♻️ refactor: extract stream_file

main
_ 2021-04-03 16:27:31 +00:00
parent ff73f501a4
commit 1df0f0f677
1 changed files with 47 additions and 39 deletions

View File

@ -159,45 +159,7 @@ async fn serve_file (
if body.is_some () { if body.is_some () {
tokio::spawn (async move { tokio::spawn (async move {
let mut bytes_sent = 0; stream_file (f, content_length, tx).await;
let mut bytes_left = content_length;
let mark_interval = 200_000;
let mut next_mark = mark_interval;
loop {
let mut buffer = vec! [0_u8; 65_536];
let bytes_read = f.read (&mut buffer).await.expect ("Couldn't read from file");
if bytes_read == 0 {
break;
}
buffer.truncate (bytes_read);
let bytes_read_64 = u64::try_from (bytes_read).expect ("Couldn't fit usize into u64");
let bytes_read_64 = min (bytes_left, bytes_read_64);
if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () {
warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, content_length);
break;
}
bytes_left -= bytes_read_64;
if bytes_left == 0 {
debug! ("Finished");
break;
}
bytes_sent += bytes_read_64;
while next_mark <= bytes_sent {
trace! ("Sent {} bytes", next_mark);
next_mark += mark_interval;
}
//delay_for (Duration::from_millis (50)).await;
}
}); });
} }
@ -254,6 +216,52 @@ async fn get_file_etag (f: &File) -> Option <String>
None None
} }
async fn stream_file (
mut f: File,
content_length: u64,
tx: tokio::sync::mpsc::Sender <Result <Vec <u8>, Infallible>>,
) {
let mut bytes_sent = 0;
let mut bytes_left = content_length;
let mark_interval = 200_000;
let mut next_mark = mark_interval;
loop {
let mut buffer = vec! [0_u8; 65_536];
let bytes_read = f.read (&mut buffer).await.expect ("Couldn't read from file");
if bytes_read == 0 {
break;
}
buffer.truncate (bytes_read);
let bytes_read_64 = u64::try_from (bytes_read).expect ("Couldn't fit usize into u64");
let bytes_read_64 = min (bytes_left, bytes_read_64);
if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () {
warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, content_length);
break;
}
bytes_left -= bytes_read_64;
if bytes_left == 0 {
debug! ("Finished");
break;
}
bytes_sent += bytes_read_64;
while next_mark <= bytes_sent {
trace! ("Sent {} bytes", next_mark);
next_mark += mark_interval;
}
//delay_for (Duration::from_millis (50)).await;
}
}
// Pass a request to the internal decision-making logic. // Pass a request to the internal decision-making logic.
// When it returns, prettify it as HTML or JSON based on what the client // When it returns, prettify it as HTML or JSON based on what the client
// asked for. // asked for.