1

Add prometheus support

This commit is contained in:
Evan Pratten 2023-07-20 10:44:14 -04:00
parent 61caeef321
commit 77266548c8
10 changed files with 172 additions and 12 deletions

View File

@ -12,10 +12,9 @@ license = "GPL-3.0"
keywords = [] keywords = []
categories = [] categories = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
protomask-tun = { path = "protomask-tun", version = "0.1.0" }
tokio = { version = "1.29.1", features = [ tokio = { version = "1.29.1", features = [
"macros", "macros",
"rt-multi-thread", "rt-multi-thread",
@ -25,6 +24,7 @@ tokio = { version = "1.29.1", features = [
clap = { version = "4.3.11", features = ["derive"] } clap = { version = "4.3.11", features = ["derive"] }
serde = { version = "1.0.171", features = ["derive"] } serde = { version = "1.0.171", features = ["derive"] }
ipnet = { version = "2.8.0", features = ["serde"] } ipnet = { version = "2.8.0", features = ["serde"] }
hyper = { version = "0.14.27", features = ["server", "http1", "tcp"] }
toml = "0.7.6" toml = "0.7.6"
log = "0.4.19" log = "0.4.19"
fern = "0.6.2" fern = "0.6.2"
@ -36,7 +36,8 @@ bimap = "0.6.3"
pnet_packet = "0.33.0" pnet_packet = "0.33.0"
rtnetlink = "0.13.0" rtnetlink = "0.13.0"
futures = "0.3.28" futures = "0.3.28"
protomask-tun = { path = "protomask-tun", version = "0.1.0" } prometheus = "0.13.3"
lazy_static = "1.4.0"
[[bin]] [[bin]]
name = "protomask" name = "protomask"

View File

@ -1,7 +1,7 @@
//! Serde definitions for the config file //! Serde definitions for the config file
use std::{ use std::{
net::{Ipv4Addr, Ipv6Addr}, net::{Ipv4Addr, Ipv6Addr, SocketAddr},
path::Path, path::Path,
time::Duration, time::Duration,
}; };
@ -50,6 +50,9 @@ pub struct Config {
/// The NAT64 prefix /// The NAT64 prefix
#[serde(rename = "Nat64Prefix")] #[serde(rename = "Nat64Prefix")]
pub nat64_prefix: Ipv6Net, pub nat64_prefix: Ipv6Net,
/// Address to bind to for prometheus support
#[serde(rename = "Prometheus")]
pub prom_bind_addr: Option<SocketAddr>,
/// Pool configuration /// Pool configuration
#[serde(rename = "Pool")] #[serde(rename = "Pool")]
pub pool: PoolConfig, pub pool: PoolConfig,

View File

@ -41,6 +41,12 @@ pub async fn main() {
.await .await
.unwrap(); .unwrap();
// Handle metrics requests
if let Some(bind_addr) = config.prom_bind_addr {
log::info!("Enabling metrics server on {}", bind_addr);
tokio::spawn(protomask::metrics::serve_metrics(bind_addr));
}
// Handle packets // Handle packets
nat64.run().await.unwrap(); nat64.run().await.unwrap();
} }

View File

@ -4,3 +4,4 @@
pub mod nat; pub mod nat;
mod packet; mod packet;
pub mod metrics;

44
src/metrics/http.rs Normal file
View File

@ -0,0 +1,44 @@
use std::{convert::Infallible, net::SocketAddr};
use hyper::{
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use prometheus::{Encoder, TextEncoder};
/// Handle an HTTP request
async fn handle_request(request: Request<Body>) -> Result<Response<Body>, Infallible> {
// If the request is targeting the metrics endpoint
if request.method() == Method::GET && request.uri().path() == "/metrics" {
// Gather metrics
let metric_families = prometheus::gather();
let body = {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
};
// Return the response
return Ok(Response::new(Body::from(body)));
}
// Otherwise, just return a 404
Ok(Response::builder()
.status(404)
.body(Body::from("Not found"))
.unwrap())
}
/// Bring up an HTTP server that listens for metrics requests
pub async fn serve_metrics(bind_addr: SocketAddr) {
// Set up the server
let make_service =
make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(handle_request)) });
let server = Server::bind(&bind_addr).serve(make_service);
// Run the server
if let Err(e) = server.await {
eprintln!("Metrics server error: {}", e);
}
}

34
src/metrics/metrics.rs Normal file
View File

@ -0,0 +1,34 @@
use lazy_static::lazy_static;
use prometheus::{
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, IntCounterVec, IntGauge,
IntGaugeVec,
};
lazy_static! {
/// Counter for the number of packets processes
pub static ref PACKET_COUNTER: IntCounterVec = register_int_counter_vec!(
"packets",
"Number of packets processed",
&["protocol", "status"]
).unwrap();
/// Counter for ICMP packet types
pub static ref ICMP_COUNTER: IntCounterVec = register_int_counter_vec!(
"icmp",
"Number of ICMP packets processed",
&["protocol", "type", "code"]
).unwrap();
/// Gauge for the number of addresses in the IPv4 pool
pub static ref IPV4_POOL_SIZE: IntGauge = register_int_gauge!(
"ipv4_pool_size",
"Number of IPv4 addresses in the pool"
).unwrap();
/// Gauge for the number of addresses currently reserved in the IPv4 pool
pub static ref IPV4_POOL_RESERVED: IntGaugeVec = register_int_gauge_vec!(
"ipv4_pool_reserved",
"Number of IPv4 addresses currently reserved",
&["static"]
).unwrap();
}

5
src/metrics/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod http;
mod metrics;
pub(crate) use metrics::*;
pub use http::serve_metrics;

View File

@ -1,6 +1,9 @@
use crate::packet::{ use crate::{
protocols::{ipv4::Ipv4Packet, ipv6::Ipv6Packet}, metrics::PACKET_COUNTER,
xlat::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4}, packet::{
protocols::{ipv4::Ipv4Packet, ipv6::Ipv6Packet},
xlat::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4},
},
}; };
use self::{ use self::{
@ -79,6 +82,7 @@ impl Nat64 {
// Drop packets that aren't destined for a destination the table knows about // Drop packets that aren't destined for a destination the table knows about
if !self.table.contains(&IpAddr::V4(packet.destination_address)) { if !self.table.contains(&IpAddr::V4(packet.destination_address)) {
PACKET_COUNTER.with_label_values(&["ipv4", "dropped"]).inc();
continue; continue;
} }
@ -88,12 +92,18 @@ impl Nat64 {
let new_destination = let new_destination =
self.table.get_reverse(packet.destination_address)?; self.table.get_reverse(packet.destination_address)?;
// Mark the packet as accepted
PACKET_COUNTER
.with_label_values(&["ipv4", "accepted"])
.inc();
// Spawn a task to process the packet // Spawn a task to process the packet
tokio::spawn(async move { tokio::spawn(async move {
let output = let output =
translate_ipv4_to_ipv6(packet, new_source, new_destination) translate_ipv4_to_ipv6(packet, new_source, new_destination)
.unwrap(); .unwrap();
tx.send(output.into()).await.unwrap(); tx.send(output.into()).await.unwrap();
PACKET_COUNTER.with_label_values(&["ipv6", "sent"]).inc();
}); });
} }
6 => { 6 => {
@ -107,6 +117,7 @@ impl Nat64 {
packet.source_address, packet.source_address,
packet.destination_address packet.destination_address
); );
PACKET_COUNTER.with_label_values(&["ipv6", "dropped"]).inc();
continue; continue;
} }
@ -123,15 +134,22 @@ impl Nat64 {
packet.destination_address, packet.destination_address,
new_destination new_destination
); );
PACKET_COUNTER.with_label_values(&["ipv6", "dropped"]).inc();
continue; continue;
} }
// Mark the packet as accepted
PACKET_COUNTER
.with_label_values(&["ipv6", "accepted"])
.inc();
// Spawn a task to process the packet // Spawn a task to process the packet
tokio::spawn(async move { tokio::spawn(async move {
let output = let output =
translate_ipv6_to_ipv4(packet, new_source, new_destination) translate_ipv6_to_ipv4(packet, new_source, new_destination)
.unwrap(); .unwrap();
tx.send(output.into()).await.unwrap(); tx.send(output.into()).await.unwrap();
PACKET_COUNTER.with_label_values(&["ipv4", "sent"]).inc();
}); });
} }
n => { n => {

View File

@ -7,6 +7,8 @@ use std::{
use bimap::BiHashMap; use bimap::BiHashMap;
use ipnet::{Ipv4Net, Ipv6Net}; use ipnet::{Ipv4Net, Ipv6Net};
use crate::metrics::{IPV4_POOL_RESERVED, IPV4_POOL_SIZE};
/// Possible errors thrown in the address reservation process /// Possible errors thrown in the address reservation process
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum TableError { pub enum TableError {
@ -38,6 +40,10 @@ impl Nat64Table {
/// - `ipv4_pool`: The pool of IPv4 addresses to use in the mapping process /// - `ipv4_pool`: The pool of IPv4 addresses to use in the mapping process
/// - `reservation_timeout`: The amount of time to reserve an address pair for /// - `reservation_timeout`: The amount of time to reserve an address pair for
pub fn new(ipv4_pool: Vec<Ipv4Net>, reservation_timeout: Duration) -> Self { pub fn new(ipv4_pool: Vec<Ipv4Net>, reservation_timeout: Duration) -> Self {
// Track the total pool size
let total_size: usize = ipv4_pool.iter().map(|net| net.hosts().count()).sum();
IPV4_POOL_SIZE.set(total_size as i64);
Self { Self {
ipv4_pool, ipv4_pool,
reservations: BiHashMap::new(), reservations: BiHashMap::new(),
@ -54,6 +60,7 @@ impl Nat64Table {
) -> Result<(), TableError> { ) -> Result<(), TableError> {
// Check if either address is already reserved // Check if either address is already reserved
self.prune(); self.prune();
self.track_utilization();
if self.reservations.contains_left(&ipv6) { if self.reservations.contains_left(&ipv6) {
return Err(TableError::AddressAlreadyReserved(ipv6.into())); return Err(TableError::AddressAlreadyReserved(ipv6.into()));
} else if self.reservations.contains_right(&ipv4) { } else if self.reservations.contains_right(&ipv4) {
@ -79,6 +86,7 @@ impl Nat64Table {
pub fn get_or_assign_ipv4(&mut self, ipv6: Ipv6Addr) -> Result<Ipv4Addr, TableError> { pub fn get_or_assign_ipv4(&mut self, ipv6: Ipv6Addr) -> Result<Ipv4Addr, TableError> {
// Prune old reservations // Prune old reservations
self.prune(); self.prune();
self.track_utilization();
// If the IPv6 address is already reserved, return the IPv4 address // If the IPv6 address is already reserved, return the IPv4 address
if let Some(ipv4) = self.reservations.get_by_left(&ipv6) { if let Some(ipv4) = self.reservations.get_by_left(&ipv6) {
@ -113,6 +121,7 @@ impl Nat64Table {
pub fn get_reverse(&mut self, ipv4: Ipv4Addr) -> Result<Ipv6Addr, TableError> { pub fn get_reverse(&mut self, ipv4: Ipv4Addr) -> Result<Ipv6Addr, TableError> {
// Prune old reservations // Prune old reservations
self.prune(); self.prune();
self.track_utilization();
// If the IPv4 address is already reserved, return the IPv6 address // If the IPv4 address is already reserved, return the IPv6 address
if let Some(ipv6) = self.reservations.get_by_right(&ipv4) { if let Some(ipv6) = self.reservations.get_by_right(&ipv4) {
@ -186,7 +195,7 @@ impl Nat64Table {
impl Nat64Table { impl Nat64Table {
/// Prune old reservations /// Prune old reservations
pub fn prune(&mut self) { fn prune(&mut self) {
let now = Instant::now(); let now = Instant::now();
// Prune from the reservation map // Prune from the reservation map
@ -211,6 +220,26 @@ impl Nat64Table {
self.reservations.contains_left(v6) && self.reservations.contains_right(v4) self.reservations.contains_left(v6) && self.reservations.contains_right(v4)
}); });
} }
fn track_utilization(&self) {
// Count static and dynamic in a single pass
let (total_dynamic_reservations, total_static_reservations) = self
.reservation_times
.iter()
.map(|((_v6, _v4), time)| match time {
Some(_) => (1, 0),
None => (0, 1),
})
.fold((0, 0), |(a1, a2), (b1, b2)| (a1 + b1, a2 + b2));
// Track the values
IPV4_POOL_RESERVED
.with_label_values(&["dynamic"])
.set(total_dynamic_reservations as i64);
IPV4_POOL_RESERVED
.with_label_values(&["static"])
.set(total_static_reservations as i64);
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -2,9 +2,12 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use pnet_packet::{icmp::IcmpTypes, icmpv6::Icmpv6Types}; use pnet_packet::{icmp::IcmpTypes, icmpv6::Icmpv6Types};
use crate::packet::{ use crate::{
error::PacketError, metrics::ICMP_COUNTER,
protocols::{icmp::IcmpPacket, icmpv6::Icmpv6Packet, raw::RawBytes}, packet::{
error::PacketError,
protocols::{icmp::IcmpPacket, icmpv6::Icmpv6Packet, raw::RawBytes},
},
}; };
use super::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4}; use super::ip::{translate_ipv4_to_ipv6, translate_ipv6_to_ipv4};
@ -17,6 +20,14 @@ pub fn translate_icmp_to_icmpv6(
new_source: Ipv6Addr, new_source: Ipv6Addr,
new_destination: Ipv6Addr, new_destination: Ipv6Addr,
) -> Result<Icmpv6Packet<RawBytes>, PacketError> { ) -> Result<Icmpv6Packet<RawBytes>, PacketError> {
ICMP_COUNTER
.with_label_values(&[
"icmp",
&input.icmp_type.0.to_string(),
&input.icmp_code.0.to_string(),
])
.inc();
// Translate the type and code // Translate the type and code
let (icmpv6_type, icmpv6_code) = let (icmpv6_type, icmpv6_code) =
type_code::translate_type_and_code_4_to_6(input.icmp_type, input.icmp_code)?; type_code::translate_type_and_code_4_to_6(input.icmp_type, input.icmp_code)?;
@ -60,6 +71,14 @@ pub fn translate_icmpv6_to_icmp(
new_source: Ipv4Addr, new_source: Ipv4Addr,
new_destination: Ipv4Addr, new_destination: Ipv4Addr,
) -> Result<IcmpPacket<RawBytes>, PacketError> { ) -> Result<IcmpPacket<RawBytes>, PacketError> {
ICMP_COUNTER
.with_label_values(&[
"icmpv6",
&input.icmp_type.0.to_string(),
&input.icmp_code.0.to_string(),
])
.inc();
// Translate the type and code // Translate the type and code
let (icmp_type, icmp_code) = let (icmp_type, icmp_code) =
type_code::translate_type_and_code_6_to_4(input.icmp_type, input.icmp_code)?; type_code::translate_type_and_code_6_to_4(input.icmp_type, input.icmp_code)?;
@ -83,7 +102,7 @@ pub fn translate_icmpv6_to_icmp(
buffer.extend_from_slice(&inner_payload); buffer.extend_from_slice(&inner_payload);
buffer buffer
}) })
}, }
_ => input.payload, _ => input.payload,
}; };