From 4aa54c32d0946654dbcfb793f7d8d8895817eab8 Mon Sep 17 00:00:00 2001
From: _ <>
Date: Sat, 6 Mar 2021 18:55:05 +0000
Subject: [PATCH] :white_check_mark: test: add a test that uses debug_proxy
---
Cargo.lock | 1 +
Cargo.toml | 14 +--
crates/debug_proxy/src/lib.rs | 24 ++++--
crates/debug_proxy/src/main.rs | 5 +-
src/main.rs | 150 +++++++++++++++++++++++++++++++++
src/tests.rs | 131 ++++++++++++++++++++++++++++
6 files changed, 313 insertions(+), 12 deletions(-)
create mode 100644 src/main.rs
diff --git a/Cargo.lock b/Cargo.lock
index c10fff5..5b7b1d9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1626,6 +1626,7 @@ dependencies = [
name = "ptth"
version = "1.1.0"
dependencies = [
+ "anyhow",
"base64 0.13.0",
"blake3",
"chrono",
diff --git a/Cargo.toml b/Cargo.toml
index 719a1a8..520a9f7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,19 +23,21 @@ exclude = [
[dependencies]
-[dev-dependencies]
-
-base64 = "0.13.0"
+anyhow = "1.0.34"
blake3 = "0.3.7"
-chrono = {version = "0.4.19", features = ["serde"]}
-reqwest = { version = "0.10.8", features = ["stream"] }
tokio = { version = "0.2.22", features = ["full"] }
-tracing = "0.1.21"
tracing-subscriber = "0.2.15"
debug_proxy = { path = "crates/debug_proxy" }
ptth_relay = { path = "crates/ptth_relay" }
ptth_server = { path = "crates/ptth_server" }
+reqwest = { version = "0.10.8", features = ["stream"] }
+tracing = "0.1.21"
+
+[dev-dependencies]
+
+base64 = "0.13.0"
+chrono = {version = "0.4.19", features = ["serde"]}
[workspace]
diff --git a/crates/debug_proxy/src/lib.rs b/crates/debug_proxy/src/lib.rs
index 1c5ab43..ca33300 100644
--- a/crates/debug_proxy/src/lib.rs
+++ b/crates/debug_proxy/src/lib.rs
@@ -17,7 +17,10 @@ use reqwest::Client;
use tokio::{
spawn,
stream::StreamExt,
- sync::mpsc,
+ sync::{
+ mpsc,
+ oneshot,
+ },
};
use ulid::Ulid;
@@ -112,7 +115,12 @@ async fn handle_all (req: Request
, state: Arc )
Ok (resp.body (Body::wrap_stream (rx))?)
}
-pub async fn run_proxy (addr: SocketAddr, upstream_authority: String) -> anyhow::Result <()> {
+pub async fn run_proxy (
+ addr: SocketAddr,
+ upstream_authority: String,
+ shutdown_oneshot: oneshot::Receiver <()>,
+) -> anyhow::Result <()>
+{
let state = Arc::new (State {
client: Client::builder ().build ()?,
upstream_authority,
@@ -130,7 +138,13 @@ pub async fn run_proxy (addr: SocketAddr, upstream_authority: String) -> anyhow:
}
});
- tracing::info! ("Binding to {}", addr);
- Ok (Server::bind (&addr)
- .serve (make_svc).await?)
+ let server = Server::bind (&addr)
+ .serve (make_svc);
+
+ tracing::info! ("Proxy binding to {}", addr);
+ server.with_graceful_shutdown (async {
+ shutdown_oneshot.await.ok ();
+ }).await?;
+
+ Ok (())
}
diff --git a/crates/debug_proxy/src/main.rs b/crates/debug_proxy/src/main.rs
index feddf5f..2653190 100644
--- a/crates/debug_proxy/src/main.rs
+++ b/crates/debug_proxy/src/main.rs
@@ -2,9 +2,12 @@ use std::{
net::SocketAddr,
};
+use tokio::sync::oneshot;
+
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let addr = SocketAddr::from(([0, 0, 0, 0], 11509));
- debug_proxy::run_proxy (addr, "127.0.0.1:4000".to_string ()).await
+ let (_stop_proxy_tx, stop_proxy_rx) = oneshot::channel ();
+ debug_proxy::run_proxy (addr, "127.0.0.1:4000".to_string (), stop_proxy_rx).await
}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..db63cd1
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,150 @@
+use std::{
+ convert::TryFrom,
+ net::SocketAddr,
+ path::PathBuf,
+ sync::Arc,
+ time::Duration,
+};
+
+use reqwest::Client;
+use tokio::{
+ spawn,
+ sync::oneshot,
+ time::delay_for,
+};
+use tracing::{
+ debug,
+ info,
+};
+
+#[tokio::main]
+async fn main () -> anyhow::Result <()> {
+ use ptth_relay::key_validity::BlakeHashWrapper;
+
+ tracing_subscriber::fmt ().try_init ().ok ();
+
+ let relay_port = 4002;
+ let proxy_port = 11510;
+
+ // Start relay
+
+ let server_name = "aliens_wildland";
+ let api_key = "AnacondaHardcoverGrannyUnlatchLankinessMutate";
+ let tripcode = BlakeHashWrapper::from_key (api_key.as_bytes ());
+ debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ());
+ let config_file = ptth_relay::config::file::Config {
+ iso: Default::default (),
+ port: Some (relay_port),
+ servers: vec! [
+ ptth_relay::config::file::Server {
+ name: server_name.to_string (),
+ tripcode,
+ display_name: None,
+ },
+ ],
+ scraper_keys: vec! [],
+ };
+
+ let config = ptth_relay::config::Config::try_from (config_file).expect ("Can't load config");
+
+ let relay_state = Arc::new (ptth_relay::RelayState::try_from (config).expect ("Can't create relay state"));
+
+ let (stop_relay_tx, stop_relay_rx) = oneshot::channel ();
+ let task_relay = spawn ({
+ let relay_state = relay_state.clone ();
+ async move {
+ ptth_relay::run_relay (
+ relay_state,
+ Arc::new (ptth_relay::load_templates (&PathBuf::new ())?),
+ stop_relay_rx,
+ None
+ ).await
+ }
+ });
+
+ assert! (relay_state.list_servers ().await.is_empty ());
+
+ // Start proxy
+
+ let (_stop_proxy_tx, stop_proxy_rx) = oneshot::channel ();
+ let task_proxy = spawn (async move {
+ debug_proxy::run_proxy (SocketAddr::from (([0, 0, 0, 0], proxy_port)), format! ("127.0.0.1:{}", relay_port), stop_proxy_rx).await
+ });
+
+ // Start server
+
+ let relay_url = format! ("http://127.0.0.1:{}", proxy_port);
+
+ let config_file = ptth_server::ConfigFile {
+ name: server_name.into (),
+ api_key: api_key.into (),
+ relay_url: format! ("{}/7ZSFUKGV", relay_url),
+ file_server_root: None,
+ };
+
+ let (stop_server_tx, stop_server_rx) = oneshot::channel ();
+ let task_server = {
+ spawn (async move {
+ ptth_server::run_server (config_file, stop_server_rx, None, None).await
+ })
+ };
+
+ delay_for (Duration::from_millis (1000)).await;
+
+ assert_eq! (relay_state.list_servers ().await, vec! [
+ server_name.to_string (),
+ ]);
+
+ let client = Client::builder ()
+ .timeout (Duration::from_secs (2))
+ .build ().expect ("Couldn't build HTTP client");
+
+ let resp = client.get (&format! ("{}/frontend/relay_up_check", relay_url))
+ .send ().await.expect ("Couldn't check if relay is up").bytes ().await.expect ("Couldn't check if relay is up");
+
+ assert_eq! (resp, "Relay is up\n");
+
+ let resp = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name))
+ .send ().await.expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license");
+
+ if blake3::hash (&resp) != blake3::Hash::from ([
+ 0xca, 0x02, 0x92, 0x78,
+ 0x9c, 0x0a, 0x0e, 0xcb,
+ 0xa7, 0x06, 0xf4, 0xb3,
+ 0xf3, 0x49, 0x30, 0x07,
+
+ 0xa9, 0x95, 0x17, 0x31,
+ 0xc1, 0xd4, 0x32, 0xc5,
+ 0x2c, 0x4a, 0xac, 0x1f,
+ 0x1a, 0xbb, 0xa8, 0xef,
+ ]) {
+ panic! ("{}", String::from_utf8 (resp.to_vec ()).expect ("???"));
+ }
+
+ // Requesting a file from a server that isn't registered
+ // will error out
+
+ let resp = client.get (&format! ("{}/frontend/servers/obviously_this_server_does_not_exist/files/COPYING", relay_url))
+ .send ().await.expect ("Couldn't send request to bogus server");
+
+ assert_eq! (resp.status (), reqwest::StatusCode::NOT_FOUND);
+
+ delay_for (Duration::from_secs (60)).await;
+
+ info! ("Shutting down end-to-end test");
+
+ stop_server_tx.send (()).expect ("Couldn't shut down server");
+ stop_relay_tx.send (()).expect ("Couldn't shut down relay");
+
+ info! ("Sent stop messages");
+
+ task_relay.await.expect ("Couldn't join relay").expect ("Relay error");
+ info! ("Relay stopped");
+
+ task_proxy.await.expect ("Couldn't join proxy");
+
+ task_server.await.expect ("Couldn't join server").expect ("Server error");
+ info! ("Server stopped");
+
+ Ok (())
+}
diff --git a/src/tests.rs b/src/tests.rs
index 45e2f91..4ebdae4 100644
--- a/src/tests.rs
+++ b/src/tests.rs
@@ -1,5 +1,6 @@
use std::{
convert::{TryFrom, TryInto},
+ net::SocketAddr,
path::PathBuf,
sync::{
Arc,
@@ -136,6 +137,136 @@ fn end_to_end () {
});
}
+#[test]
+fn debug_proxy () {
+ use ptth_relay::key_validity::BlakeHashWrapper;
+
+ tracing_subscriber::fmt ().try_init ().ok ();
+ let mut rt = Runtime::new ().expect ("Can't create runtime for testing");
+
+ rt.block_on (async {
+ let relay_port = 4002;
+ let proxy_port = 11510;
+
+ // Start relay
+
+ let server_name = "aliens_wildland";
+ let api_key = "AnacondaHardcoverGrannyUnlatchLankinessMutate";
+ let tripcode = BlakeHashWrapper::from_key (api_key.as_bytes ());
+ debug! ("Relay is expecting tripcode {}", tripcode.encode_base64 ());
+ let config_file = ptth_relay::config::file::Config {
+ iso: Default::default (),
+ port: Some (relay_port),
+ servers: vec! [
+ ptth_relay::config::file::Server {
+ name: server_name.to_string (),
+ tripcode,
+ display_name: None,
+ },
+ ],
+ scraper_keys: vec! [],
+ };
+
+ let config = ptth_relay::config::Config::try_from (config_file).expect ("Can't load config");
+
+ let relay_state = Arc::new (ptth_relay::RelayState::try_from (config).expect ("Can't create relay state"));
+
+ let (stop_relay_tx, stop_relay_rx) = oneshot::channel ();
+ let task_relay = spawn ({
+ let relay_state = relay_state.clone ();
+ async move {
+ ptth_relay::run_relay (
+ relay_state,
+ Arc::new (load_templates (&PathBuf::new ())?),
+ stop_relay_rx,
+ None
+ ).await
+ }
+ });
+
+ assert! (relay_state.list_servers ().await.is_empty ());
+
+ // Start proxy
+
+ let (stop_proxy_tx, stop_proxy_rx) = oneshot::channel ();
+ let task_proxy = spawn (async move {
+ debug_proxy::run_proxy (SocketAddr::from (([0, 0, 0, 0], proxy_port)), format! ("127.0.0.1:{}", relay_port), stop_proxy_rx).await
+ });
+
+ // Start server
+
+ let relay_url = format! ("http://127.0.0.1:{}", proxy_port);
+
+ let config_file = ptth_server::ConfigFile {
+ name: server_name.into (),
+ api_key: api_key.into (),
+ relay_url: format! ("{}/7ZSFUKGV", relay_url),
+ file_server_root: None,
+ };
+
+ let (stop_server_tx, stop_server_rx) = oneshot::channel ();
+ let task_server = {
+ spawn (async move {
+ ptth_server::run_server (config_file, stop_server_rx, None, None).await
+ })
+ };
+
+ delay_for (Duration::from_millis (1000)).await;
+
+ assert_eq! (relay_state.list_servers ().await, vec! [
+ server_name.to_string (),
+ ]);
+
+ let client = Client::builder ()
+ .timeout (Duration::from_secs (2))
+ .build ().expect ("Couldn't build HTTP client");
+
+ let resp = client.get (&format! ("{}/frontend/relay_up_check", relay_url))
+ .send ().await.expect ("Couldn't check if relay is up").bytes ().await.expect ("Couldn't check if relay is up");
+
+ assert_eq! (resp, "Relay is up\n");
+
+ let resp = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name))
+ .send ().await.expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license");
+
+ if blake3::hash (&resp) != blake3::Hash::from ([
+ 0xca, 0x02, 0x92, 0x78,
+ 0x9c, 0x0a, 0x0e, 0xcb,
+ 0xa7, 0x06, 0xf4, 0xb3,
+ 0xf3, 0x49, 0x30, 0x07,
+
+ 0xa9, 0x95, 0x17, 0x31,
+ 0xc1, 0xd4, 0x32, 0xc5,
+ 0x2c, 0x4a, 0xac, 0x1f,
+ 0x1a, 0xbb, 0xa8, 0xef,
+ ]) {
+ panic! ("{}", String::from_utf8 (resp.to_vec ()).expect ("???"));
+ }
+
+ // Requesting a file from a server that isn't registered
+ // will error out
+
+ let resp = client.get (&format! ("{}/frontend/servers/obviously_this_server_does_not_exist/files/COPYING", relay_url))
+ .send ().await.expect ("Couldn't send request to bogus server");
+
+ assert_eq! (resp.status (), reqwest::StatusCode::NOT_FOUND);
+
+ info! ("Shutting down end-to-end test");
+
+ stop_server_tx.send (()).expect ("Couldn't shut down server");
+ task_server.await.expect ("Couldn't join server").expect ("Server error");
+ info! ("Server stopped");
+
+ stop_proxy_tx.send (()).expect ("Couldn't shut down proxy");
+ task_proxy.await.expect ("Couldn't join proxy").expect ("Proxy error");
+ info! ("Proxy stopped");
+
+ stop_relay_tx.send (()).expect ("Couldn't shut down relay");
+ task_relay.await.expect ("Couldn't join relay").expect ("Relay error");
+ info! ("Relay stopped");
+ });
+}
+
#[test]
fn scraper_endpoints () {
let mut rt = Runtime::new ().expect ("Can't create runtime for testing");