From cffb888ac846d6e2971214ef2d3e593476a440bc Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Fri, 25 Mar 2022 00:44:43 +0000 Subject: [PATCH] :heavy_plus_sign: add download subcommand --- Cargo.lock | 13 ++- crates/ptth_multi_call_server/Cargo.toml | 3 + crates/ptth_multi_call_server/src/download.rs | 79 ++++++++++++++++++- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99e697e..62dc089 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,6 +584,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.5" @@ -1203,11 +1209,14 @@ version = "0.1.0" dependencies = [ "anyhow", "ctrlc", + "futures-util", + "hex", "ptth_file_server", "ptth_server", "quic_demo", "reqwest", "rusty_ulid", + "sha2", "tokio", "tracing", "tracing-subscriber", @@ -1776,9 +1785,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ "block-buffer 0.9.0", "cfg-if", diff --git a/crates/ptth_multi_call_server/Cargo.toml b/crates/ptth_multi_call_server/Cargo.toml index 72affb4..b854b1e 100644 --- a/crates/ptth_multi_call_server/Cargo.toml +++ b/crates/ptth_multi_call_server/Cargo.toml @@ -14,10 +14,13 @@ license = "AGPL-3.0" anyhow = "1.0.38" ctrlc = "3.2.1" +futures-util = "0.3.9" +hex = "0.4.3" ptth_file_server = { path = "../ptth_file_server_bin" } ptth_server = { path = "../ptth_server" } quic_demo = { path = "../../prototypes/quic_demo" } rusty_ulid = "0.10.1" +sha2 = "0.9.8" tokio = { version = "1.8.1", features = ["full"] } tracing-subscriber = "0.2.16" tracing = "0.1.25" diff --git a/crates/ptth_multi_call_server/src/download.rs b/crates/ptth_multi_call_server/src/download.rs index 25891d5..364b26c 100644 --- a/crates/ptth_multi_call_server/src/download.rs +++ b/crates/ptth_multi_call_server/src/download.rs @@ -1,5 +1,9 @@ use std::{ ffi::OsString, + io::{ + self, + Write, + }, time::Duration, }; @@ -8,9 +12,28 @@ use anyhow::{ bail, }; +use futures_util::StreamExt; + +use reqwest::{ + StatusCode, +}; + +use sha2::{ + Digest, + Sha512, +}; + +use tokio::{ + sync::mpsc, + task::{ + spawn, + spawn_blocking, + }, +}; + pub async fn main (args: &[OsString]) -> anyhow::Result <()> { let mut url = None; - let mut sha512 = None; + let mut expected_sha512 = None; let mut args = args [1..].into_iter (); @@ -23,8 +46,9 @@ pub async fn main (args: &[OsString]) -> anyhow::Result <()> { match arg.to_str ().ok_or_else (|| anyhow! ("All arguments must be valid UTF-8"))? { "--help" => println! ("For now, just look at the source code"), - "--sha512" => { - sha512 = args.next (); + "--expect-sha512" => { + let expected = args.next ().ok_or_else (|| anyhow! ("--expect-sha512 needs an argument"))?; + expected_sha512 = Some (expected.to_str ().ok_or_else (|| anyhow! ("--expect-sha512's argument must be valid Unicode"))?); } arg => { url = Some (arg); @@ -45,5 +69,54 @@ pub async fn main (args: &[OsString]) -> anyhow::Result <()> { .connect_timeout (Duration::from_secs (30)) .build ()?; + let resp = client.get (url) + .send ().await?; + + if resp.status () != StatusCode::OK { + bail! ("Expected 200 OK, got {}", resp.status ()); + } + + let mut resp_stream = resp.bytes_stream (); + + // The hasher is owned by a task because it makes ownership simpler + let (mut hash_tx, mut hash_rx) = mpsc::channel (1); + + let hasher_task = spawn_blocking (move || { + let mut hasher = Sha512::new (); + + while let Some (chunk) = tokio::runtime::Handle::current ().block_on (hash_rx.recv ()) { + hasher.update (&chunk); + } + + anyhow::Result::<_>::Ok (hasher.finalize ()) + }); + + while let Some (chunk) = resp_stream.next ().await { + let chunk = chunk?; + + hash_tx.send (chunk.clone ()).await?; + + { + let chunk = chunk.clone (); + spawn_blocking (move || { + io::stdout ().write_all (&chunk)?; + anyhow::Result::<_>::Ok (()) + }).await??; + } + } + + drop (hash_tx); + + let hash = hasher_task.await??; + + let actual_sha512 = hex::encode (&hash); + + match expected_sha512 { + None => eprintln! ("Actual SHA512 = {}", actual_sha512), + Some (expected) => if ! actual_sha512.starts_with (&expected) { + bail! ("Expected SHA512 prefix {}, actual SHA512 {}", expected, actual_sha512); + }, + } + Ok (()) }