♻️ Package interesting request fields into MsgPack

main
_ 2020-10-27 20:20:06 -05:00
parent 15b18a9335
commit 406b13c3b1
4 changed files with 52 additions and 41 deletions

View File

@ -1,16 +1,17 @@
use std::{ use std::{
collections::*, collections::*,
error::Error, error::Error,
convert::{
Infallible,
TryInto,
},
net::SocketAddr,
str::FromStr,
sync::{
Arc
},
time::{Duration, Instant},
}; };
use std::convert::{Infallible};
use std::io::Write;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::sync::{
Arc
};
use std::time::{Duration, Instant};
use futures::channel::oneshot; use futures::channel::oneshot;
use hyper::{ use hyper::{
@ -24,22 +25,18 @@ use hyper::{
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use tokio::{ use tokio::{
sync::mpsc::{
channel,
Receiver,
},
sync::Mutex, sync::Mutex,
time::delay_for, time::delay_for,
}; };
use ptth::watcher::Watchers; use ptth::{
http_serde::*,
watcher::Watchers,
};
enum Message { enum Message {
Meow, Meow,
//HttpRequestRequest (String), HttpRequestResponse (RequestParts),
HttpRequestResponse (String),
// HttpResponseRequest (String),
HttpResponseResponse (Vec <u8>),
HttpResponseResponseStream (Body), HttpResponseResponseStream (Body),
} }
@ -79,11 +76,11 @@ async fn handle_wake (state: Arc <ServerState>, watcher_code: String)
async fn handle_http_listen (state: Arc <ServerState>, watcher_code: String) async fn handle_http_listen (state: Arc <ServerState>, watcher_code: String)
-> Response <Body> -> Response <Body>
{ {
println! ("Step 1"); //println! ("Step 1");
match Watchers::long_poll (state.watchers.clone (), watcher_code).await { match Watchers::long_poll (state.watchers.clone (), watcher_code).await {
Some (Message::HttpRequestResponse (uri)) => { Some (Message::HttpRequestResponse (parts)) => {
println! ("Step 3"); println! ("Step 3");
status_reply (StatusCode::OK, uri) status_reply (StatusCode::OK, rmp_serde::to_vec (&parts).unwrap ())
}, },
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"), _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"),
} }
@ -105,7 +102,7 @@ async fn handle_http_response (
println! ("Step 7"); println! ("Step 7");
if ! watchers.wake_one (Message::HttpResponseResponseStream (body), &req_id) if ! watchers.wake_one (Message::HttpResponseResponseStream (body), &req_id)
{ {
println! ("Step 8"); println! ("Step 8 (bad thing)");
return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n"); return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n");
} }
else { else {
@ -116,31 +113,31 @@ async fn handle_http_response (
} }
async fn handle_http_request ( async fn handle_http_request (
parts: RequestParts,
state: Arc <ServerState>, state: Arc <ServerState>,
watcher_code: String, watcher_code: String
uri: String
) )
-> Response <Body> -> Response <Body>
{ {
let req_id = format! ("client_{}", ulid::Ulid::new ().to_string ()); println! ("Step 2 {}", parts.id);
println! ("Step 2 {}", req_id);
let (s, r) = oneshot::channel (); let (s, r) = oneshot::channel ();
let timeout = Duration::from_secs (5); let timeout = Duration::from_secs (5);
let id_2 = req_id.clone (); let id_2 = parts.id.clone ();
{ {
let mut that = state.watchers.lock ().await; let mut that = state.watchers.lock ().await;
that.add_watcher_with_id (s, id_2) that.add_watcher_with_id (s, id_2)
} }
let req_id = parts.id.clone ();
tokio::spawn (async move { tokio::spawn (async move {
{ {
let mut watchers = state.watchers.lock ().await; let mut watchers = state.watchers.lock ().await;
println! ("Step 3"); println! ("Step 3");
if ! watchers.wake_one (Message::HttpRequestResponse (req_id.clone ()), &watcher_code) { if ! watchers.wake_one (Message::HttpRequestResponse (parts), &watcher_code) {
watchers.remove_watcher (&req_id); watchers.remove_watcher (&req_id);
} }
} }
@ -201,7 +198,7 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
-> Result <Response <Body>, Infallible> -> Result <Response <Body>, Infallible>
{ {
let path = req.uri ().path (); let path = req.uri ().path ();
println! ("{}", path); //println! ("{}", path);
if req.method () == Method::POST { if req.method () == Method::POST {
return Ok (if let Some (request_code) = prefix_match (path, "/http_response/") { return Ok (if let Some (request_code) = prefix_match (path, "/http_response/") {
@ -224,9 +221,14 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
} }
else if let Some (rest) = prefix_match (path, "/http_request/") { else if let Some (rest) = prefix_match (path, "/http_request/") {
if let Some (idx) = rest.find ('/') { if let Some (idx) = rest.find ('/') {
let listen_code = &rest [0..idx]; let listen_code = String::from (&rest [0..idx]);
let path = &rest [idx + 1..]; let path = String::from (&rest [idx + 1..]);
Ok (handle_http_request (state, listen_code.into (), path.into ()).await) let (parts, _) = req.into_parts ();
let parts = match parts.try_into () {
Ok (x) => x,
_ => return Ok (status_reply (StatusCode::BAD_REQUEST, "Couldn't convert request")),
};
Ok (handle_http_request (parts, state, listen_code).await)
} }
else { else {
Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format")) Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format"))

View File

@ -23,6 +23,8 @@ use tokio::{
time::delay_for, time::delay_for,
}; };
use ptth::http_serde::*;
#[tokio::main] #[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> { async fn main () -> Result <(), Box <dyn Error>> {
let client = Arc::new (Client::new ()); let client = Arc::new (Client::new ());
@ -42,7 +44,7 @@ async fn main () -> Result <(), Box <dyn Error>> {
let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands");
println! ("Step 1"); //println! ("Step 1");
let req_resp = match req_req.send ().await { let req_resp = match req_req.send ().await {
Err (e) => { Err (e) => {
println! ("Err: {:?}", e); println! ("Err: {:?}", e);
@ -62,12 +64,15 @@ async fn main () -> Result <(), Box <dyn Error>> {
println! ("Step 3"); println! ("Step 3");
let body = req_resp.bytes ().await?; let body = req_resp.bytes ().await?;
let body = String::from (std::str::from_utf8 (&body)?); let parts: RequestParts = match rmp_serde::from_read_ref (&body)
{
Ok (x) => x,
_ => continue,
};
println! ("Client requested {}", body); println! ("Client requested {}", parts.uri);
println! ("Step 4/5"); println! ("Step 4/5");
let payload = String::from ("Ha ha hue hue it worked.\n");
println! ("Step 6"); println! ("Step 6");
let client = client.clone (); let client = client.clone ();
@ -84,6 +89,8 @@ async fn main () -> Result <(), Box <dyn Error>> {
tokio::spawn (async move { tokio::spawn (async move {
let path = "/home/user/pictures/bzqcChY.jpg"; let path = "/home/user/pictures/bzqcChY.jpg";
let path = "/home/user/videos/Decearing Egg.webm"; let path = "/home/user/videos/Decearing Egg.webm";
let path = "/home/user/projects/2020/ptth/README.md";
let mut f = File::open (path).await.unwrap (); let mut f = File::open (path).await.unwrap ();
let mut tx = tx; let mut tx = tx;
let mut bytes_sent = 0; let mut bytes_sent = 0;
@ -98,7 +105,7 @@ async fn main () -> Result <(), Box <dyn Error>> {
break; break;
} }
tx.send (Ok::<_, Infallible> (buffer)).await; tx.send (Ok::<_, Infallible> (buffer)).await.unwrap ();
bytes_sent += bytes_read; bytes_sent += bytes_read;
println! ("Sent {} bytes", bytes_sent); println! ("Sent {} bytes", bytes_sent);
@ -107,7 +114,7 @@ async fn main () -> Result <(), Box <dyn Error>> {
} }
}); });
let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", body)).body (Body::wrap_stream (rx)); let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", parts.id)).body (Body::wrap_stream (rx));
println! ("Step 6"); println! ("Step 6");
match resp_req.send ().await { match resp_req.send ().await {

View File

@ -44,6 +44,7 @@ impl From <Method> for hyper::Method {
#[derive (Deserialize, Serialize)] #[derive (Deserialize, Serialize)]
pub struct RequestParts { pub struct RequestParts {
pub id: String,
pub method: Method, pub method: Method,
// Technically URIs are subtle and complex but I don't care // Technically URIs are subtle and complex but I don't care
@ -70,6 +71,7 @@ impl TryFrom <http::request::Parts> for RequestParts {
); );
Ok (Self { Ok (Self {
id: ulid::Ulid::new ().to_string (),
method, method,
uri, uri,
headers, headers,

View File

@ -34,7 +34,7 @@ impl <T: 'static + Send + Sync> Watchers <T> {
} }
pub fn wake_one (&mut self, msg: T, id: &str) -> bool { pub fn wake_one (&mut self, msg: T, id: &str) -> bool {
println! ("wake_one {}", id); //println! ("wake_one {}", id);
if let Some (waiter) = self.senders.remove (id) { if let Some (waiter) = self.senders.remove (id) {
waiter.send (msg).ok (); waiter.send (msg).ok ();
@ -50,7 +50,7 @@ impl <T: 'static + Send + Sync> Watchers <T> {
} }
pub async fn long_poll (that: Arc <Mutex <Self>>, id: String) -> Option <T> { pub async fn long_poll (that: Arc <Mutex <Self>>, id: String) -> Option <T> {
println! ("long_poll {}", id); //println! ("long_poll {}", id);
let (s, r) = oneshot::channel (); let (s, r) = oneshot::channel ();
let timeout = Duration::from_secs (5); let timeout = Duration::from_secs (5);