From 566a323ea1f90dbdfca1424e2dc16494b1dc9436 Mon Sep 17 00:00:00 2001 From: yanhe Date: Sun, 24 Nov 2024 22:31:40 +0800 Subject: [PATCH 01/21] Add an encapsulated file stream in axum-extra to make it more convenient when you need to return a file as a response --- axum-extra/Cargo.toml | 1 + axum-extra/src/lib.rs | 2 +- axum-extra/src/response/file_stream.rs | 212 +++++++++++++++++++++++++ axum-extra/src/response/mod.rs | 4 + examples/stream-to-file/Cargo.toml | 2 + examples/stream-to-file/src/main.rs | 152 +++++++++++++++++- 6 files changed, 366 insertions(+), 7 deletions(-) create mode 100644 axum-extra/src/response/file_stream.rs diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index c357975ad4..f389545a52 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -15,6 +15,7 @@ version = "0.10.0-alpha.1" default = ["tracing", "multipart"] async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] +fileStream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] attachment = ["dep:tracing"] error_response = ["dep:tracing", "tracing/std"] cookie = ["dep:cookie"] diff --git a/axum-extra/src/lib.rs b/axum-extra/src/lib.rs index 473c42742c..cf48ccdfef 100644 --- a/axum-extra/src/lib.rs +++ b/axum-extra/src/lib.rs @@ -25,7 +25,7 @@ //! `tracing` | Log rejections from built-in extractors | Yes //! `typed-routing` | Enables the [`TypedPath`](crate::routing::TypedPath) routing utilities | No //! `typed-header` | Enables the [`TypedHeader`] extractor and response | No -//! +//! `fileStream` | Enables the [`fileStream`](crate::response::file_stream) response | No //! [`axum`]: https://crates.io/crates/axum #![warn( diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs new file mode 100644 index 0000000000..d5517a6a00 --- /dev/null +++ b/axum-extra/src/response/file_stream.rs @@ -0,0 +1,212 @@ +use axum::{ + body, + response::{IntoResponse, Response}, + BoxError, +}; +use bytes::Bytes; +use futures_util::TryStream; +use http::{header, StatusCode}; + +/// Encapsulate the file stream. +/// The encapsulated file stream construct requires passing in a stream +/// # Examples +/// +/// ``` +/// use axum::{ +/// http::StatusCode, +/// response::{Response, IntoResponse}, +/// Router, +/// routing::get +/// }; +/// use axum_extra::response::file_stream::FileStream; +/// use tokio::fs::File; +/// use tokio_util::io::ReaderStream ; +/// async fn file_stream() -> Result { +/// let stream=ReaderStream::new(File::open("test.txt").await.map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))?); +/// let file_stream_resp = FileStream::new(stream) +/// .file_name("test.txt"); +// +/// Ok(file_stream_resp.into_response()) +/// } +/// let app = Router::new().route("/FileStreamDownload", get(file_stream)); +/// # let _: Router = app; +/// ``` +#[derive(Debug)] +pub struct FileStream +where + S: TryStream + Send + 'static, + S::Ok: Into, + S::Error: Into, +{ + /// stream. + pub stream: S, + /// The file name of the file. + pub file_name: Option, + /// The size of the file. + pub content_size: Option, +} + +impl FileStream +where + S: TryStream + Send + 'static, + S::Ok: Into, + S::Error: Into, +{ + /// Create a file stream. + pub fn new(stream: S) -> Self { + Self { + stream, + file_name: None, + content_size: None, + } + } + + /// Set the file name of the file. + pub fn file_name>(mut self, file_name: T) -> Self { + self.file_name = Some(file_name.into()); + self + } + + /// Set the size of the file. + pub fn content_size>(mut self, len: T) -> Self { + self.content_size = Some(len.into()); + self + } +} + +impl IntoResponse for FileStream +where + S: TryStream + Send + 'static, + S::Ok: Into, + S::Error: Into, +{ + fn into_response(self) -> Response { + let mut resp = Response::builder().header(header::CONTENT_TYPE, "application/octet-stream"); + + if let Some(file_name) = self.file_name { + resp = resp.header( + header::CONTENT_DISPOSITION, + format!("attachment; filename=\"{}\"", file_name), + ); + }; + + if let Some(content_size) = self.content_size { + resp = resp.header(header::CONTENT_LENGTH, content_size); + }; + + resp.body(body::Body::from_stream(self.stream)) + .unwrap_or_else(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("build FileStream responsec error:{}", e), + ) + .into_response() + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{extract::Request, routing::get, Router}; + use body::Body; + use http_body_util::BodyExt; + use tokio::io::AsyncSeekExt; + use std::io::{Cursor, SeekFrom}; + use tokio_util::io::ReaderStream; + use tower::ServiceExt; + + #[tokio::test] + async fn response_file_stream() -> Result<(), Box> { + let app = Router::new().route( + "/file", + get(|| async { + // Simulating a file stream + let file_content = b"Hello, this is the simulated file content!".to_vec(); + let size = file_content.len() as u64; + let reader = Cursor::new(file_content); + + // response file stream + let stream = ReaderStream::new(reader); + let resp = FileStream::new(stream) + .file_name("test") + .content_size(size) + .into_response(); + resp + }), + ); + + // Simulating a GET request + let response = app + .oneshot(Request::builder().uri("/file").body(Body::empty())?) + .await?; + + // Validate Response Status Code + assert_eq!(response.status(), StatusCode::OK); + + // Validate Response Headers + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/octet-stream" + ); + assert_eq!( + response.headers().get("content-disposition").unwrap(), + "attachment; filename=\"test\"" + ); + assert_eq!(response.headers().get("content-length").unwrap(), "42"); + + // Validate Response Body + let body: &[u8] = &response.into_body().collect().await?.to_bytes(); + assert_eq!( + std::str::from_utf8(body)?, + "Hello, this is the simulated file content!" + ); + Ok(()) + } + + #[tokio::test] + async fn response_half_file() -> Result<(), Box> { + let app = Router::new().route( + "/half_file", + get(move || async move { + let mut file = tokio::fs::File::open("CHANGELOG.md").await.unwrap(); + + // get file size + let file_size = file.metadata().await.unwrap().len(); + + // seek to the middle of the file + let mid_position = file_size / 2; + file.seek(SeekFrom::Start(mid_position)).await.unwrap(); + + // response file stream + let stream = ReaderStream::new(file); + let resp = FileStream::new(stream) + .file_name("CHANGELOG.md") + .content_size(mid_position) + .into_response(); + resp + }), + ); + + // Simulating a GET request + let response = app + .oneshot(Request::builder().uri("/half_file").body(Body::empty()).unwrap()) + .await + .unwrap(); + + // Validate Response Status Code + assert_eq!(response.status(), StatusCode::OK); + + // Validate Response Headers + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/octet-stream" + ); + assert_eq!( + response.headers().get("content-disposition").unwrap(), + "attachment; filename=\"CHANGELOG.md\"" + ); + assert_eq!(response.headers().get("content-length").unwrap(), "8098"); + Ok(()) + } +} diff --git a/axum-extra/src/response/mod.rs b/axum-extra/src/response/mod.rs index bac7d040fe..222792a2c6 100644 --- a/axum-extra/src/response/mod.rs +++ b/axum-extra/src/response/mod.rs @@ -12,6 +12,10 @@ pub mod multiple; #[cfg(feature = "error_response")] mod error_response; +#[cfg(feature = "fileStream")] +/// Module for handling file streams. +pub mod file_stream; + #[cfg(feature = "error_response")] pub use error_response::InternalServerError; diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index b159c871ba..d18f1676f7 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -11,3 +11,5 @@ tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +async-stream = "0.3" +axum-extra = { path = "../../axum-extra", features = ["fileStream"] } \ No newline at end of file diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 7c44286d87..b70f4341dd 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -4,20 +4,24 @@ //! cargo run -p example-stream-to-file //! ``` +use async_stream::try_stream; use axum::{ body::Bytes, extract::{Multipart, Path, Request}, http::StatusCode, - response::{Html, Redirect}, + response::{Html, IntoResponse, Redirect, Response}, routing::{get, post}, BoxError, Router, }; +use axum_extra::response::file_stream::FileStream; use futures::{Stream, TryStreamExt}; use std::io; -use tokio::{fs::File, io::BufWriter}; -use tokio_util::io::StreamReader; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt, BufWriter}, +}; +use tokio_util::io::{ReaderStream, StreamReader}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - const UPLOADS_DIRECTORY: &str = "uploads"; #[tokio::main] @@ -36,8 +40,11 @@ async fn main() { .expect("failed to create `uploads` directory"); let app = Router::new() - .route("/", get(show_form).post(accept_form)) - .route("/file/{file_name}", post(save_request_body)); + .route("/upload", get(show_form).post(accept_form)) + .route("/", get(show_form2).post(accept_form)) + .route("/file/{file_name}", post(save_request_body)) + .route("/file_download", get(file_download_handler)) + .route("/simpler_file_download", get(simpler_file_donwload_handler)); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await @@ -84,6 +91,139 @@ async fn show_form() -> Html<&'static str> { ) } +// Handler that returns HTML for a multipart form. +async fn show_form2() -> Html<&'static str> { + Html( + r#" + + + + Upload and Download! + + +

Upload and Download Files

