1
ewconfig/scripts/rbn-grep

89 lines
2.4 KiB
Python
Executable File

#! /usr/bin/env python3
import argparse
import sys
import re
import socket
def main() -> int:
# Handle program arguments
ap = argparse.ArgumentParser(
prog="rbn-grep", description="Grep against RBN data in real-time"
)
ap.add_argument(
"--callsign", "-c", help="REGEX for the TX-ing callsign", default=".*"
)
ap.add_argument(
"--spotter", "-s", help="REGEX for the spotting callsign", default=".*"
)
ap.add_argument(
"--min-frequency", "--fl", help="Minimum frequency in kc", type=float, default=0
)
ap.add_argument(
"--max-frequency",
"--fh",
help="Maximum frequency in kc",
type=float,
default=sys.maxsize,
)
ap.add_argument(
"--stream-type",
"--st",
help="Stream type",
default="analog",
choices=["analog", "digital"],
)
ap.add_argument("--login-callsign", help="Login callsign", default="n0call")
args = ap.parse_args()
# Compile regexes
callsign_regex = re.compile(args.callsign, re.IGNORECASE)
spotter_regex = re.compile(args.spotter, re.IGNORECASE)
# Connect to the RBN Telnet servers
conn = socket.create_connection(
("telnet.reversebeacon.net", 7000 if args.stream_type == "analog" else 7001)
)
# Log in
conn.send(f"{args.login_callsign}\n".encode("ascii"))
# Read lines and filter
try:
while True:
# Read a line
lines = conn.recv(1024).decode("ascii").strip()
for line in lines.splitlines():
line_split = [x for x in line.split(" ") if x]
# Ignore bad lines
if not line.startswith("DX"):
continue
# Parse the data
spotter = line_split[2]
frequency_kc = float(line_split[3])
spotted = line_split[4]
# Filter
if (
not callsign_regex.match(spotted)
or not spotter_regex.match(spotter)
or frequency_kc < args.min_frequency
or frequency_kc > args.max_frequency
):
continue
# Print the line
print(line)
except KeyboardInterrupt:
print("\nGoodbye")
conn.close()
return 0
if __name__ == "__main__":
sys.exit(main())