🚧 wip: idea for tunneling TCP over HTTP

There's a lot of missing pieces, but the big picture is like this:

- Use 2 completely separate HTTP streams, and try to keep them alive as long
as possible, each in basically half-duplex mode
- Each stream has a long-running PUT and GET, sort of like station307
- Each end has to be terminated by a native app that either connects to a local
TCP server, or acts as a local TCP server
- No clue how it would work for multiple connections on the same port. Poorly,
I guess?
- It's probably gonna run like garbage because we're splitting TCP into
2 TCP streams, and although backpressure might work, the ACKs will be less
efficient. And the congestion control might get confused

My only goal is to tunnel Tracy over it, so that I can have that remotely.
main
_ 2021-01-12 00:22:55 +00:00
parent 574f660c1a
commit 0cb24695d0
5 changed files with 236 additions and 0 deletions

34
Cargo.lock generated
View File

@ -1656,6 +1656,40 @@ dependencies = [
"uom",
]
[[package]]
name = "ptth_forwarding"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.12.3",
"futures",
"reqwest",
"rmp-serde",
"serde",
"serde_json",
"structopt",
"thiserror",
"tokio",
"tracing",
"tracing-futures",
"tracing-subscriber",
]
[[package]]
name = "ptth_forwarding_relay"
version = "0.1.0"
dependencies = [
"anyhow",
"futures",
"hyper",
"thiserror",
"tokio",
"tracing",
"tracing-futures",
"tracing-subscriber",
"ulid",
]
[[package]]
name = "ptth_kv"
version = "0.1.0"

View File

@ -0,0 +1,22 @@
[package]
name = "ptth_forwarding"
version = "0.1.0"
authors = ["_"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.34"
base64 = "0.12.3"
futures = "0.3.7"
reqwest = { version = "0.10.8", features = ["stream"] }
rmp-serde = "0.14.4"
serde = {version = "1.0.117", features = ["derive"]}
serde_json = "1.0.60"
structopt = "0.3.20"
thiserror = "1.0.22"
tokio = { version = "0.2.22", features = ["full"] }
tracing = "0.1.21"
tracing-futures = "0.2.4"
tracing-subscriber = "0.2.15"

View File

@ -0,0 +1,24 @@
use reqwest::Client;
use tokio::{
io::AsyncWriteExt,
net::TcpStream,
stream::StreamExt,
};
#[tokio::main]
async fn main () -> anyhow::Result <()> {
let client = Client::builder ()
.build ()?;
let mut tcp_stream = TcpStream::connect ("127.0.0.1:4010").await?;
let resp = client.get ("http://127.0.0.1:4003/").send ().await?;
let mut downstream = resp.bytes_stream ();
while let Some (Ok (item)) = downstream.next ().await {
println! ("Chunk: {:?}", item);
tcp_stream.write_all (&item).await?;
}
Ok (())
}

View File

@ -0,0 +1,18 @@
[package]
name = "ptth_forwarding_relay"
version = "0.1.0"
authors = ["_"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.34"
futures = "0.3.7"
hyper = "0.13.8"
thiserror = "1.0.22"
tokio = { version = "0.2.22", features = ["full"] }
tracing = "0.1.21"
tracing-futures = "0.2.4"
tracing-subscriber = "0.2.15"
ulid = "0.4.1"

View File

@ -0,0 +1,138 @@
use std::{
sync::Arc,
time::Duration,
};
use hyper::{
Body,
Method,
Request,
Response,
StatusCode,
};
use tokio::{
spawn,
sync::mpsc,
time::interval,
};
use tracing::{
info, trace,
};
use tracing_subscriber::{
fmt,
fmt::format::FmtSpan,
EnvFilter,
};
use ulid::Ulid;
pub struct RelayState {
}
pub struct HttpService {
state: Arc <RelayState>
}
impl HttpService {
pub fn new () -> Self {
Self {
state: Arc::new (RelayState {}),
}
}
pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> {
use std::net::SocketAddr;
use hyper::{
server::Server,
service::{
make_service_fn,
service_fn,
},
};
let make_svc = make_service_fn (|_conn| {
let state = self.state.clone ();
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
Self::handle_all (req, state)
}))
}
});
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let server = Server::bind (&addr)
.serve (make_svc)
;
server.await
}
async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
-> Result <Response <Body>, anyhow::Error>
{
if req.method () == Method::GET {
return Self::handle_gets (req, &*state).await;
}
Ok::<_, anyhow::Error> (Response::builder ()
.body (Body::from ("hello\n"))?)
}
async fn handle_gets (req: Request <Body>, state: &RelayState)
-> Result <Response <Body>, anyhow::Error>
{
let (mut tx, rx) = mpsc::channel (1);
spawn (async move {
let id = Ulid::new ().to_string ();
trace! ("Downstream {} started", id);
Self::handle_downstream (tx).await.ok ();
trace! ("Downstream {} ended", id);
});
Ok::<_, anyhow::Error> (Response::builder ()
.body (Body::wrap_stream (rx))?)
}
async fn handle_downstream (mut tx: mpsc::Sender <anyhow::Result <String>>) -> Result <(), anyhow::Error> {
let mut int = interval (Duration::from_secs (1));
let mut counter = 0u64;
loop {
int.tick ().await;
tx.send (Ok::<_, anyhow::Error> (format! ("Counter: {}\n", counter))).await?;
counter += 1;
}
Ok (())
}
}
#[tokio::main]
async fn main () -> Result <(), anyhow::Error> {
use std::time::Duration;
use tokio::{
spawn,
time::interval,
};
fmt ()
.with_env_filter (EnvFilter::from_default_env ())
.with_span_events (FmtSpan::CLOSE)
.init ()
;
let service = HttpService::new ();
info! ("Starting relay");
Ok (service.serve (4003).await?)
}