🚧 wip: working on a way to make P2 optional

main
_ 2021-09-10 15:36:03 -05:00
parent e1da97a517
commit 16bde3a2cf
3 changed files with 109 additions and 50 deletions

View File

@ -46,37 +46,45 @@ async fn main () -> anyhow::Result <()> {
let server_tcp_port = opt.server_tcp_port.unwrap_or (30382); let server_tcp_port = opt.server_tcp_port.unwrap_or (30382);
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?; let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
debug! ("Accepting local TCP connections from P1");
// End of per-port stuff // End of per-port stuff
// Beginning of per-connection stuff // Beginning of per-connection stuff
loop { let task_tcp_server = tokio::spawn (async move {
let (tcp_socket, _) = listener.accept ().await?; loop {
let connection = connection.clone (); let (tcp_socket, _) = listener.accept ().await?;
let server_id = server_id.clone (); let connection = connection.clone ();
let server_id = server_id.clone ();
tokio::spawn (async move {
let (local_recv, local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let ptth_conn = quic_demo::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
debug! ("Ended PTTH connection");
Ok::<_, anyhow::Error> (())
});
}
tokio::spawn (async move { Ok::<_, anyhow::Error> (())
let (local_recv, local_send) = tcp_socket.into_split (); });
debug! ("Starting PTTH connection"); debug! ("Accepting local TCP connections from P1");
let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; task_tcp_server.await??;
trace! ("Relaying bytes..."); Ok (())
let ptth_conn = quic_demo::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
debug! ("Ended PTTH connection");
Ok::<_, anyhow::Error> (())
});
}
} }

View File

@ -49,37 +49,87 @@ async fn main () -> anyhow::Result <()> {
}; };
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
let http_server = Server::bind (&http_addr); let http_server = Server::bind (&http_addr);
tokio::spawn (async move { let tcp_port = 30382;
http_server.serve (make_svc).await let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
let task_quic_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
while let Some (conn) = incoming.next ().await {
let relay_state = Arc::clone (&relay_state);
// Each new peer QUIC connection gets its own task
tokio::spawn (async move {
let active = relay_state.stats.quic.connect ();
debug! ("QUIC connections: {}", active);
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
Ok (_) => (),
Err (e) => warn! ("handle_quic_connection {:?}", e),
}
let active = relay_state.stats.quic.disconnect ();
debug! ("QUIC connections: {}", active);
});
}
Ok::<_, anyhow::Error> (())
})
};
let task_http_server = tokio::spawn (async move {
http_server.serve (make_svc).await?;
Ok::<_, anyhow::Error> (())
}); });
let task_tcp_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
loop {
let (tcp_socket, _) = tcp_listener.accept ().await?;
let server_id = "bogus_server".to_string ();
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let (client_recv, client_send) = tcp_socket.into_split ();
debug! ("Accepted direct TCP connection P1 --> P3");
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
let p4 = match p4_server_proxies.get ("bogus_server") {
Some (x) => x,
None => bail! ("That server isn't connected"),
};
unimplemented! ();
/*
p4.req_channel.send (RequestP2ToP4 {
client_send,
client_recv,
client_id: "bogus_client".to_string (),
}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?;
*/
Ok (())
});
}
Ok::<_, anyhow::Error> (())
})
};
debug! ("Serving HTTP on {:?}", http_addr); debug! ("Serving HTTP on {:?}", http_addr);
while let Some (conn) = incoming.next ().await { task_quic_server.await??;
let relay_state = Arc::clone (&relay_state); task_http_server.await??;
task_tcp_server.await??;
// Each new peer QUIC connection gets its own task
tokio::spawn (async move {
let active = relay_state.stats.quic.connect ();
debug! ("QUIC connections: {}", active);
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
Ok (_) => (),
Err (e) => warn! ("handle_quic_connection {:?}", e),
}
let active = relay_state.stats.quic.disconnect ();
debug! ("QUIC connections: {}", active);
});
}
Ok (()) Ok (())
} }
async fn handle_http (req: Request <Body>, relay_state: Arc <RelayState>) async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
-> anyhow::Result <Response <Body>> -> anyhow::Result <Response <Body>>
{ {
let debug_string; let debug_string;

View File

@ -21,6 +21,7 @@ pub use tokio::{
AsyncReadExt, AsyncReadExt,
AsyncWriteExt, AsyncWriteExt,
}, },
net::TcpListener,
sync::{ sync::{
Mutex, Mutex,
mpsc, mpsc,