diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs index 793d6b1b..32ca9936 100644 --- a/proxmox-rest-server/src/formatter.rs +++ b/proxmox-rest-server/src/formatter.rs @@ -94,6 +94,35 @@ fn start_data_streaming( reader } +struct DirectJsonFormatter; + +/// Format data directly as ``application/json``. +/// +/// This does not support result attributes set on `rpcenv`. +/// +/// Errors generates a BAD_REQUEST containing the error message as string. +pub static DIRECT_JSON_FORMATTER: &'static dyn OutputFormatter = &DirectJsonFormatter; + +impl OutputFormatter for DirectJsonFormatter { + fn format_data(&self, data: Value, _rpcenv: &dyn RpcEnvironment) -> Response { + json_data_response(data) + } + + fn format_data_streaming( + &self, + data: Box, + _rpcenv: &dyn RpcEnvironment, + ) -> Result, Error> { + let reader = start_data_streaming(Value::Null, data); + let stream = tokio_stream::wrappers::ReceiverStream::new(reader); + json_data_response_streaming(Body::wrap_stream(stream)) + } + + fn format_error(&self, err: Error) -> Response { + error_to_response(err) + } +} + struct JsonFormatter(); /// Format data as ``application/json`` diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs index 84a30985..db6e3b0a 100644 --- a/proxmox-rest-server/src/h2service.rs +++ b/proxmox-rest-server/src/h2service.rs @@ -66,7 +66,7 @@ impl H2Service { Some(api_method) => crate::rest::handle_api_request( self.rpcenv.clone(), api_method, - formatter, + Some(formatter), parts, body, uri_param, diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs index 4a3671a6..e79639b2 100644 --- a/proxmox-rest-server/src/rest.rs +++ b/proxmox-rest-server/src/rest.rs @@ -490,21 +490,85 @@ fn access_forbidden_time() -> std::time::Instant { std::time::Instant::now() + std::time::Duration::from_millis(500) } +fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result, Error> { + let (mut send, body) = hyper::Body::channel(); + tokio::spawn(async move { + use futures::StreamExt; + + let mut stream = stream.into_inner(); + while let Some(record) = stream.next().await { + if send.send_data(record.to_bytes().into()).await.is_err() { + break; + } + } + }); + + Ok(Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json-seq") + .body(body)?) +} + +fn handle_sync_stream_as_json_seq( + iter: proxmox_router::SyncStream, +) -> Result, Error> { + let iter = iter + .into_inner() + .map(|record| Ok::<_, Error>(record.to_bytes())); + + Ok(Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json-seq") + .body(Body::wrap_stream(futures::stream::iter(iter)))?) +} + pub(crate) async fn handle_api_request( mut rpcenv: Env, info: &'static ApiMethod, - formatter: &'static dyn OutputFormatter, + formatter: Option<&'static dyn OutputFormatter>, parts: Parts, req_body: Body, uri_param: HashMap, ) -> Result, Error> { + let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER); + let compression = extract_compression_method(&parts.headers); + let accept_json_seq = parts.headers.get_all(http::header::ACCEPT).iter().any(|h| { + h.as_ref() + .split(|&b| b == b',') + .map(|e| e.trim_ascii_start()) + .any(|e| e == b"application/json-seq" || e.starts_with(b"application/json-seq;")) + }); + let result = match info.handler { ApiHandler::AsyncHttp(handler) => { let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; (handler)(parts, req_body, params, info, Box::new(rpcenv)).await } + ApiHandler::StreamSync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + match (handler)(params, info, &mut rpcenv) { + Ok(iter) if accept_json_seq => handle_sync_stream_as_json_seq(iter), + Ok(iter) => iter + .try_collect() + .map(|data| formatter.format_data(data, &rpcenv)), + Err(err) => Err(err), + } + } + ApiHandler::StreamAsync(handler) => { + let params = + get_request_parameters(info.parameters, parts, req_body, uri_param).await?; + match (handler)(params, info, &mut rpcenv).await { + Ok(stream) if accept_json_seq => handle_stream_as_json_seq(stream), + Ok(stream) => stream + .try_collect() + .await + .map(|data| formatter.format_data(data, &rpcenv)), + Err(err) => Err(err), + } + } ApiHandler::SerializingSync(handler) => { let params = get_request_parameters(info.parameters, parts, req_body, uri_param).await?; @@ -575,101 +639,6 @@ pub(crate) async fn handle_api_request( - mut rpcenv: Env, - info: &'static ApiMethod, - parts: Parts, - req_body: Body, - uri_param: HashMap, -) -> Result, Error> { - let compression = extract_compression_method(&parts.headers); - - fn to_json_response( - value: Value, - env: &Env, - ) -> Result, Error> { - if let Some(attr) = env.result_attrib().as_object() { - if !attr.is_empty() { - http_bail!( - INTERNAL_SERVER_ERROR, - "result attributes are no longer supported" - ); - } - } - let value = serde_json::to_string(&value)?; - Ok(Response::builder().status(200).body(value.into())?) - } - - let result = match info.handler { - ApiHandler::AsyncHttp(handler) => { - let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?; - (handler)(parts, req_body, params, info, Box::new(rpcenv)).await - } - ApiHandler::Sync(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv).and_then(|v| to_json_response(v, &rpcenv)) - } - ApiHandler::Async(handler) => { - let params = - get_request_parameters(info.parameters, parts, req_body, uri_param).await?; - (handler)(params, info, &mut rpcenv) - .await - .and_then(|v| to_json_response(v, &rpcenv)) - } - ApiHandler::SerializingSync(_) => http_bail!( - INTERNAL_SERVER_ERROR, - "old-style streaming calls not supported" - ), - ApiHandler::SerializingAsync(_) => http_bail!( - INTERNAL_SERVER_ERROR, - "old-style streaming calls not supported" - ), - _ => { - bail!("Unknown API handler type"); - } - }; - - let mut resp = match result { - Ok(resp) => resp, - Err(err) => { - if let Some(httperr) = err.downcast_ref::() { - if httperr.code == StatusCode::UNAUTHORIZED { - tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await; - } - } - return Err(err); - } - }; - - let resp = match compression { - Some(CompressionMethod::Deflate) => { - resp.headers_mut().insert( - header::CONTENT_ENCODING, - CompressionMethod::Deflate.content_encoding(), - ); - resp.map(|body| { - Body::wrap_stream( - DeflateEncoder::builder(TryStreamExt::map_err(body, |err| { - proxmox_lang::io_format_err!("error during compression: {}", err) - })) - .zlib(true) - .build(), - ) - }) - } - None => resp, - }; - - if info.reload_timezone { - unsafe { - tzset(); - } - } - - Ok(resp) -} - fn extension_to_content_type(filename: &Path) -> (&'static str, bool) { if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) { return match ext { @@ -1029,7 +998,8 @@ impl Formatted { { proxy_protected_request(config, api_method, parts, body, peer).await } else { - handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await + handle_api_request(rpcenv, api_method, Some(formatter), parts, body, uri_param) + .await }; let mut response = match result { @@ -1129,13 +1099,12 @@ impl Unformatted { return Err(err); } - let result = if api_method.protected - && rpcenv.env_type == RpcEnvironmentType::PUBLIC - { - proxy_protected_request(config, api_method, parts, body, peer).await - } else { - handle_unformatted_api_request(rpcenv, api_method, parts, body, uri_param).await - }; + let result = + if api_method.protected && rpcenv.env_type == RpcEnvironmentType::PUBLIC { + proxy_protected_request(config, api_method, parts, body, peer).await + } else { + handle_api_request(rpcenv, api_method, None, parts, body, uri_param).await + }; let mut response = match result { Ok(resp) => resp, diff --git a/proxmox-router/Cargo.toml b/proxmox-router/Cargo.toml index d5efa532..e1c6a01f 100644 --- a/proxmox-router/Cargo.toml +++ b/proxmox-router/Cargo.toml @@ -18,6 +18,7 @@ required-features = [ "cli" ] [dependencies] anyhow.workspace = true env_logger = { workspace = true, optional = true } +futures.workspace = true http = { workspace = true, optional = true } hyper = { workspace = true, features = [ "full" ], optional = true } nix.workspace = true @@ -35,6 +36,10 @@ proxmox-http-error.workspace = true proxmox-schema.workspace = true proxmox-async.workspace = true +[dev-dependencies] +tokio.workspace = true +tokio-stream.workspace = true + [features] default = [ "cli", "server" ] cli = [ "dep:env_logger", "dep:libc", "dep:rustyline" ] diff --git a/proxmox-router/src/cli/command.rs b/proxmox-router/src/cli/command.rs index 37d9ed54..01f64d19 100644 --- a/proxmox-router/src/cli/command.rs +++ b/proxmox-router/src/cli/command.rs @@ -90,10 +90,19 @@ async fn handle_simple_command_future( ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv), ApiHandler::SerializingSync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv) .and_then(|r| r.to_value().map_err(Error::from)), + ApiHandler::StreamSync(handler) => { + (handler)(params, cli_cmd.info, &mut rpcenv).and_then(|iter| iter.try_collect()) + } ApiHandler::Async(handler) => (handler)(params, cli_cmd.info, &mut rpcenv).await, ApiHandler::SerializingAsync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv) .await .and_then(|r| r.to_value().map_err(Error::from)), + ApiHandler::StreamAsync(handler) => { + match (handler)(params, cli_cmd.info, &mut rpcenv).await { + Ok(stream) => stream.try_collect().await, + Err(err) => Err(err), + } + } #[cfg(feature = "server")] ApiHandler::AsyncHttp(_) => { bail!("CliHandler does not support ApiHandler::AsyncHttp - internal error") @@ -130,6 +139,9 @@ pub(crate) fn handle_simple_command<'cli>( ApiHandler::SerializingSync(handler) => { (handler)(params, cli_cmd.info, rpcenv).and_then(|r| r.to_value().map_err(Error::from)) } + ApiHandler::StreamSync(handler) => { + (handler)(params, cli_cmd.info, rpcenv).and_then(|iter| iter.try_collect()) + } ApiHandler::Async(handler) => { let run = run.ok_or_else(|| { format_err!("CliHandler does not support ApiHandler::Async - internal error") @@ -140,6 +152,9 @@ pub(crate) fn handle_simple_command<'cli>( ApiHandler::SerializingAsync(_handler) => { bail!("CliHandler does not support ApiHandler::SerializingAsync - internal error"); } + ApiHandler::StreamAsync(_handler) => { + bail!("CliHandler does not support ApiHandler::StreamAsync - internal error"); + } #[cfg(feature = "server")] ApiHandler::AsyncHttp(_) => { bail!("CliHandler does not support ApiHandler::AsyncHttp - internal error"); diff --git a/proxmox-router/src/router.rs b/proxmox-router/src/router.rs index c913d1ff..4585f503 100644 --- a/proxmox-router/src/router.rs +++ b/proxmox-router/src/router.rs @@ -11,6 +11,7 @@ use http::{Method, Response}; #[cfg(feature = "server")] use hyper::Body; use percent_encoding::percent_decode_str; +use serde::Serialize; use serde_json::Value; use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema}; @@ -77,6 +78,137 @@ pub type SerializingApiHandlerFn = &'static (dyn Fn( + Sync + 'static); +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +enum RecordEntry<'a> { + /// A successful record. + Data(&'a Value), + /// An error entry. + Error(Value), +} + +impl<'a> From<&'a Result> for RecordEntry<'a> { + fn from(res: &'a Result) -> Self { + match res { + Ok(value) => Self::Data(value), + Err(err) => Self::Error(err.to_string().into()), + } + } +} + +/// A record for a streaming API call. This contains a `Result` and allows formatting +/// as a `json-seq` formatted string. +/// +/// This is currently just a json string, but we don't want to fixate strings or byte vectors as +/// output for the streaming API handler, but also not commit to creating lots of allocated +/// `Box` elements, so this can be turned into either without breaking the +/// API. +pub struct Record { + // direct access is only for the CLI code + pub(crate) data: Result, +} + +impl Record { + /// Create a new successful record from a serializeable element. + pub fn new(data: &T) -> Self { + Self { + data: Ok(serde_json::to_value(data).expect("failed to create json string")), + } + } + + /// Create a new error record from an error value. + pub fn error>(error: E) -> Self { + Self { + data: Err(error.into()), + } + } + + /// Create a new error record from `Result`. + pub fn from_result(result: Result) -> Self + where + T: Serialize, + E: Into, + { + match result { + Ok(res) => Self::new(&res), + Err(err) => Self::error(err), + } + } + + /// Create/get the bytes for a complete record to be streamed as a json sequence according to + /// RFC7464: the data is prefixed with a record separator (`\x1E`) and ends with a newline + /// (`'\n'). + pub fn to_bytes(&self) -> Vec { + let mut data = Vec::new(); + data.push(b'\x1E'); + // We assume json serialization doesn't fail. + // Don't return special objects that can fail to serialize. + // As for "normal" data - we don't expect spurious errors, otherwise they could also happen + // when serializing *errors*... + serde_json::to_writer(&mut data, &RecordEntry::from(&self.data)) + .expect("failed to create JSON record"); + data.push(b'\n'); + data + } +} + +/// A synchronous API handler returns an [`Iterator`] over items which should be serialized. +/// +/// ``` +/// # use anyhow::Error; +/// # use serde_json::{json, Value}; +/// use proxmox_router::{ApiHandler, ApiMethod, Record, RpcEnvironment, SyncStream}; +/// use proxmox_schema::ObjectSchema; +/// +/// fn hello( +/// param: Value, +/// info: &ApiMethod, +/// rpcenv: &mut dyn RpcEnvironment, +/// ) -> Result { +/// Ok([Record::new(&3u32)].into()) +/// } +/// +/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new( +/// &ApiHandler::StreamSync(&hello), +/// &ObjectSchema::new("Hello World Example", &[]) +/// ); +/// ``` +pub type StreamApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironment) -> Result + + Send + + Sync + + 'static); + +pub struct SyncStream { + inner: Box + Send>, +} + +impl SyncStream { + pub fn from_boxed(inner: Box + Send>) -> Self { + Self { inner } + } + + pub fn into_inner(self) -> Box + Send> { + self.inner + } + + pub fn try_collect(self) -> Result { + let mut acc = Vec::new(); + for i in self.inner { + acc.push(i.data?); + } + Ok(Value::Array(acc)) + } +} + +impl From for SyncStream +where + I: IntoIterator + Send + 'static, +{ + fn from(iter: I) -> Self { + Self::from_boxed(Box::new(iter.into_iter().map(IntoRecord::into_record))) + } +} + /// Asynchronous API handlers /// /// Returns a future Value. @@ -147,6 +279,103 @@ pub type SerializingApiFuture<'a> = Pin< Box, anyhow::Error>> + Send + 'a>, >; +/// Streaming asynchronous API handlers +/// +/// Returns a future Value. +/// ``` +/// # use serde_json::{json, Value}; +/// # use tokio::sync::mpsc; +/// # use tokio::spawn; +/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, Record, RpcEnvironment, StreamApiFuture}; +/// use proxmox_schema::ObjectSchema; +/// +/// +/// fn hello_future<'a>( +/// param: Value, +/// info: &ApiMethod, +/// rpcenv: &'a mut dyn RpcEnvironment, +/// ) -> StreamApiFuture<'a> { +/// let (sender, receiver) = mpsc::channel(8); +/// tokio::spawn(async move { +/// sender.send(Record::new("data")).await; +/// sender.send(Record::new("more data")).await; +/// // ... +/// }); +/// let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver); +/// Box::pin(async move { Ok(receiver.into()) }) +/// } +/// +/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new( +/// &ApiHandler::StreamAsync(&hello_future), +/// &ObjectSchema::new("Hello World Example (async)", &[]) +/// ); +/// ``` +pub type StreamApiAsyncHandlerFn = &'static (dyn for<'a> Fn( + Value, + &'static ApiMethod, + &'a mut dyn RpcEnvironment, +) -> StreamApiFuture<'a> + + Send + + Sync); + +pub type StreamApiFuture<'a> = + Pin> + Send + 'a>>; + +pub struct Stream { + inner: Pin + Send>>, +} + +impl Stream { + pub fn from_boxed(inner: Pin + Send>>) -> Self { + Self { inner } + } + + pub fn into_inner(self) -> Pin + Send>> { + self.inner + } + + pub async fn try_collect(mut self) -> Result { + use futures::StreamExt; + + let mut acc = Vec::new(); + while let Some(i) = self.inner.next().await { + acc.push(i.data?); + } + Ok(Value::Array(acc)) + } +} + +impl From for Stream +where + I: futures::Stream + Send + 'static, +{ + fn from(stream: I) -> Self { + use futures::stream::StreamExt; + Self::from_boxed(Box::pin(stream.map(IntoRecord::into_record))) + } +} + +/// Helper trait to allow [`Stream`] and [`SyncStream`] to be constructed from both +/// regular streams and "`TryStreams`". +pub trait IntoRecord { + fn into_record(self) -> Record; +} + +impl IntoRecord for Record { + fn into_record(self) -> Record { + self + } +} + +impl IntoRecord for Result { + fn into_record(self) -> Record { + match self { + Ok(record) => record, + Err(err) => Record::error(err), + } + } +} + /// Asynchronous HTTP API handlers /// /// They get low level access to request and response data. Use this @@ -201,8 +430,10 @@ pub type ApiResponseFuture = pub enum ApiHandler { Sync(ApiHandlerFn), SerializingSync(SerializingApiHandlerFn), + StreamSync(StreamApiHandlerFn), Async(ApiAsyncHandlerFn), SerializingAsync(SerializingApiAsyncHandlerFn), + StreamAsync(StreamApiAsyncHandlerFn), #[cfg(feature = "server")] AsyncHttp(ApiAsyncHttpHandlerFn), } @@ -221,12 +452,18 @@ impl PartialEq for ApiHandler { (ApiHandler::SerializingSync(l), ApiHandler::SerializingSync(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } + (ApiHandler::StreamSync(l), ApiHandler::StreamSync(r)) => { + core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) + } (ApiHandler::Async(l), ApiHandler::Async(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } (ApiHandler::SerializingAsync(l), ApiHandler::SerializingAsync(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) } + (ApiHandler::StreamAsync(l), ApiHandler::StreamAsync(r)) => { + core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r) + } #[cfg(feature = "server")] (ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => { core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)