router, rest-server: add StreamSync and StreamAsync API handlers

These are Iterators or Streams which continuously produce output. They
can either be formatted, in which they are serialized like the as
usually, or, if the client caccepts `application/json-seq` via an
`Accept` header, it will be streamed as a sequence directly.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-08-23 11:15:58 +02:00
parent c31eaf0018
commit 17bd32e90e
6 changed files with 360 additions and 105 deletions

View File

@ -94,6 +94,35 @@ fn start_data_streaming(
reader
}
struct DirectJsonFormatter;
/// Format data directly as ``application/json``.
///
/// This does not support result attributes set on `rpcenv`.
///
/// Errors generates a BAD_REQUEST containing the error message as string.
pub static DIRECT_JSON_FORMATTER: &'static dyn OutputFormatter = &DirectJsonFormatter;
impl OutputFormatter for DirectJsonFormatter {
fn format_data(&self, data: Value, _rpcenv: &dyn RpcEnvironment) -> Response<Body> {
json_data_response(data)
}
fn format_data_streaming(
&self,
data: Box<dyn SerializableReturn + Send>,
_rpcenv: &dyn RpcEnvironment,
) -> Result<Response<Body>, Error> {
let reader = start_data_streaming(Value::Null, data);
let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
json_data_response_streaming(Body::wrap_stream(stream))
}
fn format_error(&self, err: Error) -> Response<Body> {
error_to_response(err)
}
}
struct JsonFormatter();
/// Format data as ``application/json``

View File

