diff --git a/Cargo.lock b/Cargo.lock index c10fff5..5b7b1d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1626,6 +1626,7 @@ dependencies = [ name = "ptth" version = "1.1.0" dependencies = [ + "anyhow", "base64 0.13.0", "blake3", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 719a1a8..520a9f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,19 +23,21 @@ exclude = [ [dependencies] -[dev-dependencies] - -base64 = "0.13.0" +anyhow = "1.0.34" blake3 = "0.3.7" -chrono = {version = "0.4.19", features = ["serde"]} -reqwest = { version = "0.10.8", features = ["stream"] } tokio = { version = "0.2.22", features = ["full"] } -tracing = "0.1.21" tracing-subscriber = "0.2.15" debug_proxy = { path = "crates/debug_proxy" } ptth_relay = { path = "crates/ptth_relay" } ptth_server = { path = "crates/ptth_server" } +reqwest = { version = "0.10.8", features = ["stream"] } +tracing = "0.1.21" + +[dev-dependencies] + +base64 = "0.13.0" +chrono = {version = "0.4.19", features = ["serde"]} [workspace] diff --git a/crates/debug_proxy/src/lib.rs b/crates/debug_proxy/src/lib.rs index 1c5ab43..ca33300 100644 --- a/crates/debug_proxy/src/lib.rs +++ b/crates/debug_proxy/src/lib.rs @@ -17,7 +17,10 @@ use reqwest::Client; use tokio::{ spawn, stream::StreamExt, - sync::mpsc, + sync::{ + mpsc, + oneshot, + }, }; use ulid::Ulid; @@ -112,7 +115,12 @@ async fn handle_all (req: Request , state: Arc ) Ok (resp.body (Body::wrap_stream (rx))?) } -pub async fn run_proxy (addr: SocketAddr, upstream_authority: String) -> anyhow::Result <()> { +pub async fn run_proxy ( + addr: SocketAddr, + upstream_authority: String, + shutdown_oneshot: oneshot::Receiver <()>, +) -> anyhow::Result <()> +{ let state = Arc::new (State { client: Client::builder ().build ()?, upstream_authority, @@ -130,7 +138,13 @@ pub async fn run_proxy (addr: SocketAddr, upstream_authority: String) -> anyhow: } }); - tracing::info! ("Binding to {}", addr); - Ok (Server::bind (&addr) - .serve (make_svc).await?) + let server = Server::bind (&addr) + .serve (make_svc); + + tracing::info! ("Proxy binding to {}", addr); + server.with_graceful_shutdown (async { + shutdown_oneshot.await.ok (); + }).await?; + + Ok (()) } diff --git a/crates/debug_proxy/src/main.rs b/crates/debug_proxy/src/main.rs index feddf5f..2653190 100644 --- a/crates/debug_proxy/src/main.rs +++ b/crates/debug_proxy/src/main.rs @@ -2,9 +2,12 @@ use std::{ net::SocketAddr, }; +use tokio::sync::oneshot; + #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); let addr = SocketAddr::from(([0, 0, 0, 0], 11509)); - debug_proxy::run_proxy (addr, "127.0.0.1:4000".to_string ()).await + let (_stop_proxy_tx, stop_proxy_rx) = oneshot::channel (); + debug_proxy::run_proxy (addr, "127.0.0.1:4000".to_string (), stop_proxy_rx).await } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..db63cd1 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,150 @@ +use std::{ + convert::TryFrom, + net::SocketAddr, + path::PathBuf, + sync::Arc, + time::Duration, +}; + +use reqwest::Client; +use tokio::{ + spawn, + sync::oneshot, + time::delay_for, +}; +use tracing::{ + debug, + info, +}; + +#[tokio::main] +async fn main () -> anyhow::Result <()> { + use ptth_relay::key_validity::BlakeHashWrapper; + + tracing_subscriber::fmt ().try_init ().ok (); + + let relay_port = 4002; + let proxy_port = 11510; + + // Start relay + + let server_name = "aliens_wildland"; + let api_key = "AnacondaHardcoverGrannyUnlatchLankinessMutate"; + let tripcode = BlakeHashWrapper::from_key (api_key.as_bytes ()); + debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ()); + let config_file = ptth_relay::config::file::Config { + iso: Default::default (), + port: Some (relay_port), + servers: vec! [ + ptth_relay::config::file::Server { + name: server_name.to_string (), + tripcode, + display_name: None, + }, + ], + scraper_keys: vec! [], + }; + + let config = ptth_relay::config::Config::try_from (config_file).expect ("Can't load config"); + + let relay_state = Arc::new (ptth_relay::RelayState::try_from (config).expect ("Can't create relay state")); + + let (stop_relay_tx, stop_relay_rx) = oneshot::channel (); + let task_relay = spawn ({ + let relay_state = relay_state.clone (); + async move { + ptth_relay::run_relay ( + relay_state, + Arc::new (ptth_relay::load_templates (&PathBuf::new ())?), + stop_relay_rx, + None + ).await + } + }); + + assert! (relay_state.list_servers ().await.is_empty ()); + + // Start proxy + + let (_stop_proxy_tx, stop_proxy_rx) = oneshot::channel (); + let task_proxy = spawn (async move { + debug_proxy::run_proxy (SocketAddr::from (([0, 0, 0, 0], proxy_port)), format! ("127.0.0.1:{}", relay_port), stop_proxy_rx).await + }); + + // Start server + + let relay_url = format! ("http://127.0.0.1:{}", proxy_port); + + let config_file = ptth_server::ConfigFile { + name: server_name.into (), + api_key: api_key.into (), + relay_url: format! ("{}/7ZSFUKGV", relay_url), + file_server_root: None, + }; + + let (stop_server_tx, stop_server_rx) = oneshot::channel (); + let task_server = { + spawn (async move { + ptth_server::run_server (config_file, stop_server_rx, None, None).await + }) + }; + + delay_for (Duration::from_millis (1000)).await; + + assert_eq! (relay_state.list_servers ().await, vec! [ + server_name.to_string (), + ]); + + let client = Client::builder () + .timeout (Duration::from_secs (2)) + .build ().expect ("Couldn't build HTTP client"); + + let resp = client.get (&format! ("{}/frontend/relay_up_check", relay_url)) + .send ().await.expect ("Couldn't check if relay is up").bytes ().await.expect ("Couldn't check if relay is up"); + + assert_eq! (resp, "Relay is up\n"); + + let resp = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name)) + .send ().await.expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license"); + + if blake3::hash (&resp) != blake3::Hash::from ([ + 0xca, 0x02, 0x92, 0x78, + 0x9c, 0x0a, 0x0e, 0xcb, + 0xa7, 0x06, 0xf4, 0xb3, + 0xf3, 0x49, 0x30, 0x07, + + 0xa9, 0x95, 0x17, 0x31, + 0xc1, 0xd4, 0x32, 0xc5, + 0x2c, 0x4a, 0xac, 0x1f, + 0x1a, 0xbb, 0xa8, 0xef, + ]) { + panic! ("{}", String::from_utf8 (resp.to_vec ()).expect ("???")); + } + + // Requesting a file from a server that isn't registered + // will error out + + let resp = client.get (&format! ("{}/frontend/servers/obviously_this_server_does_not_exist/files/COPYING", relay_url)) + .send ().await.expect ("Couldn't send request to bogus server"); + + assert_eq! (resp.status (), reqwest::StatusCode::NOT_FOUND); + + delay_for (Duration::from_secs (60)).await; + + info! ("Shutting down end-to-end test"); + + stop_server_tx.send (()).expect ("Couldn't shut down server"); + stop_relay_tx.send (()).expect ("Couldn't shut down relay"); + + info! ("Sent stop messages"); + + task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); + info! ("Relay stopped"); + + task_proxy.await.expect ("Couldn't join proxy"); + + task_server.await.expect ("Couldn't join server").expect ("Server error"); + info! ("Server stopped"); + + Ok (()) +} diff --git a/src/tests.rs b/src/tests.rs index 45e2f91..4ebdae4 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,5 +1,6 @@ use std::{ convert::{TryFrom, TryInto}, + net::SocketAddr, path::PathBuf, sync::{ Arc, @@ -136,6 +137,136 @@ fn end_to_end () { }); } +#[test] +fn debug_proxy () { + use ptth_relay::key_validity::BlakeHashWrapper; + + tracing_subscriber::fmt ().try_init ().ok (); + let mut rt = Runtime::new ().expect ("Can't create runtime for testing"); + + rt.block_on (async { + let relay_port = 4002; + let proxy_port = 11510; + + // Start relay + + let server_name = "aliens_wildland"; + let api_key = "AnacondaHardcoverGrannyUnlatchLankinessMutate"; + let tripcode = BlakeHashWrapper::from_key (api_key.as_bytes ()); + debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ()); + let config_file = ptth_relay::config::file::Config { + iso: Default::default (), + port: Some (relay_port), + servers: vec! [ + ptth_relay::config::file::Server { + name: server_name.to_string (), + tripcode, + display_name: None, + }, + ], + scraper_keys: vec! [], + }; + + let config = ptth_relay::config::Config::try_from (config_file).expect ("Can't load config"); + + let relay_state = Arc::new (ptth_relay::RelayState::try_from (config).expect ("Can't create relay state")); + + let (stop_relay_tx, stop_relay_rx) = oneshot::channel (); + let task_relay = spawn ({ + let relay_state = relay_state.clone (); + async move { + ptth_relay::run_relay ( + relay_state, + Arc::new (load_templates (&PathBuf::new ())?), + stop_relay_rx, + None + ).await + } + }); + + assert! (relay_state.list_servers ().await.is_empty ()); + + // Start proxy + + let (stop_proxy_tx, stop_proxy_rx) = oneshot::channel (); + let task_proxy = spawn (async move { + debug_proxy::run_proxy (SocketAddr::from (([0, 0, 0, 0], proxy_port)), format! ("127.0.0.1:{}", relay_port), stop_proxy_rx).await + }); + + // Start server + + let relay_url = format! ("http://127.0.0.1:{}", proxy_port); + + let config_file = ptth_server::ConfigFile { + name: server_name.into (), + api_key: api_key.into (), + relay_url: format! ("{}/7ZSFUKGV", relay_url), + file_server_root: None, + }; + + let (stop_server_tx, stop_server_rx) = oneshot::channel (); + let task_server = { + spawn (async move { + ptth_server::run_server (config_file, stop_server_rx, None, None).await + }) + }; + + delay_for (Duration::from_millis (1000)).await; + + assert_eq! (relay_state.list_servers ().await, vec! [ + server_name.to_string (), + ]); + + let client = Client::builder () + .timeout (Duration::from_secs (2)) + .build ().expect ("Couldn't build HTTP client"); + + let resp = client.get (&format! ("{}/frontend/relay_up_check", relay_url)) + .send ().await.expect ("Couldn't check if relay is up").bytes ().await.expect ("Couldn't check if relay is up"); + + assert_eq! (resp, "Relay is up\n"); + + let resp = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name)) + .send ().await.expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license"); + + if blake3::hash (&resp) != blake3::Hash::from ([ + 0xca, 0x02, 0x92, 0x78, + 0x9c, 0x0a, 0x0e, 0xcb, + 0xa7, 0x06, 0xf4, 0xb3, + 0xf3, 0x49, 0x30, 0x07, + + 0xa9, 0x95, 0x17, 0x31, + 0xc1, 0xd4, 0x32, 0xc5, + 0x2c, 0x4a, 0xac, 0x1f, + 0x1a, 0xbb, 0xa8, 0xef, + ]) { + panic! ("{}", String::from_utf8 (resp.to_vec ()).expect ("???")); + } + + // Requesting a file from a server that isn't registered + // will error out + + let resp = client.get (&format! ("{}/frontend/servers/obviously_this_server_does_not_exist/files/COPYING", relay_url)) + .send ().await.expect ("Couldn't send request to bogus server"); + + assert_eq! (resp.status (), reqwest::StatusCode::NOT_FOUND); + + info! ("Shutting down end-to-end test"); + + stop_server_tx.send (()).expect ("Couldn't shut down server"); + task_server.await.expect ("Couldn't join server").expect ("Server error"); + info! ("Server stopped"); + + stop_proxy_tx.send (()).expect ("Couldn't shut down proxy"); + task_proxy.await.expect ("Couldn't join proxy").expect ("Proxy error"); + info! ("Proxy stopped"); + + stop_relay_tx.send (()).expect ("Couldn't shut down relay"); + task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); + info! ("Relay stopped"); + }); +} + #[test] fn scraper_endpoints () { let mut rt = Runtime::new ().expect ("Can't create runtime for testing");