From 04923dd60100bb337556ed0b5c70424a47329e52 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Tue, 27 Aug 2024 09:59:07 +0200 Subject: [PATCH] client: expose body, add generic request methods and streaming The get/put/post/put_without_body/... methods now have a default implementation forwarding to a generic `request` method as all our implementations do the same already anyway. Additionally, in order to allow easy access to a "streaming body", the Body type is now exposed. In the future, this crate may also require a wrapper to standardize the handling of `application/json-seq` streams if we end up using them, but for now, a simple way to expose the body is enough to get going. Signed-off-by: Wolfgang Bumiller --- proxmox-client/src/client.rs | 125 ++++++++++++--------- proxmox-client/src/lib.rs | 206 +++++++++++++++++++++++++++++------ 2 files changed, 248 insertions(+), 83 deletions(-) diff --git a/proxmox-client/src/client.rs b/proxmox-client/src/client.rs index bcbcbe73..cf16247c 100644 --- a/proxmox-client/src/client.rs +++ b/proxmox-client/src/client.rs @@ -6,6 +6,7 @@ use std::sync::Mutex; use http::request::Request; use http::uri::PathAndQuery; +use http::Method; use http::{StatusCode, Uri}; use hyper::body::{Body, HttpBody}; use openssl::hash::MessageDigest; @@ -20,7 +21,7 @@ use crate::auth::AuthenticationKind; use crate::error::ParseFingerprintError; use crate::{Error, Token}; -use super::{HttpApiClient, HttpApiResponse}; +use super::{HttpApiClient, HttpApiResponse, HttpApiResponseStream}; /// See [`set_verify_callback`](openssl::ssl::SslContextBuilder::set_verify_callback()). pub type TlsCallback = dyn Fn(bool, &mut x509::X509StoreContextRef) -> bool + Send + Sync + 'static; @@ -199,14 +200,19 @@ impl Client { } /// Perform an *unauthenticated* HTTP request. - async fn authenticated_request( + async fn send_authenticated_request( client: Arc, auth: Arc, - method: http::Method, + method: Method, uri: Uri, json_body: Option, - ) -> Result { - let request = auth.set_auth_headers(Request::builder().method(method).uri(uri)); + // send an `Accept: application/json-seq` header. + streaming: bool, + ) -> Result<(http::response::Parts, hyper::Body), Error> { + let mut request = auth.set_auth_headers(Request::builder().method(method).uri(uri)); + if streaming { + request = request.header(http::header::ACCEPT, "application/json-seq"); + } let request = if let Some(body) = json_body { request @@ -224,9 +230,9 @@ impl Client { } let (response, body) = response.into_parts(); - let body = read_body(body).await?; if !response.status.is_success() { + let body = read_body(body).await?; // FIXME: Decode json errors... //match serde_json::from_slice(&data) // Ok(value) => @@ -237,6 +243,21 @@ impl Client { return Err(Error::api(response.status, data)); } + Ok((response, body)) + } + + /// Perform an *unauthenticated* HTTP request. + async fn authenticated_request( + client: Arc, + auth: Arc, + method: Method, + uri: Uri, + json_body: Option, + ) -> Result { + let (response, body) = + Self::send_authenticated_request(client, auth, method, uri, json_body, false).await?; + let body = read_body(body).await?; + let content_type = match response.headers.get(http::header::CONTENT_TYPE) { None => None, Some(value) => Some( @@ -287,7 +308,7 @@ impl Client { async fn do_login_request(&self, request: proxmox_login::Request) -> Result, Error> { let request = http::Request::builder() - .method(http::Method::POST) + .method(Method::POST) .uri(request.url) .header(http::header::CONTENT_TYPE, request.content_type) .header( @@ -386,71 +407,75 @@ impl HttpApiClient for Client { type ResponseFuture<'a> = Pin> + Send + 'a>>; - fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { - Box::pin(async move { - let auth = self.login_auth()?; - let uri = self.build_uri(path_and_query)?; - let client = Arc::clone(&self.client); - Self::authenticated_request(client, auth, http::Method::GET, uri, None).await - }) - } + type ResponseStreamFuture<'a> = + Pin, Error>> + Send + 'a>>; - fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + type Body = hyper::Body; + + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseFuture<'a> where - T: ?Sized + Serialize, + T: Serialize + 'a, { - let params = serde_json::to_string(params) - .map_err(|err| Error::internal("failed to serialize parameters", err)); + let params = params + .map(|params| { + serde_json::to_string(¶ms) + .map_err(|err| Error::internal("failed to serialize parameters", err)) + }) + .transpose(); Box::pin(async move { let params = params?; let auth = self.login_auth()?; let uri = self.build_uri(path_and_query)?; let client = Arc::clone(&self.client); - Self::authenticated_request(client, auth, http::Method::POST, uri, Some(params)).await + Self::authenticated_request(client, auth, method, uri, params).await }) } - fn post_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { - Box::pin(async move { - let auth = self.login_auth()?; - let uri = self.build_uri(path_and_query)?; - let client = Arc::clone(&self.client); - Self::authenticated_request(client, auth, http::Method::POST, uri, None).await - }) - } - - fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseStreamFuture<'a> where - T: ?Sized + Serialize, + T: Serialize + 'a, { - let params = serde_json::to_string(params) - .map_err(|err| Error::internal("failed to serialize parameters", err)); + let params = params + .map(|params| { + serde_json::to_string(¶ms) + .map_err(|err| Error::internal("failed to serialize parameters", err)) + }) + .transpose(); Box::pin(async move { let params = params?; let auth = self.login_auth()?; let uri = self.build_uri(path_and_query)?; let client = Arc::clone(&self.client); - Self::authenticated_request(client, auth, http::Method::PUT, uri, Some(params)).await - }) - } + let (response, body) = + Self::send_authenticated_request(client, auth, method, uri, params, true).await?; - fn put_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { - Box::pin(async move { - let auth = self.login_auth()?; - let uri = self.build_uri(path_and_query)?; - let client = Arc::clone(&self.client); - Self::authenticated_request(client, auth, http::Method::PUT, uri, None).await - }) - } + let content_type = match response.headers.get(http::header::CONTENT_TYPE) { + None => None, + Some(value) => Some( + value + .to_str() + .map_err(|err| Error::internal("bad Content-Type header", err))? + .to_owned(), + ), + }; - fn delete<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { - Box::pin(async move { - let auth = self.login_auth()?; - let uri = self.build_uri(path_and_query)?; - let client = Arc::clone(&self.client); - Self::authenticated_request(client, auth, http::Method::DELETE, uri, None).await + Ok(HttpApiResponseStream { + status: response.status.as_u16(), + content_type, + body: Some(body), + }) }) } } diff --git a/proxmox-client/src/lib.rs b/proxmox-client/src/lib.rs index dd57290a..c6e3cf02 100644 --- a/proxmox-client/src/lib.rs +++ b/proxmox-client/src/lib.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::future::Future; +use http::Method; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -29,43 +30,84 @@ pub trait HttpApiClient { where Self: 'a; - /// `GET` request with a path and query component (no hostname). - /// - /// For this request, authentication headers should be set! - fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>; + /// Some requests are better "streamed" than collected in RAM, for this, the body type used by + /// the underlying client needs to be exposed. + type Body; - /// `POST` request with a path and query component (no hostname), and a serializable body. - /// - /// The body should be serialized to json and sent with `Content-type: application/json`. - /// - /// For this request, authentication headers should be set! - fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + /// Future for streamed requests. + type ResponseStreamFuture<'a>: Future, Error>> + + 'a where - T: ?Sized + Serialize; + Self: 'a; - /// `POST` request with a path and query component (no hostname), no request body. + /// An *authenticated* asynchronous request with a path and query component (no hostname), and + /// an optional body, of which the response body is read to completion. /// /// For this request, authentication headers should be set! - fn post_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>; - - /// `PUT` request with a path and query component (no hostname), and a serializable body. - /// - /// The body should be serialized to json and sent with `Content-type: application/json`. - /// - /// For this request, authentication headers should be set! - fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseFuture<'a> where - T: ?Sized + Serialize; + T: Serialize + 'a; - /// `PUT` request with a path and query component (no hostname), no request body. + /// An *authenticated* asynchronous request with a path and query component (no hostname), and + /// an optional body. The response status is returned, but the body is returned for the caller + /// to read from. /// /// For this request, authentication headers should be set! - fn put_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>; + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseStreamFuture<'a> + where + T: Serialize + 'a; - /// `DELETE` request with a path and query component (no hostname). - /// - /// For this request, authentication headers should be set! - fn delete<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a>; + /// This is deprecated. + /// Calls `self.request` with `Method::GET` and `None` for the body. + fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { + self.request(Method::GET, path_and_query, None::<()>) + } + + /// This is deprecated. + /// Calls `self.request` with `Method::POST`. + fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> + where + T: ?Sized + Serialize, + { + self.request(Method::POST, path_and_query, Some(params)) + } + + /// This is deprecated. + /// Calls `self.request` with `Method::POST` and `None` for the body.. + fn post_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { + self.request(Method::POST, path_and_query, None::<()>) + } + + /// This is deprecated. + /// Calls `self.request` with `Method::PUT`. + fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> + where + T: ?Sized + Serialize, + { + self.request(Method::PUT, path_and_query, Some(params)) + } + + /// This is deprecated. + /// Calls `self.request` with `Method::PUT` and `None` for the body.. + fn put_without_body<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { + self.request(Method::PUT, path_and_query, None::<()>) + } + + /// This is deprecated. + /// Calls `self.request` with `Method::DELETE`. + fn delete<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { + self.request(Method::DELETE, path_and_query, None::<()>) + } } /// A response from the HTTP API as required by the [`HttpApiClient`] trait. @@ -200,11 +242,41 @@ where where Self: 'a; + type Body = C::Body; + + type ResponseStreamFuture<'a> = C::ResponseStreamFuture<'a> + where + Self: 'a; + + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseFuture<'a> + where + T: Serialize + 'a, + { + C::request(self, method, path_and_query, params) + } + + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseStreamFuture<'a> + where + T: Serialize + 'a, + { + C::streaming_request(self, method, path_and_query, params) + } + fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { C::get(self, path_and_query) } - fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> where T: ?Sized + Serialize, { @@ -215,7 +287,7 @@ where C::post_without_body(self, path_and_query) } - fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> where T: ?Sized + Serialize, { @@ -239,11 +311,41 @@ where where Self: 'a; + type Body = C::Body; + + type ResponseStreamFuture<'a> = C::ResponseStreamFuture<'a> + where + Self: 'a; + + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseFuture<'a> + where + T: Serialize + 'a, + { + C::request(self, method, path_and_query, params) + } + + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseStreamFuture<'a> + where + T: Serialize + 'a, + { + C::streaming_request(self, method, path_and_query, params) + } + fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { C::get(self, path_and_query) } - fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> where T: ?Sized + Serialize, { @@ -254,7 +356,7 @@ where C::post_without_body(self, path_and_query) } - fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> where T: ?Sized + Serialize, { @@ -278,11 +380,41 @@ where where Self: 'a; + type Body = C::Body; + + type ResponseStreamFuture<'a> = C::ResponseStreamFuture<'a> + where + Self: 'a; + + fn request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseFuture<'a> + where + T: Serialize + 'a, + { + C::request(self, method, path_and_query, params) + } + + fn streaming_request<'a, T>( + &'a self, + method: Method, + path_and_query: &'a str, + params: Option, + ) -> Self::ResponseStreamFuture<'a> + where + T: Serialize + 'a, + { + C::streaming_request(self, method, path_and_query, params) + } + fn get<'a>(&'a self, path_and_query: &'a str) -> Self::ResponseFuture<'a> { C::get(self, path_and_query) } - fn post<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn post<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> where T: ?Sized + Serialize, { @@ -293,7 +425,7 @@ where C::post_without_body(self, path_and_query) } - fn put<'a, T>(&'a self, path_and_query: &'a str, params: &T) -> Self::ResponseFuture<'a> + fn put<'a, T>(&'a self, path_and_query: &'a str, params: &'a T) -> Self::ResponseFuture<'a> where T: ?Sized + Serialize, { @@ -308,3 +440,11 @@ where C::delete(self, path_and_query) } } + +/// A streaming response from the HTTP API as required by the [`HttpApiClient`] trait. +pub struct HttpApiResponseStream { + pub status: u16, + pub content_type: Option, + /// Requests where the response has no body may put `None` here. + pub body: Option, +}