@ -66,7 +66,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
Some(api_method) => crate::rest::handle_api_request(
self.rpcenv.clone(),
api_method,
formatter,
Some(formatter),
parts,
body,
uri_param,

View File

@ -490,21 +490,85 @@ fn access_forbidden_time() -> std::time::Instant {
std::time::Instant::now() + std::time::Duration::from_millis(500)
}
fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
let (mut send, body) = hyper::Body::channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = stream.into_inner();
while let Some(record) = stream.next().await {
if send.send_data(record.to_bytes().into()).await.is_err() {
break;
}
}
});
Ok(Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json-seq")
.body(body)?)
}
fn handle_sync_stream_as_json_seq(
iter: proxmox_router::SyncStream,
) -> Result<Response<Body>, Error> {
let iter = iter
.into_inner()
.map(|record| Ok::<_, Error>(record.to_bytes()));
Ok(Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json-seq")
.body(Body::wrap_stream(futures::stream::iter(iter)))?)
}
pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
mut rpcenv: Env,
info: &'static ApiMethod,
formatter: &'static dyn OutputFormatter,
formatter: Option<&'static dyn OutputFormatter>,
parts: Parts,
req_body: Body,
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
let compression = extract_compression_method(&parts.headers);
let accept_json_seq = parts.headers.get_all(http::header::ACCEPT).iter().any(|h| {
h.as_ref()
.split(|&b| b == b',')
.map(|e| e.trim_ascii_start())
.any(|e| e == b"application/json-seq" || e.starts_with(b"application/json-seq;"))
});
let result = match info.handler {
ApiHandler::AsyncHttp(handler) => {
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await
}
ApiHandler::StreamSync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
match (handler)(params, info, &mut rpcenv) {
Ok(iter) if accept_json_seq => handle_sync_stream_as_json_seq(iter),
Ok(iter) => iter
.try_collect()
.map(|data| formatter.format_data(data, &rpcenv)),
Err(err) => Err(err),
}
}
ApiHandler::StreamAsync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
match (handler)(params, info, &mut rpcenv).await {
Ok(stream) if accept_json_seq => handle_stream_as_json_seq(stream),
Ok(stream) => stream
.try_collect()
.await
.map(|data| formatter.format_data(data, &rpcenv)),
Err(err) => Err(err),
}
}
ApiHandler::SerializingSync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
@ -575,101 +639,6 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
Ok(resp)
}
async fn handle_unformatted_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
mut rpcenv: Env,
info: &'static ApiMethod,
parts: Parts,
req_body: Body,
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let compression = extract_compression_method(&parts.headers);
fn to_json_response<Env: RpcEnvironment>(
value: Value,
env: &Env,
) -> Result<Response<Body>, Error> {
if let Some(attr) = env.result_attrib().as_object() {
if !attr.is_empty() {
http_bail!(
INTERNAL_SERVER_ERROR,
"result attributes are no longer supported"
);
}
}
let value = serde_json::to_string(&value)?;
Ok(Response::builder().status(200).body(value.into())?)
}
let result = match info.handler {
ApiHandler::AsyncHttp(handler) => {
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await
}
ApiHandler::Sync(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv).and_then(|v| to_json_response(v, &rpcenv))
}
ApiHandler::Async(handler) => {
let params =
get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv)
.await
.and_then(|v| to_json_response(v, &rpcenv))
}
ApiHandler::SerializingSync(_) => http_bail!(
INTERNAL_SERVER_ERROR,
"old-style streaming calls not supported"
),
ApiHandler::SerializingAsync(_) => http_bail!(
INTERNAL_SERVER_ERROR,
"old-style streaming calls not supported"
),
_ => {
bail!("Unknown API handler type");
}
};
let mut resp = match result {
Ok(resp) => resp,
Err(err) => {
if let Some(httperr) = err.downcast_ref::<HttpError>() {
if httperr.code == StatusCode::UNAUTHORIZED {
tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
}
}
return Err(err);
}
};
let resp = match compression {
Some(CompressionMethod::Deflate) => {
resp.headers_mut().insert(
header::CONTENT_ENCODING,
CompressionMethod::Deflate.content_encoding(),
);
resp.map(|body| {
Body::wrap_stream(
DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
proxmox_lang::io_format_err!("error during compression: {}", err)
}))
.zlib(true)
.build(),
)
})
}
None => resp,
};
if info.reload_timezone {
unsafe {
tzset();
}
}
Ok(resp)
}
fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) {
return match ext {
@ -1029,7 +998,8 @@ impl Formatted {
{
proxy_protected_request(config, api_method, parts, body, peer).await
} else {
handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await
handle_api_request(rpcenv, api_method, Some(formatter), parts, body, uri_param)
.await
};
let mut response = match result {
@ -1129,13 +1099,12 @@ impl Unformatted {
return Err(err);
}
let result = if api_method.protected
&& rpcenv.env_type == RpcEnvironmentType::PUBLIC
{
proxy_protected_request(config, api_method, parts, body, peer).await
} else {
handle_unformatted_api_request(rpcenv, api_method, parts, body, uri_param).await
};
let result =
if api_method.protected && rpcenv.env_type == RpcEnvironmentType::PUBLIC {
proxy_protected_request(config, api_method, parts, body, peer).await
} else {
handle_api_request(rpcenv, api_method, None, parts, body, uri_param).await
};
let mut response = match result {
Ok(resp) => resp,

View File

@ -18,6 +18,7 @@ required-features = [ "cli" ]
[dependencies]
anyhow.workspace = true
env_logger = { workspace = true, optional = true }
futures.workspace = true
http = { workspace = true, optional = true }
hyper = { workspace = true, features = [ "full" ], optional = true }
nix.workspace = true
@ -35,6 +36,10 @@ proxmox-http-error.workspace = true
proxmox-schema.workspace = true
proxmox-async.workspace = true
[dev-dependencies]
tokio.workspace = true
tokio-stream.workspace = true
[features]
default = [ "cli", "server" ]
cli = [ "dep:env_logger", "dep:libc", "dep:rustyline" ]

View File

@ -90,10 +90,19 @@ async fn handle_simple_command_future(
ApiHandler::Sync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv),
ApiHandler::SerializingSync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv)
.and_then(|r| r.to_value().map_err(Error::from)),
ApiHandler::StreamSync(handler) => {
(handler)(params, cli_cmd.info, &mut rpcenv).and_then(|iter| iter.try_collect())
}
ApiHandler::Async(handler) => (handler)(params, cli_cmd.info, &mut rpcenv).await,
ApiHandler::SerializingAsync(handler) => (handler)(params, cli_cmd.info, &mut rpcenv)
.await
.and_then(|r| r.to_value().map_err(Error::from)),
ApiHandler::StreamAsync(handler) => {
match (handler)(params, cli_cmd.info, &mut rpcenv).await {
Ok(stream) => stream.try_collect().await,
Err(err) => Err(err),
}
}
#[cfg(feature = "server")]
ApiHandler::AsyncHttp(_) => {
bail!("CliHandler does not support ApiHandler::AsyncHttp - internal error")
@ -130,6 +139,9 @@ pub(crate) fn handle_simple_command<'cli>(
ApiHandler::SerializingSync(handler) => {
(handler)(params, cli_cmd.info, rpcenv).and_then(|r| r.to_value().map_err(Error::from))
}
ApiHandler::StreamSync(handler) => {
(handler)(params, cli_cmd.info, rpcenv).and_then(|iter| iter.try_collect())
}
ApiHandler::Async(handler) => {
let run = run.ok_or_else(|| {
format_err!("CliHandler does not support ApiHandler::Async - internal error")
@ -140,6 +152,9 @@ pub(crate) fn handle_simple_command<'cli>(
ApiHandler::SerializingAsync(_handler) => {
bail!("CliHandler does not support ApiHandler::SerializingAsync - internal error");
}
ApiHandler::StreamAsync(_handler) => {
bail!("CliHandler does not support ApiHandler::StreamAsync - internal error");
}
#[cfg(feature = "server")]
ApiHandler::AsyncHttp(_) => {
bail!("CliHandler does not support ApiHandler::AsyncHttp - internal error");

View File

@ -11,6 +11,7 @@ use http::{Method, Response};
#[cfg(feature = "server")]
use hyper::Body;
use percent_encoding::percent_decode_str;
use serde::Serialize;
use serde_json::Value;
use proxmox_schema::{ObjectSchema, ParameterSchema, ReturnType, Schema};
@ -77,6 +78,137 @@ pub type SerializingApiHandlerFn = &'static (dyn Fn(
+ Sync
+ 'static);
#[derive(Serialize)]
#[serde(rename_all = "kebab-case")]
enum RecordEntry<'a> {
/// A successful record.
Data(&'a Value),
/// An error entry.
Error(Value),
}
impl<'a> From<&'a Result<Value, Error>> for RecordEntry<'a> {
fn from(res: &'a Result<Value, Error>) -> Self {
match res {
Ok(value) => Self::Data(value),
Err(err) => Self::Error(err.to_string().into()),
}
}
}
/// A record for a streaming API call. This contains a `Result<Value, Error>` and allows formatting
/// as a `json-seq` formatted string.
///
/// This is currently just a json string, but we don't want to fixate strings or byte vectors as
/// output for the streaming API handler, but also not commit to creating lots of allocated
/// `Box<dyn SerializableReturn>` elements, so this can be turned into either without breaking the
/// API.
pub struct Record {
// direct access is only for the CLI code
pub(crate) data: Result<Value, Error>,
}
impl Record {
/// Create a new successful record from a serializeable element.
pub fn new<T: ?Sized + Serialize>(data: &T) -> Self {
Self {
data: Ok(serde_json::to_value(data).expect("failed to create json string")),
}
}
/// Create a new error record from an error value.
pub fn error<E: Into<Error>>(error: E) -> Self {
Self {
data: Err(error.into()),
}
}
/// Create a new error record from `Result`.
pub fn from_result<T, E>(result: Result<T, E>) -> Self
where
T: Serialize,
E: Into<Error>,
{
match result {
Ok(res) => Self::new(&res),
Err(err) => Self::error(err),
}
}
/// Create/get the bytes for a complete record to be streamed as a json sequence according to
/// RFC7464: the data is prefixed with a record separator (`\x1E`) and ends with a newline
/// (`'\n').
pub fn to_bytes(&self) -> Vec<u8> {
let mut data = Vec::new();
data.push(b'\x1E');
// We assume json serialization doesn't fail.
// Don't return special objects that can fail to serialize.
// As for "normal" data - we don't expect spurious errors, otherwise they could also happen
// when serializing *errors*...
serde_json::to_writer(&mut data, &RecordEntry::from(&self.data))
.expect("failed to create JSON record");
data.push(b'\n');
data
}
}
/// A synchronous API handler returns an [`Iterator`] over items which should be serialized.
///
/// ```
/// # use anyhow::Error;
/// # use serde_json::{json, Value};
/// use proxmox_router::{ApiHandler, ApiMethod, Record, RpcEnvironment, SyncStream};
/// use proxmox_schema::ObjectSchema;
///
/// fn hello(
/// param: Value,
/// info: &ApiMethod,
/// rpcenv: &mut dyn RpcEnvironment,
/// ) -> Result<SyncStream, Error> {
/// Ok([Record::new(&3u32)].into())
/// }
///
/// const API_METHOD_HELLO: ApiMethod = ApiMethod::new(
/// &ApiHandler::StreamSync(&hello),
/// &ObjectSchema::new("Hello World Example", &[])
/// );
/// ```
pub type StreamApiHandlerFn = &'static (dyn Fn(Value, &ApiMethod, &mut dyn RpcEnvironment) -> Result<SyncStream, Error>
+ Send
+ Sync
+ 'static);
pub struct SyncStream {
inner: Box<dyn Iterator<Item = Record> + Send>,
}
impl SyncStream {
pub fn from_boxed(inner: Box<dyn Iterator<Item = Record> + Send>) -> Self {
Self { inner }
}
pub fn into_inner(self) -> Box<dyn Iterator<Item = Record> + Send> {
self.inner
}
pub fn try_collect(self) -> Result<Value, Error> {
let mut acc = Vec::new();
for i in self.inner {
acc.push(i.data?);
}
Ok(Value::Array(acc))
}
}
impl<I> From<I> for SyncStream
where
I: IntoIterator<Item: IntoRecord, IntoIter: Send> + Send + 'static,
{
fn from(iter: I) -> Self {
Self::from_boxed(Box::new(iter.into_iter().map(IntoRecord::into_record)))
}
}
/// Asynchronous API handlers
///
/// Returns a future Value.
@ -147,6 +279,103 @@ pub type SerializingApiFuture<'a> = Pin<
Box<dyn Future<Output = Result<Box<dyn SerializableReturn + Send>, anyhow::Error>> + Send + 'a>,
>;
/// Streaming asynchronous API handlers
///
/// Returns a future Value.
/// ```
/// # use serde_json::{json, Value};
/// # use tokio::sync::mpsc;
/// # use tokio::spawn;
/// use proxmox_router::{ApiFuture, ApiHandler, ApiMethod, Record, RpcEnvironment, StreamApiFuture};
/// use proxmox_schema::ObjectSchema;
///
///
/// fn hello_future<'a>(
/// param: Value,
/// info: &ApiMethod,
/// rpcenv: &'a mut dyn RpcEnvironment,
/// ) -> StreamApiFuture<'a> {
/// let (sender, receiver) = mpsc::channel(8);
/// tokio::spawn(async move {
/// sender.send(Record::new("data")).await;
/// sender.send(Record::new("more data")).await;
/// // ...
/// });
/// let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver);
/// Box::pin(async move { Ok(receiver.into()) })
/// }
///
/// const API_METHOD_HELLO_FUTURE: ApiMethod = ApiMethod::new(
/// &ApiHandler::StreamAsync(&hello_future),
/// &ObjectSchema::new("Hello World Example (async)", &[])
/// );
/// ```
pub type StreamApiAsyncHandlerFn = &'static (dyn for<'a> Fn(
Value,
&'static ApiMethod,
&'a mut dyn RpcEnvironment,
) -> StreamApiFuture<'a>
+ Send
+ Sync);
pub type StreamApiFuture<'a> =
Pin<Box<dyn Future<Output = Result<Stream, anyhow::Error>> + Send + 'a>>;
pub struct Stream {
inner: Pin<Box<dyn futures::Stream<Item = Record> + Send>>,
}
impl Stream {
pub fn from_boxed(inner: Pin<Box<dyn futures::Stream<Item = Record> + Send>>) -> Self {
Self { inner }
}
pub fn into_inner(self) -> Pin<Box<dyn futures::Stream<Item = Record> + Send>> {
self.inner
}
pub async fn try_collect(mut self) -> Result<Value, Error> {
use futures::StreamExt;
let mut acc = Vec::new();
while let Some(i) = self.inner.next().await {
acc.push(i.data?);
}
Ok(Value::Array(acc))
}
}
impl<I> From<I> for Stream
where
I: futures::Stream<Item: IntoRecord> + Send + 'static,
{
fn from(stream: I) -> Self {
use futures::stream::StreamExt;
Self::from_boxed(Box::pin(stream.map(IntoRecord::into_record)))
}
}
/// Helper trait to allow [`Stream`] and [`SyncStream`] to be constructed from both
/// regular streams and "`TryStreams`".
pub trait IntoRecord {
fn into_record(self) -> Record;
}
impl IntoRecord for Record {
fn into_record(self) -> Record {
self
}
}
impl IntoRecord for Result<Record, Error> {
fn into_record(self) -> Record {
match self {
Ok(record) => record,
Err(err) => Record::error(err),
}
}
}
/// Asynchronous HTTP API handlers
///
/// They get low level access to request and response data. Use this
@ -201,8 +430,10 @@ pub type ApiResponseFuture =
pub enum ApiHandler {
Sync(ApiHandlerFn),
SerializingSync(SerializingApiHandlerFn),
StreamSync(StreamApiHandlerFn),
Async(ApiAsyncHandlerFn),
SerializingAsync(SerializingApiAsyncHandlerFn),
StreamAsync(StreamApiAsyncHandlerFn),
#[cfg(feature = "server")]
AsyncHttp(ApiAsyncHttpHandlerFn),
}
@ -221,12 +452,18 @@ impl PartialEq for ApiHandler {
(ApiHandler::SerializingSync(l), ApiHandler::SerializingSync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
(ApiHandler::StreamSync(l), ApiHandler::StreamSync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
(ApiHandler::Async(l), ApiHandler::Async(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
(ApiHandler::SerializingAsync(l), ApiHandler::SerializingAsync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
(ApiHandler::StreamAsync(l), ApiHandler::StreamAsync(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)
}
#[cfg(feature = "server")]
(ApiHandler::AsyncHttp(l), ApiHandler::AsyncHttp(r)) => {
core::mem::transmute::<_, usize>(l) == core::mem::transmute::<_, usize>(r)