+ + +
+
+ +
+ +
+ +
+
+ +
+ +
+
+ +
+
+ + + "#, + ) +} + +/// A simpler file download handler that uses the `FileStream` response. +/// Returns the entire file as a stream. +async fn simpler_file_donwload_handler() -> Response { + let Ok(file) = File::open("./CHANGELOG.md").await else { + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response(); + }; + + let Ok(file_metatdata) = file.metadata().await else { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to get file metatdata", + ) + .into_response(); + }; + + // Constructing a Stream with ReaderStream + let stream = ReaderStream::new(file); + + // Use FileStream to return and set some information. + // Will set application/octet-stream in the header. + let file_stream_resp = FileStream::new(stream) + .file_name("test.txt") + .content_size(file_metatdata.len()); + + //It is also possible to set only the stream FileStream will be automatically set on the http header. + //let file_stream_resp = FileStream::new(stream); + + file_stream_resp.into_response() +} + +/// If you want to control the returned files in more detail you can implement a Stream +/// For example, use the try_stream! macro to construct a file stream and set which parts are needed. +async fn file_download_handler() -> Response { + let file_stream = match try_stream("./CHANGELOG.md", 5, 25, 10).await { + Ok(file_stream) => file_stream, + Err(e) => { + println!("{e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); + }, + }; + + // Use FileStream to return and set some information. + // Will set application/octet-stream in the header. + let file_stream_resp = FileStream::new(Box::pin(file_stream)) + .file_name("test.txt").content_size(20_u64); + + file_stream_resp.into_response() +} + +/// More complex manipulation of files and conversion to a stream +async fn try_stream( + file_path: &str, + start: u64, + mut end: u64, + buffer_size: usize, +) -> Result, std::io::Error>>, String> { + + let mut file = File::open(file_path) + .await + .map_err(|e| format!("open file:{file_path} err:{e}"))?; + + file.seek(std::io::SeekFrom::Start(start)) + .await + .map_err(|e| format!("file:{file_path} seek err:{e}"))?; + + if end == 0 { + let metadata = file + .metadata() + .await + .map_err(|e| format!("file:{file_path} get metadata err:{e}"))?; + end = metadata.len() as u64; + } + + let mut buffer = vec![0; buffer_size]; + + let stream = try_stream! { + let mut total_read = 0; + + while total_read < end { + let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); + let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + if n == 0 { + break; // EOF + } + total_read += n as u64; + yield buffer[..n].to_vec(); + + } + }; + Ok(stream) +} + // Handler that accepts a multipart form upload and streams each field to a file. async fn accept_form(mut multipart: Multipart) -> Result { while let Ok(Some(field)) = multipart.next_field().await { From f0df9ce8ab229a5681c4243f5695897510ca09f6 Mon Sep 17 00:00:00 2001 From: yanhe Date: Sun, 24 Nov 2024 23:21:15 +0800 Subject: [PATCH 02/21] Correction of ci checks --- axum-extra/src/lib.rs | 2 +- examples/stream-to-file/src/main.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/axum-extra/src/lib.rs b/axum-extra/src/lib.rs index cf48ccdfef..4583be7e26 100644 --- a/axum-extra/src/lib.rs +++ b/axum-extra/src/lib.rs @@ -26,7 +26,7 @@ //! `typed-routing` | Enables the [`TypedPath`](crate::routing::TypedPath) routing utilities | No //! `typed-header` | Enables the [`TypedHeader`] extractor and response | No //! `fileStream` | Enables the [`fileStream`](crate::response::file_stream) response | No -//! [`axum`]: https://crates.io/crates/axum +//! [`axum`]: #![warn( clippy::all, diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index b70f4341dd..6fcd2f1d36 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -44,7 +44,7 @@ async fn main() { .route("/", get(show_form2).post(accept_form)) .route("/file/{file_name}", post(save_request_body)) .route("/file_download", get(file_download_handler)) - .route("/simpler_file_download", get(simpler_file_donwload_handler)); + .route("/simpler_file_download", get(simpler_file_download_handler)); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await @@ -132,15 +132,15 @@ async fn show_form2() -> Html<&'static str> { /// A simpler file download handler that uses the `FileStream` response. /// Returns the entire file as a stream. -async fn simpler_file_donwload_handler() -> Response { +async fn simpler_file_download_handler() -> Response { let Ok(file) = File::open("./CHANGELOG.md").await else { return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response(); }; - let Ok(file_metatdata) = file.metadata().await else { + let Ok(file_metadata) = file.metadata().await else { return ( StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get file metatdata", + "Failed to get file metadata", ) .into_response(); }; @@ -152,7 +152,7 @@ async fn simpler_file_donwload_handler() -> Response { // Will set application/octet-stream in the header. let file_stream_resp = FileStream::new(stream) .file_name("test.txt") - .content_size(file_metatdata.len()); + .content_size(file_metadata.len()); //It is also possible to set only the stream FileStream will be automatically set on the http header. //let file_stream_resp = FileStream::new(stream); @@ -200,7 +200,7 @@ async fn try_stream( .metadata() .await .map_err(|e| format!("file:{file_path} get metadata err:{e}"))?; - end = metadata.len() as u64; + end = metadata.len(); } let mut buffer = vec![0; buffer_size]; From 4b45d89c557139b067e9ddb89bdb60e645d4625f Mon Sep 17 00:00:00 2001 From: yanhe Date: Sun, 24 Nov 2024 23:37:41 +0800 Subject: [PATCH 03/21] fix clippy warnings --- axum-extra/src/response/file_stream.rs | 10 ++++------ examples/stream-to-file/Cargo.toml | 6 +++--- examples/stream-to-file/src/main.rs | 12 ++++++------ 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index d5517a6a00..fd7f07db8f 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -128,11 +128,10 @@ mod tests { // response file stream let stream = ReaderStream::new(reader); - let resp = FileStream::new(stream) + FileStream::new(stream) .file_name("test") .content_size(size) - .into_response(); - resp + .into_response() }), ); @@ -180,11 +179,10 @@ mod tests { // response file stream let stream = ReaderStream::new(file); - let resp = FileStream::new(stream) + FileStream::new(stream) .file_name("CHANGELOG.md") .content_size(mid_position) - .into_response(); - resp + .into_response() }), ); diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index d18f1676f7..9a5197445c 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -6,10 +6,10 @@ publish = false [dependencies] axum = { path = "../../axum", features = ["multipart"] } +axum-extra = { path = "../../axum-extra", features = ["fileStream"] } +async-stream = "0.3" futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -async-stream = "0.3" -axum-extra = { path = "../../axum-extra", features = ["fileStream"] } \ No newline at end of file +tracing-subscriber = { version = "0.3", features = ["env-filter"] } \ No newline at end of file diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 6fcd2f1d36..d866c65c45 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -4,7 +4,11 @@ //! cargo run -p example-stream-to-file //! ``` -use async_stream::try_stream; +use std::io; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt, BufWriter}, +}; use axum::{ body::Bytes, extract::{Multipart, Path, Request}, @@ -15,11 +19,7 @@ use axum::{ }; use axum_extra::response::file_stream::FileStream; use futures::{Stream, TryStreamExt}; -use std::io; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncSeekExt, BufWriter}, -}; +use async_stream::try_stream; use tokio_util::io::{ReaderStream, StreamReader}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const UPLOADS_DIRECTORY: &str = "uploads"; From 13547b6472bbce49858a1fdc50558f45f5a8e15d Mon Sep 17 00:00:00 2001 From: yanhe Date: Sun, 24 Nov 2024 23:58:30 +0800 Subject: [PATCH 04/21] Fix review warnings --- axum-extra/src/response/file_stream.rs | 26 ++++++++++++++++++++++---- examples/stream-to-file/Cargo.toml | 2 +- examples/stream-to-file/src/main.rs | 26 +++++++++++++------------- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index fd7f07db8f..408d404adf 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -111,8 +111,8 @@ mod tests { use axum::{extract::Request, routing::get, Router}; use body::Body; use http_body_util::BodyExt; - use tokio::io::AsyncSeekExt; use std::io::{Cursor, SeekFrom}; + use tokio::io::AsyncSeekExt; use tokio_util::io::ReaderStream; use tower::ServiceExt; @@ -172,7 +172,7 @@ mod tests { // get file size let file_size = file.metadata().await.unwrap().len(); - + // seek to the middle of the file let mid_position = file_size / 2; file.seek(SeekFrom::Start(mid_position)).await.unwrap(); @@ -188,7 +188,12 @@ mod tests { // Simulating a GET request let response = app - .oneshot(Request::builder().uri("/half_file").body(Body::empty()).unwrap()) + .oneshot( + Request::builder() + .uri("/half_file") + .body(Body::empty()) + .unwrap(), + ) .await .unwrap(); @@ -204,7 +209,20 @@ mod tests { response.headers().get("content-disposition").unwrap(), "attachment; filename=\"CHANGELOG.md\"" ); - assert_eq!(response.headers().get("content-length").unwrap(), "8098"); + + let file = tokio::fs::File::open("CHANGELOG.md").await.unwrap(); + // get file size + let content_length = file.metadata().await.unwrap().len() / 2; + + assert_eq!( + response + .headers() + .get("content-length") + .unwrap() + .to_str() + .unwrap(), + content_length.to_string() + ); Ok(()) } } diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index 9a5197445c..c70e34ae4e 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -12,4 +12,4 @@ futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } \ No newline at end of file +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index d866c65c45..532e2d48ed 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -4,11 +4,7 @@ //! cargo run -p example-stream-to-file //! ``` -use std::io; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncSeekExt, BufWriter}, -}; +use async_stream::try_stream; use axum::{ body::Bytes, extract::{Multipart, Path, Request}, @@ -19,7 +15,11 @@ use axum::{ }; use axum_extra::response::file_stream::FileStream; use futures::{Stream, TryStreamExt}; -use async_stream::try_stream; +use std::io; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt, BufWriter}, +}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const UPLOADS_DIRECTORY: &str = "uploads"; @@ -153,10 +153,10 @@ async fn simpler_file_download_handler() -> Response { let file_stream_resp = FileStream::new(stream) .file_name("test.txt") .content_size(file_metadata.len()); - - //It is also possible to set only the stream FileStream will be automatically set on the http header. + + //It is also possible to set only the stream FileStream will be automatically set on the http header. //let file_stream_resp = FileStream::new(stream); - + file_stream_resp.into_response() } @@ -168,13 +168,14 @@ async fn file_download_handler() -> Response { Err(e) => { println!("{e}"); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); - }, + } }; - + // Use FileStream to return and set some information. // Will set application/octet-stream in the header. let file_stream_resp = FileStream::new(Box::pin(file_stream)) - .file_name("test.txt").content_size(20_u64); + .file_name("test.txt") + .content_size(20_u64); file_stream_resp.into_response() } @@ -186,7 +187,6 @@ async fn try_stream( mut end: u64, buffer_size: usize, ) -> Result, std::io::Error>>, String> { - let mut file = File::open(file_path) .await .map_err(|e| format!("open file:{file_path} err:{e}"))?; From 6967a97cd20fb353d54f924c6d5b42d18351616b Mon Sep 17 00:00:00 2001 From: yanhe Date: Mon, 25 Nov 2024 00:07:57 +0800 Subject: [PATCH 05/21] fix dependency tables check --- examples/stream-to-file/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index c70e34ae4e..c255aed557 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -5,9 +5,9 @@ edition = "2021" publish = false [dependencies] +async-stream = "0.3" axum = { path = "../../axum", features = ["multipart"] } axum-extra = { path = "../../axum-extra", features = ["fileStream"] } -async-stream = "0.3" futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } From 1f2d3146e6f1fe0598ec04b41027c9807e9d2f9c Mon Sep 17 00:00:00 2001 From: yanhe Date: Mon, 25 Nov 2024 22:21:53 +0800 Subject: [PATCH 06/21] Add a from_path function to make it easier to return a stream of files with file name and content size. --- axum-extra/src/response/file_stream.rs | 85 +++++++++++++++++++------- examples/stream-to-file/src/main.rs | 35 +++-------- 2 files changed, 72 insertions(+), 48 deletions(-) diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index 408d404adf..88d05a5116 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -1,3 +1,5 @@ +use std::{io, path::PathBuf}; + use axum::{ body, response::{IntoResponse, Response}, @@ -6,6 +8,11 @@ use axum::{ use bytes::Bytes; use futures_util::TryStream; use http::{header, StatusCode}; +use tokio::fs::File; +use tokio_util::io::ReaderStream; + +/// Alias for `tokio_util::io::ReaderStream`. +pub type AsyncReaderStream = ReaderStream; /// Encapsulate the file stream. /// The encapsulated file stream construct requires passing in a stream @@ -20,7 +27,7 @@ use http::{header, StatusCode}; /// }; /// use axum_extra::response::file_stream::FileStream; /// use tokio::fs::File; -/// use tokio_util::io::ReaderStream ; +/// use tokio_util::io::ReaderStream; /// async fn file_stream() -> Result { /// let stream=ReaderStream::new(File::open("test.txt").await.map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))?); /// let file_stream_resp = FileStream::new(stream) @@ -61,6 +68,54 @@ where } } + /// Create a file stream from a file path. + /// # Examples + /// ``` + /// use axum::{ + /// http::StatusCode, + /// response::{Response, IntoResponse}, + /// Router, + /// routing::get + /// }; + /// use axum_extra::response::file_stream::FileStream; + /// use std::path::PathBuf; + /// use tokio_util::io::ReaderStream; + /// use tokio::fs::File; + /// async fn file_stream() -> Result { + /// Ok(FileStream::>::from_path(PathBuf::from("test.txt")) + /// .await + /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))? + /// .into_response()) + /// } + /// let app = Router::new().route("/FileStreamDownload", get(file_stream)); + /// # let _: Router = app; + /// ``` + pub async fn from_path(path: PathBuf) -> io::Result> { + // open file + let file = File::open(&path).await?; + let mut content_size = None; + let mut file_name = None; + + // get file metadata length + if let Ok(metadata) = file.metadata().await { + content_size = Some(metadata.len()); + } + + // get file name + if let Some(file_name_os) = path.file_name() { + if let Some(file_name_str) = file_name_os.to_str() { + file_name = Some(file_name_str.to_owned()); + } + } + + // return FileStream + Ok(FileStream { + stream: ReaderStream::new(file), + file_name, + content_size, + }) + } + /// Set the file name of the file. pub fn file_name>(mut self, file_name: T) -> Self { self.file_name = Some(file_name.into()); @@ -111,8 +166,7 @@ mod tests { use axum::{extract::Request, routing::get, Router}; use body::Body; use http_body_util::BodyExt; - use std::io::{Cursor, SeekFrom}; - use tokio::io::AsyncSeekExt; + use std::io::Cursor; use tokio_util::io::ReaderStream; use tower::ServiceExt; @@ -164,24 +218,13 @@ mod tests { } #[tokio::test] - async fn response_half_file() -> Result<(), Box> { + async fn response_from_path() -> Result<(), Box> { let app = Router::new().route( - "/half_file", + "/from_path", get(move || async move { - let mut file = tokio::fs::File::open("CHANGELOG.md").await.unwrap(); - - // get file size - let file_size = file.metadata().await.unwrap().len(); - - // seek to the middle of the file - let mid_position = file_size / 2; - file.seek(SeekFrom::Start(mid_position)).await.unwrap(); - - // response file stream - let stream = ReaderStream::new(file); - FileStream::new(stream) - .file_name("CHANGELOG.md") - .content_size(mid_position) + FileStream::::from_path("CHANGELOG.md".into()) + .await + .unwrap() .into_response() }), ); @@ -190,7 +233,7 @@ mod tests { let response = app .oneshot( Request::builder() - .uri("/half_file") + .uri("/from_path") .body(Body::empty()) .unwrap(), ) @@ -212,7 +255,7 @@ mod tests { let file = tokio::fs::File::open("CHANGELOG.md").await.unwrap(); // get file size - let content_length = file.metadata().await.unwrap().len() / 2; + let content_length = file.metadata().await.unwrap().len(); assert_eq!( response diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 532e2d48ed..e443681a17 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -13,14 +13,14 @@ use axum::{ routing::{get, post}, BoxError, Router, }; -use axum_extra::response::file_stream::FileStream; +use axum_extra::response::file_stream::{AsyncReaderStream, FileStream}; use futures::{Stream, TryStreamExt}; use std::io; use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt, BufWriter}, }; -use tokio_util::io::{ReaderStream, StreamReader}; +use tokio_util::io::StreamReader; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const UPLOADS_DIRECTORY: &str = "uploads"; @@ -133,31 +133,12 @@ async fn show_form2() -> Html<&'static str> { /// A simpler file download handler that uses the `FileStream` response. /// Returns the entire file as a stream. async fn simpler_file_download_handler() -> Response { - let Ok(file) = File::open("./CHANGELOG.md").await else { - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response(); - }; - - let Ok(file_metadata) = file.metadata().await else { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get file metadata", - ) - .into_response(); - }; - - // Constructing a Stream with ReaderStream - let stream = ReaderStream::new(file); - - // Use FileStream to return and set some information. - // Will set application/octet-stream in the header. - let file_stream_resp = FileStream::new(stream) - .file_name("test.txt") - .content_size(file_metadata.len()); - - //It is also possible to set only the stream FileStream will be automatically set on the http header. - //let file_stream_resp = FileStream::new(stream); - - file_stream_resp.into_response() + //If you want to simply return a file as a stream + // you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length. + FileStream::::from_path("./CHANGELOG.md".into()) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response()) + .into_response() } /// If you want to control the returned files in more detail you can implement a Stream From f3fd30b93a59540da46e9b96815d3dc159201966 Mon Sep 17 00:00:00 2001 From: yanhe Date: Tue, 26 Nov 2024 21:46:43 +0800 Subject: [PATCH 07/21] Corrected the features format, the example creates the file first to test it. --- axum-extra/Cargo.toml | 4 +-- axum-extra/src/response/file_stream.rs | 15 +++++----- axum-extra/src/response/mod.rs | 2 +- examples/stream-to-file/Cargo.toml | 2 +- examples/stream-to-file/src/main.rs | 41 +++++++++++++++++++++----- 5 files changed, 44 insertions(+), 20 deletions(-) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index f389545a52..12cb5345b3 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -15,7 +15,7 @@ version = "0.10.0-alpha.1" default = ["tracing", "multipart"] async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] -fileStream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] +file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] attachment = ["dep:tracing"] error_response = ["dep:tracing", "tracing/std"] cookie = ["dep:cookie"] @@ -68,7 +68,7 @@ prost = { version = "0.13", optional = true } serde_html_form = { version = "0.2.0", optional = true } serde_json = { version = "1.0.71", optional = true } serde_path_to_error = { version = "0.1.8", optional = true } -tokio = { version = "1.19", optional = true } +tokio = { version = "1.19", optional = true, features = ["fs"] } tokio-stream = { version = "0.1.9", optional = true } tokio-util = { version = "0.7", optional = true } tracing = { version = "0.1.37", default-features = false, optional = true } diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index 88d05a5116..a6b4686737 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -1,5 +1,3 @@ -use std::{io, path::PathBuf}; - use axum::{ body, response::{IntoResponse, Response}, @@ -8,6 +6,7 @@ use axum::{ use bytes::Bytes; use futures_util::TryStream; use http::{header, StatusCode}; +use std::{io, path::PathBuf}; use tokio::fs::File; use tokio_util::io::ReaderStream; @@ -79,13 +78,13 @@ where /// }; /// use axum_extra::response::file_stream::FileStream; /// use std::path::PathBuf; - /// use tokio_util::io::ReaderStream; /// use tokio::fs::File; - /// async fn file_stream() -> Result { - /// Ok(FileStream::>::from_path(PathBuf::from("test.txt")) + /// use tokio_util::io::ReaderStream; + /// async fn file_stream() -> Response { + /// FileStream::>::from_path(PathBuf::from("test.txt")) /// .await - /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))? - /// .into_response()) + /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) + /// .into_response() /// } /// let app = Router::new().route("/FileStreamDownload", get(file_stream)); /// # let _: Router = app; @@ -253,7 +252,7 @@ mod tests { "attachment; filename=\"CHANGELOG.md\"" ); - let file = tokio::fs::File::open("CHANGELOG.md").await.unwrap(); + let file = File::open("CHANGELOG.md").await.unwrap(); // get file size let content_length = file.metadata().await.unwrap().len(); diff --git a/axum-extra/src/response/mod.rs b/axum-extra/src/response/mod.rs index 222792a2c6..8140cf65e8 100644 --- a/axum-extra/src/response/mod.rs +++ b/axum-extra/src/response/mod.rs @@ -12,7 +12,7 @@ pub mod multiple; #[cfg(feature = "error_response")] mod error_response; -#[cfg(feature = "fileStream")] +#[cfg(feature = "file-stream")] /// Module for handling file streams. pub mod file_stream; diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index c255aed557..fee5e0ebbe 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] async-stream = "0.3" axum = { path = "../../axum", features = ["multipart"] } -axum-extra = { path = "../../axum-extra", features = ["fileStream"] } +axum-extra = { path = "../../axum-extra", features = ["file-stream"] } futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index e443681a17..50fa50a39e 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -15,15 +15,15 @@ use axum::{ }; use axum_extra::response::file_stream::{AsyncReaderStream, FileStream}; use futures::{Stream, TryStreamExt}; -use std::io; +use std::{io, path::PathBuf}; use tokio::{ fs::File, - io::{AsyncReadExt, AsyncSeekExt, BufWriter}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}, }; use tokio_util::io::StreamReader; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const UPLOADS_DIRECTORY: &str = "uploads"; - +const DOWNLOAD_DIRECTORY: &str = "downloads"; #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -39,6 +39,15 @@ async fn main() { .await .expect("failed to create `uploads` directory"); + tokio::fs::create_dir(DOWNLOAD_DIRECTORY) + .await + .expect("failed to create `downloads` directory"); + + //create a file to download + create_test_file(std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt")) + .await + .expect("failed to create test file"); + let app = Router::new() .route("/upload", get(show_form).post(accept_form)) .route("/", get(show_form2).post(accept_form)) @@ -53,6 +62,19 @@ async fn main() { axum::serve(listener, app).await.unwrap(); } +async fn create_test_file(path: PathBuf) -> io::Result<()> { + let mut file = File::create(path).await?; + for i in 1..=30 { + let line = format!( + "Hello, this is the simulated file content! This is line {}\n", + i + ); + file.write_all(line.as_bytes()).await?; + } + file.flush().await?; + Ok(()) +} + // Handler that streams the request body to a file. // // POST'ing to `/file/foo.txt` will create a file called `foo.txt`. @@ -135,16 +157,19 @@ async fn show_form2() -> Html<&'static str> { async fn simpler_file_download_handler() -> Response { //If you want to simply return a file as a stream // you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length. - FileStream::::from_path("./CHANGELOG.md".into()) - .await - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response()) - .into_response() + FileStream::::from_path( + std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"), + ) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response()) + .into_response() } /// If you want to control the returned files in more detail you can implement a Stream /// For example, use the try_stream! macro to construct a file stream and set which parts are needed. async fn file_download_handler() -> Response { - let file_stream = match try_stream("./CHANGELOG.md", 5, 25, 10).await { + let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); + let file_stream = match try_stream(&file_path, 5, 25, 10).await { Ok(file_stream) => file_stream, Err(e) => { println!("{e}"); From bcfaf3d06e6fd2fa9c1583f8460f515f7f6a2a2e Mon Sep 17 00:00:00 2001 From: yanhe Date: Tue, 26 Nov 2024 21:56:12 +0800 Subject: [PATCH 08/21] fix ci warnings --- axum-extra/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index 12cb5345b3..e2896ae11a 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -105,6 +105,7 @@ allowed = [ "prost", "serde", "tokio", + "tokio-util", "tower_layer", "tower_service", ] From f61fdcbe2b1df74afa4d3bfa5cbfe83c765aebf1 Mon Sep 17 00:00:00 2001 From: yanhe Date: Sun, 1 Dec 2024 09:08:24 +0800 Subject: [PATCH 09/21] add more tests --- axum-extra/src/response/file_stream.rs | 125 ++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index a6b4686737..057a10cd8f 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -170,7 +170,128 @@ mod tests { use tower::ServiceExt; #[tokio::test] - async fn response_file_stream() -> Result<(), Box> { + async fn response() -> Result<(), Box> { + let app = Router::new().route( + "/file", + get(|| async { + // Simulating a file stream + let file_content = b"Hello, this is the simulated file content!".to_vec(); + let reader = Cursor::new(file_content); + + // Response file stream + // Content size and file name are not attached by default + let stream = ReaderStream::new(reader); + FileStream::new(stream).into_response() + }), + ); + + // Simulating a GET request + let response = app + .oneshot(Request::builder().uri("/file").body(Body::empty())?) + .await?; + + // Validate Response Status Code + assert_eq!(response.status(), StatusCode::OK); + + // Validate Response Headers + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/octet-stream" + ); + + // Validate Response Body + let body: &[u8] = &response.into_body().collect().await?.to_bytes(); + assert_eq!( + std::str::from_utf8(body)?, + "Hello, this is the simulated file content!" + ); + Ok(()) + } + + #[tokio::test] + async fn response_not_set_filename() -> Result<(), Box> { + let app = Router::new().route( + "/file", + get(|| async { + // Simulating a file stream + let file_content = b"Hello, this is the simulated file content!".to_vec(); + let size = file_content.len() as u64; + let reader = Cursor::new(file_content); + + // Response file stream + let stream = ReaderStream::new(reader); + FileStream::new(stream).content_size(size).into_response() + }), + ); + + // Simulating a GET request + let response = app + .oneshot(Request::builder().uri("/file").body(Body::empty())?) + .await?; + + // Validate Response Status Code + assert_eq!(response.status(), StatusCode::OK); + + // Validate Response Headers + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/octet-stream" + ); + assert_eq!(response.headers().get("content-length").unwrap(), "42"); + + // Validate Response Body + let body: &[u8] = &response.into_body().collect().await?.to_bytes(); + assert_eq!( + std::str::from_utf8(body)?, + "Hello, this is the simulated file content!" + ); + Ok(()) + } + + #[tokio::test] + async fn response_not_set_content_size() -> Result<(), Box> { + let app = Router::new().route( + "/file", + get(|| async { + // Simulating a file stream + let file_content = b"Hello, this is the simulated file content!".to_vec(); + let reader = Cursor::new(file_content); + + // Response file stream + let stream = ReaderStream::new(reader); + FileStream::new(stream).file_name("test").into_response() + }), + ); + + // Simulating a GET request + let response = app + .oneshot(Request::builder().uri("/file").body(Body::empty())?) + .await?; + + // Validate Response Status Code + assert_eq!(response.status(), StatusCode::OK); + + // Validate Response Headers + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/octet-stream" + ); + assert_eq!( + response.headers().get("content-disposition").unwrap(), + "attachment; filename=\"test\"" + ); + + // Validate Response Body + let body: &[u8] = &response.into_body().collect().await?.to_bytes(); + assert_eq!( + std::str::from_utf8(body)?, + "Hello, this is the simulated file content!" + ); + Ok(()) + } + + #[tokio::test] + async fn response_with_content_size_and_filename() -> Result<(), Box> { let app = Router::new().route( "/file", get(|| async { @@ -179,7 +300,7 @@ mod tests { let size = file_content.len() as u64; let reader = Cursor::new(file_content); - // response file stream + // Response file stream let stream = ReaderStream::new(reader); FileStream::new(stream) .file_name("test") From 721333c8e0e755d7e1648425b9a6f63e863e7a63 Mon Sep 17 00:00:00 2001 From: yanhe Date: Sun, 1 Dec 2024 21:11:44 +0800 Subject: [PATCH 10/21] Added entries to CHANGELOG.md and modified axum-extra/Cargo.toml features. --- axum-extra/CHANGELOG.md | 1 + axum-extra/Cargo.toml | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/axum-extra/CHANGELOG.md b/axum-extra/CHANGELOG.md index 327e3edbb4..7b1230cb90 100644 --- a/axum-extra/CHANGELOG.md +++ b/axum-extra/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning]. - **fixed:** `Host` extractor includes port number when parsing authority ([#2242]) - **added:** Add `RouterExt::typed_connect` ([#2961]) - **added:** Add `json!` for easy construction of JSON responses ([#2962]) +- **added:** Add `FileStream` for easy construction of file stream responses ([#3047]) [#2242]: https://github.com/tokio-rs/axum/pull/2242 [#2961]: https://github.com/tokio-rs/axum/pull/2961 diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index e2896ae11a..e1a258260d 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -15,7 +15,7 @@ version = "0.10.0-alpha.1" default = ["tracing", "multipart"] async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] -file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] +file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio", "tokio?/fs"] attachment = ["dep:tracing"] error_response = ["dep:tracing", "tracing/std"] cookie = ["dep:cookie"] @@ -68,7 +68,7 @@ prost = { version = "0.13", optional = true } serde_html_form = { version = "0.2.0", optional = true } serde_json = { version = "1.0.71", optional = true } serde_path_to_error = { version = "0.1.8", optional = true } -tokio = { version = "1.19", optional = true, features = ["fs"] } +tokio = { version = "1.19", optional = true } tokio-stream = { version = "0.1.9", optional = true } tokio-util = { version = "0.7", optional = true } tracing = { version = "0.1.37", default-features = false, optional = true } @@ -105,7 +105,6 @@ allowed = [ "prost", "serde", "tokio", - "tokio-util", "tower_layer", "tower_service", ] From d569225f7d5a53bbab44ae11c48afd7435d8ca65 Mon Sep 17 00:00:00 2001 From: yanhe Date: Mon, 2 Dec 2024 08:50:43 +0800 Subject: [PATCH 11/21] Improvement code --- axum-extra/src/lib.rs | 1 + axum-extra/src/response/file_stream.rs | 4 ++-- axum-extra/src/response/mod.rs | 6 ++++++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/axum-extra/src/lib.rs b/axum-extra/src/lib.rs index 4583be7e26..db197edf72 100644 --- a/axum-extra/src/lib.rs +++ b/axum-extra/src/lib.rs @@ -26,6 +26,7 @@ //! `typed-routing` | Enables the [`TypedPath`](crate::routing::TypedPath) routing utilities | No //! `typed-header` | Enables the [`TypedHeader`] extractor and response | No //! `fileStream` | Enables the [`fileStream`](crate::response::file_stream) response | No +//! //! [`axum`]: #![warn( diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index 057a10cd8f..38d475e4cb 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -142,11 +142,11 @@ where header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}\"", file_name), ); - }; + } if let Some(content_size) = self.content_size { resp = resp.header(header::CONTENT_LENGTH, content_size); - }; + } resp.body(body::Body::from_stream(self.stream)) .unwrap_or_else(|e| { diff --git a/axum-extra/src/response/mod.rs b/axum-extra/src/response/mod.rs index 8140cf65e8..9240c11a23 100644 --- a/axum-extra/src/response/mod.rs +++ b/axum-extra/src/response/mod.rs @@ -16,6 +16,12 @@ mod error_response; /// Module for handling file streams. pub mod file_stream; +#[cfg(feature = "file-stream")] +pub use file_stream::FileStream; + +#[cfg(feature = "file-stream")] +pub use file_stream::AsyncReaderStream; + #[cfg(feature = "error_response")] pub use error_response::InternalServerError; From 32c121b3ca222fe8bb6db2c6ce70b4a45c638998 Mon Sep 17 00:00:00 2001 From: yanhe Date: Mon, 2 Dec 2024 18:47:03 +0800 Subject: [PATCH 12/21] fix CHANGELOG --- axum-extra/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/axum-extra/CHANGELOG.md b/axum-extra/CHANGELOG.md index 7b1230cb90..4690a49465 100644 --- a/axum-extra/CHANGELOG.md +++ b/axum-extra/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning]. [#2242]: https://github.com/tokio-rs/axum/pull/2242 [#2961]: https://github.com/tokio-rs/axum/pull/2961 [#2962]: https://github.com/tokio-rs/axum/pull/2962 +[#3047]: https://github.com/tokio-rs/axum/pull/3047 # 0.10.0 From 3cd7fc36d905d23adf1758a2544696fcaa0aa3f2 Mon Sep 17 00:00:00 2001 From: yanhe Date: Tue, 3 Dec 2024 22:58:58 +0800 Subject: [PATCH 13/21] Support for range mode returns --- axum-extra/Cargo.toml | 6 +- axum-extra/src/response/file_stream.rs | 261 ++++++++++++++++++++++++- examples/stream-to-file/src/main.rs | 135 ++++++++++++- 3 files changed, 390 insertions(+), 12 deletions(-) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index e1a258260d..24ec50b3aa 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -12,10 +12,10 @@ repository = "https://github.com/tokio-rs/axum" version = "0.10.0-alpha.1" [features] -default = ["tracing", "multipart"] +default = ["tracing", "multipart", "file-stream"] async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] -file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio", "tokio?/fs"] +file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio", "tokio?/fs", "tokio?/io-util", "dep:async-stream"] attachment = ["dep:tracing"] error_response = ["dep:tracing", "tracing/std"] cookie = ["dep:cookie"] @@ -57,6 +57,7 @@ tower-layer = "0.3" tower-service = "0.3" # optional dependencies +async-stream = { version = "0.3", optional = true } axum-macros = { path = "../axum-macros", version = "0.5.0-alpha.1", optional = true } cookie = { package = "cookie", version = "0.18.0", features = ["percent-encode"], optional = true } fastrand = { version = "2.1.0", optional = true } @@ -75,6 +76,7 @@ tracing = { version = "0.1.37", default-features = false, optional = true } typed-json = { version = "0.1.1", optional = true } [dev-dependencies] +async-stream = "0.3" axum = { path = "../axum", features = ["macros"] } axum-macros = { path = "../axum-macros", features = ["__private"] } hyper = "1.0.0" diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index 38d475e4cb..69c9f9083c 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -4,10 +4,13 @@ use axum::{ BoxError, }; use bytes::Bytes; -use futures_util::TryStream; +use futures_util::{Stream, TryStream}; use http::{header, StatusCode}; -use std::{io, path::PathBuf}; -use tokio::fs::File; +use std::{io, path::Path}; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt}, +}; use tokio_util::io::ReaderStream; /// Alias for `tokio_util::io::ReaderStream`. @@ -27,6 +30,7 @@ pub type AsyncReaderStream = ReaderStream; /// use axum_extra::response::file_stream::FileStream; /// use tokio::fs::File; /// use tokio_util::io::ReaderStream; +/// /// async fn file_stream() -> Result { /// let stream=ReaderStream::new(File::open("test.txt").await.map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))?); /// let file_stream_resp = FileStream::new(stream) @@ -70,6 +74,8 @@ where /// Create a file stream from a file path. /// # Examples /// ``` + /// use std::path::Path; + /// /// use axum::{ /// http::StatusCode, /// response::{Response, IntoResponse}, @@ -80,8 +86,9 @@ where /// use std::path::PathBuf; /// use tokio::fs::File; /// use tokio_util::io::ReaderStream; + /// /// async fn file_stream() -> Response { - /// FileStream::>::from_path(PathBuf::from("test.txt")) + /// FileStream::>::from_path(&PathBuf::from("test.txt")) /// .await /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) /// .into_response() @@ -89,7 +96,7 @@ where /// let app = Router::new().route("/FileStreamDownload", get(file_stream)); /// # let _: Router = app; /// ``` - pub async fn from_path(path: PathBuf) -> io::Result> { + pub async fn from_path(path: &Path) -> io::Result> { // open file let file = File::open(&path).await?; let mut content_size = None; @@ -126,6 +133,165 @@ where self.content_size = Some(len.into()); self } + + /// return a range response + /// range: (start, end, total_size) + /// # Examples + /// + /// ``` + /// use axum::{ + /// http::StatusCode, + /// response::{Response, IntoResponse}, + /// Router, + /// routing::get + /// }; + /// use axum_extra::response::file_stream::FileStream; + /// use tokio::fs::File; + /// use tokio_util::io::ReaderStream; + /// use tokio::io::AsyncSeekExt; + /// + /// async fn range_response() -> Result { + /// let mut file=File::open("test.txt").await.map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}")))?; + /// let mut file_size=file.metadata().await.map_err(|e| (StatusCode::NOT_FOUND, format!("Get file size: {e}")))?.len(); + /// file.seek(std::io::SeekFrom::Start(10)).await.map_err(|e| (StatusCode::NOT_FOUND, format!("File seek error: {e}")))?; + /// let stream=ReaderStream::new(file); + /// + /// Ok(FileStream::new(stream).into_range_response(10, file_size-1, file_size)) + /// } + /// let app = Router::new().route("/FileStreamRange", get(range_response)); + /// # let _: Router = app; + /// ``` + pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response { + let mut resp = Response::builder().header(header::CONTENT_TYPE, "application/octet-stream"); + resp = resp.status(StatusCode::PARTIAL_CONTENT); + + resp = resp.header( + header::CONTENT_RANGE, + format!("bytes {}-{}/{}", start, end, total_size), + ); + + resp.body(body::Body::from_stream(self.stream)) + .unwrap_or_else(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("build FileStream responsec error: {}", e), + ) + .into_response() + }) + } + + /// Attempts to return RANGE requests directly from the file path + /// # Arguments + /// * `file_path` - The path of the file to be streamed + /// * `start` - The start position of the range, if start > file size or start > end return Range Not Satisfiable + /// * `end` - The end position of the range if end == 0 end = file size - 1 + /// * `buffer_size` - The buffer size of the range + /// # Examples + /// ``` + /// use axum::{ + /// http::StatusCode, + /// response::{Response, IntoResponse}, + /// Router, + /// routing::get + /// }; + /// use std::path::Path; + /// use axum_extra::response::file_stream::FileStream; + /// use tokio::fs::File; + /// use tokio_util::io::ReaderStream; + /// use tokio::io::AsyncSeekExt; + /// use axum_extra::response::AsyncReaderStream; + /// + /// async fn range_stream() -> Response { + /// let range_start = 0; + /// let range_end = 1024; + /// let buffer_size = 1024; + /// + /// FileStream::::try_range_response(Path::new("CHANGELOG.md"),range_start,range_end,buffer_size).await + /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) + /// .into_response() + /// + /// } + /// let app = Router::new().route("/FileStreamRange", get(range_stream)); + /// # let _: Router = app; + /// ``` + pub async fn try_range_response( + file_path: &Path, + start: u64, + mut end: u64, + buffer_size: usize, + ) -> io::Result { + // open file + let file = File::open(file_path).await?; + + // get file metadata + let metadata = file.metadata().await?; + let total_size = metadata.len(); + + if end == 0 { + end = total_size - 1; + } + + // range check + if start > total_size { + return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range Not Satisfiable").into_response()); + } + if start > end { + return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range Not Satisfiable").into_response()); + } + if end >= total_size { + return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range Not Satisfiable").into_response()); + } + + // get file stream + let stream = try_stream(file, start, end, buffer_size).await?; + let mut resp = Response::builder().header(header::CONTENT_TYPE, "application/octet-stream"); + resp = resp.status(StatusCode::PARTIAL_CONTENT); + + resp = resp.header( + header::CONTENT_RANGE, + format!("bytes {}-{}/{}", start, end, total_size), + ); + + Ok(resp + .body(body::Body::from_stream(stream)) + .unwrap_or_else(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("build FileStream responsec error: {}", e), + ) + .into_response() + })) + } +} + +/// More complex manipulation of files and conversion to a stream +async fn try_stream( + mut file: File, + start: u64, + end: u64, + buffer_size: usize, +) -> Result, io::Error>>, io::Error> { + file.seek(std::io::SeekFrom::Start(start)).await?; + + let mut buffer = vec![0; buffer_size]; + + let stream = async_stream::try_stream! { + let mut total_read = 0; + + while total_read < end { + let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); + let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + if n == 0 { + break; // EOF + } + total_read += n as u64; + yield buffer[..n].to_vec(); + + } + }; + Ok(stream) } impl IntoResponse for FileStream @@ -152,7 +318,7 @@ where .unwrap_or_else(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, - format!("build FileStream responsec error:{}", e), + format!("build FileStream responsec error: {}", e), ) .into_response() }) @@ -164,6 +330,7 @@ mod tests { use super::*; use axum::{extract::Request, routing::get, Router}; use body::Body; + use http::HeaderMap; use http_body_util::BodyExt; use std::io::Cursor; use tokio_util::io::ReaderStream; @@ -342,7 +509,7 @@ mod tests { let app = Router::new().route( "/from_path", get(move || async move { - FileStream::::from_path("CHANGELOG.md".into()) + FileStream::::from_path(Path::new("CHANGELOG.md")) .await .unwrap() .into_response() @@ -388,4 +555,84 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn response_range_file() -> Result<(), Box> { + let app = Router::new().route("/range_response", get(range_stream)); + + // Simulating a GET request + let response = app + .oneshot( + Request::builder() + .uri("/range_response") + .header(header::RANGE, "bytes=20-1000") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Validate Response Status Code + assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT); + + // Validate Response Headers + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/octet-stream" + ); + + let file = File::open("CHANGELOG.md").await.unwrap(); + // get file size + let content_length = file.metadata().await.unwrap().len(); + + assert_eq!( + response + .headers() + .get("content-range") + .unwrap() + .to_str() + .unwrap(), + format!("bytes 20-1000/{}", content_length) + ); + Ok(()) + } + + async fn range_stream(headers: HeaderMap) -> Response { + let range_header = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()); + + let (start, end) = if let Some(range) = range_header { + if let Some(range) = parse_range_header(range) { + range + } else { + return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); + } + } else { + (0, 0) // default range end = 0, if end = 0 end == file size - 1 + }; + + FileStream::::try_range_response( + Path::new("CHANGELOG.md"), + start, + end, + 1024, + ) + .await + .unwrap() + } + + fn parse_range_header(range: &str) -> Option<(u64, u64)> { + let range = range.strip_prefix("bytes=")?; + let mut parts = range.split('-'); + let start = parts.next()?.parse::().ok()?; + let end = parts + .next() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + if start > end { + return None; + } + Some((start, end)) + } } diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 50fa50a39e..81faea1f82 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -8,7 +8,7 @@ use async_stream::try_stream; use axum::{ body::Bytes, extract::{Multipart, Path, Request}, - http::StatusCode, + http::{header, HeaderMap, StatusCode}, response::{Html, IntoResponse, Redirect, Response}, routing::{get, post}, BoxError, Router, @@ -53,7 +53,9 @@ async fn main() { .route("/", get(show_form2).post(accept_form)) .route("/file/{file_name}", post(save_request_body)) .route("/file_download", get(file_download_handler)) - .route("/simpler_file_download", get(simpler_file_download_handler)); + .route("/simpler_file_download", get(simpler_file_download_handler)) + .route("/range_file", get(file_range_handler)) + .route("/range_file_stream", get(try_file_range_handler)); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await @@ -146,6 +148,15 @@ async fn show_form2() -> Html<&'static str> { + + +
+
+ + +
+
+ "#, @@ -158,7 +169,7 @@ async fn simpler_file_download_handler() -> Response { //If you want to simply return a file as a stream // you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length. FileStream::::from_path( - std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"), + &std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"), ) .await .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response()) @@ -230,6 +241,124 @@ async fn try_stream( Ok(stream) } +async fn try_stream2( + mut file: File, + start: u64, + mut end: u64, + buffer_size: usize, +) -> Result, std::io::Error>>, String> { + file.seek(std::io::SeekFrom::Start(start)) + .await + .map_err(|e| format!("file seek err:{e}"))?; + + if end == 0 { + let metadata = file + .metadata() + .await + .map_err(|e| format!("file get metadata err:{e}"))?; + end = metadata.len(); + } + + let mut buffer = vec![0; buffer_size]; + + let stream = try_stream! { + let mut total_read = 0; + + while total_read < end { + let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); + let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + if n == 0 { + break; // EOF + } + total_read += n as u64; + yield buffer[..n].to_vec(); + + } + }; + Ok(stream) +} + +/// A file download handler that accepts a range header and returns a partial file as a stream. +/// You can return directly from the path +/// But you can't download this stream directly from your browser, you need to use a tool like curl or Postman. +async fn try_file_range_handler(headers: HeaderMap) -> Response { + let range_header = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()); + + let (start, end) = if let Some(range) = range_header { + if let Some(range) = parse_range_header(range) { + range + } else { + return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); + } + } else { + (0, 0) // default range end = 0, if end = 0 end == file size - 1 + }; + + let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); + FileStream::::try_range_response( + std::path::Path::new(&file_path), + start, + end, + 1024, + ) + .await + .unwrap() +} + +/// If you want to control the stream yourself +async fn file_range_handler(headers: HeaderMap) -> Response { + // Parse the range header to get the start and end values. + let range_header = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()); + + // If the range header is invalid, return a 416 Range Not Satisfiable response. + let (start, end) = if let Some(range) = range_header { + if let Some(range) = parse_range_header(range) { + range + } else { + return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); + } + } else { + (0, 0) // default range end = 0, if end = 0 end == file size - 1 + }; + + let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); + + let file = File::open(file_path).await.unwrap(); + + let file_size = file.metadata().await.unwrap().len(); + + let file_stream = match try_stream2(file, start, end, 256).await { + Ok(file_stream) => file_stream, + Err(e) => { + println!("{e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); + } + }; + + FileStream::new(Box::pin(file_stream)).into_range_response(start, end, file_size) +} + +/// Parse the range header and return the start and end values. +fn parse_range_header(range: &str) -> Option<(u64, u64)> { + let range = range.strip_prefix("bytes=")?; + let mut parts = range.split('-'); + let start = parts.next()?.parse::().ok()?; + let end = parts + .next() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + if start > end { + return None; + } + Some((start, end)) +} + // Handler that accepts a multipart form upload and streams each field to a file. async fn accept_form(mut multipart: Multipart) -> Result { while let Ok(Some(field)) = multipart.next_field().await { From 8e6e969fcc8cc4f6b3ea6a52d31eb7bd21aac653 Mon Sep 17 00:00:00 2001 From: yanhe Date: Tue, 3 Dec 2024 22:59:50 +0800 Subject: [PATCH 14/21] fix Cargo.toml --- axum-extra/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index 24ec50b3aa..6c37e5c24b 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -12,7 +12,7 @@ repository = "https://github.com/tokio-rs/axum" version = "0.10.0-alpha.1" [features] -default = ["tracing", "multipart", "file-stream"] +default = ["tracing", "multipart"] async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio", "tokio?/fs", "tokio?/io-util", "dep:async-stream"] From 1ded31e014c4ade0df6d42e26117b6647abf74b3 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 3 Dec 2024 21:58:55 +0100 Subject: [PATCH 15/21] Clean up Cargo.toml --- axum-extra/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axum-extra/src/lib.rs b/axum-extra/src/lib.rs index db197edf72..a44664ee52 100644 --- a/axum-extra/src/lib.rs +++ b/axum-extra/src/lib.rs @@ -27,7 +27,7 @@ //! `typed-header` | Enables the [`TypedHeader`] extractor and response | No //! `fileStream` | Enables the [`fileStream`](crate::response::file_stream) response | No //! -//! [`axum`]: +//! [`axum`]: https://crates.io/crates/axum #![warn( clippy::all, From 6e1b21661873cac7f3a7f74a0e6eee8c8270406a Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 3 Dec 2024 21:59:27 +0100 Subject: [PATCH 16/21] Update axum-extra/src/lib.rs --- axum-extra/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axum-extra/src/lib.rs b/axum-extra/src/lib.rs index a44664ee52..468658081a 100644 --- a/axum-extra/src/lib.rs +++ b/axum-extra/src/lib.rs @@ -25,7 +25,7 @@ //! `tracing` | Log rejections from built-in extractors | Yes //! `typed-routing` | Enables the [`TypedPath`](crate::routing::TypedPath) routing utilities | No //! `typed-header` | Enables the [`TypedHeader`] extractor and response | No -//! `fileStream` | Enables the [`fileStream`](crate::response::file_stream) response | No +//! `FileStream` | Enables the [`FileStream`](crate::response::FileStream) response | No //! //! [`axum`]: https://crates.io/crates/axum From b5cdd1d3ad54d5ba2e4c939f68e7d04b260fba01 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 3 Dec 2024 22:13:34 +0100 Subject: [PATCH 17/21] Minor improvements --- axum-extra/src/response/file_stream.rs | 76 +++++++++++++------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index 69c9f9083c..164b905712 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -17,7 +17,9 @@ use tokio_util::io::ReaderStream; pub type AsyncReaderStream = ReaderStream; /// Encapsulate the file stream. -/// The encapsulated file stream construct requires passing in a stream +/// +/// The encapsulated file stream construct requires passing in a stream. +/// /// # Examples /// /// ``` @@ -42,12 +44,7 @@ pub type AsyncReaderStream = ReaderStream; /// # let _: Router = app; /// ``` #[derive(Debug)] -pub struct FileStream -where - S: TryStream + Send + 'static, - S::Ok: Into, - S::Error: Into, -{ +pub struct FileStream { /// stream. pub stream: S, /// The file name of the file. @@ -72,10 +69,10 @@ where } /// Create a file stream from a file path. + /// /// # Examples - /// ``` - /// use std::path::Path; /// + /// ``` /// use axum::{ /// http::StatusCode, /// response::{Response, IntoResponse}, @@ -83,22 +80,21 @@ where /// routing::get /// }; /// use axum_extra::response::file_stream::FileStream; - /// use std::path::PathBuf; /// use tokio::fs::File; /// use tokio_util::io::ReaderStream; /// /// async fn file_stream() -> Response { - /// FileStream::>::from_path(&PathBuf::from("test.txt")) - /// .await - /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) - /// .into_response() + /// FileStream::>::from_path("test.txt") + /// .await + /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) + /// .into_response() /// } /// let app = Router::new().route("/FileStreamDownload", get(file_stream)); /// # let _: Router = app; /// ``` - pub async fn from_path(path: &Path) -> io::Result> { + pub async fn from_path(path: impl AsRef) -> io::Result> { // open file - let file = File::open(&path).await?; + let file = File::open(path).await?; let mut content_size = None; let mut file_name = None; @@ -123,19 +119,21 @@ where } /// Set the file name of the file. - pub fn file_name>(mut self, file_name: T) -> Self { + pub fn file_name(mut self, file_name: impl Into) -> Self { self.file_name = Some(file_name.into()); self } /// Set the size of the file. - pub fn content_size>(mut self, len: T) -> Self { - self.content_size = Some(len.into()); + pub fn content_size(mut self, len: u64) -> Self { + self.content_size = Some(len); self } - /// return a range response + /// Return a range response. + /// /// range: (start, end, total_size) + /// /// # Examples /// /// ``` @@ -180,13 +178,17 @@ where }) } - /// Attempts to return RANGE requests directly from the file path + /// Attempts to return RANGE requests directly from the file path. + /// /// # Arguments + /// /// * `file_path` - The path of the file to be streamed /// * `start` - The start position of the range, if start > file size or start > end return Range Not Satisfiable /// * `end` - The end position of the range if end == 0 end = file size - 1 /// * `buffer_size` - The buffer size of the range + /// /// # Examples + /// /// ``` /// use axum::{ /// http::StatusCode, @@ -206,7 +208,7 @@ where /// let range_end = 1024; /// let buffer_size = 1024; /// - /// FileStream::::try_range_response(Path::new("CHANGELOG.md"),range_start,range_end,buffer_size).await + /// FileStream::::try_range_response("CHANGELOG.md", range_start, range_end, buffer_size).await /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) /// .into_response() /// @@ -215,7 +217,7 @@ where /// # let _: Router = app; /// ``` pub async fn try_range_response( - file_path: &Path, + file_path: impl AsRef, start: u64, mut end: u64, buffer_size: usize, @@ -249,7 +251,7 @@ where resp = resp.header( header::CONTENT_RANGE, - format!("bytes {}-{}/{}", start, end, total_size), + format!("bytes {start}-{end}/{total_size}"), ); Ok(resp @@ -257,7 +259,7 @@ where .unwrap_or_else(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, - format!("build FileStream responsec error: {}", e), + format!("build FileStream responsec error: {e}"), ) .into_response() })) @@ -278,19 +280,19 @@ async fn try_stream( let stream = async_stream::try_stream! { let mut total_read = 0; - while total_read < end { - let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); - let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - if n == 0 { - break; // EOF - } - total_read += n as u64; - yield buffer[..n].to_vec(); - + while total_read < end { + let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); + let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + if n == 0 { + break; // EOF + } + total_read += n as u64; + yield buffer[..n].to_vec(); } }; + Ok(stream) } @@ -318,7 +320,7 @@ where .unwrap_or_else(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, - format!("build FileStream responsec error: {}", e), + format!("build FileStream responsec error: {e}"), ) .into_response() }) From 250888acc4a2d15ddd9e9cced7680a07a5334bda Mon Sep 17 00:00:00 2001 From: yanhe Date: Wed, 4 Dec 2024 20:10:31 +0800 Subject: [PATCH 18/21] improvements code --- axum-extra/Cargo.toml | 3 +- axum-extra/src/response/file_stream.rs | 85 +++++--------------------- axum-extra/src/response/mod.rs | 3 - examples/stream-to-file/src/main.rs | 9 ++- 4 files changed, 21 insertions(+), 79 deletions(-) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index 036bba2478..633f323872 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -15,7 +15,7 @@ version = "0.10.0-alpha.1" default = ["tracing"] async-read-body = ["dep:tokio-util", "tokio-util?/io", "dep:tokio"] -file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio", "tokio?/fs", "tokio?/io-util", "dep:async-stream"] +file-stream = ["dep:tokio-util", "tokio-util?/io", "dep:tokio", "tokio?/fs", "tokio?/io-util"] attachment = ["dep:tracing"] error-response = ["dep:tracing", "tracing/std"] cookie = ["dep:cookie"] @@ -76,7 +76,6 @@ tracing = { version = "0.1.37", default-features = false, optional = true } typed-json = { version = "0.1.1", optional = true } [dev-dependencies] -async-stream = "0.3" axum = { path = "../axum", features = ["macros"] } axum-macros = { path = "../axum-macros", features = ["__private"] } hyper = "1.0.0" diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index 164b905712..ea8f2da5e1 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -4,7 +4,7 @@ use axum::{ BoxError, }; use bytes::Bytes; -use futures_util::{Stream, TryStream}; +use futures_util::TryStream; use http::{header, StatusCode}; use std::{io, path::Path}; use tokio::{ @@ -13,9 +13,6 @@ use tokio::{ }; use tokio_util::io::ReaderStream; -/// Alias for `tokio_util::io::ReaderStream`. -pub type AsyncReaderStream = ReaderStream; - /// Encapsulate the file stream. /// /// The encapsulated file stream construct requires passing in a stream. @@ -92,9 +89,9 @@ where /// let app = Router::new().route("/FileStreamDownload", get(file_stream)); /// # let _: Router = app; /// ``` - pub async fn from_path(path: impl AsRef) -> io::Result> { + pub async fn from_path(path: impl AsRef) -> io::Result>> { // open file - let file = File::open(path).await?; + let file = File::open(&path).await?; let mut content_size = None; let mut file_name = None; @@ -104,7 +101,7 @@ where } // get file name - if let Some(file_name_os) = path.file_name() { + if let Some(file_name_os) = path.as_ref().file_name() { if let Some(file_name_str) = file_name_os.to_str() { file_name = Some(file_name_str.to_owned()); } @@ -185,7 +182,6 @@ where /// * `file_path` - The path of the file to be streamed /// * `start` - The start position of the range, if start > file size or start > end return Range Not Satisfiable /// * `end` - The end position of the range if end == 0 end = file size - 1 - /// * `buffer_size` - The buffer size of the range /// /// # Examples /// @@ -201,14 +197,12 @@ where /// use tokio::fs::File; /// use tokio_util::io::ReaderStream; /// use tokio::io::AsyncSeekExt; - /// use axum_extra::response::AsyncReaderStream; /// /// async fn range_stream() -> Response { /// let range_start = 0; /// let range_end = 1024; - /// let buffer_size = 1024; /// - /// FileStream::::try_range_response("CHANGELOG.md", range_start, range_end, buffer_size).await + /// FileStream::>::try_range_response("CHANGELOG.md", range_start, range_end).await /// .map_err(|e| (StatusCode::NOT_FOUND, format!("File not found: {e}"))) /// .into_response() /// @@ -220,10 +214,9 @@ where file_path: impl AsRef, start: u64, mut end: u64, - buffer_size: usize, ) -> io::Result { // open file - let file = File::open(file_path).await?; + let mut file = File::open(file_path).await?; // get file metadata let metadata = file.metadata().await?; @@ -244,58 +237,17 @@ where return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range Not Satisfiable").into_response()); } - // get file stream - let stream = try_stream(file, start, end, buffer_size).await?; - let mut resp = Response::builder().header(header::CONTENT_TYPE, "application/octet-stream"); - resp = resp.status(StatusCode::PARTIAL_CONTENT); + // get file stream and seek to start to return range response + file.seek(std::io::SeekFrom::Start(start)).await?; - resp = resp.header( - header::CONTENT_RANGE, - format!("bytes {start}-{end}/{total_size}"), - ); + // lenght = end - start + 1 exmple: 0-10 = 11 bytes + let stream = ReaderStream::new(file.take(end - start + 1)); - Ok(resp - .body(body::Body::from_stream(stream)) - .unwrap_or_else(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("build FileStream responsec error: {e}"), - ) - .into_response() - })) + Ok(FileStream::new(stream).into_range_response(start, end, total_size)) } } -/// More complex manipulation of files and conversion to a stream -async fn try_stream( - mut file: File, - start: u64, - end: u64, - buffer_size: usize, -) -> Result, io::Error>>, io::Error> { - file.seek(std::io::SeekFrom::Start(start)).await?; - - let mut buffer = vec![0; buffer_size]; - - let stream = async_stream::try_stream! { - let mut total_read = 0; - - while total_read < end { - let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); - let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - if n == 0 { - break; // EOF - } - total_read += n as u64; - yield buffer[..n].to_vec(); - } - }; - - Ok(stream) -} - +/// default response is application/octet-stream and attachment mode; impl IntoResponse for FileStream where S: TryStream + Send + 'static, @@ -511,7 +463,7 @@ mod tests { let app = Router::new().route( "/from_path", get(move || async move { - FileStream::::from_path(Path::new("CHANGELOG.md")) + FileStream::>::from_path(Path::new("CHANGELOG.md")) .await .unwrap() .into_response() @@ -614,14 +566,9 @@ mod tests { (0, 0) // default range end = 0, if end = 0 end == file size - 1 }; - FileStream::::try_range_response( - Path::new("CHANGELOG.md"), - start, - end, - 1024, - ) - .await - .unwrap() + FileStream::>::try_range_response(Path::new("CHANGELOG.md"), start, end) + .await + .unwrap() } fn parse_range_header(range: &str) -> Option<(u64, u64)> { diff --git a/axum-extra/src/response/mod.rs b/axum-extra/src/response/mod.rs index 044d2b90b1..40a549f93c 100644 --- a/axum-extra/src/response/mod.rs +++ b/axum-extra/src/response/mod.rs @@ -19,9 +19,6 @@ pub mod file_stream; #[cfg(feature = "file-stream")] pub use file_stream::FileStream; -#[cfg(feature = "file-stream")] -pub use file_stream::AsyncReaderStream; - #[cfg(feature = "error-response")] pub use error_response::InternalServerError; diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 81faea1f82..ded7c0e68c 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -13,14 +13,14 @@ use axum::{ routing::{get, post}, BoxError, Router, }; -use axum_extra::response::file_stream::{AsyncReaderStream, FileStream}; +use axum_extra::response::file_stream::FileStream; use futures::{Stream, TryStreamExt}; use std::{io, path::PathBuf}; use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}, }; -use tokio_util::io::StreamReader; +use tokio_util::io::{ReaderStream, StreamReader}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const UPLOADS_DIRECTORY: &str = "uploads"; const DOWNLOAD_DIRECTORY: &str = "downloads"; @@ -168,7 +168,7 @@ async fn show_form2() -> Html<&'static str> { async fn simpler_file_download_handler() -> Response { //If you want to simply return a file as a stream // you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length. - FileStream::::from_path( + FileStream::>::from_path( &std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"), ) .await @@ -299,11 +299,10 @@ async fn try_file_range_handler(headers: HeaderMap) -> Response { }; let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); - FileStream::::try_range_response( + FileStream::>::try_range_response( std::path::Path::new(&file_path), start, end, - 1024, ) .await .unwrap() From 55c5f522ec4421e6786f1ce0f3f8eb32cbe2bc80 Mon Sep 17 00:00:00 2001 From: yanhe Date: Wed, 4 Dec 2024 20:17:28 +0800 Subject: [PATCH 19/21] fix ci --- axum-extra/Cargo.toml | 1 + axum-extra/src/response/file_stream.rs | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index 633f323872..1240d34a35 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -109,6 +109,7 @@ allowed = [ "prost", "serde", "tokio", + "tokio_util", "tower_layer", "tower_service", ] diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index ea8f2da5e1..a779787780 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -162,14 +162,14 @@ where resp = resp.header( header::CONTENT_RANGE, - format!("bytes {}-{}/{}", start, end, total_size), + format!("bytes {start}-{end}/{total_size}"), ); resp.body(body::Body::from_stream(self.stream)) .unwrap_or_else(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, - format!("build FileStream responsec error: {}", e), + format!("build FileStream responsec error: {e}"), ) .into_response() }) @@ -240,7 +240,6 @@ where // get file stream and seek to start to return range response file.seek(std::io::SeekFrom::Start(start)).await?; - // lenght = end - start + 1 exmple: 0-10 = 11 bytes let stream = ReaderStream::new(file.take(end - start + 1)); Ok(FileStream::new(stream).into_range_response(start, end, total_size)) @@ -260,7 +259,7 @@ where if let Some(file_name) = self.file_name { resp = resp.header( header::CONTENT_DISPOSITION, - format!("attachment; filename=\"{}\"", file_name), + format!("attachment; filename=\"{file_name}\""), ); } From 33124c12eba3a7cc321b93e524a1e4379a6c77d6 Mon Sep 17 00:00:00 2001 From: yanhe Date: Wed, 4 Dec 2024 20:23:44 +0800 Subject: [PATCH 20/21] split the example change --- axum-extra/src/response/file_stream.rs | 2 +- examples/stream-to-file/Cargo.toml | 4 +- examples/stream-to-file/src/main.rs | 294 +------------------------ 3 files changed, 12 insertions(+), 288 deletions(-) diff --git a/axum-extra/src/response/file_stream.rs b/axum-extra/src/response/file_stream.rs index a779787780..03271410c5 100644 --- a/axum-extra/src/response/file_stream.rs +++ b/axum-extra/src/response/file_stream.rs @@ -545,7 +545,7 @@ mod tests { .unwrap() .to_str() .unwrap(), - format!("bytes 20-1000/{}", content_length) + format!("bytes 20-1000/{content_length}") ); Ok(()) } diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index fee5e0ebbe..534324f846 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -5,11 +5,9 @@ edition = "2021" publish = false [dependencies] -async-stream = "0.3" axum = { path = "../../axum", features = ["multipart"] } -axum-extra = { path = "../../axum-extra", features = ["file-stream"] } futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } \ No newline at end of file diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index ded7c0e68c..46272cf98f 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -4,26 +4,22 @@ //! cargo run -p example-stream-to-file //! ``` -use async_stream::try_stream; use axum::{ body::Bytes, extract::{Multipart, Path, Request}, - http::{header, HeaderMap, StatusCode}, - response::{Html, IntoResponse, Redirect, Response}, + http::StatusCode, + response::{Html, Redirect}, routing::{get, post}, BoxError, Router, }; -use axum_extra::response::file_stream::FileStream; use futures::{Stream, TryStreamExt}; -use std::{io, path::PathBuf}; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}, -}; -use tokio_util::io::{ReaderStream, StreamReader}; +use std::io; +use tokio::{fs::File, io::BufWriter}; +use tokio_util::io::StreamReader; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + const UPLOADS_DIRECTORY: &str = "uploads"; -const DOWNLOAD_DIRECTORY: &str = "downloads"; + #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -39,23 +35,9 @@ async fn main() { .await .expect("failed to create `uploads` directory"); - tokio::fs::create_dir(DOWNLOAD_DIRECTORY) - .await - .expect("failed to create `downloads` directory"); - - //create a file to download - create_test_file(std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt")) - .await - .expect("failed to create test file"); - let app = Router::new() - .route("/upload", get(show_form).post(accept_form)) - .route("/", get(show_form2).post(accept_form)) - .route("/file/{file_name}", post(save_request_body)) - .route("/file_download", get(file_download_handler)) - .route("/simpler_file_download", get(simpler_file_download_handler)) - .route("/range_file", get(file_range_handler)) - .route("/range_file_stream", get(try_file_range_handler)); + .route("/", get(show_form).post(accept_form)) + .route("/file/{file_name}", post(save_request_body)); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await @@ -64,19 +46,6 @@ async fn main() { axum::serve(listener, app).await.unwrap(); } -async fn create_test_file(path: PathBuf) -> io::Result<()> { - let mut file = File::create(path).await?; - for i in 1..=30 { - let line = format!( - "Hello, this is the simulated file content! This is line {}\n", - i - ); - file.write_all(line.as_bytes()).await?; - } - file.flush().await?; - Ok(()) -} - // Handler that streams the request body to a file. // // POST'ing to `/file/foo.txt` will create a file called `foo.txt`. @@ -115,249 +84,6 @@ async fn show_form() -> Html<&'static str> { ) } -// Handler that returns HTML for a multipart form. -async fn show_form2() -> Html<&'static str> { - Html( - r#" - - - - Upload and Download! - - -

