From 17bd32e90e94e4d90e88eca56203d0bf72ea0d14 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2024 11:15:58 +0200 Subject: [PATCH] router, rest-server: add StreamSync and StreamAsync API handlers These are Iterators or Streams which continuously produce output. They can either be formatted, in which they are serialized like the as usually, or, if the client caccepts `application/json-seq` via an `Accept` header, it will be streamed as a sequence directly. Signed-off-by: Wolfgang Bumiller --- proxmox-rest-server/src/formatter.rs | 29 ++++ proxmox-rest-server/src/h2service.rs | 2 +- proxmox-rest-server/src/rest.rs | 177 +++++++++----------- proxmox-router/Cargo.toml | 5 + proxmox-router/src/cli/command.rs | 15 ++ proxmox-router/src/router.rs | 237 +++++++++++++++++++++++++++ 6 files changed, 360 insertions(+), 105 deletions(-) diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs index 793d6b1b..32ca9936 100644 --- a/proxmox-rest-server/src/formatter.rs +++ b/proxmox-rest-server/src/formatter.rs @@ -94,6 +94,35 @@ fn start_data_streaming( reader } +struct DirectJsonFormatter; + +/// Format data directly as ``application/json``. +/// +/// This does not support result attributes set on `rpcenv`. +/// +/// Errors generates a BAD_REQUEST containing the error message as string. +pub static DIRECT_JSON_FORMATTER: &'static dyn OutputFormatter = &DirectJsonFormatter; + +impl OutputFormatter for DirectJsonFormatter { + fn format_data(&self, data: Value, _rpcenv: &dyn RpcEnvironment) -> Response { + json_data_response(data) + } + + fn format_data_streaming( + &self, + data: Box, + _rpcenv: &dyn RpcEnvironment, + ) -> Result, Error> { + let reader = start_data_streaming(Value::Null, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + json_data_response_streaming(Body::wrap_stream(stream)) + } + + fn format_error(&self, err: Error) -> Response { + error_to_response(err) + } +} + struct JsonFormatter(); /// Format data as ``application/json`` diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs index 84a30985..db6e3b0a 100644 --- a/proxmox-rest-server/src/h2service.rs +++ b/proxmox-rest-server/src/h2service.rs @@ -66,7 +66,7 @@ impl H2Service { Some(api_method) => crate::rest::handle_api_request( self.rpcenv.clone(), api_method, - formatter, + Some(formatter), parts, body, uri_param, diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs index 4a3671a6..e79639b2 100644 --- a/proxmox-rest-server/src/rest.rs +++ b/proxmox-rest-server/src/rest.rs @@ -490,21 +490,85 @@ fn access_forbidden_time() -> std::time::Instant { std::time::Instant::now() + std::time::Duration::from_millis(500) } +fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result, Error> { + let (mut send, body) = hyper::Body::channel(); + tokio::spawn(async move { + use futures::StreamExt; + + let mut stream = stream.into_inner(); + while let Some(record) = stream.next().await { + if send.send_data(record.to_bytes().into()).await.is_err() { + break; + } + } + }); + + Ok(Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json-seq") + .body(body)?) +} + +fn handle_sync_stream_as_json_seq( + iter: proxmox_router::SyncStream, +) -> Result, Error> { + let iter = iter + .into_inner() + .map(|record| Ok::<_, Error>(record.to_bytes())); + + Ok(Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json-seq") + .body(Body::wrap_stream(futures::stream::iter(iter)))?) +} + pub(crate) async fn handle_api_request( mut rpcenv: Env, info: &'static ApiMethod, - formatter: &'static dyn OutputFormatter, + formatter: Option<&'static dyn OutputFormatter>, parts: Parts, req_body: Body, uri_param: HashMap, ) -> Result, Error> { + let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER); + let compression = extract_compression_method(&parts.headers); + let accept_json_seq = parts.headers.get_all(http::header::ACCEPT).iter().any(|h| { + h.as_ref() + .split(|&b| b == b',') + .map(|e| e.trim_ascii_start()) + .any(|e| e == b"application/json-seq" || e.starts_with(b"application/json-seq;")) + }); + let result = match info.handler { ApiHandler::AsyncHttp(handler) => { let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; (handler)(parts, req_body, params, info, Box::new(rpcenv)).await } + ApiHandler::StreamSync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + match (handler)(params, info, &mut rpcenv) { + Ok(iter) if accept_json_seq => handle_sync_stream_as_json_seq(iter), + Ok(iter) => iter + .try_collect() + .map(|data| formatter.format_data(data, &rpcenv)), + Err(err) => Err(err), + } + } + ApiHandler::StreamAsync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + match (handler)(params, info, &mut rpcenv).await { + Ok(stream) if accept_json_seq => handle_stream_as_json_seq(stream), + Ok(stream) => stream + .try_collect() + .await + .map(|data| formatter.format_data(data, &rpcenv)), + Err(err) => Err(err), + } + } ApiHandler::SerializingSync(handler) => { let params = get_request_parameters(info.parameters, parts, req_body, uri_param).await?; @@ -575,101 +639,6 @@ pub(crate) async fn handle_api_request( - mut rpcenv: Env, - info: &'static ApiMethod, - parts: Parts, - req_body: Body, - uri_param: HashMap, -) -> Result, Error> { - let compression = extract_compression_method(&parts.headers); - - fn to_json_response( - value: Value, - env: &Env, - ) -> Result, Error> { - if let Some(attr) = env.result_attrib().as_object() { - if !attr.is_empty() { - http_bail!( - INTERNAL_SERVER_ERROR, - "result attributes are no longer supported" - ); - } - } - let value = serde_json::to_string(&value)?; - Ok(Response::builder().status(200).body(value.into())?) - } - - let result = match info.handler { - ApiHandler::AsyncHttp(handler) => { - let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; - (handler)(parts, req_body, params, info, Box::new(rpcenv)).await - } - ApiHandler::Sync(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv).and_then(|v| to_json_response(v, &rpcenv)) - } - ApiHandler::Async(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv) - .await - .and_then(|v| to_json_response(v, &rpcenv)) - } - ApiHandler::SerializingSync(_) => http_bail!( - INTERNAL_SERVER_ERROR, - "old-style streaming calls not supported" - ), - ApiHandler::SerializingAsync(_) => http_bail!( - INTERNAL_SERVER_ERROR, - "old-style streaming calls not supported" - ), - _ => { - bail!("Unknown API handler type"); - } - }; - - let mut resp = match result { - Ok(resp) => resp, - Err(err) => { - if let Some(httperr) = err.downcast_ref::() { - if httperr.code == StatusCode::UNAUTHORIZED { - tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await; - } - } - return Err(err); - } - }; - - let resp = match compression { - Some(CompressionMethod::Deflate) => { - resp.headers_mut().insert( - header::CONTENT_ENCODING, - CompressionMethod::Deflate.content_encoding(), - ); - resp.map(|body| { - Body::wrap_stream( - DeflateEncoder::builder(TryStreamExt::map_err(body, |err| { - proxmox_lang::io_format_err!("error during compression: {}", err) - })) - .zlib(true) - .build(), - ) - }) - } - None => resp, - }; - - if info.reload_timezone { - unsafe { - tzset(); - } - } - - Ok(resp) -} - fn extension_to_content_type(filename: &Path) -> (&'static str, bool) { if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) { return match ext { @@ -1029,7 +998,8 @@ impl Formatted { { proxy_protected_request(config, api_method, parts, body, peer).await } else { - handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await + handle_api_request(rpcenv, api_method, Some(formatter), parts, body, uri_param) + .await }; let mut response = match result { @@ -1129,13 +1099,12 @@ impl Unformatted { return Err(err); } - let result = if api_method.protected - && rpcenv.env_type == RpcEnvironmentType::PUBLIC - { - proxy_protected_request(config, api_method, parts, body, peer).await - } else { - handle_unformatted_api_request(rpcenv, api_method, parts, body, uri_param).await - }; + let result = + if api_method.protected && rpcenv.env_type == RpcEnvironmentType::PUBLIC { + proxy_protected_request(config, api_method, parts, body, peer).await + } else { + handle_api_request(rpcenv, api_method, None, parts, body, uri_param).await + }; let mut response = match result { Ok(resp) => resp, diff --git a/proxmox-router/Cargo.toml b/proxmox-router/Cargo.toml index d5efa532..e1c6a01f 100644 --- a/proxmox-router/Cargo.toml +++ b/proxmox-router/Cargo.toml @@ -18,6 +18,7 @@ required-features = [ "cli" ] [dependencies] anyhow.workspace = true env_logger = { workspace = true, optional = true } +futures.workspace = true http = { workspace = true, optional = true } hyper = { workspace = true, features = [ "full" ], optional = true } nix.workspace = true @@ -35,6 +36,10 @@ proxmox-http-error.workspace = true proxmox-schema.workspace = true proxmox-async.workspace = true +[dev-dependencies] +tokio.workspace = true +tokio-stream.workspace = true + [features] default = [ "cli", "server" ] cli = [ "dep:env_logger", "dep:libc", "dep:rustyline" ] diff --git a/proxmox-router/src/cli/command.rs b/proxmox-router/src/cli/command.rs index 37d9ed54..01f64d19 100644 --- a/proxmox-router/src/cli/command.rs +++ b/proxmox-router/src/cli/command.rs @@ -90,10 +90,19 @@ async fn handle_simple_command_future( ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv), ApiHandler::SerializingSync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv) .and_then(|r| r.to_value().map_err(Error::from)), + ApiHandler::StreamSync(handler) => { + (handler)(params, cli_cmd.info, &mut rpcenv).and_then(|iter| iter.try_collect()) + } ApiHandler::Async(handler) => (handler)(params, cli_cmd.info, &mut rpcenv).await, ApiHandler::SerializingAsync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv) .await .and_then(|r| r.to_value().map_err(Error::from)), + ApiHandler::StreamAsync(handler) => { + match (handler)(params, cli_cmd.info, &mut rpcenv).await { + Ok(stream) => stream.try_collect().await, + Err(err) => Err(err), + } + } #[cfg(feature = "server")] ApiHandler::AsyncHttp(_) => { bail!("CliHandler does not support ApiHandler::AsyncHttp - internal error") @@ -130,6 +139,9 @@ pub(crate) fn handle_simple_command<'cli>( ApiHandler::SerializingSync(handler) => { (handler)(params, cli_cmd.info, rpcenv).and_then(|r| r.to_value().map_err(Error::from)) } + ApiHandler::StreamSync(handler) => { + (handler)(params, cli_cmd.info, rpcenv).and_then(|iter| iter.try_collect()) + } ApiHandler::Async(handler) => { let run = run.ok_or_else(|| { format_err!("CliHandler does not support ApiHandler::Async - internal error") @@ -140,6 +152,9 @@ pub(crate) fn handle_simple_command<'cli>( ApiHandler::SerializingAsync(_handler) => { bail!("CliHandler does not support ApiHandler::SerializingAsync - internal error"); } + ApiHandler::StreamAsync(_handler) => { + bail!("CliHandler does not support ApiHandler::StreamAsync - internal error"); + } #[cfg(feature = "server")] ApiHandler::AsyncHttp(_) => { bail!("CliHandler does not support ApiHandler::AsyncHttp - internal error"); diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs index c913d1ff..4585f503 100644 --- a/proxmox-router/src/router.rs +++ b/proxmox-router/src/router.rs @@ -11,6 +11,7 @@ use http::{Method, Response}; #[cfg(feature = "server")] use hyper::Body; use percent_encoding::percent_decode_str; +use serde::Serialize; use serde_json::Value; use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema}; @@ -77,6 +78,137 @@ pub type SerializingApiHandlerFn = &'static (dyn Fn( + Sync + 'static); +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +enum RecordEntry<'a> { + /// A successful record. + Data(&'a Value), + /// An error entry. + Error(Value), +} + +impl<'a> From<&'a Result> for RecordEntry<'a> { + fn from(res: &'a Result) -> Self { + match res { + Ok(value) => Self::Data(value), + Err(err) => Self::Error(err.to_string().into()), + } + } +} + +/// A record for a streaming API call. This contains a `Result` and allows formatting +/// as a `json-seq` formatted string. +/// +/// This is currently just a json string, but we don't want to fixate strings or byte vectors as +/// output for the streaming API handler, but also not commit to creating lots of allocated +/// `Box` elements, so this can be turned into either without breaking the +/// API. +pub struct Record { + // direct access is only for the CLI code + pub(crate) data: Result, +} + +impl Record { + /// Create a new successful record from a serializeable element. + pub fn new(data: &T) -> Self { + Self { + data: Ok(serde_json::to_value(data).expect("failed to create json string")), + } + } + + /// Create a new error record from an error value. + pub fn error>(error: E) -> Self { + Self { + data: Err(error.into()), + } + } + + /// Create a new error record from `Result`. + pub fn from_result(result: Result) -> Self + where + T: Serialize, + E: Into, + { + match result { + Ok(res) => Self::new(&res), + Err(err) => Self::error(err), + } + } + + /// Create/get the bytes for a complete record to be streamed as a json sequence according to + /// RFC7464: the data is prefixed with a record separator (`\x1E`) and ends with a newline + /// (`'\n'). + pub fn to_bytes(&self) -> Vec { + let mut data = Vec::new(); + data.push(b'\x1E'); + // We assume json serialization doesn't fail. + // Don't return special objects that can fail to serialize. + // As for "normal" data - we don't expect spurious errors, otherwise they could also happen + // when serializing *errors*... + serde_json::to_writer(&mut data, &RecordEntry::from(&self.data)) + .expect("failed to create JSON record"); + data.push(b'\n'); + data + } +} + +/// A synchronous API handler returns an [`Iterator`] over items which should be serialized. +/// +/// ``` +/// # use anyhow::Error; +/// # use serde_json::{json, Value}; +/// use proxmox_router::{ApiHandler, ApiMethod, Record, RpcEnvironment, SyncStream}; +/// use proxmox_schema::ObjectSchema; +/// +/// fn hello( +/// param: Value, +/// info: &ApiMethod, +/// rpcenv: &mut dyn RpcEnvironment, +/// ) -> Result { +/// Ok([Record::new(&3u32)].into()) +/// } +/// +/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new( +/// &ApiHandler::StreamSync(&hello), +/// &ObjectSchema::new("Hello World Example", &[]) +/// ); +/// ``` +pub type StreamApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironment) -> Result + + Send + + Sync + + 'static); + +pub struct SyncStream { + inner: Box + Send>, +} + +impl SyncStream { + pub fn from_boxed(inner: Box + Send>) -> Self { + Self { inner } + } + + pub fn into_inner(self) -> Box + Send> { + self.inner + } + + pub fn try_collect(self) -> Result { + let mut acc = Vec::new(); + for i in self.inner { + acc.push(i.data?); + } + Ok(Value::Array(acc)) + } +} + +impl From for SyncStream +where + I: IntoIterator + Send + 'static, +{ + fn from(iter: I) -> Self { + Self::from_boxed(Box::new(iter.into_iter().map(IntoRecord::into_record))) + } +} + /// Asynchronous API handlers /// /// Returns a future Value. @@ -147,6 +279,103 @@ pub type SerializingApiFuture<'a> = Pin< Box, anyhow::Error>> + Send + 'a>, >; +/// Streaming asynchronous API handlers +/// +/// Returns a future Value. +/// ``` +/// # use serde_json::{json, Value}; +/// # use tokio::sync::mpsc; +/// # use tokio::spawn; +/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, Record, RpcEnvironment, StreamApiFuture}; +/// use proxmox_schema::ObjectSchema; +/// +/// +/// fn hello_future<'a>( +/// param: Value, +/// info: &ApiMethod, +/// rpcenv: &'a mut dyn RpcEnvironment, +/// ) -> StreamApiFuture<'a> { +/// let (sender, receiver) = mpsc::channel(8); +/// tokio::spawn(async move { +/// sender.send(Record::new("data")).await; +/// sender.send(Record::new("more data")).await; +/// // ... +/// }); +/// let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver); +/// Box::pin(async move { Ok(receiver.into()) }) +/// } +/// +/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new( +/// &ApiHandler::StreamAsync(&hello_future), +/// &ObjectSchema::new("Hello World Example (async)", &[]) +/// ); +/// ``` +pub type StreamApiAsyncHandlerFn = &'static (dyn for<'a> Fn( + Value, + &'static ApiMethod, + &'a mut dyn RpcEnvironment, +) -> StreamApiFuture<'a> + + Send + + Sync); + +pub type StreamApiFuture<'a> = + Pin> + Send + 'a>>; + +pub struct Stream { + inner: Pin + Send>>, +} + +impl Stream { + pub fn from_boxed(inner: Pin + Send>>) -> Self { + Self { inner } + } + + pub fn into_inner(self) -> Pin + Send>> { + self.inner + } + + pub async fn try_collect(mut self) -> Result { + use futures::StreamExt; + + let mut acc = Vec::new(); + while let Some(i) = self.inner.next().await { + acc.push(i.data?); + } + Ok(Value::Array(acc)) + } +} + +impl From for Stream +where + I: futures::Stream + Send + 'static, +{ + fn from(stream: I) -> Self { + use futures::stream::StreamExt; + Self::from_boxed(Box::pin(stream.map(IntoRecord::into_record))) + } +} + +/// Helper trait to allow [`Stream`] and [`SyncStream`] to be constructed from both +/// regular streams and "`TryStreams`". +pub trait IntoRecord { + fn into_record(self) -> Record; +} + +impl IntoRecord for Record { + fn into_record(self) -> Record { + self + } +} + +impl IntoRecord for Result { + fn into_record(self) -> Record { + match self { + Ok(record) => record, + Err(err) => Record::error(err), + } + } +} + /// Asynchronous HTTP API handlers /// /// They get low level access to request and response data. Use this @@ -201,8 +430,10 @@ pub type ApiResponseFuture = pub enum ApiHandler { Sync(ApiHandlerFn), SerializingSync(SerializingApiHandlerFn), + StreamSync(StreamApiHandlerFn), Async(ApiAsyncHandlerFn), SerializingAsync(SerializingApiAsyncHandlerFn), + StreamAsync(StreamApiAsyncHandlerFn), #[cfg(feature = "server")] AsyncHttp(ApiAsyncHttpHandlerFn), } @@ -221,12 +452,18 @@ impl PartialEq for ApiHandler { (ApiHandler::SerializingSync(l), ApiHandler::SerializingSync(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } + (ApiHandler::StreamSync(l), ApiHandler::StreamSync(r)) => { + core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) + } (ApiHandler::Async(l), ApiHandler::Async(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } (ApiHandler::SerializingAsync(l), ApiHandler::SerializingAsync(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } + (ApiHandler::StreamAsync(l), ApiHandler::StreamAsync(r)) => { + core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) + } #[cfg(feature = "server")] (ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)