
272 lines
6.1 KiB
Raw Normal View History

2020-10-27 01:42:10 +00:00
use std::{
use std::convert::{Infallible};
use std::io::Write;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::sync::{
use std::time::{Duration, Instant};
use futures::channel::oneshot;
use hyper::{
2020-10-27 01:42:10 +00:00
use hyper::service::{make_service_fn, service_fn};
use tokio::{
2020-10-27 01:42:10 +00:00
2020-10-27 01:55:47 +00:00
use ptth::watcher::Watchers;
2020-10-27 01:42:10 +00:00
enum Message {
//HttpRequestRequest (String),
HttpRequestResponse (String),
// HttpResponseRequest (String),
HttpResponseResponse (Vec <u8>),
HttpResponseResponseStream (Body),
2020-10-27 01:42:10 +00:00
#[derive (Default)]
struct ServerState {
watchers: Arc <Mutex <Watchers <Message>>>,
fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
-> Response <Body>
Response::builder ().status (status).body (b.into ()).unwrap ()
2020-10-27 02:20:37 +00:00
async fn handle_watch (state: Arc <ServerState>, watcher_code: String)
2020-10-27 01:42:10 +00:00
-> Response <Body>
2020-10-27 02:20:37 +00:00
match Watchers::long_poll (state.watchers.clone (), watcher_code).await {
2020-10-27 01:42:10 +00:00
None => status_reply (StatusCode::OK, "no\n"),
Some (_) => status_reply (StatusCode::OK, "actually, yes\n"),
2020-10-27 02:20:37 +00:00
async fn handle_wake (state: Arc <ServerState>, watcher_code: String)
2020-10-27 01:42:10 +00:00
-> Response <Body>
let mut watchers = state.watchers.lock ().await;
2020-10-27 02:20:37 +00:00
if watchers.wake_one (Message::Meow, &watcher_code) {
status_reply (StatusCode::OK, "ok\n")
else {
status_reply (StatusCode::BAD_REQUEST, "no\n")
2020-10-27 01:42:10 +00:00
async fn handle_http_listen (state: Arc <ServerState>, watcher_code: String)
-> Response <Body>
println! ("Step 1");
match Watchers::long_poll (state.watchers.clone (), watcher_code).await {
Some (Message::HttpRequestResponse (uri)) => {
println! ("Step 3");
status_reply (StatusCode::OK, uri)
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"),
async fn handle_http_response (
req: Request <Body>,
state: Arc <ServerState>,
req_id: String,
-> Response <Body>
println! ("Step 6");
let body = req.into_body ();
let mut watchers = state.watchers.lock ().await;
println! ("Step 7");
if ! watchers.wake_one (Message::HttpResponseResponseStream (body), &req_id)
println! ("Step 8");
return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n");
else {
println! ("Step 8");
status_reply (StatusCode::OK, "ok\n")
async fn handle_http_request (
state: Arc <ServerState>,
watcher_code: String,
uri: String
-> Response <Body>
let req_id = format! ("client_{}", ulid::Ulid::new ().to_string ());
println! ("Step 2 {}", req_id);
let (s, r) = oneshot::channel ();
let timeout = Duration::from_secs (5);
let id_2 = req_id.clone ();
let mut that = state.watchers.lock ().await;
that.add_watcher_with_id (s, id_2)
tokio::spawn (async move {
let mut watchers = state.watchers.lock ().await;
println! ("Step 3");
if ! watchers.wake_one (Message::HttpRequestResponse (req_id.clone ()), &watcher_code) {
watchers.remove_watcher (&req_id);
delay_for (timeout).await;
let mut that = state.watchers.lock ().await;
that.remove_watcher (&req_id);
match r.await {
Ok (Message::HttpResponseResponseStream (body)) => {
println! ("Step 7");
status_reply (StatusCode::OK, body)
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "server didn't reply in time or somethin'"),
2020-10-27 01:55:47 +00:00
async fn handle_udp_send () -> Response <Body>
use tokio::net::UdpSocket;
let mut sock = UdpSocket::bind (SocketAddr::from (([0,0,0,0], 0))).await.unwrap ();
sock.send_to (b"handle_udp_send\n", SocketAddr::from (([127,0,0,1], 9000u16))).await.unwrap ();
status_reply (StatusCode::OK, "ok\n")
2020-10-27 02:00:09 +00:00
async fn handle_udp_recv () -> Response <Body>
use tokio::net::UdpSocket;
let mut sock = UdpSocket::bind (SocketAddr::from (([0,0,0,0], 4001))).await.unwrap ();
let mut buffer = vec! [0u8; 4096];
let (bytes_received, _addr) = sock.recv_from (&mut buffer [..]).await.unwrap ();
buffer.truncate (bytes_received);
status_reply (StatusCode::OK, buffer)
2020-10-27 02:20:37 +00:00
fn prefix_match <'a> (hay: &'a str, needle: &str) -> Option <&'a str>
if hay.starts_with (needle) {
Some (&hay [needle.len ()..])
else {
2020-10-27 01:42:10 +00:00
async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
-> Result <Response <Body>, Infallible>
2020-10-27 02:20:37 +00:00
let path = req.uri ().path ();
println! ("{}", path);
2020-10-27 01:42:10 +00:00
if req.method () == Method::POST {
return Ok (if let Some (request_code) = prefix_match (path, "/http_response/") {
let request_code = request_code.into ();
handle_http_response (req, state, request_code).await
else {
status_reply (StatusCode::BAD_REQUEST, "Can't POST this\n")
2020-10-27 02:20:37 +00:00
if let Some (watch_code) = prefix_match (path, "/watch/") {
Ok (handle_watch (state, watch_code.into ()).await)
2020-10-27 01:42:10 +00:00
2020-10-27 02:20:37 +00:00
else if let Some (watch_code) = prefix_match (path, "/wake/") {
Ok (handle_wake (state, watch_code.into ()).await)
2020-10-27 01:42:10 +00:00
else if let Some (listen_code) = prefix_match (path, "/http_listen/") {
Ok (handle_http_listen (state, listen_code.into ()).await)
else if let Some (rest) = prefix_match (path, "/http_request/") {
if let Some (idx) = rest.find ('/') {
let listen_code = &rest [0..idx];
let path = &rest [idx + 1..];
Ok (handle_http_request (state, listen_code.into (), path.into ()).await)
else {
Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format"))
2020-10-27 02:20:37 +00:00
else if let Some (name) = prefix_match (path, "/udp_send/") {
2020-10-27 01:55:47 +00:00
Ok (handle_udp_send ().await)
2020-10-27 02:20:37 +00:00
else if let Some (name) = prefix_match (path, "/udp_recv/") {
2020-10-27 02:00:09 +00:00
Ok (handle_udp_recv ().await)
2020-10-27 02:20:37 +00:00
2020-10-27 01:42:10 +00:00
else {
Ok (status_reply (StatusCode::OK, "Hi\n"))
async fn main () -> Result <(), Box <dyn Error>> {
let addr = SocketAddr::from(([0, 0, 0, 0], 4000));
let state = Arc::new (ServerState::default ());
let make_svc = make_service_fn (|_conn| {
let state = state.clone ();
async {
Ok::<_, Infallible> (service_fn (move |req| {
let state = state.clone ();
handle_all (req, state)
let server = Server::bind (&addr).serve (make_svc);
Ok (())