use std::{ convert::{TryInto}, net::SocketAddr, path::PathBuf, sync::{ Arc, }, time::Duration, }; use tokio::{ spawn, sync::oneshot, }; use reqwest::Client; use tracing::{debug, info}; // Poll for a few seconds till the server is ready // If this takes more than 5 seconds-ish, it's bad, the test should // fail async fn wait_for_any_server (relay_state: &ptth_relay::Relay) { for _ in 0..50 { tokio::time::sleep (Duration::from_millis (100)).await; if ! relay_state.list_servers ().await.is_empty () { break; } } } async fn testing_client_checks ( testing_config: &TestingConfig, client: &reqwest::Client ) { let relay_url = testing_config.relay_url (); let server_name = testing_config.server_name; let resp = client.get (&format! ("{}/frontend/relay_up_check", relay_url)) .send ().await.expect ("Couldn't check if relay is up").bytes ().await.expect ("Couldn't check if relay is up"); assert_eq! (resp, "Relay is up\n"); let req = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name)) .send (); let resp = tokio::time::timeout (Duration::from_secs (2), req).await.expect ("Request timed out").expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license"); if blake3::hash (&resp) != [ 0xca, 0x02, 0x92, 0x78, 0x9c, 0x0a, 0x0e, 0xcb, 0xa7, 0x06, 0xf4, 0xb3, 0xf3, 0x49, 0x30, 0x07, 0xa9, 0x95, 0x17, 0x31, 0xc1, 0xd4, 0x32, 0xc5, 0x2c, 0x4a, 0xac, 0x1f, 0x1a, 0xbb, 0xa8, 0xef, ] { panic! ("{}", String::from_utf8 (resp.to_vec ()).expect ("???")); } // Requesting a file from a server that isn't registered // will error out let resp = client.get (&format! ("{}/frontend/servers/obviously_this_server_does_not_exist/files/COPYING", relay_url)) .send ().await.expect ("Couldn't send request to bogus server"); assert_eq! (resp.status (), reqwest::StatusCode::NOT_FOUND); } struct TestingConfig { server_name: &'static str, api_key: &'static str, proxy_port: u16, relay_port: u16, } impl TestingConfig { fn relay_url (&self) -> String { format! ("http://127.0.0.1:{}", self.proxy_port) } } struct TestingRelay { state: Arc , task: tokio::task::JoinHandle >, stop_tx: oneshot::Sender <()>, } struct TestingServer { task: tokio::task::JoinHandle >, stop_tx: oneshot::Sender <()>, } impl TestingRelay { async fn new (testing_config: &TestingConfig) -> Self { use ptth_relay::*; let tripcode = key_validity::BlakeHashWrapper::from_key (testing_config.api_key.as_bytes ()); debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ()); let state = Relay::build () .port (testing_config.relay_port) .server (config::file::Server { name: testing_config.server_name.to_string (), tripcode, display_name: None, }) .build ().expect ("Can't create relay state"); let state = Arc::new (state); let (stop_tx, stop_rx) = oneshot::channel (); let task = spawn ({ let state = state.clone (); async move { run_relay ( state, &PathBuf::new (), stop_rx, None ).await } }); assert! (state.list_servers ().await.is_empty ()); Self { task, stop_tx, state, } } async fn graceful_shutdown (self) { self.stop_tx.send (()).expect ("Couldn't shut down relay"); self.task.await.expect ("Couldn't join relay").expect ("Relay error"); info! ("Relay stopped"); } } impl TestingServer { async fn new (testing_config: &TestingConfig) -> Self { let config_file = ptth_server::ConfigFile::new ( testing_config.server_name.into (), testing_config.api_key.into (), format! ("{}/7ZSFUKGV", testing_config.relay_url ()), ); let (stop_tx, stop_rx) = oneshot::channel (); let task = { spawn (async move { ptth_server::run_server (config_file, stop_rx, None, None, None).await }) }; Self { task, stop_tx, } } async fn graceful_shutdown (self) { self.stop_tx.send (()).expect ("Couldn't shut down server"); self.task.await.expect ("Couldn't join server").expect ("Server error"); info! ("Server stopped"); } } #[tokio::test] async fn end_to_end () { // Prefer this form for tests, since all tests share one process // and we don't care if another test already installed a subscriber. //tracing_subscriber::fmt ().try_init ().ok (); let relay_port = 40000; // No proxy let proxy_port = relay_port; let server_name = "aliens_wildland"; let testing_config = TestingConfig { server_name, api_key: "AnacondaHardcoverGrannyUnlatchLankinessMutate", proxy_port, relay_port, }; let testing_relay = TestingRelay::new (&testing_config).await; let testing_server = TestingServer::new (&testing_config).await; wait_for_any_server (&testing_relay.state).await; assert_eq! (testing_relay.state.list_servers ().await, vec! [ server_name.to_string (), ]); let client = Client::builder () .build ().expect ("Couldn't build HTTP client"); testing_client_checks (&testing_config, &client).await; info! ("Shutting down end-to-end test"); testing_server.graceful_shutdown ().await; testing_relay.graceful_shutdown ().await; } #[tokio::test] async fn debug_proxy () { tracing_subscriber::fmt () .with_env_filter (tracing_subscriber::EnvFilter::from_default_env ()) .try_init ().ok (); let relay_port = 4002; let proxy_port = 11510; // Start relay let server_name = "aliens_wildland"; let testing_config = TestingConfig { server_name, api_key: "AnacondaHardcoverGrannyUnlatchLankinessMutate", proxy_port, relay_port, }; let testing_relay = TestingRelay::new (&testing_config).await; // Start proxy let (stop_proxy_tx, stop_proxy_rx) = oneshot::channel (); let task_proxy = spawn (async move { debug_proxy::run_proxy (SocketAddr::from (([0, 0, 0, 0], proxy_port)), format! ("127.0.0.1:{}", relay_port), stop_proxy_rx).await }); // Start server let testing_server = TestingServer::new (&testing_config).await; wait_for_any_server (&testing_relay.state).await; assert_eq! (testing_relay.state.list_servers ().await, vec! [ server_name.to_string (), ]); let client = Client::builder () .build ().expect ("Couldn't build HTTP client"); testing_client_checks (&testing_config, &client).await; info! ("Shutting down end-to-end test"); testing_server.graceful_shutdown ().await; stop_proxy_tx.send (()).expect ("Couldn't shut down proxy"); task_proxy.await.expect ("Couldn't join proxy").expect ("Proxy error"); info! ("Proxy stopped"); testing_relay.graceful_shutdown ().await; } #[tokio::test] async fn scraper_endpoints () { use ptth_relay::*; let relay_state = Relay::build () .port (4001) .enable_scraper_api (true) .scraper_key (key_validity::ScraperKey::new_30_day ("automated testing", b"bogus")) .build ().expect ("Can't create relay state"); let relay_state = Arc::new (relay_state); let relay_state_2 = relay_state.clone (); let (stop_relay_tx, stop_relay_rx) = oneshot::channel (); let task_relay = spawn (async move { run_relay ( relay_state_2, &PathBuf::new (), stop_relay_rx, None ).await }); let relay_url = "http://127.0.0.1:4001"; let mut headers = reqwest::header::HeaderMap::new (); headers.insert ("X-ApiKey", "bogus".try_into ().unwrap ()); let client = Client::builder () .default_headers (headers) .timeout (Duration::from_secs (2)) .build ().expect ("Couldn't build HTTP client"); let mut resp = None; for _ in 0usize..5 { let x = client.get (&format! ("{}/scraper/api/test", relay_url)) .send ().await; match x { Err (_) => { // Probably a reqwest error cause the port is in // use or something. Try again. }, Ok (x) => { resp = Some (x); break; }, }; tokio::time::sleep (Duration::from_millis (200)).await; } let resp = resp.expect ("Reqwest repeatedly failed to connect to the relay"); let resp = resp.bytes ().await.expect ("Couldn't check if relay is up"); assert_eq! (resp, "You're valid!\n"); stop_relay_tx.send (()).expect ("Couldn't shut down relay"); task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); }