1

Fix some TCP bugs

This commit is contained in:
Evan Pratten 2023-07-18 16:57:40 -04:00
parent deaa9e87c5
commit 10d7bf520f
3 changed files with 110 additions and 60 deletions

View File

@ -31,7 +31,11 @@ impl TunDevice {
/// it will be replaced with a unique number.
pub async fn new(name: &str) -> Result<Self> {
// Bring up an rtnetlink connection
let (rt_connection, rt_handle, _) = rtnetlink::new_connection()?;
let (rt_connection, rt_handle, _) = rtnetlink::new_connection().map_err(|err| {
log::error!("Failed to open rtnetlink connection");
log::error!("{}", err);
err
})?;
tokio::spawn(rt_connection);
// Create the TUN device
@ -55,7 +59,12 @@ impl TunDevice {
.set(tun_link.header.index)
.up()
.execute()
.await?;
.await
.map_err(|err| {
log::error!("Failed to bring up link");
log::error!("{}", err);
err
})?;
log::debug!("Brought {} up", tun_device.name());
// Read the link MTU
@ -81,7 +90,12 @@ impl TunDevice {
.address()
.add(self.link_index, ip_address, prefix_len)
.execute()
.await?;
.await
.map_err(|err| {
log::error!("Failed to add address {} to link", ip_address);
log::error!("{}", err);
err
})?;
Ok(())
}
@ -98,14 +112,24 @@ impl TunDevice {
.set_prefix_length_filter(prefix_len)
.execute()
.try_next()
.await?
.await
.map_err(|err| {
log::error!("Failed to find address {} on link", ip_address);
log::error!("{}", err);
err
})?
{
// Delete the address
self.rt_handle
.address()
.del(address_message)
.execute()
.await?;
.await
.map_err(|err| {
log::error!("Failed to remove address {} from link", ip_address);
log::error!("{}", err);
err
})?;
}
Ok(())
@ -119,18 +143,30 @@ impl TunDevice {
.route()
.add()
.v4()
.output_interface(self.link_index)
.destination_prefix(destination.addr(), destination.prefix_len())
.execute()
.await?;
.await
.map_err(|err| {
log::error!("Failed to add route {} to link", destination);
log::error!("{}", err);
err
})?;
}
IpNet::V6(destination) => {
self.rt_handle
.route()
.add()
.v6()
.output_interface(self.link_index)
.destination_prefix(destination.addr(), destination.prefix_len())
.execute()
.await?;
.await
.map_err(|err| {
log::error!("Failed to add route {} to link", destination);
log::error!("{}", err);
err
})?;
}
}
@ -140,10 +176,10 @@ impl TunDevice {
/// Spawns worker threads, and returns a tx/rx pair for the caller to interact with them
pub async fn spawn_worker(&self) -> (mpsc::Sender<Vec<u8>>, broadcast::Receiver<Vec<u8>>) {
// Create a channel for packets to be sent to the caller
let (tx_to_caller, rx_from_worker) = broadcast::channel(32);
let (tx_to_caller, rx_from_worker) = broadcast::channel(65535);
// Create a channel for packets being received from the caller
let (tx_to_worker, mut rx_from_caller) = mpsc::channel(32);
let (tx_to_worker, mut rx_from_caller) = mpsc::channel(65535);
// Clone some values for use in worker threads
let mtu = self.mtu;

View File

@ -82,52 +82,67 @@ impl Nat64 {
// Process packets in a loop
loop {
// Try to read a packet
let packet = rx.recv().await?;
match rx.recv().await {
Ok(packet) => {
// Clone the TX so the worker can respond with data
let tx = tx.clone();
// Clone the TX so the worker can respond with data
let tx = tx.clone();
// Separate logic is needed for handling IPv4 vs IPv6 packets, so a check must be done here
match packet[0] >> 4 {
4 => {
// Parse the packet
let packet: Ipv4Packet<Vec<u8>> = packet.try_into()?;
// Separate logic is needed for handling IPv4 vs IPv6 packets, so a check must be done here
match packet[0] >> 4 {
4 => {
// Parse the packet
let packet: Ipv4Packet<Vec<u8>> = packet.try_into()?;
// Drop packets that aren't destined for a destination the table knows about
if !self.table.contains(&IpAddr::V4(packet.destination_address)) {
continue;
}
// Drop packets that aren't destined for a destination the table knows about
if !self.table.contains(&IpAddr::V4(packet.destination_address)) {
continue;
// Get the new source and dest addresses
let new_source =
embed_address(packet.source_address, self.ipv6_nat_prefix);
let new_destination =
self.table.get_reverse(packet.destination_address)?;
// Spawn a task to process the packet
tokio::spawn(async move {
let output =
translate_ipv4_to_ipv6(packet, new_source, new_destination)
.unwrap();
tx.send(output.into()).await.unwrap();
});
}
6 => {
// Parse the packet
let packet: Ipv6Packet<Vec<u8>> = packet.try_into()?;
// Get the new source and dest addresses
let new_source =
self.table.get_or_assign_ipv4(packet.source_address)?;
let new_destination = extract_address(packet.destination_address);
// Spawn a task to process the packet
tokio::spawn(async move {
let output =
translate_ipv6_to_ipv4(packet, new_source, new_destination)
.unwrap();
tx.send(output.into()).await.unwrap();
});
}
n => {
log::warn!("Unknown IP version: {}", n);
}
}
// Get the new source and dest addresses
let new_source = embed_address(packet.source_address, self.ipv6_nat_prefix);
let new_destination = self.table.get_reverse(packet.destination_address)?;
// Spawn a task to process the packet
tokio::spawn(async move {
let output =
translate_ipv4_to_ipv6(packet, new_source, new_destination).unwrap();
tx.send(output.into()).await.unwrap();
});
Ok(())
}
6 => {
// Parse the packet
let packet: Ipv6Packet<Vec<u8>> = packet.try_into()?;
// Get the new source and dest addresses
let new_source = self.table.get_or_assign_ipv4(packet.source_address)?;
let new_destination = extract_address(packet.destination_address);
// Spawn a task to process the packet
tokio::spawn(async move {
let output =
translate_ipv6_to_ipv4(packet, new_source, new_destination).unwrap();
tx.send(output.into()).await.unwrap();
});
}
n => {
log::warn!("Unknown IP version: {}", n);
}
}
Err(error) => match error {
broadcast::error::RecvError::Lagged(count) => {
log::warn!("Translator running behind! Dropping {} packets", count);
Ok(())
}
error => Err(error),
},
}?;
}
}
}

View File

@ -100,12 +100,11 @@ impl<T> TcpPacket<T> {
}
/// Get the length of the options in words
fn options_length_words(&self) -> u8 {
fn options_length(&self) -> usize {
self.options
.iter()
.map(|option| TcpOptionPacket::packet_size(option) as u8)
.sum::<u8>()
/ 4
.map(|option| TcpOptionPacket::packet_size(option))
.sum::<usize>()
}
}
@ -182,18 +181,18 @@ impl TcpPacket<RawBytes> {
impl<T> Into<Vec<u8>> for TcpPacket<T>
where
T: Into<Vec<u8>> ,
T: Into<Vec<u8>>,
{
fn into(self) -> Vec<u8> {
// Get the options length in words
let options_length_words = self.options_length_words();
let options_length = self.options_length();
// Convert the payload into raw bytes
let payload: Vec<u8> = self.payload.into();
// Allocate a mutable packet to write into
let total_length = pnet_packet::tcp::MutableTcpPacket::minimum_packet_size()
+ (options_length_words as usize * 4)
+ options_length
+ payload.len();
let mut output =
pnet_packet::tcp::MutableTcpPacket::owned(vec![0u8; total_length]).unwrap();
@ -206,12 +205,12 @@ where
output.set_sequence(self.sequence);
output.set_acknowledgement(self.ack_number);
// Write the offset
output.set_data_offset(5 + (options_length / 4) as u8);
// Write the options
output.set_options(&self.options);
// Write the offset
output.set_data_offset(5 + options_length_words);
// Write the flags
output.set_flags(self.flags.into());