diff --git a/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs b/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs index 85542fc..49b6f9f 100644 --- a/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs +++ b/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs @@ -1,6 +1,7 @@ use tokio::sync::watch; use ptth_quic::prelude::*; +use ptth_quic::executable_relay_server as relay; #[tokio::main] async fn main () -> anyhow::Result <()> { @@ -8,7 +9,7 @@ async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); - let opt = ptth_quic::executable_relay_server::Opt::from_args (); + let opt = relay::Opt::from_args (); let (running_tx, mut running_rx) = watch::channel (true); @@ -17,8 +18,15 @@ async fn main () -> anyhow::Result <()> { })?; trace! ("Set Ctrl+C handler"); + let app = relay::App::new (opt)?; + println! ("Base64 cert: {}", base64::encode (app.server_cert ())); + println! ("Listening on {}", app.listen_addr ()); + + tokio::fs::create_dir_all ("ptth_quic_output").await?; + tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?; + tokio::select! { - val = ptth_quic::executable_relay_server::main (opt) => { + val = app.run () => { }, val = running_rx.changed () => { diff --git a/crates/ptth_quic/src/executable_relay_server.rs b/crates/ptth_quic/src/executable_relay_server.rs index f84b8ff..ee6fb01 100644 --- a/crates/ptth_quic/src/executable_relay_server.rs +++ b/crates/ptth_quic/src/executable_relay_server.rs @@ -18,162 +18,191 @@ use crate::prelude::*; use protocol::PeerId; #[derive (Debug, StructOpt)] -pub (crate) struct Opt { +pub struct Opt { #[structopt (long)] pub (crate) listen_addr: Option , #[structopt (long)] pub (crate) tcp_listen_port: Option , } -pub (crate) async fn main (opt: Opt) -> anyhow::Result <()> -{ - let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; - let (endpoint, server_cert) = make_server_endpoint (listen_addr)?; - println! ("Base64 cert: {}", base64::encode (&server_cert)); - println! ("Listening on {}", listen_addr); - - tokio::fs::create_dir_all ("ptth_quic_output").await?; - tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; - - let relay_state = RelayState::default (); - if let Err (e) = relay_state.reload_config ().await { - error! ("{:?}", e); +pub struct App { + endpoint: quinn::Endpoint, + listen_addr: SocketAddr, + server_cert: Vec , + tcp_listen_port: Option , +} + +impl App { + pub fn new (opt: Opt) -> anyhow::Result { + let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; + let (endpoint, server_cert) = make_server_endpoint (listen_addr)?; + + let listen_addr = endpoint.local_addr ()?; + + Ok (Self { + endpoint, + listen_addr, + server_cert, + tcp_listen_port: opt.tcp_listen_port, + }) } - let relay_state = Arc::new (relay_state); - let make_svc = { - let relay_state = Arc::clone (&relay_state); - make_service_fn (move |_conn| { + pub fn listen_addr (&self) -> SocketAddr { + self.listen_addr + } + + pub fn server_cert (&self) -> &[u8] { + &self.server_cert + } + + pub async fn run (self) -> anyhow::Result <()> { + let Self { + endpoint, + listen_addr, + server_cert, + tcp_listen_port, + } = self; + + let relay_state = RelayState::default (); + if let Err (e) = relay_state.reload_config ().await { + error! ("{:?}", e); + } + let relay_state = Arc::new (relay_state); + + let make_svc = { let relay_state = Arc::clone (&relay_state); - - async move { - Ok::<_, String> (service_fn (move |req| { - let relay_state = Arc::clone (&relay_state); - - handle_http (req, relay_state) - })) - } - }) - }; - - let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); - let http_server = Server::bind (&http_addr); - - let _task_reload_config = { - let relay_state = Arc::clone (&relay_state); - tokio::spawn (async move { - let mut interval = tokio::time::interval (std::time::Duration::from_secs (60)); - interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); - - loop { - interval.tick ().await; - - relay_state.reload_config ().await.ok (); - } - }) - }; - - let task_quic_server = { - let relay_state = Arc::clone (&relay_state); - tokio::spawn (async move { - while let Some (conn) = endpoint.accept ().await { + make_service_fn (move |_conn| { let relay_state = Arc::clone (&relay_state); - // Each new peer QUIC connection gets its own task - tokio::spawn (async move { - let active = relay_state.stats.quic.connect (); - debug! ("QUIC connections: {}", active); - - match handle_quic_connection (Arc::clone (&relay_state), conn).await { - Ok (_) => (), - Err (e) => warn! ("handle_quic_connection `{:?}`", e), - } - - let active = relay_state.stats.quic.disconnect (); - debug! ("QUIC connections: {}", active); - }); - } - - Ok::<_, anyhow::Error> (()) - }) - }; - - let task_direc_server = { - let relay_state = Arc::clone (&relay_state); + async move { + Ok::<_, String> (service_fn (move |req| { + let relay_state = Arc::clone (&relay_state); + + handle_http (req, relay_state) + })) + } + }) + }; - tokio::spawn (async move { - let sock = UdpSocket::bind("0.0.0.0:30379").await?; - let mut buf = [0; 2048]; - loop { - let (len, addr) = sock.recv_from (&mut buf).await?; - debug! ("{:?} bytes received from {:?}", len, addr); + let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); + let http_server = Server::bind (&http_addr); + + let _task_reload_config = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + let mut interval = tokio::time::interval (std::time::Duration::from_secs (60)); + interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); - let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); - - { - let mut direc_cookies = relay_state.direc_cookies.lock ().await; + loop { + interval.tick ().await; - if let Some (direc_state) = direc_cookies.remove (&packet) { - debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); - direc_state.p2_addr.send (addr).ok (); - } - else { - debug! ("UDP packet didn't match any PTTH_DIREC cookie"); + relay_state.reload_config ().await.ok (); + } + }) + }; + + let task_quic_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + while let Some (conn) = endpoint.accept ().await { + let relay_state = Arc::clone (&relay_state); + + // Each new peer QUIC connection gets its own task + tokio::spawn (async move { + let active = relay_state.stats.quic.connect (); + debug! ("QUIC connections: {}", active); + + match handle_quic_connection (Arc::clone (&relay_state), conn).await { + Ok (_) => (), + Err (e) => warn! ("handle_quic_connection `{:?}`", e), + } + + let active = relay_state.stats.quic.disconnect (); + debug! ("QUIC connections: {}", active); + }); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_direc_server = { + let relay_state = Arc::clone (&relay_state); + + tokio::spawn (async move { + let sock = UdpSocket::bind("0.0.0.0:30379").await?; + let mut buf = [0; 2048]; + loop { + let (len, addr) = sock.recv_from (&mut buf).await?; + debug! ("{:?} bytes received from {:?}", len, addr); + + let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); + + { + let mut direc_cookies = relay_state.direc_cookies.lock ().await; + + if let Some (direc_state) = direc_cookies.remove (&packet) { + debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); + direc_state.p2_addr.send (addr).ok (); + } + else { + debug! ("UDP packet didn't match any PTTH_DIREC cookie"); + } } } - } - + + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_http_server = tokio::spawn (async move { + http_server.serve (make_svc).await?; Ok::<_, anyhow::Error> (()) - }) - }; - - let task_http_server = tokio::spawn (async move { - http_server.serve (make_svc).await?; - Ok::<_, anyhow::Error> (()) - }); - - debug! ("Serving HTTP on {:?}", http_addr); - - if let Some (tcp_listen_port) = opt.tcp_listen_port { - tokio::spawn (async move { - let cfg = udp_over_tcp::server::Config { - tcp_port: tcp_listen_port, - udp_port: listen_addr.port (), - }; - - if let Err (e) = udp_over_tcp::server::main (cfg).await { - eprintln! ("udp_over_tcp::server::main exited with err {:?}", e); - } }); - } - - { - let config = relay_state.config.load (); - dbg! (&config.webhook_url); - if let Some (webhook_url) = config.webhook_url.clone () { - let j = json! ({ - "text": "Booting up", - }).to_string (); - let http_client = relay_state.http_client.clone (); + + debug! ("Serving HTTP on {:?}", http_addr); + + if let Some (tcp_listen_port) = tcp_listen_port { tokio::spawn (async move { - http_client.post (webhook_url).body (j).send ().await + let cfg = udp_over_tcp::server::Config { + tcp_port: tcp_listen_port, + udp_port: listen_addr.port (), + }; + + if let Err (e) = udp_over_tcp::server::main (cfg).await { + eprintln! ("udp_over_tcp::server::main exited with err {:?}", e); + } }); } + + { + let config = relay_state.config.load (); + dbg! (&config.webhook_url); + if let Some (webhook_url) = config.webhook_url.clone () { + let j = json! ({ + "text": "Booting up", + }).to_string (); + let http_client = relay_state.http_client.clone (); + tokio::spawn (async move { + http_client.post (webhook_url).body (j).send ().await + }); + } + } + + tokio::select! { + _val = task_quic_server => { + eprintln! ("QUIC relay server exited, exiting"); + }, + _val = task_http_server => { + eprintln! ("HTTP server exited, exiting"); + }, + _val = task_direc_server => { + eprintln! ("PTTH_DIREC server exited, exiting"); + }, + } + + Ok (()) } - - tokio::select! { - _val = task_quic_server => { - eprintln! ("QUIC relay server exited, exiting"); - }, - _val = task_http_server => { - eprintln! ("HTTP server exited, exiting"); - }, - _val = task_direc_server => { - eprintln! ("PTTH_DIREC server exited, exiting"); - }, - } - - Ok (()) } async fn handle_http (_req: Request , relay_state: Arc ) diff --git a/crates/ptth_quic/src/tests.rs b/crates/ptth_quic/src/tests.rs index ff41920..2ffd11e 100644 --- a/crates/ptth_quic/src/tests.rs +++ b/crates/ptth_quic/src/tests.rs @@ -9,12 +9,12 @@ async fn end_to_end_async () -> anyhow::Result <()> { use crate::executable_relay_server as relay; let relay_opt = relay::Opt { - listen_addr: "127.0.0.1:30381".to_string ().into (), - tcp_listen_port: 8001.into (), + listen_addr: "127.0.0.1:0".to_string ().into (), + tcp_listen_port: None, }; + let relay_app = relay::App::new (relay_opt)?; let task_relay = tokio::spawn (async move { - relay::main (relay_opt).await?; - Ok::<_, anyhow::Error> (()) + relay_app.run ().await }); Ok (())