use hyper::{ Body, Request, Response, Server, service::{ make_service_fn, service_fn, }, StatusCode, }; use structopt::StructOpt; use tokio::{ net::UdpSocket, sync::watch, }; use quic_demo::prelude::*; use protocol::PeerId; #[derive (Debug, StructOpt)] struct Opt { #[structopt (long)] listen_addr: Option , } #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); let opt = Opt::from_args (); let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?; println! ("Base64 cert: {}", base64::encode (&server_cert)); tokio::fs::create_dir_all ("ptth_quic_output").await?; tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; let relay_state = Arc::new (RelayState::default ()); let make_svc = { let relay_state = Arc::clone (&relay_state); make_service_fn (move |_conn| { let relay_state = Arc::clone (&relay_state); async move { Ok::<_, String> (service_fn (move |req| { let relay_state = Arc::clone (&relay_state); handle_http (req, relay_state) })) } }) }; let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); let http_server = Server::bind (&http_addr); let tcp_port = 30382; let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; let (_running_tx, running_rx) = watch::channel (true); let task_quic_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { while let Some (conn) = incoming.next ().await { let relay_state = Arc::clone (&relay_state); // Each new peer QUIC connection gets its own task tokio::spawn (async move { let active = relay_state.stats.quic.connect (); debug! ("QUIC connections: {}", active); match handle_quic_connection (Arc::clone (&relay_state), conn).await { Ok (_) => (), Err (e) => warn! ("handle_quic_connection {:?}", e), } let active = relay_state.stats.quic.disconnect (); debug! ("QUIC connections: {}", active); }); } Ok::<_, anyhow::Error> (()) }) }; let task_direc_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { let sock = UdpSocket::bind("0.0.0.0:30379").await?; let mut buf = [0; 2048]; loop { let (len, addr) = sock.recv_from (&mut buf).await?; debug! ("{:?} bytes received from {:?}", len, addr); let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); { let mut direc_cookies = relay_state.direc_cookies.lock ().await; if let Some (direc_state) = direc_cookies.remove (&packet) { debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); direc_state.p2_addr.send (addr).ok (); } else { debug! ("UDP packet didn't match any PTTH_DIREC cookie"); } } } Ok::<_, anyhow::Error> (()) }) }; let task_http_server = tokio::spawn (async move { http_server.serve (make_svc).await?; Ok::<_, anyhow::Error> (()) }); let task_tcp_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { while *running_rx.borrow () { let (tcp_socket, _) = tcp_listener.accept ().await?; let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { let (_client_recv, _client_send) = tcp_socket.into_split (); debug! ("Accepted direct TCP connection P1 --> P3"); let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; let _p4 = match p4_server_proxies.get ("bogus_server") { Some (x) => x, None => bail! ("That server isn't connected"), }; // unimplemented! (); /* p4.req_channel.send (RequestP2ToP4 { client_send, client_recv, client_id: "bogus_client".to_string (), }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; */ Ok (()) }); } Ok::<_, anyhow::Error> (()) }) }; debug! ("Serving HTTP on {:?}", http_addr); task_quic_server.await??; task_http_server.await??; task_tcp_server.await??; task_direc_server.await??; Ok (()) } async fn handle_http (_req: Request , relay_state: Arc ) -> anyhow::Result > { let debug_string; { let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; debug_string = format! ("{:#?}\n", p4_server_proxies.keys ().collect::> ()); } let resp = Response::builder () .status (StatusCode::OK) .header ("content-type", "text/plain") .body (Body::from (debug_string))?; Ok (resp) } #[derive (Default)] struct RelayState { p4_server_proxies: Mutex >, direc_cookies: Mutex , DirecState>>, stats: Stats, } struct DirecState { start_time: Instant, p2_id: PeerId, p2_addr: tokio::sync::oneshot::Sender , } #[derive (Default)] struct Stats { quic: ConnectEvents, } #[derive (Default)] struct ConnectEvents { connects: AtomicU64, disconnects: AtomicU64, } impl ConnectEvents { fn connect (&self) -> u64 { let connects = self.connects.fetch_add (1, Ordering::Relaxed) + 1; let disconnects = self.disconnects.load (Ordering::Relaxed); connects - disconnects } fn disconnect (&self) -> u64 { let disconnects = self.disconnects.fetch_add (1, Ordering::Relaxed) + 1; let connects = self.connects.load (Ordering::Relaxed); connects - disconnects } fn _active (&self) -> u64 { let connects = self.connects.load (Ordering::Relaxed); let disconnects = self.disconnects.load (Ordering::Relaxed); connects - disconnects } } struct P4State { req_channel: mpsc::Sender , } impl RelayState { } struct RequestP2ToP4 { client_send: quinn::SendStream, client_recv: quinn::RecvStream, client_id: String, } struct PtthNewConnection { client_send: quinn::SendStream, client_recv: quinn::RecvStream, server_send: quinn::SendStream, server_recv: quinn::RecvStream, } struct PtthConnection { uplink_task: JoinHandle >, downlink_task: JoinHandle >, } impl PtthNewConnection { fn build (self) -> PtthConnection { let Self { mut client_send, mut client_recv, mut server_send, mut server_recv, } = self; let uplink_task = tokio::spawn (async move { // Uplink - Client to end server let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = client_recv.read (&mut buf).await? { if bytes_read == 0 { break; } let buf_slice = &buf [0..bytes_read]; trace! ("Uplink relaying {} bytes", bytes_read); server_send.write_all (buf_slice).await?; } trace! ("Uplink closed"); Ok::<_, anyhow::Error> (()) }); let downlink_task = tokio::spawn (async move { // Downlink - End server to client let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = server_recv.read (&mut buf).await? { let buf_slice = &buf [0..bytes_read]; trace! ("Downlink relaying {} bytes", bytes_read); client_send.write_all (buf_slice).await?; } trace! ("Downlink closed"); Ok::<_, anyhow::Error> (()) }); PtthConnection { uplink_task, downlink_task, } } } async fn handle_quic_connection ( relay_state: Arc , conn: quinn::Connecting, ) -> anyhow::Result <()> { let mut conn = conn.await?; // Everyone who connects must identify themselves with the first // bi stream // TODO: Timeout let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??; let peer = protocol::p3_accept_peer (&mut recv).await?; match peer { protocol::P3Peer::P2ClientProxy (peer) => { // TODO: Check authorization for P2 peers protocol::p3_authorize_p2_peer (&mut send).await?; handle_p2_connection (relay_state, conn, peer).await?; }, protocol::P3Peer::P4ServerProxy (peer) => { // TODO: Check authorization for P2 peers protocol::p3_authorize_p4_peer (&mut send).await?; handle_p4_connection (relay_state, conn, peer).await?; }, } Ok::<_, anyhow::Error> (()) } async fn handle_p2_connection ( relay_state: Arc , conn: quinn::NewConnection, peer: protocol::P2ClientProxy, ) -> anyhow::Result <()> { let client_id = peer.id; let quinn::NewConnection { mut bi_streams, .. } = conn; while let Some (bi_stream) = bi_streams.next ().await { let (send, mut recv) = bi_stream?; let relay_state = Arc::clone (&relay_state); let client_id = client_id.clone (); tokio::spawn (async move { debug! ("Request started for P2"); match protocol::p3_accept_p2_stream (&mut recv).await? { protocol::P2ToP3Stream::ConnectP2ToP4 { server_id, } => { handle_request_p2_to_p4 ( relay_state, client_id, server_id, send, recv ).await? }, protocol::P2ToP3Stream::DirecP2ToP4 { server_id, cookie, } => { handle_direc_p2_to_p4 ( relay_state, client_id, server_id, cookie, send, recv ).await? }, } debug! ("Request ended for P2"); Ok::<_, anyhow::Error> (()) }); } debug! ("P2 {} disconnected", client_id); Ok (()) } async fn handle_request_p2_to_p4 ( relay_state: Arc , client_id: String, server_id: PeerId, mut client_send: quinn::SendStream, client_recv: quinn::RecvStream, ) -> anyhow::Result <()> { trace! ("P2 {} wants to connect to P4 {}", client_id, server_id); // TODO: Check authorization for P2 to connect to P4 protocol::p3_authorize_p2_to_p4_connection (&mut client_send).await?; { let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; match p4_server_proxies.get (&server_id) { Some (p4_state) => { p4_state.req_channel.send (RequestP2ToP4 { client_send, client_recv, client_id, }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; }, None => warn! ("That server isn't connected"), } } Ok (()) } async fn handle_direc_p2_to_p4 ( relay_state: Arc , client_id: String, server_id: PeerId, cookie: Vec , mut client_send: quinn::SendStream, client_recv: quinn::RecvStream, ) -> anyhow::Result <()> { debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id); // TODO: Check authorization protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?; let (tx, rx) = tokio::sync::oneshot::channel (); { let mut direc_cookies = relay_state.direc_cookies.lock ().await; direc_cookies.insert (cookie, DirecState { start_time: Instant::now (), p2_id: client_id.clone (), p2_addr: tx, }); } debug! ("Waiting to learn P2's WAN address..."); let wan_addr = rx.await?; debug! ("And that WAN address is {}", wan_addr); Ok (()) } async fn handle_p4_connection ( relay_state: Arc , conn: quinn::NewConnection, peer: protocol::P4ServerProxy, ) -> anyhow::Result <()> { let server_id = peer.id; let quinn::NewConnection { connection, .. } = conn; let (tx, mut rx) = mpsc::channel (2); let p4_state = P4State { req_channel: tx, }; { let mut p4_server_proxies = relay_state.p4_server_proxies.lock ().await; p4_server_proxies.insert (server_id.clone (), p4_state); } while let Some (req) = rx.recv ().await { let connection = connection.clone (); let server_id = server_id.clone (); tokio::spawn (async move { let RequestP2ToP4 { client_send, client_recv, client_id, } = req; debug! ("P4 {} got a request from P2 {}", server_id, client_id); let (server_send, server_recv) = protocol::p3_connect_p2_to_p4 (&connection, &client_id).await?; trace! ("Relaying bytes..."); let ptth_conn = PtthNewConnection { client_send, client_recv, server_send, server_recv, }.build (); ptth_conn.uplink_task.await??; ptth_conn.downlink_task.await??; debug! ("Request ended for P4"); Ok::<_, anyhow::Error> (()) }); } debug! ("P4 {} disconnected", server_id); Ok (()) }