use hyper::{ Body, Request, Response, Server, service::{ make_service_fn, service_fn, }, StatusCode, }; use structopt::StructOpt; use quic_demo::prelude::*; use protocol::PeerId; #[derive (Debug, StructOpt)] struct Opt { #[structopt (long)] listen_addr: Option , } #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); let opt = Opt::from_args (); let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?; println! ("Base64 cert: {}", base64::encode (&server_cert)); tokio::fs::create_dir_all ("ptth_quic_output").await?; tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; let relay_state = Arc::new (RelayState::default ()); let make_svc = { let relay_state = Arc::clone (&relay_state); make_service_fn (move |_conn| { let relay_state = Arc::clone (&relay_state); async move { Ok::<_, String> (service_fn (move |req| { let relay_state = Arc::clone (&relay_state); handle_http (req, relay_state) })) } }) }; let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); let http_server = Server::bind (&http_addr); let tcp_port = 30382; let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; let task_quic_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { while let Some (conn) = incoming.next ().await { let relay_state = Arc::clone (&relay_state); // Each new peer QUIC connection gets its own task tokio::spawn (async move { let active = relay_state.stats.quic.connect (); debug! ("QUIC connections: {}", active); match handle_quic_connection (Arc::clone (&relay_state), conn).await { Ok (_) => (), Err (e) => warn! ("handle_quic_connection {:?}", e), } let active = relay_state.stats.quic.disconnect (); debug! ("QUIC connections: {}", active); }); } Ok::<_, anyhow::Error> (()) }) }; let task_http_server = tokio::spawn (async move { http_server.serve (make_svc).await?; Ok::<_, anyhow::Error> (()) }); let task_tcp_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { loop { let (tcp_socket, _) = tcp_listener.accept ().await?; let server_id = "bogus_server".to_string (); let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { let (client_recv, client_send) = tcp_socket.into_split (); debug! ("Accepted direct TCP connection P1 --> P3"); let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; let p4 = match p4_server_proxies.get ("bogus_server") { Some (x) => x, None => bail! ("That server isn't connected"), }; unimplemented! (); /* p4.req_channel.send (RequestP2ToP4 { client_send, client_recv, client_id: "bogus_client".to_string (), }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; */ Ok (()) }); } Ok::<_, anyhow::Error> (()) }) }; debug! ("Serving HTTP on {:?}", http_addr); task_quic_server.await??; task_http_server.await??; task_tcp_server.await??; Ok (()) } async fn handle_http (_req: Request , relay_state: Arc ) -> anyhow::Result > { let debug_string; { let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; debug_string = format! ("{:#?}\n", p4_server_proxies.keys ().collect::> ()); } let resp = Response::builder () .status (StatusCode::OK) .header ("content-type", "text/plain") .body (Body::from (debug_string))?; Ok (resp) } #[derive (Default)] struct RelayState { p4_server_proxies: Mutex >, stats: Stats, } #[derive (Default)] struct Stats { quic: ConnectEvents, } #[derive (Default)] struct ConnectEvents { connects: AtomicU64, disconnects: AtomicU64, } impl ConnectEvents { fn connect (&self) -> u64 { let connects = self.connects.fetch_add (1, Ordering::Relaxed) + 1; let disconnects = self.disconnects.load (Ordering::Relaxed); connects - disconnects } fn disconnect (&self) -> u64 { let disconnects = self.disconnects.fetch_add (1, Ordering::Relaxed) + 1; let connects = self.connects.load (Ordering::Relaxed); connects - disconnects } fn _active (&self) -> u64 { let connects = self.connects.load (Ordering::Relaxed); let disconnects = self.disconnects.load (Ordering::Relaxed); connects - disconnects } } struct P4State { req_channel: mpsc::Sender , } impl RelayState { } struct RequestP2ToP4 { client_send: quinn::SendStream, client_recv: quinn::RecvStream, client_id: String, } struct PtthNewConnection { client_send: quinn::SendStream, client_recv: quinn::RecvStream, server_send: quinn::SendStream, server_recv: quinn::RecvStream, } struct PtthConnection { uplink_task: JoinHandle >, downlink_task: JoinHandle >, } impl PtthNewConnection { fn build (self) -> PtthConnection { let Self { mut client_send, mut client_recv, mut server_send, mut server_recv, } = self; let uplink_task = tokio::spawn (async move { // Uplink - Client to end server let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = client_recv.read (&mut buf).await? { if bytes_read == 0 { break; } let buf_slice = &buf [0..bytes_read]; trace! ("Uplink relaying {} bytes", bytes_read); server_send.write_all (buf_slice).await?; } trace! ("Uplink closed"); Ok::<_, anyhow::Error> (()) }); let downlink_task = tokio::spawn (async move { // Downlink - End server to client let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = server_recv.read (&mut buf).await? { let buf_slice = &buf [0..bytes_read]; trace! ("Downlink relaying {} bytes", bytes_read); client_send.write_all (buf_slice).await?; } trace! ("Downlink closed"); Ok::<_, anyhow::Error> (()) }); PtthConnection { uplink_task, downlink_task, } } } async fn handle_quic_connection ( relay_state: Arc , conn: quinn::Connecting, ) -> anyhow::Result <()> { let mut conn = conn.await?; // Everyone who connects must identify themselves with the first // bi stream // TODO: Timeout let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??; let peer = protocol::p3_accept_peer (&mut recv).await?; match peer { protocol::P3Peer::P2ClientProxy (peer) => { // TODO: Check authorization for P2 peers protocol::p3_authorize_p2_peer (&mut send).await?; handle_p2_connection (relay_state, conn, peer).await?; }, protocol::P3Peer::P4ServerProxy (peer) => { // TODO: Check authorization for P2 peers protocol::p3_authorize_p4_peer (&mut send).await?; handle_p4_connection (relay_state, conn, peer).await?; }, } Ok::<_, anyhow::Error> (()) } async fn handle_p2_connection ( relay_state: Arc , conn: quinn::NewConnection, peer: protocol::P2ClientProxy, ) -> anyhow::Result <()> { let client_id = peer.id; let quinn::NewConnection { mut bi_streams, .. } = conn; while let Some (bi_stream) = bi_streams.next ().await { let (send, mut recv) = bi_stream?; let relay_state = Arc::clone (&relay_state); let client_id = client_id.clone (); tokio::spawn (async move { debug! ("Request started for P2"); match protocol::p3_accept_p2_stream (&mut recv).await? { protocol::P2ToP3Stream::ConnectP2ToP4 { server_id, } => handle_request_p2_to_p4 (relay_state, client_id, server_id, send, recv).await?, } debug! ("Request ended for P2"); Ok::<_, anyhow::Error> (()) }); } debug! ("P2 {} disconnected", client_id); Ok (()) } async fn handle_request_p2_to_p4 ( relay_state: Arc , client_id: String, server_id: PeerId, mut client_send: quinn::SendStream, client_recv: quinn::RecvStream, ) -> anyhow::Result <()> { trace! ("P2 {} wants to connect to P4 {}", client_id, server_id); // TODO: Check authorization for P2 to connect to P4 protocol::p3_authorize_p2_to_p4_connection (&mut client_send).await?; { let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; match p4_server_proxies.get (&server_id) { Some (p4_state) => { p4_state.req_channel.send (RequestP2ToP4 { client_send, client_recv, client_id, }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; }, None => warn! ("That server isn't connected"), } } Ok (()) } async fn handle_p4_connection ( relay_state: Arc , conn: quinn::NewConnection, peer: protocol::P4ServerProxy, ) -> anyhow::Result <()> { let server_id = peer.id; let quinn::NewConnection { connection, .. } = conn; let (tx, mut rx) = mpsc::channel (2); let p4_state = P4State { req_channel: tx, }; { let mut p4_server_proxies = relay_state.p4_server_proxies.lock ().await; p4_server_proxies.insert (server_id.clone (), p4_state); } while let Some (req) = rx.recv ().await { let connection = connection.clone (); let server_id = server_id.clone (); tokio::spawn (async move { let RequestP2ToP4 { client_send, client_recv, client_id, } = req; debug! ("P4 {} got a request from P2 {}", server_id, client_id); let (server_send, server_recv) = protocol::p3_connect_p2_to_p4 (&connection, &client_id).await?; trace! ("Relaying bytes..."); let ptth_conn = PtthNewConnection { client_send, client_recv, server_send, server_recv, }.build (); ptth_conn.uplink_task.await??; ptth_conn.downlink_task.await??; debug! ("Request ended for P4"); Ok::<_, anyhow::Error> (()) }); } debug! ("P4 {} disconnected", server_id); Ok (()) }