add debug_proxy which I can probably use to inject network problems during tests
parent
56a9c0cbeb
commit
27336d8571
|
@ -479,6 +479,19 @@ dependencies = [
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "debug_proxy"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"http",
|
||||||
|
"hyper",
|
||||||
|
"reqwest",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "digest"
|
name = "digest"
|
||||||
version = "0.8.1"
|
version = "0.8.1"
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
[package]
|
||||||
|
|
||||||
|
name = "debug_proxy"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Trish"]
|
||||||
|
edition = "2018"
|
||||||
|
license = "AGPL-3.0"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
|
||||||
|
anyhow = "1.0.34"
|
||||||
|
http = "0.2.1"
|
||||||
|
hyper = "0.13.8"
|
||||||
|
reqwest = { version = "0.10.8", features = ["stream"] }
|
||||||
|
tokio = { version = "0.2.22", features = ["full"] }
|
||||||
|
tracing = "0.1.21"
|
||||||
|
tracing-subscriber = "0.2.15"
|
|
@ -0,0 +1,120 @@
|
||||||
|
use std::{
|
||||||
|
net::SocketAddr,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use hyper::{
|
||||||
|
Body,
|
||||||
|
Request,
|
||||||
|
Response,
|
||||||
|
Server,
|
||||||
|
service::{
|
||||||
|
make_service_fn,
|
||||||
|
service_fn,
|
||||||
|
},
|
||||||
|
StatusCode,
|
||||||
|
};
|
||||||
|
use reqwest::Client;
|
||||||
|
use tokio::{
|
||||||
|
spawn,
|
||||||
|
stream::StreamExt,
|
||||||
|
sync::mpsc,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct State {
|
||||||
|
client: Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_all (req: Request <Body>, state: Arc <State>)
|
||||||
|
-> anyhow::Result <Response <Body>>
|
||||||
|
{
|
||||||
|
let (head, mut body) = req.into_parts ();
|
||||||
|
|
||||||
|
tracing::trace! ("Got URI {}", head.uri);
|
||||||
|
|
||||||
|
let mut new_uri = head.uri.clone ().into_parts ();
|
||||||
|
new_uri.scheme = Some (http::uri::Scheme::HTTPS);
|
||||||
|
new_uri.authority = Some (http::uri::Authority::from_static ("example.com"));
|
||||||
|
let new_uri = http::Uri::from_parts (new_uri)?;
|
||||||
|
|
||||||
|
tracing::trace! ("Rebuilt URI as {}", new_uri);
|
||||||
|
|
||||||
|
let mut upstream_req = state.client.request (head.method, &new_uri.to_string ());
|
||||||
|
for (k, v) in &head.headers {
|
||||||
|
// upstream_req = upstream_req.header (k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (mut tx, rx) = mpsc::channel (1);
|
||||||
|
spawn (async move {
|
||||||
|
loop {
|
||||||
|
let item = body.next ().await;
|
||||||
|
|
||||||
|
if let Some (item) = item {
|
||||||
|
tx.send (item).await?;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Finished
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
});
|
||||||
|
|
||||||
|
let upstream_resp = upstream_req.body (reqwest::Body::wrap_stream (rx)).send ().await?;
|
||||||
|
|
||||||
|
let mut resp = Response::builder ()
|
||||||
|
.status (upstream_resp.status ());
|
||||||
|
|
||||||
|
for (k, v) in upstream_resp.headers () {
|
||||||
|
resp = resp.header (k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (mut tx, rx) = mpsc::channel (1);
|
||||||
|
spawn (async move {
|
||||||
|
let mut body = upstream_resp.bytes_stream ();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let item = body.next ().await;
|
||||||
|
|
||||||
|
if let Some (item) = item {
|
||||||
|
tx.send (item).await?;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Finished
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok (resp.body (Body::wrap_stream (rx))?)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main () -> anyhow::Result <()> {
|
||||||
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
|
let addr = SocketAddr::from(([0, 0, 0, 0], 11509));
|
||||||
|
|
||||||
|
let state = Arc::new (State {
|
||||||
|
client: Client::builder ().build ()?,
|
||||||
|
});
|
||||||
|
|
||||||
|
let make_svc = make_service_fn (|_conn| {
|
||||||
|
let state = state.clone ();
|
||||||
|
|
||||||
|
async {
|
||||||
|
Ok::<_, String> (service_fn (move |req| {
|
||||||
|
let state = state.clone ();
|
||||||
|
|
||||||
|
handle_all (req, state)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tracing::info! ("Binding to {}", addr);
|
||||||
|
Ok (Server::bind (&addr)
|
||||||
|
.serve (make_svc).await?)
|
||||||
|
}
|
|
@ -35,7 +35,7 @@ use ptth_server::{
|
||||||
};
|
};
|
||||||
|
|
||||||
async fn handle_all (req: Request <Body>, state: Arc <State>)
|
async fn handle_all (req: Request <Body>, state: Arc <State>)
|
||||||
-> Result <Response <Body>, anyhow::Error>
|
-> anyhow::Result <Response <Body>>
|
||||||
{
|
{
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use hyper::header::HeaderName;
|
use hyper::header::HeaderName;
|
||||||
|
@ -82,7 +82,7 @@ pub struct ConfigFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main () -> Result <(), anyhow::Error> {
|
async fn main () -> anyhow::Result <()> {
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let path = PathBuf::from ("./config/ptth_server.toml");
|
let path = PathBuf::from ("./config/ptth_server.toml");
|
||||||
|
|
Loading…
Reference in New Issue