From 10d7bf520feaeccd46c176db6bacacf66ee57456 Mon Sep 17 00:00:00 2001 From: Evan Pratten Date: Tue, 18 Jul 2023 16:57:40 -0400 Subject: [PATCH] Fix some TCP bugs --- protomask-tun/src/tun.rs | 54 +++++++++++++++++---- src/nat/mod.rs | 97 +++++++++++++++++++++---------------- src/packet/protocols/tcp.rs | 19 ++++---- 3 files changed, 110 insertions(+), 60 deletions(-) diff --git a/protomask-tun/src/tun.rs b/protomask-tun/src/tun.rs index 99cfb6c..79511b5 100644 --- a/protomask-tun/src/tun.rs +++ b/protomask-tun/src/tun.rs @@ -31,7 +31,11 @@ impl TunDevice { /// it will be replaced with a unique number. pub async fn new(name: &str) -> Result { // Bring up an rtnetlink connection - let (rt_connection, rt_handle, _) = rtnetlink::new_connection()?; + let (rt_connection, rt_handle, _) = rtnetlink::new_connection().map_err(|err| { + log::error!("Failed to open rtnetlink connection"); + log::error!("{}", err); + err + })?; tokio::spawn(rt_connection); // Create the TUN device @@ -55,7 +59,12 @@ impl TunDevice { .set(tun_link.header.index) .up() .execute() - .await?; + .await + .map_err(|err| { + log::error!("Failed to bring up link"); + log::error!("{}", err); + err + })?; log::debug!("Brought {} up", tun_device.name()); // Read the link MTU @@ -81,7 +90,12 @@ impl TunDevice { .address() .add(self.link_index, ip_address, prefix_len) .execute() - .await?; + .await + .map_err(|err| { + log::error!("Failed to add address {} to link", ip_address); + log::error!("{}", err); + err + })?; Ok(()) } @@ -98,14 +112,24 @@ impl TunDevice { .set_prefix_length_filter(prefix_len) .execute() .try_next() - .await? + .await + .map_err(|err| { + log::error!("Failed to find address {} on link", ip_address); + log::error!("{}", err); + err + })? { // Delete the address self.rt_handle .address() .del(address_message) .execute() - .await?; + .await + .map_err(|err| { + log::error!("Failed to remove address {} from link", ip_address); + log::error!("{}", err); + err + })?; } Ok(()) @@ -119,18 +143,30 @@ impl TunDevice { .route() .add() .v4() + .output_interface(self.link_index) .destination_prefix(destination.addr(), destination.prefix_len()) .execute() - .await?; + .await + .map_err(|err| { + log::error!("Failed to add route {} to link", destination); + log::error!("{}", err); + err + })?; } IpNet::V6(destination) => { self.rt_handle .route() .add() .v6() + .output_interface(self.link_index) .destination_prefix(destination.addr(), destination.prefix_len()) .execute() - .await?; + .await + .map_err(|err| { + log::error!("Failed to add route {} to link", destination); + log::error!("{}", err); + err + })?; } } @@ -140,10 +176,10 @@ impl TunDevice { /// Spawns worker threads, and returns a tx/rx pair for the caller to interact with them pub async fn spawn_worker(&self) -> (mpsc::Sender>, broadcast::Receiver>) { // Create a channel for packets to be sent to the caller - let (tx_to_caller, rx_from_worker) = broadcast::channel(32); + let (tx_to_caller, rx_from_worker) = broadcast::channel(65535); // Create a channel for packets being received from the caller - let (tx_to_worker, mut rx_from_caller) = mpsc::channel(32); + let (tx_to_worker, mut rx_from_caller) = mpsc::channel(65535); // Clone some values for use in worker threads let mtu = self.mtu; diff --git a/src/nat/mod.rs b/src/nat/mod.rs index f0cc8a6..aa1ff8f 100644 --- a/src/nat/mod.rs +++ b/src/nat/mod.rs @@ -82,52 +82,67 @@ impl Nat64 { // Process packets in a loop loop { // Try to read a packet - let packet = rx.recv().await?; + match rx.recv().await { + Ok(packet) => { + // Clone the TX so the worker can respond with data + let tx = tx.clone(); - // Clone the TX so the worker can respond with data - let tx = tx.clone(); + // Separate logic is needed for handling IPv4 vs IPv6 packets, so a check must be done here + match packet[0] >> 4 { + 4 => { + // Parse the packet + let packet: Ipv4Packet> = packet.try_into()?; - // Separate logic is needed for handling IPv4 vs IPv6 packets, so a check must be done here - match packet[0] >> 4 { - 4 => { - // Parse the packet - let packet: Ipv4Packet> = packet.try_into()?; + // Drop packets that aren't destined for a destination the table knows about + if !self.table.contains(&IpAddr::V4(packet.destination_address)) { + continue; + } - // Drop packets that aren't destined for a destination the table knows about - if !self.table.contains(&IpAddr::V4(packet.destination_address)) { - continue; + // Get the new source and dest addresses + let new_source = + embed_address(packet.source_address, self.ipv6_nat_prefix); + let new_destination = + self.table.get_reverse(packet.destination_address)?; + + // Spawn a task to process the packet + tokio::spawn(async move { + let output = + translate_ipv4_to_ipv6(packet, new_source, new_destination) + .unwrap(); + tx.send(output.into()).await.unwrap(); + }); + } + 6 => { + // Parse the packet + let packet: Ipv6Packet> = packet.try_into()?; + + // Get the new source and dest addresses + let new_source = + self.table.get_or_assign_ipv4(packet.source_address)?; + let new_destination = extract_address(packet.destination_address); + + // Spawn a task to process the packet + tokio::spawn(async move { + let output = + translate_ipv6_to_ipv4(packet, new_source, new_destination) + .unwrap(); + tx.send(output.into()).await.unwrap(); + }); + } + n => { + log::warn!("Unknown IP version: {}", n); + } } - - // Get the new source and dest addresses - let new_source = embed_address(packet.source_address, self.ipv6_nat_prefix); - let new_destination = self.table.get_reverse(packet.destination_address)?; - - // Spawn a task to process the packet - tokio::spawn(async move { - let output = - translate_ipv4_to_ipv6(packet, new_source, new_destination).unwrap(); - tx.send(output.into()).await.unwrap(); - }); + Ok(()) } - 6 => { - // Parse the packet - let packet: Ipv6Packet> = packet.try_into()?; - - // Get the new source and dest addresses - let new_source = self.table.get_or_assign_ipv4(packet.source_address)?; - let new_destination = extract_address(packet.destination_address); - - // Spawn a task to process the packet - tokio::spawn(async move { - let output = - translate_ipv6_to_ipv4(packet, new_source, new_destination).unwrap(); - tx.send(output.into()).await.unwrap(); - }); - } - n => { - log::warn!("Unknown IP version: {}", n); - } - } + Err(error) => match error { + broadcast::error::RecvError::Lagged(count) => { + log::warn!("Translator running behind! Dropping {} packets", count); + Ok(()) + } + error => Err(error), + }, + }?; } } } diff --git a/src/packet/protocols/tcp.rs b/src/packet/protocols/tcp.rs index f74f323..1413c20 100644 --- a/src/packet/protocols/tcp.rs +++ b/src/packet/protocols/tcp.rs @@ -100,12 +100,11 @@ impl TcpPacket { } /// Get the length of the options in words - fn options_length_words(&self) -> u8 { + fn options_length(&self) -> usize { self.options .iter() - .map(|option| TcpOptionPacket::packet_size(option) as u8) - .sum::() - / 4 + .map(|option| TcpOptionPacket::packet_size(option)) + .sum::() } } @@ -182,18 +181,18 @@ impl TcpPacket { impl Into> for TcpPacket where - T: Into> , + T: Into>, { fn into(self) -> Vec { // Get the options length in words - let options_length_words = self.options_length_words(); + let options_length = self.options_length(); // Convert the payload into raw bytes let payload: Vec = self.payload.into(); // Allocate a mutable packet to write into let total_length = pnet_packet::tcp::MutableTcpPacket::minimum_packet_size() - + (options_length_words as usize * 4) + + options_length + payload.len(); let mut output = pnet_packet::tcp::MutableTcpPacket::owned(vec![0u8; total_length]).unwrap(); @@ -206,12 +205,12 @@ where output.set_sequence(self.sequence); output.set_acknowledgement(self.ack_number); + // Write the offset + output.set_data_offset(5 + (options_length / 4) as u8); + // Write the options output.set_options(&self.options); - // Write the offset - output.set_data_offset(5 + options_length_words); - // Write the flags output.set_flags(self.flags.into());