Upload and Download Files

- - -
-
- -
- -
- -
-
- -
- -
-
- -
-
- - -
-
- - -
-
- - - - "#, - ) -} - -/// A simpler file download handler that uses the `FileStream` response. -/// Returns the entire file as a stream. -async fn simpler_file_download_handler() -> Response { - //If you want to simply return a file as a stream - // you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length. - FileStream::>::from_path( - &std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"), - ) - .await - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response()) - .into_response() -} - -/// If you want to control the returned files in more detail you can implement a Stream -/// For example, use the try_stream! macro to construct a file stream and set which parts are needed. -async fn file_download_handler() -> Response { - let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); - let file_stream = match try_stream(&file_path, 5, 25, 10).await { - Ok(file_stream) => file_stream, - Err(e) => { - println!("{e}"); - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); - } - }; - - // Use FileStream to return and set some information. - // Will set application/octet-stream in the header. - let file_stream_resp = FileStream::new(Box::pin(file_stream)) - .file_name("test.txt") - .content_size(20_u64); - - file_stream_resp.into_response() -} - -/// More complex manipulation of files and conversion to a stream -async fn try_stream( - file_path: &str, - start: u64, - mut end: u64, - buffer_size: usize, -) -> Result, std::io::Error>>, String> { - let mut file = File::open(file_path) - .await - .map_err(|e| format!("open file:{file_path} err:{e}"))?; - - file.seek(std::io::SeekFrom::Start(start)) - .await - .map_err(|e| format!("file:{file_path} seek err:{e}"))?; - - if end == 0 { - let metadata = file - .metadata() - .await - .map_err(|e| format!("file:{file_path} get metadata err:{e}"))?; - end = metadata.len(); - } - - let mut buffer = vec![0; buffer_size]; - - let stream = try_stream! { - let mut total_read = 0; - - while total_read < end { - let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); - let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - if n == 0 { - break; // EOF - } - total_read += n as u64; - yield buffer[..n].to_vec(); - - } - }; - Ok(stream) -} - -async fn try_stream2( - mut file: File, - start: u64, - mut end: u64, - buffer_size: usize, -) -> Result, std::io::Error>>, String> { - file.seek(std::io::SeekFrom::Start(start)) - .await - .map_err(|e| format!("file seek err:{e}"))?; - - if end == 0 { - let metadata = file - .metadata() - .await - .map_err(|e| format!("file get metadata err:{e}"))?; - end = metadata.len(); - } - - let mut buffer = vec![0; buffer_size]; - - let stream = try_stream! { - let mut total_read = 0; - - while total_read < end { - let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); - let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - if n == 0 { - break; // EOF - } - total_read += n as u64; - yield buffer[..n].to_vec(); - - } - }; - Ok(stream) -} - -/// A file download handler that accepts a range header and returns a partial file as a stream. -/// You can return directly from the path -/// But you can't download this stream directly from your browser, you need to use a tool like curl or Postman. -async fn try_file_range_handler(headers: HeaderMap) -> Response { - let range_header = headers - .get(header::RANGE) - .and_then(|value| value.to_str().ok()); - - let (start, end) = if let Some(range) = range_header { - if let Some(range) = parse_range_header(range) { - range - } else { - return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); - } - } else { - (0, 0) // default range end = 0, if end = 0 end == file size - 1 - }; - - let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); - FileStream::>::try_range_response( - std::path::Path::new(&file_path), - start, - end, - ) - .await - .unwrap() -} - -/// If you want to control the stream yourself -async fn file_range_handler(headers: HeaderMap) -> Response { - // Parse the range header to get the start and end values. - let range_header = headers - .get(header::RANGE) - .and_then(|value| value.to_str().ok()); - - // If the range header is invalid, return a 416 Range Not Satisfiable response. - let (start, end) = if let Some(range) = range_header { - if let Some(range) = parse_range_header(range) { - range - } else { - return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); - } - } else { - (0, 0) // default range end = 0, if end = 0 end == file size - 1 - }; - - let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); - - let file = File::open(file_path).await.unwrap(); - - let file_size = file.metadata().await.unwrap().len(); - - let file_stream = match try_stream2(file, start, end, 256).await { - Ok(file_stream) => file_stream, - Err(e) => { - println!("{e}"); - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); - } - }; - - FileStream::new(Box::pin(file_stream)).into_range_response(start, end, file_size) -} - -/// Parse the range header and return the start and end values. -fn parse_range_header(range: &str) -> Option<(u64, u64)> { - let range = range.strip_prefix("bytes=")?; - let mut parts = range.split('-'); - let start = parts.next()?.parse::().ok()?; - let end = parts - .next() - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); - if start > end { - return None; - } - Some((start, end)) -} - // Handler that accepts a multipart form upload and streams each field to a file. async fn accept_form(mut multipart: Multipart) -> Result { while let Ok(Some(field)) = multipart.next_field().await { @@ -415,4 +141,4 @@ fn path_is_valid(path: &str) -> bool { } components.count() == 1 -} +} \ No newline at end of file From 444bfbf41423f3da1dcaccf9bffde04fdad6d0f8 Mon Sep 17 00:00:00 2001 From: yanhe Date: Wed, 4 Dec 2024 20:26:02 +0800 Subject: [PATCH 21/21] fix ci warn --- examples/stream-to-file/Cargo.toml | 2 +- examples/stream-to-file/src/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index 534324f846..b159c871ba 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -10,4 +10,4 @@ futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } \ No newline at end of file +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 46272cf98f..7c44286d87 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -141,4 +141,4 @@ fn path_is_valid(path: &str) -> bool { } components.count() == 1 -} \ No newline at end of file +}