refactor: unify StreamResponses

these were simply forgotten behind, which lead to responses returning
different headers.
This commit is contained in:
Joonas Koivunen 2020-06-09 12:08:09 +03:00
parent ddb58cfbf3
commit d9a273fc9b
6 changed files with 40 additions and 71 deletions

View File

@ -1,18 +1,15 @@
use crate::v0::support::unshared::Unshared;
use crate::v0::support::{with_ipfs, HandledErr, InvalidMultipartFormData, StringError};
use crate::v0::support::{
with_ipfs, HandledErr, InvalidMultipartFormData, StreamResponse, StringError,
};
use futures::stream::FuturesOrdered;
use futures::stream::StreamExt;
use futures::stream::{TryStream, TryStreamExt};
use ipfs::error::Error;
use ipfs::{Ipfs, IpfsTypes};
use libipld::cid::{Cid, Codec, Version};
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::error::Error as StdError;
use warp::hyper::body::Bytes;
use warp::{
http::Response, hyper::Body, multipart, path, query, reply, Buf, Filter, Rejection, Reply,
};
use warp::{http::Response, multipart, path, query, reply, Buf, Filter, Rejection, Reply};
mod options;
use options::RmOptions;
@ -234,16 +231,3 @@ pub fn stat<T: IpfsTypes>(
.and(query::<StatQuery>())
.and_then(stat_query)
}
pub struct StreamResponse<S>(pub S);
impl<S> Reply for StreamResponse<S>
where
S: TryStream + Send + Sync + 'static,
S::Ok: Into<Bytes>,
S::Error: StdError + Send + Sync + 'static,
{
fn into_response(self) -> warp::reply::Response {
Response::new(Body::wrap_stream(self.0.into_stream()))
}
}

View File

@ -29,7 +29,9 @@ use std::time::Duration;
use warp::hyper::body::Bytes;
use warp::Filter;
use super::support::{with_ipfs, NonUtf8Topic, RequiredArgumentMissing, StringError};
use crate::v0::support::{
with_ipfs, NonUtf8Topic, RequiredArgumentMissing, StreamResponse, StringError,
};
#[derive(Default)]
pub struct Pubsub {
@ -371,32 +373,6 @@ fn preformat(msg: impl AsRef<ipfs::PubsubMessage>) -> Result<PreformattedJsonMes
})
}
struct StreamResponse<S>(S);
impl<S> warp::Reply for StreamResponse<S>
where
S: futures::stream::TryStream + Send + Sync + 'static,
S::Ok: Into<Bytes>,
S::Error: std::error::Error + Send + Sync + 'static,
{
fn into_response(self) -> warp::reply::Response {
use warp::http::header::{HeaderValue, CONTENT_TYPE, TRAILER};
use warp::hyper::Body;
// while it may seem like the S::Error is handled somehow it currently just means the
// response will stop. hopefully later it can be used to become trailer headers.
let mut resp = warp::reply::Response::new(Body::wrap_stream(self.0.into_stream()));
let headers = resp.headers_mut();
// FIXME: unable to send this header with warp/hyper right now
headers.insert(TRAILER, HeaderValue::from_static("X-Stream-Error"));
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert("X-Chunked-Output", HeaderValue::from_static("1"));
resp
}
}
/// The "arg" for `pubsub/sub`
#[derive(Debug, Deserialize)]
struct TopicParameter {

View File

@ -22,11 +22,7 @@ mod path;
pub use path::{IpfsPath, WalkSuccess};
use crate::v0::support::unshared::Unshared;
mod support;
use support::StreamResponse;
use crate::v0::support::HandledErr;
use crate::v0::support::{HandledErr, StreamResponse};
/// https://docs-beta.ipfs.io/reference/http/api/#api-v0-refs
pub fn refs<T: IpfsTypes>(

View File

@ -1,19 +0,0 @@
use std::error::Error as StdError;
use warp::hyper::body::Bytes;
use warp::hyper::Body;
use warp::{reply::Response, Reply};
use futures::stream::{TryStream, TryStreamExt};
pub struct StreamResponse<S>(pub S);
impl<S> Reply for StreamResponse<S>
where
S: TryStream + Send + Sync + 'static,
S::Ok: Into<Bytes>,
S::Error: StdError + Send + Sync + 'static,
{
fn into_response(self) -> Response {
Response::new(Body::wrap_stream(self.0.into_stream()))
}
}

View File

@ -7,7 +7,9 @@ use std::error::Error as StdError;
use std::fmt;
pub mod option_parsing;
mod stream;
pub mod unshared;
pub use stream::StreamResponse;
/// The common responses apparently returned by the go-ipfs HTTP api on errors.
/// See also: https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/response/error.rs

View File

@ -0,0 +1,30 @@
use futures::stream::{TryStream, TryStreamExt};
use std::error::Error as StdError;
use warp::http::header::{HeaderValue, CONTENT_TYPE, TRAILER};
use warp::http::Response;
use warp::hyper::body::Bytes;
use warp::hyper::Body;
use warp::Reply;
pub struct StreamResponse<S>(pub S);
impl<S> Reply for StreamResponse<S>
where
S: TryStream + Send + Sync + 'static,
S::Ok: Into<Bytes>,
S::Error: StdError + Send + Sync + 'static,
{
fn into_response(self) -> warp::reply::Response {
// while it may seem like the S::Error is handled somehow it currently just means the
// response will stop. hopefully later it can be used to become trailer headers.
let mut resp = Response::new(Body::wrap_stream(self.0.into_stream()));
let headers = resp.headers_mut();
// FIXME: unable to send this header with warp/hyper right now
headers.insert(TRAILER, HeaderValue::from_static("X-Stream-Error"));
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert("X-Chunked-Output", HeaderValue::from_static("1"));
resp
}
}