👕 refactor: break apart the relay's main fn so we can see which ports it bound
parent
605c15468a
commit
3f0272ed09
|
@ -1,6 +1,7 @@
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use ptth_quic::prelude::*;
|
use ptth_quic::prelude::*;
|
||||||
|
use ptth_quic::executable_relay_server as relay;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main () -> anyhow::Result <()> {
|
async fn main () -> anyhow::Result <()> {
|
||||||
|
@ -8,7 +9,7 @@ async fn main () -> anyhow::Result <()> {
|
||||||
|
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let opt = ptth_quic::executable_relay_server::Opt::from_args ();
|
let opt = relay::Opt::from_args ();
|
||||||
|
|
||||||
let (running_tx, mut running_rx) = watch::channel (true);
|
let (running_tx, mut running_rx) = watch::channel (true);
|
||||||
|
|
||||||
|
@ -17,8 +18,15 @@ async fn main () -> anyhow::Result <()> {
|
||||||
})?;
|
})?;
|
||||||
trace! ("Set Ctrl+C handler");
|
trace! ("Set Ctrl+C handler");
|
||||||
|
|
||||||
|
let app = relay::App::new (opt)?;
|
||||||
|
println! ("Base64 cert: {}", base64::encode (app.server_cert ()));
|
||||||
|
println! ("Listening on {}", app.listen_addr ());
|
||||||
|
|
||||||
|
tokio::fs::create_dir_all ("ptth_quic_output").await?;
|
||||||
|
tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?;
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
val = ptth_quic::executable_relay_server::main (opt) => {
|
val = app.run () => {
|
||||||
|
|
||||||
},
|
},
|
||||||
val = running_rx.changed () => {
|
val = running_rx.changed () => {
|
||||||
|
|
|
@ -18,162 +18,191 @@ use crate::prelude::*;
|
||||||
use protocol::PeerId;
|
use protocol::PeerId;
|
||||||
|
|
||||||
#[derive (Debug, StructOpt)]
|
#[derive (Debug, StructOpt)]
|
||||||
pub (crate) struct Opt {
|
pub struct Opt {
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
pub (crate) listen_addr: Option <String>,
|
pub (crate) listen_addr: Option <String>,
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
pub (crate) tcp_listen_port: Option <u16>,
|
pub (crate) tcp_listen_port: Option <u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub (crate) async fn main (opt: Opt) -> anyhow::Result <()>
|
pub struct App {
|
||||||
{
|
endpoint: quinn::Endpoint,
|
||||||
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
|
listen_addr: SocketAddr,
|
||||||
let (endpoint, server_cert) = make_server_endpoint (listen_addr)?;
|
server_cert: Vec <u8>,
|
||||||
println! ("Base64 cert: {}", base64::encode (&server_cert));
|
tcp_listen_port: Option <u16>,
|
||||||
println! ("Listening on {}", listen_addr);
|
}
|
||||||
|
|
||||||
tokio::fs::create_dir_all ("ptth_quic_output").await?;
|
impl App {
|
||||||
tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?;
|
pub fn new (opt: Opt) -> anyhow::Result <Self> {
|
||||||
|
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
|
||||||
let relay_state = RelayState::default ();
|
let (endpoint, server_cert) = make_server_endpoint (listen_addr)?;
|
||||||
if let Err (e) = relay_state.reload_config ().await {
|
|
||||||
error! ("{:?}", e);
|
let listen_addr = endpoint.local_addr ()?;
|
||||||
|
|
||||||
|
Ok (Self {
|
||||||
|
endpoint,
|
||||||
|
listen_addr,
|
||||||
|
server_cert,
|
||||||
|
tcp_listen_port: opt.tcp_listen_port,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
let relay_state = Arc::new (relay_state);
|
|
||||||
|
|
||||||
let make_svc = {
|
pub fn listen_addr (&self) -> SocketAddr {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
self.listen_addr
|
||||||
make_service_fn (move |_conn| {
|
}
|
||||||
|
|
||||||
|
pub fn server_cert (&self) -> &[u8] {
|
||||||
|
&self.server_cert
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run (self) -> anyhow::Result <()> {
|
||||||
|
let Self {
|
||||||
|
endpoint,
|
||||||
|
listen_addr,
|
||||||
|
server_cert,
|
||||||
|
tcp_listen_port,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
let relay_state = RelayState::default ();
|
||||||
|
if let Err (e) = relay_state.reload_config ().await {
|
||||||
|
error! ("{:?}", e);
|
||||||
|
}
|
||||||
|
let relay_state = Arc::new (relay_state);
|
||||||
|
|
||||||
|
let make_svc = {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
make_service_fn (move |_conn| {
|
||||||
async move {
|
|
||||||
Ok::<_, String> (service_fn (move |req| {
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
|
||||||
|
|
||||||
handle_http (req, relay_state)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
|
|
||||||
let http_server = Server::bind (&http_addr);
|
|
||||||
|
|
||||||
let _task_reload_config = {
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
|
||||||
tokio::spawn (async move {
|
|
||||||
let mut interval = tokio::time::interval (std::time::Duration::from_secs (60));
|
|
||||||
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
interval.tick ().await;
|
|
||||||
|
|
||||||
relay_state.reload_config ().await.ok ();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let task_quic_server = {
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
|
||||||
tokio::spawn (async move {
|
|
||||||
while let Some (conn) = endpoint.accept ().await {
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
|
||||||
// Each new peer QUIC connection gets its own task
|
async move {
|
||||||
tokio::spawn (async move {
|
Ok::<_, String> (service_fn (move |req| {
|
||||||
let active = relay_state.stats.quic.connect ();
|
let relay_state = Arc::clone (&relay_state);
|
||||||
debug! ("QUIC connections: {}", active);
|
|
||||||
|
handle_http (req, relay_state)
|
||||||
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
|
}))
|
||||||
Ok (_) => (),
|
}
|
||||||
Err (e) => warn! ("handle_quic_connection `{:?}`", e),
|
})
|
||||||
}
|
};
|
||||||
|
|
||||||
let active = relay_state.stats.quic.disconnect ();
|
|
||||||
debug! ("QUIC connections: {}", active);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok::<_, anyhow::Error> (())
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let task_direc_server = {
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
|
||||||
|
|
||||||
tokio::spawn (async move {
|
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
|
||||||
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
|
let http_server = Server::bind (&http_addr);
|
||||||
let mut buf = [0; 2048];
|
|
||||||
loop {
|
let _task_reload_config = {
|
||||||
let (len, addr) = sock.recv_from (&mut buf).await?;
|
let relay_state = Arc::clone (&relay_state);
|
||||||
debug! ("{:?} bytes received from {:?}", len, addr);
|
tokio::spawn (async move {
|
||||||
|
let mut interval = tokio::time::interval (std::time::Duration::from_secs (60));
|
||||||
|
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
|
loop {
|
||||||
|
interval.tick ().await;
|
||||||
{
|
|
||||||
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
|
|
||||||
|
|
||||||
if let Some (direc_state) = direc_cookies.remove (&packet) {
|
relay_state.reload_config ().await.ok ();
|
||||||
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
|
}
|
||||||
direc_state.p2_addr.send (addr).ok ();
|
})
|
||||||
}
|
};
|
||||||
else {
|
|
||||||
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
|
let task_quic_server = {
|
||||||
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
tokio::spawn (async move {
|
||||||
|
while let Some (conn) = endpoint.accept ().await {
|
||||||
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
|
||||||
|
// Each new peer QUIC connection gets its own task
|
||||||
|
tokio::spawn (async move {
|
||||||
|
let active = relay_state.stats.quic.connect ();
|
||||||
|
debug! ("QUIC connections: {}", active);
|
||||||
|
|
||||||
|
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
|
||||||
|
Ok (_) => (),
|
||||||
|
Err (e) => warn! ("handle_quic_connection `{:?}`", e),
|
||||||
|
}
|
||||||
|
|
||||||
|
let active = relay_state.stats.quic.disconnect ();
|
||||||
|
debug! ("QUIC connections: {}", active);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let task_direc_server = {
|
||||||
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
|
||||||
|
tokio::spawn (async move {
|
||||||
|
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
|
||||||
|
let mut buf = [0; 2048];
|
||||||
|
loop {
|
||||||
|
let (len, addr) = sock.recv_from (&mut buf).await?;
|
||||||
|
debug! ("{:?} bytes received from {:?}", len, addr);
|
||||||
|
|
||||||
|
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
|
||||||
|
|
||||||
|
if let Some (direc_state) = direc_cookies.remove (&packet) {
|
||||||
|
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
|
||||||
|
direc_state.p2_addr.send (addr).ok ();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let task_http_server = tokio::spawn (async move {
|
||||||
|
http_server.serve (make_svc).await?;
|
||||||
Ok::<_, anyhow::Error> (())
|
Ok::<_, anyhow::Error> (())
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let task_http_server = tokio::spawn (async move {
|
|
||||||
http_server.serve (make_svc).await?;
|
|
||||||
Ok::<_, anyhow::Error> (())
|
|
||||||
});
|
|
||||||
|
|
||||||
debug! ("Serving HTTP on {:?}", http_addr);
|
|
||||||
|
|
||||||
if let Some (tcp_listen_port) = opt.tcp_listen_port {
|
|
||||||
tokio::spawn (async move {
|
|
||||||
let cfg = udp_over_tcp::server::Config {
|
|
||||||
tcp_port: tcp_listen_port,
|
|
||||||
udp_port: listen_addr.port (),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err (e) = udp_over_tcp::server::main (cfg).await {
|
|
||||||
eprintln! ("udp_over_tcp::server::main exited with err {:?}", e);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
debug! ("Serving HTTP on {:?}", http_addr);
|
||||||
{
|
|
||||||
let config = relay_state.config.load ();
|
if let Some (tcp_listen_port) = tcp_listen_port {
|
||||||
dbg! (&config.webhook_url);
|
|
||||||
if let Some (webhook_url) = config.webhook_url.clone () {
|
|
||||||
let j = json! ({
|
|
||||||
"text": "Booting up",
|
|
||||||
}).to_string ();
|
|
||||||
let http_client = relay_state.http_client.clone ();
|
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
http_client.post (webhook_url).body (j).send ().await
|
let cfg = udp_over_tcp::server::Config {
|
||||||
|
tcp_port: tcp_listen_port,
|
||||||
|
udp_port: listen_addr.port (),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err (e) = udp_over_tcp::server::main (cfg).await {
|
||||||
|
eprintln! ("udp_over_tcp::server::main exited with err {:?}", e);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let config = relay_state.config.load ();
|
||||||
|
dbg! (&config.webhook_url);
|
||||||
|
if let Some (webhook_url) = config.webhook_url.clone () {
|
||||||
|
let j = json! ({
|
||||||
|
"text": "Booting up",
|
||||||
|
}).to_string ();
|
||||||
|
let http_client = relay_state.http_client.clone ();
|
||||||
|
tokio::spawn (async move {
|
||||||
|
http_client.post (webhook_url).body (j).send ().await
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_val = task_quic_server => {
|
||||||
|
eprintln! ("QUIC relay server exited, exiting");
|
||||||
|
},
|
||||||
|
_val = task_http_server => {
|
||||||
|
eprintln! ("HTTP server exited, exiting");
|
||||||
|
},
|
||||||
|
_val = task_direc_server => {
|
||||||
|
eprintln! ("PTTH_DIREC server exited, exiting");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
_val = task_quic_server => {
|
|
||||||
eprintln! ("QUIC relay server exited, exiting");
|
|
||||||
},
|
|
||||||
_val = task_http_server => {
|
|
||||||
eprintln! ("HTTP server exited, exiting");
|
|
||||||
},
|
|
||||||
_val = task_direc_server => {
|
|
||||||
eprintln! ("PTTH_DIREC server exited, exiting");
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok (())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
||||||
|
|
|
@ -9,12 +9,12 @@ async fn end_to_end_async () -> anyhow::Result <()> {
|
||||||
use crate::executable_relay_server as relay;
|
use crate::executable_relay_server as relay;
|
||||||
|
|
||||||
let relay_opt = relay::Opt {
|
let relay_opt = relay::Opt {
|
||||||
listen_addr: "127.0.0.1:30381".to_string ().into (),
|
listen_addr: "127.0.0.1:0".to_string ().into (),
|
||||||
tcp_listen_port: 8001.into (),
|
tcp_listen_port: None,
|
||||||
};
|
};
|
||||||
|
let relay_app = relay::App::new (relay_opt)?;
|
||||||
let task_relay = tokio::spawn (async move {
|
let task_relay = tokio::spawn (async move {
|
||||||
relay::main (relay_opt).await?;
|
relay_app.run ().await
|
||||||
Ok::<_, anyhow::Error> (())
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok (())
|
Ok (())
|
||||||
|
|
Loading…
Reference in New Issue