From a4b2ea29eec8fdb63feeffcba2f2364193702944 Mon Sep 17 00:00:00 2001 From: Evan Pratten Date: Thu, 21 Nov 2024 15:24:31 -0500 Subject: [PATCH] New chunking logic --- scripts/rbn-to-mqtt | 71 +++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/scripts/rbn-to-mqtt b/scripts/rbn-to-mqtt index 1584d10..a61577d 100755 --- a/scripts/rbn-to-mqtt +++ b/scripts/rbn-to-mqtt @@ -36,6 +36,7 @@ def main() -> int: rbn_socket.sendall(b'va3ujf\r\n') # Handle incoming RBN spots + buffer = b'' while True: # Read incoming packet (may contain multiple spots) @@ -43,39 +44,45 @@ def main() -> int: if not data: break + # Append to buffer + buffer += data + # Split the packet into spots - for spot in data.splitlines(): - # Some lines aren't spots - if not spot.startswith(b'DX'): - continue - - # Parse the spot - match = RBN_SPOT_RE.match(spot.decode('utf-8')) - if not match: - logger.warning("Failed to parse spot: %s", spot) - continue - - values = match.groupdict() - - # Parse the timestamp into something more useful - utc_now = datetime.now(UTC) - timestamp = datetime(utc_now.year, utc_now.month, utc_now.day, int(values["time"][:2]), int(values["time"][2:4]), 0) - - # Sanitize into a new dict - sanitized = { - "spotter": values["spotter"].upper(), - "frequency_khz": float(values["frequency"]), - "spotted": values["spotted"].upper(), - "mode": values["mode"].upper(), - "db": int(values["db"]), - "speed": values["notes"] if values["notes"] and values["notes"].endswith("WPM") else None, - "grid": values["notes"] if values["notes"] and (not values.get("notes", "WPM").endswith("WPM")) else None, - "timestamp": timestamp.timestamp(), - } - - # Publish the spot to the MQTT broker - logger.debug("Writing spot: %s", sanitized) - mqtt_client.publish(f"radio/spots/rbn/{args.rbn_mode}/{sanitized['spotted']}", payload=json.dumps(sanitized)) + if b'\r\n' in buffer: + chunks = buffer.splitlines() + buffer = chunks.pop() # Store the un-finished chunk for later + for spot in chunks: + # Some lines aren't spots + if not spot.startswith(b'DX'): + continue + + # Parse the spot + match = RBN_SPOT_RE.match(spot.decode('utf-8')) + if not match: + logger.warning("Failed to parse spot: %s", spot) + continue + + values = match.groupdict() + + # Parse the timestamp into something more useful + utc_now = datetime.now(UTC) + timestamp = datetime(utc_now.year, utc_now.month, utc_now.day, int(values["time"][:2]), int(values["time"][2:4]), 0) + + # Sanitize into a new dict + sanitized = { + "spotter": values["spotter"].upper(), + "frequency_khz": float(values["frequency"]), + "spotted": values["spotted"].upper(), + "mode": values["mode"].upper(), + "db": int(values["db"]), + "speed": values["notes"] if values["notes"] and values["notes"].endswith("WPM") else None, + "grid": values["notes"] if values["notes"] and (not values.get("notes", "WPM").endswith("WPM")) else None, + "timestamp": timestamp.timestamp(), + } + + # Publish the spot to the MQTT broker + logger.debug("Writing spot: %s", sanitized) + mqtt_client.publish(f"radio/spots/rbn/{args.rbn_mode}/{sanitized['spotted']}", payload=json.dumps(sanitized)) return 0