ifpi v1
This commit is contained in:
parent
7c2fe29696
commit
ee0180e618
163
scripts/ifpi
Executable file
163
scripts/ifpi
Executable file
@ -0,0 +1,163 @@
|
||||
#! /usr/bin/env python3
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
import subprocess
|
||||
import ipaddress
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HOSTNAME_CACHE = {}
|
||||
|
||||
HOSTNAME_PATTERNS = {
|
||||
"GitHub": [re.compile(r".*github\.com\.")],
|
||||
"Google": [re.compile(r".*google\.com\."), re.compile(r".*1e100\.net\.")],
|
||||
"Google Cloud": [re.compile(r".*googleusercontent\.com\.")],
|
||||
"Amazon Web Services": [re.compile(r".*amazonaws\.com\.")],
|
||||
"Cloudfront": [re.compile(r".*cloudfront\.net\.")],
|
||||
"Evan's Infrastructure": [re.compile(r".*ewp\.fyi\.")],
|
||||
}
|
||||
|
||||
|
||||
def classify_traffic(
|
||||
destination_addr: ipaddress.IPv4Address, destination_port: int, packet_proto: str
|
||||
) -> str:
|
||||
packet_proto = packet_proto.upper().replace(",", "")
|
||||
|
||||
# Handle some easy cases
|
||||
if destination_addr.is_multicast:
|
||||
return "multicast"
|
||||
if destination_port == 22 and packet_proto == "TCP":
|
||||
return "SSH"
|
||||
if destination_port == 53 and packet_proto == "UDP":
|
||||
return "DNS"
|
||||
|
||||
# Use nslookup to get the hostname
|
||||
if destination_addr not in HOSTNAME_CACHE:
|
||||
try:
|
||||
hostname = (
|
||||
subprocess.check_output(
|
||||
["nslookup", str(destination_addr)], stderr=subprocess.DEVNULL
|
||||
)
|
||||
.decode("utf-8")
|
||||
.split("\n")[0]
|
||||
.split(" ")[-1]
|
||||
)
|
||||
HOSTNAME_CACHE[destination_addr] = hostname
|
||||
except subprocess.CalledProcessError:
|
||||
HOSTNAME_CACHE[destination_addr] = None
|
||||
|
||||
# Get the hostname
|
||||
hostname = HOSTNAME_CACHE[destination_addr] or destination_addr
|
||||
|
||||
# If this is HTTP/HTTPS traffic, try to figure out the service
|
||||
if (packet_proto == "TCP" and destination_port in [80, 443]) or (
|
||||
packet_proto == "UDP" and destination_port == 443
|
||||
):
|
||||
for service, patterns in HOSTNAME_PATTERNS.items():
|
||||
for pattern in patterns:
|
||||
if pattern.match(str(hostname)):
|
||||
return service
|
||||
|
||||
# Fallbacks in case we can't figure anything else out
|
||||
if packet_proto == "TCP" and destination_port == 443:
|
||||
return f"HTTPS ({hostname})"
|
||||
if packet_proto == "TCP" and destination_port == 80:
|
||||
return f"HTTP ({hostname})"
|
||||
if packet_proto == "TCP" and destination_port == 443:
|
||||
return f"QUIC ({hostname})"
|
||||
|
||||
return f"Unknown({packet_proto}, {destination_port}, {hostname})"
|
||||
|
||||
|
||||
def main() -> int:
|
||||
# Handle program arguments
|
||||
ap = argparse.ArgumentParser(prog="ifpi", description="Interface Packet Inspector")
|
||||
ap.add_argument("interface", help="Interface to listen on")
|
||||
ap.add_argument(
|
||||
"--local-subnet",
|
||||
"-l",
|
||||
help="Subnet(s) to consider local",
|
||||
action="append",
|
||||
)
|
||||
ap.add_argument(
|
||||
"-v", "--verbose", help="Enable verbose logging", action="store_true"
|
||||
)
|
||||
args = ap.parse_args()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(levelname)s: %(message)s",
|
||||
)
|
||||
|
||||
# If we are not root, re-launch ourselves with sudo
|
||||
if not os.geteuid() == 0:
|
||||
return subprocess.call(["sudo"] + sys.argv)
|
||||
|
||||
# Convert the local subnets to IPNetwork objects
|
||||
local_subnets = []
|
||||
if args.local_subnet:
|
||||
for subnet in args.local_subnet:
|
||||
local_subnets.append(ipaddress.ip_network(subnet))
|
||||
|
||||
# Launch tcpdump
|
||||
tcpdump_args = [
|
||||
"tcpdump",
|
||||
"-i",
|
||||
args.interface,
|
||||
"-nn",
|
||||
"-tt",
|
||||
"-q",
|
||||
]
|
||||
process = subprocess.Popen(
|
||||
tcpdump_args,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
|
||||
# Read all lines as they are printed
|
||||
for line in process.stdout:
|
||||
line = line.decode("utf-8").strip()
|
||||
|
||||
# The format is (time, proto, data)
|
||||
timestamp, protocol, data = line.split(" ", 2)
|
||||
|
||||
# We will only handle IP packets
|
||||
if protocol not in ["IP", "IP6"]:
|
||||
continue
|
||||
|
||||
# Extract source and destination IPs, along with the metadata
|
||||
routing, metadata = data.split(": ", 1)
|
||||
source, destination = routing.split(" > ")
|
||||
source_port, destination_port = (
|
||||
source.split(".")[-1],
|
||||
destination.split(".")[-1],
|
||||
)
|
||||
try:
|
||||
source = ipaddress.ip_address(".".join(source.split(".")[:-1]))
|
||||
destination = ipaddress.ip_address(".".join(destination.split(".")[:-1]))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Only pay attention to source addrs that are local
|
||||
for subnet in local_subnets:
|
||||
if source in subnet:
|
||||
break
|
||||
else:
|
||||
continue
|
||||
|
||||
# Classify the traffic
|
||||
classification = classify_traffic(
|
||||
destination, int(destination_port), metadata.lstrip().split(" ")[0]
|
||||
)
|
||||
|
||||
print(f"{source}\t{classification}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
Loading…
x
Reference in New Issue
Block a user