router: split streaming reader impl into 'stream' feature

so 'no-default-features' compiles again

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-09-04 11:20:11 +02:00
parent 2a4cf83799
commit 3d6b3c4786
4 changed files with 43 additions and 36 deletions

View File

@ -42,6 +42,7 @@ tokio-stream.workspace = true
[features]
default = [ "cli", "server" ]
cli = [ "dep:env_logger", "dep:libc", "dep:rustyline" ]
cli = [ "stream", "dep:env_logger", "dep:libc", "dep:rustyline" ]
server = [ "dep:http", "dep:hyper" ]
test-harness = [ "proxmox-schema/test-harness" ]
stream = [ "dep:hyper" ]

View File

@ -80,11 +80,6 @@ pub type SerializingApiHandlerFn = &'static (dyn Fn(
/// A record for a streaming API call. This contains a `Result<Value, Error>` and allows formatting
/// as a `json-seq` formatted string.
///
/// This is currently just a json string, but we don't want to fixate strings or byte vectors as
/// output for the streaming API handler, but also not commit to creating lots of allocated
/// `Box<dyn SerializableReturn>` elements, so this can be turned into either without breaking the
/// API.
pub struct Record {
// direct access is only for the CLI code
pub(crate) data: crate::stream::Record<Value>,
@ -204,6 +199,7 @@ impl SyncStream {
self.inner
}
#[cfg(feature = "stream")]
pub fn try_collect(self) -> Result<Value, Error> {
let mut acc = Vec::new();
for i in self.inner {
@ -347,6 +343,7 @@ impl Stream {
self.inner
}
#[cfg(feature = "stream")]
pub async fn try_collect(mut self) -> Result<Value, Error> {
use futures::StreamExt;

View File

@ -0,0 +1,36 @@
use anyhow::Error;
use serde::{Deserialize, Serialize};
#[cfg(feature = "stream")]
mod parsing;
#[cfg(feature = "stream")]
pub use parsing::{BodyBufReader, JsonRecords, Records};
/// Streamed JSON records can contain either "data" or an error.
///
/// Errors can be a simple string or structured data.
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum Record<T> {
/// A successful record.
Data(T),
/// An error entry.
Error(serde_json::Value),
}
impl<T> Record<T> {
/// Convenience method to turn the record into a `Result`.
///
/// The error is converted to either a message or, for structured errors, a json
/// representation.
pub fn into_result(self) -> Result<T, Error> {
match self {
Self::Data(data) => Ok(data),
Self::Error(serde_json::Value::String(s)) => Err(Error::msg(s)),
Self::Error(other) => match serde_json::to_string(&other) {
Ok(s) => Err(Error::msg(s)),
Err(err) => Err(Error::from(err)),
},
}
}
}

View File

@ -6,7 +6,9 @@ use std::task::{ready, Poll};
use anyhow::{format_err, Context as _, Error};
use futures::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader};
use hyper::body::{Body, Bytes};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use super::Record;
pub struct Records<R = BodyBufReader>
where
@ -308,32 +310,3 @@ impl AsyncBufRead for BodyBufReader {
}
}
}
/// Streamed JSON records can contain either "data" or an error.
///
/// Errors can be a simple string or structured data.
///
/// For convenience, an [`into_result()`](Record::into_result) method is provided to turn the
/// record into a regular `Result`, where the error is converted to either a message or, for
/// structured errors, a json representation.
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum Record<T> {
/// A successful record.
Data(T),
/// An error entry.
Error(serde_json::Value),
}
impl<T> Record<T> {
pub fn into_result(self) -> Result<T, Error> {
match self {
Self::Data(data) => Ok(data),
Self::Error(serde_json::Value::String(s)) => Err(Error::msg(s)),
Self::Error(other) => match serde_json::to_string(&other) {
Ok(s) => Err(Error::msg(s)),
Err(err) => Err(Error::from(err)),
},
}
}
}