diff --git a/scripts/watch-ports b/scripts/watch-ports new file mode 100755 index 0000000..47f8e01 --- /dev/null +++ b/scripts/watch-ports @@ -0,0 +1,177 @@ +#! /usr/bin/env python3 +import argparse +import sys +import logging +import re +import subprocess +import shutil +from typing import Any, Dict, List, Optional + +NETSTAT_REPORT_RE = re.compile( + r"([a-z\d]+)\s+(\d+)\s+(\d+)\s+([\d\.\:]+):(\d+)\s+([\d\.\:\*]+)\s+([A-Z\d]+)?\s+(?:(\d+)/)?" +) + +logger = logging.getLogger(__name__) + + +def parse_netstat_report(report: List[str]) -> List[Dict[str, Any]]: + # Build the output + output = [] + + # Handle each line + for line in report: + match = NETSTAT_REPORT_RE.match(line) + if match is None: + logger.debug(f"Failed to parse line: {line}") + continue + + # Parse the match + proto, recvq, sendq, local, port, remote, state, pid = match.groups() + output.append( + { + "proto": proto, + "recvq": int(recvq), + "sendq": int(sendq), + "local": local, + "port": int(port), + "remote": remote, + "state": state, + "pid": int(pid) if pid is not None else None, + } + ) + + return output + + +def print_changes( + args: argparse.Namespace, + last_report: List[Dict[str, Any]], + new_report: List[Dict[str, Any]], +) -> None: + # Allocate a list of additions and removals + changes = [] + + # Find additions + for new_entry in new_report: + if new_entry not in last_report: + new_entry = new_entry.copy() + new_entry["action"] = "add" + changes.append(new_entry) + + # Find removals + for last_entry in last_report: + if last_entry not in new_report: + last_entry = last_entry.copy() + last_entry["action"] = "remove" + changes.append(last_entry) + + # Print the changes + for change in changes: + # If we aren't showing loopback and this is a loopback bind, skip it + if not args.show_loopback and change["local"].startswith("127."): + continue + + # Determine the process name + proc_name = ( + f"PID {change['pid']}" + if change["pid"] is not None + else "an unknown process" + ) + + # Determine the action + action = "started" if change["action"] == "add" else "stopped" + + # Clean the protocol + proto_clean = change["proto"].replace("6", "").replace("4", "") + if proto_clean == "raw": + proto_clean = "" + else: + proto_clean = f"/{proto_clean}" + + # Print the change + message = f"{proc_name} has {action} listening on {change['local']} port {change['port']}{proto_clean}" + message = message[0].upper() + message[1:] + if args.output_mode == "print": + print(message) + else: + logger.info(message) + + +def main() -> int: + # Handle program arguments + ap = argparse.ArgumentParser( + prog="watch-ports", description="Displays changes to open ports" + ) + ap.add_argument( + "--output-mode", + help="How to display messages", + choices=["print", "log"], + default="print", + ) + ap.add_argument( + "--no-tail", help="Don't show the tail of the log", action="store_true" + ) + ap.add_argument( + "--show-loopback", help="Also show loopback binds", action="store_true" + ) + ap.add_argument( + "-v", "--verbose", help="Enable verbose logging", action="store_true" + ) + args = ap.parse_args() + + # Configure logging + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s: %(message)s", + ) + + # If we don't have netstat, we can't do anything + if shutil.which("netstat") is None: + logger.error("`netstat` not found in $PATH") + return 1 + + # Launch netstat + netstat = subprocess.Popen( + ["netstat", "-64lpnWc"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL + ) + + # Netstat returns a full list in a loop. So we need to read lines into a buffer, + # then parse the whole buffer when a new update is sent. + info_buffer: List[str] = [] + last_report: Optional[List[Dict[str, Any]]] = None + try: + for line in netstat.stdout: + line = line.decode("utf-8").strip() + + # If this line marks the start of a new section, parse the buffer + if line.startswith("Proto"): + + # Parse + parsed_info = parse_netstat_report(info_buffer) + + # Handle the changes + if last_report is not None: + print_changes(args, last_report, parsed_info) + + # Update the last report + # NOTE: the logic here makes more sense if you think about it for a moment ;) + if args.no_tail or parsed_info: + last_report = parsed_info + info_buffer = [] + continue + + # If the line starts with the word "Active" we can skip it + if line.startswith("Active"): + continue + + # Otherwise, add the line to the buffer + info_buffer.append(line) + except KeyboardInterrupt: + netstat.kill() + return 0 + + return 0 + + +if __name__ == "__main__": + sys.exit(main())