From 221732a616d9bce2206477994b13675a07b9385c Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 6 Mar 2021 19:17:42 +0000 Subject: [PATCH] :recycle: refactor: extract TestingRelay --- src/main.rs | 19 ++++---- src/tests.rs | 125 +++++++++++++++++++++++++++++++++++---------------- 2 files changed, 96 insertions(+), 48 deletions(-) diff --git a/src/main.rs b/src/main.rs index db63cd1..c6e3dab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,7 +66,7 @@ async fn main () -> anyhow::Result <()> { // Start proxy - let (_stop_proxy_tx, stop_proxy_rx) = oneshot::channel (); + let (stop_proxy_tx, stop_proxy_rx) = oneshot::channel (); let task_proxy = spawn (async move { debug_proxy::run_proxy (SocketAddr::from (([0, 0, 0, 0], proxy_port)), format! ("127.0.0.1:{}", relay_port), stop_proxy_rx).await }); @@ -134,17 +134,16 @@ async fn main () -> anyhow::Result <()> { info! ("Shutting down end-to-end test"); stop_server_tx.send (()).expect ("Couldn't shut down server"); - stop_relay_tx.send (()).expect ("Couldn't shut down relay"); - - info! ("Sent stop messages"); - - task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); - info! ("Relay stopped"); - - task_proxy.await.expect ("Couldn't join proxy"); - task_server.await.expect ("Couldn't join server").expect ("Server error"); info! ("Server stopped"); + stop_proxy_tx.send (()).expect ("Couldn't shut down proxy"); + task_proxy.await.expect ("Couldn't join proxy").expect ("Proxy error"); + info! ("Proxy stopped"); + + stop_relay_tx.send (()).expect ("Couldn't shut down relay"); + task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); + info! ("Relay stopped"); + Ok (()) } diff --git a/src/tests.rs b/src/tests.rs index 4ebdae4..f15c65c 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -20,6 +20,84 @@ use tracing::{debug, info}; use ptth_relay::load_templates; +// Poll for a few seconds till the server is ready +// If this takes more than 5 seconds-ish, it's bad, the test should +// fail + +async fn wait_for_any_server (relay_state: &ptth_relay::RelayState) { + for _ in 0..50 { + delay_for (Duration::from_millis (100)).await; + if ! relay_state.list_servers ().await.is_empty () { + break; + } + } +} + +struct TestingConfig { + server_name: &'static str, + api_key: &'static str, + + relay_port: u16, +} + +struct TestingRelay { + state: Arc , + task: tokio::task::JoinHandle >, + stop_tx: oneshot::Sender <()>, +} + +impl TestingRelay { + async fn new (testing_config: &TestingConfig) -> Self { + use ptth_relay::*; + + let tripcode = key_validity::BlakeHashWrapper::from_key (testing_config.api_key.as_bytes ()); + debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ()); + let config_file = config::file::Config { + iso: Default::default (), + port: Some (testing_config.relay_port), + servers: vec! [ + config::file::Server { + name: testing_config.server_name.to_string (), + tripcode, + display_name: None, + }, + ], + scraper_keys: vec! [], + }; + + let cfg = config::Config::try_from (config_file).expect ("Can't load config"); + + let state = Arc::new (RelayState::try_from (cfg).expect ("Can't create relay state")); + + let (stop_tx, stop_relay_rx) = oneshot::channel (); + let task = spawn ({ + let state = state.clone (); + async move { + run_relay ( + state, + Arc::new (load_templates (&PathBuf::new ())?), + stop_relay_rx, + None + ).await + } + }); + + assert! (state.list_servers ().await.is_empty ()); + + Self { + task, + stop_tx, + state, + } + } + + async fn graceful_shutdown (self) { + self.stop_tx.send (()).expect ("Couldn't shut down relay"); + self.task.await.expect ("Couldn't join relay").expect ("Relay error"); + info! ("Relay stopped"); + } +} + #[test] fn end_to_end () { use ptth_relay::key_validity::BlakeHashWrapper; @@ -139,8 +217,6 @@ fn end_to_end () { #[test] fn debug_proxy () { - use ptth_relay::key_validity::BlakeHashWrapper; - tracing_subscriber::fmt ().try_init ().ok (); let mut rt = Runtime::new ().expect ("Can't create runtime for testing"); @@ -152,39 +228,14 @@ fn debug_proxy () { let server_name = "aliens_wildland"; let api_key = "AnacondaHardcoverGrannyUnlatchLankinessMutate"; - let tripcode = BlakeHashWrapper::from_key (api_key.as_bytes ()); - debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ()); - let config_file = ptth_relay::config::file::Config { - iso: Default::default (), - port: Some (relay_port), - servers: vec! [ - ptth_relay::config::file::Server { - name: server_name.to_string (), - tripcode, - display_name: None, - }, - ], - scraper_keys: vec! [], + + let testing_config = TestingConfig { + server_name, + api_key, + relay_port, }; - let config = ptth_relay::config::Config::try_from (config_file).expect ("Can't load config"); - - let relay_state = Arc::new (ptth_relay::RelayState::try_from (config).expect ("Can't create relay state")); - - let (stop_relay_tx, stop_relay_rx) = oneshot::channel (); - let task_relay = spawn ({ - let relay_state = relay_state.clone (); - async move { - ptth_relay::run_relay ( - relay_state, - Arc::new (load_templates (&PathBuf::new ())?), - stop_relay_rx, - None - ).await - } - }); - - assert! (relay_state.list_servers ().await.is_empty ()); + let testing_relay = TestingRelay::new (&testing_config).await; // Start proxy @@ -211,9 +262,9 @@ fn debug_proxy () { }) }; - delay_for (Duration::from_millis (1000)).await; + wait_for_any_server (&testing_relay.state).await; - assert_eq! (relay_state.list_servers ().await, vec! [ + assert_eq! (testing_relay.state.list_servers ().await, vec! [ server_name.to_string (), ]); @@ -261,9 +312,7 @@ fn debug_proxy () { task_proxy.await.expect ("Couldn't join proxy").expect ("Proxy error"); info! ("Proxy stopped"); - stop_relay_tx.send (()).expect ("Couldn't shut down relay"); - task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); - info! ("Relay stopped"); + testing_relay.graceful_shutdown ().await; }); }