use tokio::{ net::TcpListener, sync::watch, task::JoinHandle, }; use crate::prelude::*; pub struct ForwardingInstance { task: JoinHandle >, shutdown_flag: watch::Sender , local_port: u16, } impl ForwardingInstance { pub async fn new ( connection_p2_p3: quinn::Connection, params: ForwardingParams, ) -> anyhow::Result { let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true); let listener = TcpListener::bind (("127.0.0.1", params.client_tcp_port)).await?; let local_port = listener.local_addr ()?.port (); trace! ("Accepting local TCP connections from P1 on {}", local_port); let task = tokio::spawn (forward_port ( listener, connection_p2_p3, params, shutdown_flag_rx )); Ok (Self { task, shutdown_flag, local_port, }) } pub async fn close (self) -> anyhow::Result <()> { if self.shutdown_flag.send (false).is_err () { warn! ("Trying to gracefully shutdown forwarding task but it appears to already be shut down"); } self.task.await .context ("awaiting ForwardingInstance task")? .context ("inside ForwardingInstance task")?; Ok (()) } pub fn local_port (&self) -> u16 { self.local_port } } pub struct ForwardingParams { pub client_tcp_port: u16, pub server_id: String, pub server_tcp_port: u16, } /// Starts a TCP listener that can forward any number of TCP streams to /// the same client:server port combination pub async fn forward_port ( listener: TcpListener, connection_p2_p3: quinn::Connection, params: ForwardingParams, mut shutdown_flag_rx: tokio::sync::watch::Receiver , ) -> anyhow::Result <()> { let ForwardingParams { server_id, server_tcp_port, .. } = params; while *shutdown_flag_rx.borrow () { tokio::select! { x = listener.accept () => { let (tcp_socket, _) = x?; let connection = connection_p2_p3.clone (); let server_id = server_id.clone (); let shutdown_flag_rx = shutdown_flag_rx.clone (); tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx)); }, _ = shutdown_flag_rx.changed () => (), }; } Ok::<_, anyhow::Error> (()) } async fn handle_p1 ( connection: quinn::Connection, server_id: String, server_tcp_port: u16, tcp_socket: tokio::net::TcpStream, shutdown_flag_rx: tokio::sync::watch::Receiver , ) -> anyhow::Result <()> { let (mut local_recv, mut local_send) = tcp_socket.into_split (); debug! ("Starting PTTH connection"); let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; trace! ("Relaying bytes..."); let task_blue = { let mut shutdown_flag_rx = shutdown_flag_rx.clone (); tokio::spawn (async move { let mut buf = vec! [0u8; 65_536]; while *shutdown_flag_rx.borrow () { trace! ("Blue reading from QUIC..."); tokio::select! { x = relay_recv.read (&mut buf) => { let bytes_read = match x? { None => break, Some (0) => break, Some (x) => x, }; let buf_slice = &buf [0..bytes_read]; trace! ("Uplink relaying {} bytes", bytes_read); local_send.write_all (buf_slice).await?; }, _ = shutdown_flag_rx.changed () => (), }; } debug! ("Blue QUIC --> TCP closed"); Ok::<_, anyhow::Error> (()) }) }; let task_green = { let mut shutdown_flag_rx = shutdown_flag_rx.clone (); tokio::spawn (async move { let mut buf = vec! [0u8; 65_536]; while *shutdown_flag_rx.borrow () { trace! ("Green reading from TCP..."); tokio::select! { x = local_recv.read (&mut buf) => { let bytes_read = match x? { 0 => break, x => x, }; let buf_slice = &buf [0..bytes_read]; trace! ("Downlink relaying {} bytes", bytes_read); relay_send.write_all (buf_slice).await?; }, _ = shutdown_flag_rx.changed () => (), }; } debug! ("Green TCP --> QUIC closed"); Ok::<_, anyhow::Error> (()) }) }; task_blue.await??; task_green.await??; debug! ("Ended PTTH connection"); Ok (()) }