client: expose body, add generic request methods and streaming

The get/put/post/put_without_body/... methods now have a default
implementation forwarding to a generic `request` method as all our
implementations do the same already anyway.

Additionally, in order to allow easy access to a "streaming body", the
Body type is now exposed.

In the future, this crate may also require a wrapper to standardize
the handling of `application/json-seq` streams if we end up using
them, but for now, a simple way to expose the body is enough to get
going.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-08-27 09:59:07 +02:00
parent 8021f0a7f6
commit 04923dd601
2 changed files with 248 additions and 83 deletions

View File

@ -6,6 +6,7 @@ use std::sync::Mutex;
use http::request::Request;
use http::uri::PathAndQuery;
use http::Method;
use http::{StatusCode, Uri};
use hyper::body::{Body, HttpBody};
use openssl::hash::MessageDigest;
@ -20,7 +21,7 @@ use crate::auth::AuthenticationKind;
use crate::error::ParseFingerprintError;
use crate::{Error, Token};
use super::{HttpApiClient, HttpApiResponse};
use super::{HttpApiClient, HttpApiResponse, HttpApiResponseStream};
/// See [`set_verify_callback`](openssl::ssl::SslContextBuilder::set_verify_callback()).
pub type TlsCallback = dyn Fn(bool, &mut x509::X509StoreContextRef) -> bool + Send + Sync + 'static;
@ -199,14 +200,19 @@ impl Client {
}
/// Perform an *unauthenticated* HTTP request.
async fn authenticated_request(
async fn send_authenticated_request(
client: Arc<proxmox_http::client::Client>,
auth: Arc<AuthenticationKind>,
method: http::Method,
method: Method,
uri: Uri,
json_body: Option<String>,
) -> Result<HttpApiResponse, Error> {
let request = auth.set_auth_headers(Request::builder().method(method).uri(uri));
// send an `Accept: application/json-seq` header.
streaming: bool,
) -> Result<(http::response::Parts, hyper::Body), Error> {
let mut request = auth.set_auth_headers(Request::builder().method(method).uri(uri));
if streaming {
request = request.header(http::header::ACCEPT, "application/json-seq");
}
let request = if let Some(body) = json_body {
request
@ -224,9 +230,9 @@ impl Client {
}
let (response, body) = response.into_parts();
let body = read_body(body).await?;
if !response.status.is_success() {
let body = read_body(body).await?;
// FIXME: Decode json errors...
//match serde_json::from_slice(&data)
// Ok(value) =>
@ -237,6 +243,21 @@ impl Client {
return Err(Error::api(response.status, data));
}
Ok((response, body))
}
/// Perform an *unauthenticated* HTTP request.
async fn authenticated_request(
client: Arc<proxmox_http::client::Client>,
auth: Arc<AuthenticationKind>,
method: Method,
uri: Uri,
json_body: Option<String>,
) -> Result<HttpApiResponse, Error> {
let (response, body) =
Self::send_authenticated_request(client, auth, method, uri, json_body, false).await?;
let body = read_body(body).await?;
let content_type = match response.headers.get(http::header::CONTENT_TYPE) {
None => None,
Some(value) => Some(
@ -287,7 +308,7 @@ impl Client {
async fn do_login_request(&self, request: proxmox_login::Request) -> Result<Vec<u8>, Error> {
let request = http::Request::builder()
.method(http::Method::POST)
.method(Method::POST)
.uri(request.url)
.header(http::header::CONTENT_TYPE, request.content_type)
.header(
@ -386,71 +407,75 @@ impl HttpApiClient for Client {
type ResponseFuture<'a> =
Pin<Box<dyn Future<Output = Result<HttpApiResponse, Error>> + Send + 'a>>;
fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
Box::pin(async move {
let auth = self.login_auth()?;
let uri = self.build_uri(path_and_query)?;
let client = Arc::clone(&self.client);
Self::authenticated_request(client, auth, http::Method::GET, uri, None).await
})
}
type ResponseStreamFuture<'a> =
Pin<Box<dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, Error>> + Send + 'a>>;
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
type Body = hyper::Body;
fn request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
T: Serialize + 'a,
{
let params = serde_json::to_string(params)
.map_err(|err| Error::internal("failed to serialize parameters", err));
let params = params
.map(|params| {
serde_json::to_string(&params)
.map_err(|err| Error::internal("failed to serialize parameters", err))
})
.transpose();
Box::pin(async move {
let params = params?;
let auth = self.login_auth()?;
let uri = self.build_uri(path_and_query)?;
let client = Arc::clone(&self.client);
Self::authenticated_request(client, auth, http::Method::POST, uri, Some(params)).await
Self::authenticated_request(client, auth, method, uri, params).await
})
}
fn post_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
Box::pin(async move {
let auth = self.login_auth()?;
let uri = self.build_uri(path_and_query)?;
let client = Arc::clone(&self.client);
Self::authenticated_request(client, auth, http::Method::POST, uri, None).await
})
}
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn streaming_request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseStreamFuture<'a>
where
T: ?Sized + Serialize,
T: Serialize + 'a,
{
let params = serde_json::to_string(params)
.map_err(|err| Error::internal("failed to serialize parameters", err));
let params = params
.map(|params| {
serde_json::to_string(&params)
.map_err(|err| Error::internal("failed to serialize parameters", err))
})
.transpose();
Box::pin(async move {
let params = params?;
let auth = self.login_auth()?;
let uri = self.build_uri(path_and_query)?;
let client = Arc::clone(&self.client);
Self::authenticated_request(client, auth, http::Method::PUT, uri, Some(params)).await
})
}
let (response, body) =
Self::send_authenticated_request(client, auth, method, uri, params, true).await?;
fn put_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
Box::pin(async move {
let auth = self.login_auth()?;
let uri = self.build_uri(path_and_query)?;
let client = Arc::clone(&self.client);
Self::authenticated_request(client, auth, http::Method::PUT, uri, None).await
})
}
let content_type = match response.headers.get(http::header::CONTENT_TYPE) {
None => None,
Some(value) => Some(
value
.to_str()
.map_err(|err| Error::internal("bad Content-Type header", err))?
.to_owned(),
),
};
fn delete<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
Box::pin(async move {
let auth = self.login_auth()?;
let uri = self.build_uri(path_and_query)?;
let client = Arc::clone(&self.client);
Self::authenticated_request(client, auth, http::Method::DELETE, uri, None).await
Ok(HttpApiResponseStream {
status: response.status.as_u16(),
content_type,
body: Some(body),
})
})
}
}

View File

@ -3,6 +3,7 @@
use std::collections::HashMap;
use std::future::Future;
use http::Method;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -29,43 +30,84 @@ pub trait HttpApiClient {
where
Self: 'a;
/// `GET` request with a path and query component (no hostname).
///
/// For this request, authentication headers should be set!
fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>;
/// Some requests are better "streamed" than collected in RAM, for this, the body type used by
/// the underlying client needs to be exposed.
type Body;
/// `POST` request with a path and query component (no hostname), and a serializable body.
///
/// The body should be serialized to json and sent with `Content-type: application/json`.
///
/// For this request, authentication headers should be set!
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
/// Future for streamed requests.
type ResponseStreamFuture<'a>: Future<Output = Result<HttpApiResponseStream<Self::Body>, Error>>
+ 'a
where
T: ?Sized + Serialize;
Self: 'a;
/// `POST` request with a path and query component (no hostname), no request body.
/// An *authenticated* asynchronous request with a path and query component (no hostname), and
/// an optional body, of which the response body is read to completion.
///
/// For this request, authentication headers should be set!
fn post_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>;
/// `PUT` request with a path and query component (no hostname), and a serializable body.
///
/// The body should be serialized to json and sent with `Content-type: application/json`.
///
/// For this request, authentication headers should be set!
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize;
T: Serialize + 'a;
/// `PUT` request with a path and query component (no hostname), no request body.
/// An *authenticated* asynchronous request with a path and query component (no hostname), and
/// an optional body. The response status is returned, but the body is returned for the caller
/// to read from.
///
/// For this request, authentication headers should be set!
fn put_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>;
fn streaming_request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseStreamFuture<'a>
where
T: Serialize + 'a;
/// `DELETE` request with a path and query component (no hostname).
///
/// For this request, authentication headers should be set!
fn delete<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>;
/// This is deprecated.
/// Calls `self.request` with `Method::GET` and `None` for the body.
fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
self.request(Method::GET, path_and_query, None::<()>)
}
/// This is deprecated.
/// Calls `self.request` with `Method::POST`.
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
self.request(Method::POST, path_and_query, Some(params))
}
/// This is deprecated.
/// Calls `self.request` with `Method::POST` and `None` for the body..
fn post_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
self.request(Method::POST, path_and_query, None::<()>)
}
/// This is deprecated.
/// Calls `self.request` with `Method::PUT`.
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
self.request(Method::PUT, path_and_query, Some(params))
}
/// This is deprecated.
/// Calls `self.request` with `Method::PUT` and `None` for the body..
fn put_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
self.request(Method::PUT, path_and_query, None::<()>)
}
/// This is deprecated.
/// Calls `self.request` with `Method::DELETE`.
fn delete<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
self.request(Method::DELETE, path_and_query, None::<()>)
}
}
/// A response from the HTTP API as required by the [`HttpApiClient`] trait.
@ -200,11 +242,41 @@ where
where
Self: 'a;
type Body = C::Body;
type ResponseStreamFuture<'a> = C::ResponseStreamFuture<'a>
where
Self: 'a;
fn request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseFuture<'a>
where
T: Serialize + 'a,
{
C::request(self, method, path_and_query, params)
}
fn streaming_request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseStreamFuture<'a>
where
T: Serialize + 'a,
{
C::streaming_request(self, method, path_and_query, params)
}
fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
C::get(self, path_and_query)
}
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
@ -215,7 +287,7 @@ where
C::post_without_body(self, path_and_query)
}
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
@ -239,11 +311,41 @@ where
where
Self: 'a;
type Body = C::Body;
type ResponseStreamFuture<'a> = C::ResponseStreamFuture<'a>
where
Self: 'a;
fn request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseFuture<'a>
where
T: Serialize + 'a,
{
C::request(self, method, path_and_query, params)
}
fn streaming_request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseStreamFuture<'a>
where
T: Serialize + 'a,
{
C::streaming_request(self, method, path_and_query, params)
}
fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
C::get(self, path_and_query)
}
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
@ -254,7 +356,7 @@ where
C::post_without_body(self, path_and_query)
}
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
@ -278,11 +380,41 @@ where
where
Self: 'a;
type Body = C::Body;
type ResponseStreamFuture<'a> = C::ResponseStreamFuture<'a>
where
Self: 'a;
fn request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseFuture<'a>
where
T: Serialize + 'a,
{
C::request(self, method, path_and_query, params)
}
fn streaming_request<'a, T>(
&'a self,
method: Method,
path_and_query: &'a str,
params: Option<T>,
) -> Self::ResponseStreamFuture<'a>
where
T: Serialize + 'a,
{
C::streaming_request(self, method, path_and_query, params)
}
fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> {
C::get(self, path_and_query)
}
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
@ -293,7 +425,7 @@ where
C::post_without_body(self, path_and_query)
}
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a>
fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a>
where
T: ?Sized + Serialize,
{
@ -308,3 +440,11 @@ where
C::delete(self, path_and_query)
}
}
/// A streaming response from the HTTP API as required by the [`HttpApiClient`] trait.
pub struct HttpApiResponseStream<Body> {
pub status: u16,
pub content_type: Option<String>,
/// Requests where the response has no body may put `None` here.
pub body: Option<Body>,
}