wip: debug proxy now owns a filter which can drop or modify request bodies

main
_ 2021-03-06 22:58:23 +00:00
parent a980d151fc
commit 9648a9853c
5 changed files with 224 additions and 184 deletions

5
Cargo.lock generated
View File

@ -165,9 +165,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.42" version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d" checksum = "d3340571769500ddef1e94b45055fabed6b08a881269b7570c830b8f32ef84ef"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -468,6 +468,7 @@ name = "debug_proxy"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"futures-util", "futures-util",
"http", "http",
"hyper", "hyper",

View File

@ -9,6 +9,7 @@ license = "AGPL-3.0"
[dependencies] [dependencies]
anyhow = "1.0.34" anyhow = "1.0.34"
async-trait = "0.1.45"
futures-util = "0.3.8" futures-util = "0.3.8"
http = "0.2.1" http = "0.2.1"
hyper = { version = "0.14.4", features = ["server", "stream"] } hyper = { version = "0.14.4", features = ["server", "stream"] }

View File

@ -3,12 +3,14 @@ use std::{
sync::Arc, sync::Arc,
}; };
use async_trait::async_trait;
use futures_util::StreamExt; use futures_util::StreamExt;
use hyper::{ use hyper::{
Body, Body,
Request, Request,
Response, Response,
Server, Server,
body::Bytes,
service::{ service::{
make_service_fn, make_service_fn,
service_fn, service_fn,
@ -25,18 +27,77 @@ use tokio::{
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use ulid::Ulid; use ulid::Ulid;
struct State { #[async_trait]
client: Client, trait ProxyFilter {
upstream_authority: String, async fn request_body (&self, mut body: Body, tx: mpsc::Sender <Result <Bytes, hyper::Error>>) -> anyhow::Result <()>;
} }
async fn handle_all (req: Request <Body>, state: Arc <State>) struct PassthroughFilter {}
#[async_trait]
impl ProxyFilter for PassthroughFilter {
async fn request_body (&self, mut body: Body, tx: mpsc::Sender <Result <Bytes, hyper::Error>>) -> anyhow::Result <()> {
let mut bytes_transferred = 0;
loop {
let item = body.next ().await;
if let Some (item) = item {
if let Ok (item) = &item {
bytes_transferred += item.len ();
}
tx.send (item).await?;
}
else {
// Finished
break;
}
}
Ok (())
}
}
struct RequestBodyDropFilter {}
#[async_trait]
impl ProxyFilter for RequestBodyDropFilter {
async fn request_body (&self, mut body: Body, tx: mpsc::Sender <Result <Bytes, hyper::Error>>) -> anyhow::Result <()> {
let mut bytes_transferred = 0;
loop {
let item = body.next ().await;
if let Some (item) = item {
if let Ok (item) = &item {
bytes_transferred += item.len ();
}
// tx.send (item).await?;
tracing::debug! ("RequestBodyDropFilter dropping chunk");
}
else {
// Finished
break;
}
}
Ok (())
}
}
struct State <PF> {
client: Client,
upstream_authority: String,
proxy_filter: Arc <PF>,
}
async fn handle_all <PF: 'static + ProxyFilter + Sync + Send> (req: Request <Body>, state: Arc <State <PF>>)
-> anyhow::Result <Response <Body>> -> anyhow::Result <Response <Body>>
{ {
let req_id = Ulid::new ().to_string (); let req_id = Ulid::new ().to_string ();
let (head, mut body) = req.into_parts (); let (head, mut body) = req.into_parts ();
tracing::trace! ("{} Got URI {}", req_id, head.uri); tracing::debug! ("{} Got URI {}", req_id, head.uri);
let upstream_authority = state.upstream_authority.clone (); let upstream_authority = state.upstream_authority.clone ();
@ -55,27 +116,10 @@ async fn handle_all (req: Request <Body>, state: Arc <State>)
let (tx, rx) = mpsc::channel (1); let (tx, rx) = mpsc::channel (1);
spawn ({ spawn ({
let req_id = req_id.clone (); let req_id = req_id.clone ();
let proxy_filter = state.proxy_filter.clone ();
async move { async move {
let mut bytes_transferred = 0; proxy_filter.request_body (body, tx).await
loop {
let item = body.next ().await;
if let Some (item) = item {
if let Ok (item) = &item {
bytes_transferred += item.len ();
}
tx.send (item).await?;
}
else {
// Finished
break;
}
}
tracing::trace! ("{} Request body bytes: {}", req_id, bytes_transferred);
Ok::<_, anyhow::Error> (())
} }
}); });
@ -122,9 +166,13 @@ pub async fn run_proxy (
shutdown_oneshot: oneshot::Receiver <()>, shutdown_oneshot: oneshot::Receiver <()>,
) -> anyhow::Result <()> ) -> anyhow::Result <()>
{ {
let filter = PassthroughFilter {};
// let filter = RequestBodyDropFilter {};
let state = Arc::new (State { let state = Arc::new (State {
client: Client::builder ().build ()?, client: Client::builder ().build ()?,
upstream_authority, upstream_authority,
proxy_filter: Arc::new (filter),
}); });
let make_svc = make_service_fn (|_conn| { let make_svc = make_service_fn (|_conn| {

View File

@ -106,9 +106,8 @@ async fn handle_one_req (
if e.is_request () { if e.is_request () {
warn! ("Error while POSTing response. Client probably hung up."); warn! ("Error while POSTing response. Client probably hung up.");
} }
else {
error! ("Err: {:?}", e); error! ("Err: {:?}", e);
}
}, },
} }

View File

@ -9,7 +9,6 @@ use std::{
}; };
use tokio::{ use tokio::{
runtime::Runtime,
spawn, spawn,
sync::oneshot, sync::oneshot,
}; };
@ -42,8 +41,9 @@ async fn testing_client_checks (
assert_eq! (resp, "Relay is up\n"); assert_eq! (resp, "Relay is up\n");
let resp = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name)) let req = client.get (&format! ("{}/frontend/servers/{}/files/COPYING", relay_url, server_name))
.send ().await.expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license"); .send ();
let resp = tokio::time::timeout (Duration::from_secs (2), req).await.expect ("Request timed out").expect ("Couldn't find license").bytes ().await.expect ("Couldn't find license");
if blake3::hash (&resp) != blake3::Hash::from ([ if blake3::hash (&resp) != blake3::Hash::from ([
0xca, 0x02, 0x92, 0x78, 0xca, 0x02, 0x92, 0x78,
@ -174,16 +174,13 @@ impl TestingServer {
} }
} }
#[test] #[tokio::test]
fn end_to_end () { async fn end_to_end () {
// Prefer this form for tests, since all tests share one process // Prefer this form for tests, since all tests share one process
// and we don't care if another test already installed a subscriber. // and we don't care if another test already installed a subscriber.
//tracing_subscriber::fmt ().try_init ().ok (); //tracing_subscriber::fmt ().try_init ().ok ();
let rt = Runtime::new ().expect ("Can't create runtime for testing");
// Spawn the root task
rt.block_on (async {
let relay_port = 4000; let relay_port = 4000;
// No proxy // No proxy
let proxy_port = relay_port; let proxy_port = relay_port;
@ -214,15 +211,14 @@ fn end_to_end () {
testing_server.graceful_shutdown ().await; testing_server.graceful_shutdown ().await;
testing_relay.graceful_shutdown ().await; testing_relay.graceful_shutdown ().await;
});
} }
#[test] #[tokio::test]
fn debug_proxy () { async fn debug_proxy () {
tracing_subscriber::fmt ().try_init ().ok (); tracing_subscriber::fmt ()
let rt = Runtime::new ().expect ("Can't create runtime for testing"); .with_env_filter (tracing_subscriber::EnvFilter::from_default_env ())
.try_init ().ok ();
rt.block_on (async {
let relay_port = 4002; let relay_port = 4002;
let proxy_port = 11510; let proxy_port = 11510;
@ -271,14 +267,10 @@ fn debug_proxy () {
info! ("Proxy stopped"); info! ("Proxy stopped");
testing_relay.graceful_shutdown ().await; testing_relay.graceful_shutdown ().await;
});
} }
#[test] #[tokio::test]
fn scraper_endpoints () { async fn scraper_endpoints () {
let rt = Runtime::new ().expect ("Can't create runtime for testing");
rt.block_on (async {
use ptth_relay::*; use ptth_relay::*;
let config_file = config::file::Config { let config_file = config::file::Config {
@ -341,5 +333,4 @@ fn scraper_endpoints () {
stop_relay_tx.send (()).expect ("Couldn't shut down relay"); stop_relay_tx.send (()).expect ("Couldn't shut down relay");
task_relay.await.expect ("Couldn't join relay").expect ("Relay error"); task_relay.await.expect ("Couldn't join relay").expect ("Relay error");
});
} }