178 lines
5.2 KiB
Python
Executable File
178 lines
5.2 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
import argparse
|
|
import sys
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import shutil
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
NETSTAT_REPORT_RE = re.compile(
|
|
r"([a-z\d]+)\s+(\d+)\s+(\d+)\s+([\d\.\:]+):(\d+)\s+([\d\.\:\*]+)\s+([A-Z\d]+)?\s+(?:(\d+)/)?"
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_netstat_report(report: List[str]) -> List[Dict[str, Any]]:
|
|
# Build the output
|
|
output = []
|
|
|
|
# Handle each line
|
|
for line in report:
|
|
match = NETSTAT_REPORT_RE.match(line)
|
|
if match is None:
|
|
logger.debug(f"Failed to parse line: {line}")
|
|
continue
|
|
|
|
# Parse the match
|
|
proto, recvq, sendq, local, port, remote, state, pid = match.groups()
|
|
output.append(
|
|
{
|
|
"proto": proto,
|
|
"recvq": int(recvq),
|
|
"sendq": int(sendq),
|
|
"local": local,
|
|
"port": int(port),
|
|
"remote": remote,
|
|
"state": state,
|
|
"pid": int(pid) if pid is not None else None,
|
|
}
|
|
)
|
|
|
|
return output
|
|
|
|
|
|
def print_changes(
|
|
args: argparse.Namespace,
|
|
last_report: List[Dict[str, Any]],
|
|
new_report: List[Dict[str, Any]],
|
|
) -> None:
|
|
# Allocate a list of additions and removals
|
|
changes = []
|
|
|
|
# Find additions
|
|
for new_entry in new_report:
|
|
if new_entry not in last_report:
|
|
new_entry = new_entry.copy()
|
|
new_entry["action"] = "add"
|
|
changes.append(new_entry)
|
|
|
|
# Find removals
|
|
for last_entry in last_report:
|
|
if last_entry not in new_report:
|
|
last_entry = last_entry.copy()
|
|
last_entry["action"] = "remove"
|
|
changes.append(last_entry)
|
|
|
|
# Print the changes
|
|
for change in changes:
|
|
# If we aren't showing loopback and this is a loopback bind, skip it
|
|
if not args.show_loopback and change["local"].startswith("127."):
|
|
continue
|
|
|
|
# Determine the process name
|
|
proc_name = (
|
|
f"PID {change['pid']}"
|
|
if change["pid"] is not None
|
|
else "an unknown process"
|
|
)
|
|
|
|
# Determine the action
|
|
action = "started" if change["action"] == "add" else "stopped"
|
|
|
|
# Clean the protocol
|
|
proto_clean = change["proto"].replace("6", "").replace("4", "")
|
|
if proto_clean == "raw":
|
|
proto_clean = ""
|
|
else:
|
|
proto_clean = f"/{proto_clean}"
|
|
|
|
# Print the change
|
|
message = f"{proc_name} has {action} listening on {change['local']} port {change['port']}{proto_clean}"
|
|
message = message[0].upper() + message[1:]
|
|
if args.output_mode == "print":
|
|
print(message, flush=True)
|
|
else:
|
|
logger.info(message)
|
|
|
|
|
|
def main() -> int:
|
|
# Handle program arguments
|
|
ap = argparse.ArgumentParser(
|
|
prog="watch-ports", description="Displays changes to open ports"
|
|
)
|
|
ap.add_argument(
|
|
"--output-mode",
|
|
help="How to display messages",
|
|
choices=["print", "log"],
|
|
default="print",
|
|
)
|
|
ap.add_argument(
|
|
"--no-tail", help="Don't show the tail of the log", action="store_true"
|
|
)
|
|
ap.add_argument(
|
|
"--show-loopback", help="Also show loopback binds", action="store_true"
|
|
)
|
|
ap.add_argument(
|
|
"-v", "--verbose", help="Enable verbose logging", action="store_true"
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(levelname)s: %(message)s",
|
|
)
|
|
|
|
# If we don't have netstat, we can't do anything
|
|
if shutil.which("netstat") is None:
|
|
logger.error("`netstat` not found in $PATH")
|
|
return 1
|
|
|
|
# Launch netstat
|
|
netstat = subprocess.Popen(
|
|
["netstat", "-64lpnWc"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
|
|
)
|
|
|
|
# Netstat returns a full list in a loop. So we need to read lines into a buffer,
|
|
# then parse the whole buffer when a new update is sent.
|
|
info_buffer: List[str] = []
|
|
last_report: Optional[List[Dict[str, Any]]] = None
|
|
try:
|
|
for line in netstat.stdout:
|
|
line = line.decode("utf-8").strip()
|
|
|
|
# If this line marks the start of a new section, parse the buffer
|
|
if line.startswith("Proto"):
|
|
|
|
# Parse
|
|
parsed_info = parse_netstat_report(info_buffer)
|
|
|
|
# Handle the changes
|
|
if last_report is not None:
|
|
print_changes(args, last_report, parsed_info)
|
|
|
|
# Update the last report
|
|
# NOTE: the logic here makes more sense if you think about it for a moment ;)
|
|
if args.no_tail or parsed_info:
|
|
last_report = parsed_info
|
|
info_buffer = []
|
|
continue
|
|
|
|
# If the line starts with the word "Active" we can skip it
|
|
if line.startswith("Active"):
|
|
continue
|
|
|
|
# Otherwise, add the line to the buffer
|
|
info_buffer.append(line)
|
|
except KeyboardInterrupt:
|
|
netstat.kill()
|
|
return 0
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|