1

wip metrics

This commit is contained in:
Evan Pratten 2023-07-19 14:47:46 -04:00
parent c6db02c80a
commit 24cc6545ca
8 changed files with 209 additions and 8 deletions

View File

@ -16,6 +16,7 @@ categories = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
protomask-tun = { path = "protomask-tun", version = "0.1.0" }
tokio = { version = "1.29.1", features = [ tokio = { version = "1.29.1", features = [
"macros", "macros",
"rt-multi-thread", "rt-multi-thread",
@ -36,7 +37,7 @@ bimap = "0.6.3"
pnet_packet = "0.33.0" pnet_packet = "0.33.0"
rtnetlink = "0.13.0" rtnetlink = "0.13.0"
futures = "0.3.28" futures = "0.3.28"
protomask-tun = { path = "protomask-tun", version = "0.1.0" } prometheus-client = "0.21.2"
[[bin]] [[bin]]
name = "protomask" name = "protomask"

View File

@ -3,7 +3,7 @@
use clap::Parser; use clap::Parser;
use config::Config; use config::Config;
use logging::enable_logger; use logging::enable_logger;
use protomask::nat::Nat64; use protomask::{nat::Nat64, metrics::registry::MetricRegistry};
mod cli; mod cli;
mod config; mod config;
@ -22,7 +22,7 @@ pub async fn main() {
// Currently, only a /96 is supported // Currently, only a /96 is supported
if config.nat64_prefix.prefix_len() != 96 { if config.nat64_prefix.prefix_len() != 96 {
log::error!("Only a /96 prefix is supported for the NAT64 prefix"); log::error!("Only a /96 length is supported for the NAT64 prefix");
std::process::exit(1); std::process::exit(1);
} }
@ -41,6 +41,15 @@ pub async fn main() {
.await .await
.unwrap(); .unwrap();
// Create a metric registry
let mut metric_registry = MetricRegistry::new();
let metric_sender = metric_registry.get_sender();
// Run the metric registry
tokio::spawn(async move {
metric_registry.run().await;
});
// Handle packets // Handle packets
nat64.run().await.unwrap(); nat64.run(metric_sender).await.unwrap();
} }

View File

@ -4,3 +4,4 @@
pub mod nat; pub mod nat;
mod packet; mod packet;
pub mod metrics;

24
src/metrics/labels.rs Normal file
View File

@ -0,0 +1,24 @@
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum PacketStatus {
Sent,
Accepted,
Dropped,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct PacketsMetric {
/// The protocol being counted
pub protocol: IpProtocol,
/// The status of the packet
pub status: PacketStatus,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum IpProtocol {
Ipv4,
Ipv6,
}

32
src/metrics/macros.rs Normal file
View File

@ -0,0 +1,32 @@
// Calling send with nested enums is kinda messy, so these macros clean up the calling code a bit
#[macro_export]
#[cfg_attr(rustfmt, rustfmt_skip)]
macro_rules! count_packet {
($sender: expr, $protocol: expr, $status: expr) => {
$sender.send(
crate::metrics::registry::MetricEvent::CounterAdd(
crate::metrics::registry::Metric::Packets(
crate::metrics::labels::PacketsMetric {
protocol: $protocol,
status: $status,
}
),
)
).await
};
}
#[macro_export]
macro_rules! count_packet_ipv4 {
($sender: expr, $status: expr) => {
count_packet!($sender, crate::metrics::labels::IpProtocol::Ipv4, $status)
};
}
#[macro_export]
macro_rules! count_packet_ipv6 {
($sender: expr, $status: expr) => {
count_packet!($sender, crate::metrics::labels::IpProtocol::Ipv6, $status)
};
}

5
src/metrics/mod.rs Normal file
View File

@ -0,0 +1,5 @@
//! Open Metrics support
pub mod labels;
pub mod registry;
pub(crate) mod macros;

83
src/metrics/registry.rs Normal file
View File

@ -0,0 +1,83 @@
use prometheus_client::{
metrics::{counter::Counter, family::Family},
registry::Registry,
};
use tokio::sync::mpsc;
use super::labels::PacketsMetric;
/// A metric event (modification to a metric)
pub enum MetricEvent {
CounterAdd(Metric),
}
/// A metric to be modified
pub enum Metric {
Packets(PacketsMetric),
}
/// Sender that sends `MetricEvent`s
pub type MetricEventSender = mpsc::Sender<MetricEvent>;
/// Automated metric handler
pub struct MetricRegistry {
/// Sender for things that produce metrics to inform the registry of changes
sender: MetricEventSender,
/// Receiver used to read incoming events
receiver: mpsc::Receiver<MetricEvent>,
// Handles on various metrics
metric_packets: Family<PacketsMetric, Counter>,
}
impl MetricRegistry {
/// Construct a new metric registry
pub fn new() -> Self {
// Build rx and tx
let (sender, receiver) = mpsc::channel(100);
// Create the internal registry
let mut internal_registry = <Registry>::default();
// Create and register each metric
let metric_packets = Family::<PacketsMetric, Counter>::default();
internal_registry.register(
"packets",
"Number of packets accepted, sent, or dropped",
metric_packets.clone(),
);
Self {
sender,
receiver,
metric_packets,
}
}
/// Get a copy of the sender to allow a task to send metrics
pub fn get_sender(&self) -> MetricEventSender {
self.sender.clone()
}
/// Run the metric registry
pub async fn run(&mut self) {
// Process events in a loop
loop {
// Try to read an event
match self.receiver.recv().await {
Some(event) => match event {
MetricEvent::CounterAdd(metric) => match metric {
Metric::Packets(metric) => {
self.metric_packets.get_or_create(&metric).inc();
}
},
},
None => {
// The sender has been dropped, so we should exit
break;
}
}
}
}
}

View File

@ -1,6 +1,13 @@
use crate::packet::{ use crate::{
protocols::{ipv4::Ipv4Packet, ipv6::Ipv6Packet}, count_packet, count_packet_ipv4,
xlat::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4}, metrics::{
labels::PacketStatus,
registry::{MetricEvent, MetricEventSender},
},
packet::{
protocols::{ipv4::Ipv4Packet, ipv6::Ipv6Packet},
xlat::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4},
}, count_packet_ipv6,
}; };
use self::{ use self::{
@ -34,6 +41,8 @@ pub enum Nat64Error {
PacketReceiveError(#[from] broadcast::error::RecvError), PacketReceiveError(#[from] broadcast::error::RecvError),
#[error(transparent)] #[error(transparent)]
PacketSendError(#[from] mpsc::error::SendError<Vec<u8>>), PacketSendError(#[from] mpsc::error::SendError<Vec<u8>>),
#[error(transparent)]
MetricSendError(#[from] mpsc::error::SendError<MetricEvent>),
} }
pub struct Nat64 { pub struct Nat64 {
@ -75,7 +84,7 @@ impl Nat64 {
} }
/// Block and process all packets /// Block and process all packets
pub async fn run(&mut self) -> Result<(), Nat64Error> { pub async fn run(&mut self, metric_sender: MetricEventSender) -> Result<(), Nat64Error> {
// Get an rx/tx pair for the interface // Get an rx/tx pair for the interface
let (tx, mut rx) = self.interface.spawn_worker().await; let (tx, mut rx) = self.interface.spawn_worker().await;
@ -95,6 +104,10 @@ impl Nat64 {
// Drop packets that aren't destined for a destination the table knows about // Drop packets that aren't destined for a destination the table knows about
if !self.table.contains(&IpAddr::V4(packet.destination_address)) { if !self.table.contains(&IpAddr::V4(packet.destination_address)) {
// Update metrics
count_packet_ipv4!(metric_sender, PacketStatus::Dropped)?;
// Drop packet
continue; continue;
} }
@ -105,28 +118,61 @@ impl Nat64 {
self.table.get_reverse(packet.destination_address)?; self.table.get_reverse(packet.destination_address)?;
// Spawn a task to process the packet // Spawn a task to process the packet
let metric_sender = metric_sender.clone();
tokio::spawn(async move { tokio::spawn(async move {
count_packet_ipv4!(metric_sender, PacketStatus::Accepted).unwrap();
// Process the packet
let output = let output =
translate_ipv4_to_ipv6(packet, new_source, new_destination) translate_ipv4_to_ipv6(packet, new_source, new_destination)
.unwrap(); .unwrap();
// Send the translated packet
tx.send(output.into()).await.unwrap(); tx.send(output.into()).await.unwrap();
count_packet_ipv6!(metric_sender, PacketStatus::Sent).unwrap();
}); });
} }
6 => { 6 => {
// Parse the packet // Parse the packet
let packet: Ipv6Packet<Vec<u8>> = packet.try_into()?; let packet: Ipv6Packet<Vec<u8>> = packet.try_into()?;
// Drop packets "coming from" the NAT64 prefix
if self.ipv6_nat_prefix.contains(&packet.source_address) {
log::warn!(
"Dropping packet \"from\" NAT64 prefix: {} -> {}",
packet.source_address,
packet.destination_address
);
count_packet_ipv6!(metric_sender, PacketStatus::Dropped)?;
continue;
}
// Get the new source and dest addresses // Get the new source and dest addresses
let new_source = let new_source =
self.table.get_or_assign_ipv4(packet.source_address)?; self.table.get_or_assign_ipv4(packet.source_address)?;
let new_destination = extract_address(packet.destination_address); let new_destination = extract_address(packet.destination_address);
// Drop packets destined for private IPv4 addresses
if new_destination.is_private() {
log::warn!(
"Dropping packet destined for private IPv4 address: {} -> {} ({})",
packet.source_address,
packet.destination_address,
new_destination
);
count_packet_ipv6!(metric_sender, PacketStatus::Dropped)?;
continue;
}
// Spawn a task to process the packet // Spawn a task to process the packet
let metric_sender = metric_sender.clone();
tokio::spawn(async move { tokio::spawn(async move {
count_packet_ipv6!(metric_sender, PacketStatus::Accepted).unwrap();
let output = let output =
translate_ipv6_to_ipv4(packet, new_source, new_destination) translate_ipv6_to_ipv4(packet, new_source, new_destination)
.unwrap(); .unwrap();
tx.send(output.into()).await.unwrap(); tx.send(output.into()).await.unwrap();
count_packet_ipv4!(metric_sender, PacketStatus::Sent).unwrap();
}); });
} }
n => { n => {