Concept is proven, though it won't work for streaming

main
_ 2020-10-27 03:27:25 +00:00
parent e3aa61bb9a
commit 0cc61796c0
4 changed files with 122 additions and 21 deletions

View File

@ -1,4 +1,5 @@
[package]
name = "ptth"
version = "0.1.0"
authors = ["_"]
@ -7,7 +8,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.7"
hyper = "0.13.8"
reqwest = "0.10.8"
tokio = { version = "0.2", features = ["full"] }
ulid = "0.4.1"

View File

@ -26,7 +26,10 @@ use ptth::watcher::Watchers;
#[derive (Clone)]
enum Message {
Meow,
HttpRequest (String),
//HttpRequestRequest (String),
HttpRequestResponse (String),
// HttpResponseRequest (String),
HttpResponseResponse (String),
}
#[derive (Default)]
@ -65,12 +68,41 @@ async fn handle_wake (state: Arc <ServerState>, watcher_code: String)
async fn handle_http_listen (state: Arc <ServerState>, watcher_code: String)
-> Response <Body>
{
println! ("Step 1");
match Watchers::long_poll (state.watchers.clone (), watcher_code).await {
Some (Message::HttpRequest (uri)) => status_reply (StatusCode::OK, uri),
Some (Message::HttpRequestResponse (uri)) => {
println! ("Step 3");
status_reply (StatusCode::OK, uri)
},
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"),
}
}
async fn handle_http_response (
state: Arc <ServerState>,
req_id: String,
response: String
)
-> Response <Body>
{
println! ("Step 6");
{
let mut watchers = state.watchers.lock ().await;
println! ("Step 7");
if ! watchers.wake_one (Message::HttpResponseResponse (response), &req_id)
{
println! ("Step 8");
return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n");
}
else {
println! ("Step 8");
status_reply (StatusCode::OK, "ok\n")
}
}
}
async fn handle_http_request (
state: Arc <ServerState>,
watcher_code: String,
@ -78,13 +110,42 @@ async fn handle_http_request (
)
-> Response <Body>
{
let req_id = format! ("client_{}", ulid::Ulid::new ().to_string ());
println! ("Step 2 {}", req_id);
let (s, r) = oneshot::channel ();
let timeout = Duration::from_secs (5);
let id_2 = req_id.clone ();
{
let mut that = state.watchers.lock ().await;
that.add_watcher_with_id (s, id_2)
}
tokio::spawn (async move {
{
let mut watchers = state.watchers.lock ().await;
if watchers.wake_one (Message::HttpRequest (uri), &watcher_code) {
status_reply (StatusCode::OK, "ok\n")
println! ("Step 3");
if ! watchers.wake_one (Message::HttpRequestResponse (req_id.clone ()), &watcher_code) {
watchers.remove_watcher (&req_id);
}
else {
status_reply (StatusCode::BAD_REQUEST, "no\n")
}
delay_for (timeout).await;
{
let mut that = state.watchers.lock ().await;
that.remove_watcher (&req_id);
}
});
match r.await {
Ok (Message::HttpResponseResponse (s)) => {
println! ("Step 7");
status_reply (StatusCode::OK, s)
},
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "server didn't reply in time or somethin'"),
}
}
@ -149,6 +210,16 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format"))
}
}
else if let Some (rest) = prefix_match (path, "/http_response/") {
if let Some (idx) = rest.find ('/') {
let request_code = &rest [0..idx];
let response = &rest [idx + 1..];
Ok (handle_http_response (state, request_code.into (), response.into ()).await)
}
else {
Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format"))
}
}
/*
else if let Some (name) = prefix_match (path, "/udp_send/") {
Ok (handle_udp_send ().await)

View File

@ -1,4 +1,7 @@
use std::error::Error;
use std::{
error::Error,
sync::Arc,
};
use hyper::{
StatusCode,
@ -9,7 +12,7 @@ use tokio::fs::File;
#[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> {
let client = Client::new ();
let client = Arc::new (Client::new ());
let path = "/home/user/pictures/bzqcChY.jpg";
@ -17,11 +20,13 @@ async fn main () -> Result <(), Box <dyn Error>> {
let _uri = Uri::builder ()
.scheme ("http")
.authority ("127.0.0.1:4000")
.path_and_query ("/listen/alien_wildlands")
.path_and_query ("/http_listen/alien_wildlands")
.build ().unwrap ();
let req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands");
let response = match req.send ().await {
let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands");
println! ("Step 1");
let req_resp = match req_req.send ().await {
Err (e) => {
println! ("Err: {:?}", e);
continue;
@ -29,15 +34,38 @@ async fn main () -> Result <(), Box <dyn Error>> {
Ok (r) => r,
};
if response.status () != StatusCode::OK {
if req_resp.status () != StatusCode::OK {
continue;
}
let body = response.bytes ().await?;
let body = std::str::from_utf8 (&body)?;
println! ("Step 3");
let body = req_resp.bytes ().await?;
let body = String::from (std::str::from_utf8 (&body)?);
println! ("Client requested {}", body);
}
Ok (())
println! ("Step 4/5");
let payload = String::from ("Ha ha hue hue it worked.");
println! ("Step 6");
let client = client.clone ();
tokio::spawn (async move {
let resp_req = Uri::builder ()
.scheme ("http")
.authority ("127.0.0.1:4000")
.path_and_query ("/listen/alien_wildlands")
.build ().unwrap ();
let resp_req = client.get (&format! ("http://127.0.0.1:4000/http_response/{}/{}", body, payload));
println! ("Step 6");
match resp_req.send ().await {
Err (e) => {
println! ("Err: {:?}", e);
},
Ok (_) => (),
}
});
}
}

View File

@ -29,8 +29,8 @@ impl <T: 'static + Clone + Send + Sync> Watchers <T> {
self.senders.insert (id, s);
}
pub fn remove_watcher (&mut self, id: String) {
self.senders.remove (&id);
pub fn remove_watcher (&mut self, id: &str) {
self.senders.remove (id);
}
/*
fn wake_all (&mut self, msg: T) {
@ -63,7 +63,6 @@ impl <T: 'static + Clone + Send + Sync> Watchers <T> {
println! ("long_poll {}", id);
let (s, r) = oneshot::channel ();
let timeout = Duration::from_secs (5);
let id_2 = id.clone ();
@ -75,7 +74,7 @@ impl <T: 'static + Clone + Send + Sync> Watchers <T> {
tokio::spawn (async move {
delay_for (timeout).await;
let mut that = that.lock ().await;
that.remove_watcher (id);
that.remove_watcher (&id);
});
if let Ok (message) = r.await {