use hyper::{ Body, Request, Response, Server, service::{ make_service_fn, service_fn, }, StatusCode, }; use structopt::StructOpt; use tokio::{ net::UdpSocket, }; use crate::prelude::*; use protocol::PeerId; #[derive (Debug, StructOpt)] pub struct Opt { #[structopt (long)] pub (crate) listen_addr: Option , #[structopt (long)] pub (crate) tcp_listen_port: Option , } pub struct App { endpoint: quinn::Endpoint, listen_addr: SocketAddr, pub (crate) metrics: Arc >, server_cert: Vec , tcp_listen_port: Option , } #[derive (Default)] pub (crate) struct Metrics { pub (crate) connected_end_servers: usize, } impl App { pub fn new (opt: Opt) -> anyhow::Result { let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; let (endpoint, server_cert) = make_server_endpoint (listen_addr)?; let listen_addr = endpoint.local_addr ()?; Ok (Self { endpoint, listen_addr, metrics: Default::default (), server_cert, tcp_listen_port: opt.tcp_listen_port, }) } pub fn listen_addr (&self) -> SocketAddr { self.listen_addr } pub fn server_cert (&self) -> &[u8] { &self.server_cert } pub async fn run (self) -> anyhow::Result <()> { let Self { endpoint, listen_addr, metrics, server_cert, tcp_listen_port, } = self; let mut relay_state = RelayState::default (); relay_state.metrics = metrics; if let Err (e) = relay_state.reload_config ().await { error! ("{:?}", e); } let relay_state = Arc::new (relay_state); let make_svc = { let relay_state = Arc::clone (&relay_state); make_service_fn (move |_conn| { let relay_state = Arc::clone (&relay_state); async move { Ok::<_, String> (service_fn (move |req| { let relay_state = Arc::clone (&relay_state); handle_http (req, relay_state) })) } }) }; let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); let http_server = Server::bind (&http_addr); let _task_reload_config = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { let mut interval = tokio::time::interval (std::time::Duration::from_secs (60)); interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); loop { interval.tick ().await; relay_state.reload_config ().await.ok (); } }) }; let task_quic_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { while let Some (conn) = endpoint.accept ().await { let relay_state = Arc::clone (&relay_state); // Each new peer QUIC connection gets its own task tokio::spawn (async move { let active = relay_state.stats.quic.connect (); debug! ("QUIC connections: {}", active); match handle_quic_connection (Arc::clone (&relay_state), conn).await { Ok (_) => (), Err (e) => warn! ("handle_quic_connection `{:?}`", e), } let active = relay_state.stats.quic.disconnect (); debug! ("QUIC connections: {}", active); }); } Ok::<_, anyhow::Error> (()) }) }; let task_direc_server = { let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { let sock = UdpSocket::bind("0.0.0.0:30379").await?; let mut buf = [0; 2048]; loop { let (len, addr) = sock.recv_from (&mut buf).await?; debug! ("{:?} bytes received from {:?}", len, addr); let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); { let mut direc_cookies = relay_state.direc_cookies.lock ().await; if let Some (direc_state) = direc_cookies.remove (&packet) { debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); direc_state.p2_addr.send (addr).ok (); } else { debug! ("UDP packet didn't match any PTTH_DIREC cookie"); } } } Ok::<_, anyhow::Error> (()) }) }; let task_http_server = tokio::spawn (async move { http_server.serve (make_svc).await?; Ok::<_, anyhow::Error> (()) }); debug! ("Serving HTTP on {:?}", http_addr); if let Some (tcp_listen_port) = tcp_listen_port { tokio::spawn (async move { let cfg = udp_over_tcp::server::Config { tcp_port: tcp_listen_port, udp_port: listen_addr.port (), }; if let Err (e) = udp_over_tcp::server::main (cfg).await { eprintln! ("udp_over_tcp::server::main exited with err {:?}", e); } }); } { let config = relay_state.config.load (); dbg! (&config.webhook_url); if let Some (webhook_url) = config.webhook_url.clone () { let j = json! ({ "text": "Booting up", }).to_string (); let http_client = relay_state.http_client.clone (); tokio::spawn (async move { http_client.post (webhook_url).body (j).send ().await }); } } tokio::select! { _val = task_quic_server => { eprintln! ("QUIC relay server exited, exiting"); }, _val = task_http_server => { eprintln! ("HTTP server exited, exiting"); }, _val = task_direc_server => { eprintln! ("PTTH_DIREC server exited, exiting"); }, } Ok (()) } } async fn handle_http (_req: Request , relay_state: Arc ) -> anyhow::Result > { let debug_string; { let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; debug_string = format! ("{:#?}\n", p4_server_proxies.keys ().collect::> ()); } let resp = Response::builder () .status (StatusCode::OK) .header ("content-type", "text/plain") .body (Body::from (debug_string))?; Ok (resp) } #[derive (Default)] struct RelayState { config: arc_swap::ArcSwap , p4_server_proxies: Mutex >, direc_cookies: Mutex , DirecState>>, metrics: Arc >, stats: Stats, http_client: reqwest::Client, } #[derive (Default)] struct Config { ip_nicknames: BTreeMap <[u8; 4], String>, webhook_url: Option , } impl From for Config { fn from (x: ConfigFile) -> Self { Self { ip_nicknames: x.ip_nicknames.into_iter ().collect (), webhook_url: x.webhook_url, } } } #[derive (Deserialize)] struct ConfigFile { ip_nicknames: Vec <([u8; 4], String)>, webhook_url: Option , } struct DirecState { start_time: Instant, p2_id: PeerId, p2_addr: tokio::sync::oneshot::Sender , } #[derive (Default)] struct Stats { quic: ConnectEvents, } #[derive (Default)] struct ConnectEvents { connects: AtomicU64, disconnects: AtomicU64, } impl ConnectEvents { fn connect (&self) -> u64 { let connects = self.connects.fetch_add (1, Ordering::Relaxed) + 1; let disconnects = self.disconnects.load (Ordering::Relaxed); connects - disconnects } fn disconnect (&self) -> u64 { let disconnects = self.disconnects.fetch_add (1, Ordering::Relaxed) + 1; let connects = self.connects.load (Ordering::Relaxed); connects - disconnects } fn _active (&self) -> u64 { let connects = self.connects.load (Ordering::Relaxed); let disconnects = self.disconnects.load (Ordering::Relaxed); connects - disconnects } } struct P4State { req_channel: mpsc::Sender , } impl RelayState { async fn reload_config (&self) -> anyhow::Result <()> { let s = tokio::fs::read_to_string ("config/ptth_quic_relay_server.json").await?; let config: ConfigFile = serde_json::from_str (&s)?; let config = Arc::new (Config::from (config)); self.config.store (config); Ok (()) } } struct RequestP2ToP4 { client_send: quinn::SendStream, client_recv: quinn::RecvStream, client_id: String, } struct PtthNewConnection { client_send: quinn::SendStream, client_recv: quinn::RecvStream, server_send: quinn::SendStream, server_recv: quinn::RecvStream, } struct PtthConnection { uplink_task: JoinHandle >, downlink_task: JoinHandle >, } impl PtthNewConnection { fn build (self) -> PtthConnection { let Self { mut client_send, mut client_recv, mut server_send, mut server_recv, } = self; let uplink_task = tokio::spawn (async move { // Uplink - Client to end server let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = client_recv.read (&mut buf).await? { if bytes_read == 0 { break; } let buf_slice = &buf [0..bytes_read]; trace! ("Uplink relaying {} bytes", bytes_read); server_send.write_all (buf_slice).await?; } trace! ("Uplink closed"); Ok::<_, anyhow::Error> (()) }); let downlink_task = tokio::spawn (async move { // Downlink - End server to client let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = server_recv.read (&mut buf).await? { let buf_slice = &buf [0..bytes_read]; trace! ("Downlink relaying {} bytes", bytes_read); client_send.write_all (buf_slice).await?; } trace! ("Downlink closed"); Ok::<_, anyhow::Error> (()) }); PtthConnection { uplink_task, downlink_task, } } } async fn handle_quic_connection ( relay_state: Arc , conn: quinn::Connecting, ) -> anyhow::Result <()> { let id = Ulid::generate (); let config = relay_state.config.load (); let remote_addr = conn.remote_address (); let ip_nickname = match remote_addr { SocketAddr::V4 (x) => { let ip = x.ip ().octets (); match config.ip_nicknames.get (&ip) { Some (nick) => nick.as_str (), _ => "Unknown", } }, _ => "Unknown, not IPv4", }; debug! ("EHG7NVUD Incoming QUIC connection {} from {:?} ({})", id, remote_addr, ip_nickname); if let Some (webhook_url) = config.webhook_url.clone () { let j = json! ({ "text": format! ("Incoming QUIC connection from {:?} ({})", remote_addr, ip_nickname), }).to_string (); let http_client = relay_state.http_client.clone (); tokio::spawn (async move { http_client.post (webhook_url).body (j).send ().await }); } let conn = conn.await?; // Everyone who connects must identify themselves with the first // bi stream // TODO: Timeout let (mut send, mut recv) = conn.accept_bi ().await?; let peer = protocol::p3_accept_peer (&mut recv).await?; match peer { protocol::P3Peer::P2ClientProxy (peer) => { trace! ("H36JTVE5 Handling connection {} as P2 client", id); // TODO: Check authorization for P2 peers protocol::p3_authorize_p2_peer (&mut send).await?; handle_p2_connection (relay_state, conn, peer).await?; }, protocol::P3Peer::P4ServerProxy (peer) => { trace! ("LRHUKB7K Handling connection {} as P4 end server", id); // TODO: Check authorization for P4 peers protocol::p3_authorize_p4_peer (&mut send).await?; let metrics = Arc::clone (&relay_state.metrics); { let mut m = metrics.write ().await; m.connected_end_servers += 1; } handle_p4_connection (relay_state, conn, peer).await?; { let mut m = metrics.write ().await; m.connected_end_servers -= 1; } }, } Ok::<_, anyhow::Error> (()) } async fn handle_p2_connection ( relay_state: Arc , conn: quinn::Connection, peer: protocol::P2ClientProxy, ) -> anyhow::Result <()> { let client_id = peer.id; while let Ok ((send, mut recv)) = conn.accept_bi ().await { let relay_state = Arc::clone (&relay_state); let client_id = client_id.clone (); tokio::spawn (async move { debug! ("Request started for P2"); match protocol::p3_accept_p2_stream (&mut recv).await? { protocol::P2ToP3Stream::ConnectP2ToP4 { server_id, } => { handle_request_p2_to_p4 ( relay_state, client_id, server_id, send, recv ).await? }, protocol::P2ToP3Stream::DirecP2ToP4 { server_id, cookie, } => { handle_direc_p2_to_p4 ( relay_state, client_id, server_id, cookie, send, recv ).await? }, } debug! ("Request ended for P2"); Ok::<_, anyhow::Error> (()) }); } debug! ("P2 {} disconnected", client_id); Ok (()) } async fn handle_request_p2_to_p4 ( relay_state: Arc , client_id: String, server_id: PeerId, mut client_send: quinn::SendStream, client_recv: quinn::RecvStream, ) -> anyhow::Result <()> { trace! ("P2 {} wants to connect to P4 {}", client_id, server_id); // TODO: Check authorization for P2 to connect to P4 protocol::p3_authorize_p2_to_p4_connection (&mut client_send).await?; { let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; match p4_server_proxies.get (&server_id) { Some (p4_state) => { p4_state.req_channel.send (RequestP2ToP4 { client_send, client_recv, client_id, }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; }, None => warn! ("That server isn't connected"), } } Ok (()) } async fn handle_direc_p2_to_p4 ( relay_state: Arc , client_id: String, server_id: PeerId, cookie: Vec , mut client_send: quinn::SendStream, client_recv: quinn::RecvStream, ) -> anyhow::Result <()> { debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id); // TODO: Check authorization protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?; let (tx, rx) = tokio::sync::oneshot::channel (); { let mut direc_cookies = relay_state.direc_cookies.lock ().await; direc_cookies.insert (cookie, DirecState { start_time: Instant::now (), p2_id: client_id.clone (), p2_addr: tx, }); } debug! ("Waiting to learn P2's WAN address..."); let wan_addr = rx.await?; debug! ("And that WAN address is {}", wan_addr); Ok (()) } async fn handle_p4_connection ( relay_state: Arc , connection: quinn::Connection, peer: protocol::P4ServerProxy, ) -> anyhow::Result <()> { let server_id = peer.id; let (tx, mut rx) = mpsc::channel (2); let p4_state = P4State { req_channel: tx, }; { let mut p4_server_proxies = relay_state.p4_server_proxies.lock ().await; p4_server_proxies.insert (server_id.clone (), p4_state); } while let Some (req) = rx.recv ().await { let connection = connection.clone (); let server_id = server_id.clone (); tokio::spawn (async move { let RequestP2ToP4 { client_send, client_recv, client_id, } = req; debug! ("P4 {} got a request from P2 {}", server_id, client_id); let (server_send, server_recv) = protocol::p3_connect_p2_to_p4 (&connection, &client_id).await?; trace! ("Relaying bytes..."); let ptth_conn = PtthNewConnection { client_send, client_recv, server_send, server_recv, }.build (); ptth_conn.uplink_task.await??; ptth_conn.downlink_task.await??; debug! ("Request ended for P4"); Ok::<_, anyhow::Error> (()) }); } debug! ("P4 {} disconnected", server_id); Ok (()) }