diff --git a/Cargo.toml b/Cargo.toml index 32eaaec..91ea5ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ categories = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +protomask-tun = { path = "protomask-tun", version = "0.1.0" } tokio = { version = "1.29.1", features = [ "macros", "rt-multi-thread", @@ -36,7 +37,7 @@ bimap = "0.6.3" pnet_packet = "0.33.0" rtnetlink = "0.13.0" futures = "0.3.28" -protomask-tun = { path = "protomask-tun", version = "0.1.0" } +prometheus-client = "0.21.2" [[bin]] name = "protomask" diff --git a/src/cli/main.rs b/src/cli/main.rs index 4fdb9ec..5532f30 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -3,7 +3,7 @@ use clap::Parser; use config::Config; use logging::enable_logger; -use protomask::nat::Nat64; +use protomask::{nat::Nat64, metrics::registry::MetricRegistry}; mod cli; mod config; @@ -22,7 +22,7 @@ pub async fn main() { // Currently, only a /96 is supported if config.nat64_prefix.prefix_len() != 96 { - log::error!("Only a /96 prefix is supported for the NAT64 prefix"); + log::error!("Only a /96 length is supported for the NAT64 prefix"); std::process::exit(1); } @@ -41,6 +41,15 @@ pub async fn main() { .await .unwrap(); + // Create a metric registry + let mut metric_registry = MetricRegistry::new(); + let metric_sender = metric_registry.get_sender(); + + // Run the metric registry + tokio::spawn(async move { + metric_registry.run().await; + }); + // Handle packets - nat64.run().await.unwrap(); + nat64.run(metric_sender).await.unwrap(); } diff --git a/src/lib.rs b/src/lib.rs index 70539e1..d5d7c1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,3 +4,4 @@ pub mod nat; mod packet; +pub mod metrics; \ No newline at end of file diff --git a/src/metrics/labels.rs b/src/metrics/labels.rs new file mode 100644 index 0000000..a086667 --- /dev/null +++ b/src/metrics/labels.rs @@ -0,0 +1,24 @@ +use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; + + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +pub enum PacketStatus { + Sent, + Accepted, + Dropped, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct PacketsMetric { + /// The protocol being counted + pub protocol: IpProtocol, + + /// The status of the packet + pub status: PacketStatus, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +pub enum IpProtocol { + Ipv4, + Ipv6, +} diff --git a/src/metrics/macros.rs b/src/metrics/macros.rs new file mode 100644 index 0000000..cc68b40 --- /dev/null +++ b/src/metrics/macros.rs @@ -0,0 +1,32 @@ +// Calling send with nested enums is kinda messy, so these macros clean up the calling code a bit + +#[macro_export] +#[cfg_attr(rustfmt, rustfmt_skip)] +macro_rules! count_packet { + ($sender: expr, $protocol: expr, $status: expr) => { + $sender.send( + crate::metrics::registry::MetricEvent::CounterAdd( + crate::metrics::registry::Metric::Packets( + crate::metrics::labels::PacketsMetric { + protocol: $protocol, + status: $status, + } + ), + ) + ).await + }; +} + +#[macro_export] +macro_rules! count_packet_ipv4 { + ($sender: expr, $status: expr) => { + count_packet!($sender, crate::metrics::labels::IpProtocol::Ipv4, $status) + }; +} + +#[macro_export] +macro_rules! count_packet_ipv6 { + ($sender: expr, $status: expr) => { + count_packet!($sender, crate::metrics::labels::IpProtocol::Ipv6, $status) + }; +} \ No newline at end of file diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..d7e83a3 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1,5 @@ +//! Open Metrics support + +pub mod labels; +pub mod registry; +pub(crate) mod macros; \ No newline at end of file diff --git a/src/metrics/registry.rs b/src/metrics/registry.rs new file mode 100644 index 0000000..088671d --- /dev/null +++ b/src/metrics/registry.rs @@ -0,0 +1,83 @@ +use prometheus_client::{ + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; +use tokio::sync::mpsc; + +use super::labels::PacketsMetric; + +/// A metric event (modification to a metric) +pub enum MetricEvent { + CounterAdd(Metric), +} + +/// A metric to be modified +pub enum Metric { + Packets(PacketsMetric), +} + +/// Sender that sends `MetricEvent`s +pub type MetricEventSender = mpsc::Sender; + +/// Automated metric handler +pub struct MetricRegistry { + /// Sender for things that produce metrics to inform the registry of changes + sender: MetricEventSender, + + /// Receiver used to read incoming events + receiver: mpsc::Receiver, + + // Handles on various metrics + metric_packets: Family, +} + +impl MetricRegistry { + /// Construct a new metric registry + pub fn new() -> Self { + // Build rx and tx + let (sender, receiver) = mpsc::channel(100); + + // Create the internal registry + let mut internal_registry = ::default(); + + // Create and register each metric + let metric_packets = Family::::default(); + internal_registry.register( + "packets", + "Number of packets accepted, sent, or dropped", + metric_packets.clone(), + ); + + Self { + sender, + receiver, + metric_packets, + } + } + + /// Get a copy of the sender to allow a task to send metrics + pub fn get_sender(&self) -> MetricEventSender { + self.sender.clone() + } + + /// Run the metric registry + pub async fn run(&mut self) { + // Process events in a loop + loop { + // Try to read an event + match self.receiver.recv().await { + Some(event) => match event { + MetricEvent::CounterAdd(metric) => match metric { + Metric::Packets(metric) => { + self.metric_packets.get_or_create(&metric).inc(); + } + }, + }, + None => { + // The sender has been dropped, so we should exit + break; + } + } + } + } +} diff --git a/src/nat/mod.rs b/src/nat/mod.rs index aa1ff8f..34b3eb8 100644 --- a/src/nat/mod.rs +++ b/src/nat/mod.rs @@ -1,6 +1,13 @@ -use crate::packet::{ - protocols::{ipv4::Ipv4Packet, ipv6::Ipv6Packet}, - xlat::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4}, +use crate::{ + count_packet, count_packet_ipv4, + metrics::{ + labels::PacketStatus, + registry::{MetricEvent, MetricEventSender}, + }, + packet::{ + protocols::{ipv4::Ipv4Packet, ipv6::Ipv6Packet}, + xlat::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4}, + }, count_packet_ipv6, }; use self::{ @@ -34,6 +41,8 @@ pub enum Nat64Error { PacketReceiveError(#[from] broadcast::error::RecvError), #[error(transparent)] PacketSendError(#[from] mpsc::error::SendError>), + #[error(transparent)] + MetricSendError(#[from] mpsc::error::SendError), } pub struct Nat64 { @@ -75,7 +84,7 @@ impl Nat64 { } /// Block and process all packets - pub async fn run(&mut self) -> Result<(), Nat64Error> { + pub async fn run(&mut self, metric_sender: MetricEventSender) -> Result<(), Nat64Error> { // Get an rx/tx pair for the interface let (tx, mut rx) = self.interface.spawn_worker().await; @@ -95,6 +104,10 @@ impl Nat64 { // Drop packets that aren't destined for a destination the table knows about if !self.table.contains(&IpAddr::V4(packet.destination_address)) { + // Update metrics + count_packet_ipv4!(metric_sender, PacketStatus::Dropped)?; + + // Drop packet continue; } @@ -105,28 +118,61 @@ impl Nat64 { self.table.get_reverse(packet.destination_address)?; // Spawn a task to process the packet + let metric_sender = metric_sender.clone(); tokio::spawn(async move { + count_packet_ipv4!(metric_sender, PacketStatus::Accepted).unwrap(); + + // Process the packet let output = translate_ipv4_to_ipv6(packet, new_source, new_destination) .unwrap(); + + // Send the translated packet tx.send(output.into()).await.unwrap(); + count_packet_ipv6!(metric_sender, PacketStatus::Sent).unwrap(); }); } 6 => { // Parse the packet let packet: Ipv6Packet> = packet.try_into()?; + // Drop packets "coming from" the NAT64 prefix + if self.ipv6_nat_prefix.contains(&packet.source_address) { + log::warn!( + "Dropping packet \"from\" NAT64 prefix: {} -> {}", + packet.source_address, + packet.destination_address + ); + count_packet_ipv6!(metric_sender, PacketStatus::Dropped)?; + continue; + } + // Get the new source and dest addresses let new_source = self.table.get_or_assign_ipv4(packet.source_address)?; let new_destination = extract_address(packet.destination_address); + // Drop packets destined for private IPv4 addresses + if new_destination.is_private() { + log::warn!( + "Dropping packet destined for private IPv4 address: {} -> {} ({})", + packet.source_address, + packet.destination_address, + new_destination + ); + count_packet_ipv6!(metric_sender, PacketStatus::Dropped)?; + continue; + } + // Spawn a task to process the packet + let metric_sender = metric_sender.clone(); tokio::spawn(async move { + count_packet_ipv6!(metric_sender, PacketStatus::Accepted).unwrap(); let output = translate_ipv6_to_ipv4(packet, new_source, new_destination) .unwrap(); tx.send(output.into()).await.unwrap(); + count_packet_ipv4!(metric_sender, PacketStatus::Sent).unwrap(); }); } n => {