Merge pull request #191 from eqlabs/update_hyper_warp

Update hyper, warp
This commit is contained in:
Joonas Koivunen 2020-06-17 23:39:48 +03:00 committed by GitHub
commit 7f161ca516
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 13 additions and 68 deletions

11
Cargo.lock generated
View File

@ -1303,9 +1303,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "0.13.3"
version = "0.13.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7b15203263d1faa615f9337d79c1d37959439dc46c2b4faab33286fadc2a1c5"
checksum = "a6e7655b9594024ad0ee439f3b5a7299369dc2a3f459b47c696f9ff676f9aa1f"
dependencies = [
"bytes 0.5.4",
"futures-channel",
@ -1317,8 +1317,8 @@ dependencies = [
"httparse",
"itoa",
"log 0.4.8",
"net2",
"pin-project",
"socket2",
"time",
"tokio 0.2.16",
"tower-service",
@ -1418,6 +1418,7 @@ dependencies = [
"futures 0.3.4",
"hex",
"hex-literal",
"hyper",
"ipfs",
"libipld 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
"log 0.4.8",
@ -3959,9 +3960,9 @@ dependencies = [
[[package]]
name = "warp"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54cd1e2b3eb3539284d88b76a9afcf5e20f2ef2fab74db5b21a1c30d7d945e82"
checksum = "0e95175b7a927258ecbb816bdada3cc469cb68593e7940b96a60f4af366a9970"
dependencies = [
"bytes 0.5.4",
"futures 0.3.4",

View File

@ -27,7 +27,8 @@ serde_json = "1.0.51"
structopt = "0.3.12"
thiserror = "1.0.14"
tokio = { version = "0.2.16", features = ["full"] }
warp = "0.2.2"
warp = "0.2.3"
hyper = "0.13.6"
async-stream = "0.2.1"
pin-project = "0.4.8"
url = "2.1.1"

View File

@ -1,4 +1,3 @@
use crate::v0::support::unshared::Unshared;
use crate::v0::support::{
with_ipfs, HandledErr, InvalidMultipartFormData, StreamResponse, StringError,
};
@ -195,7 +194,7 @@ async fn rm_query<T: IpfsTypes>(
});
let st = futures::stream::iter(responses);
Ok(StreamResponse(Unshared::new(st)))
Ok(StreamResponse(st))
}
#[derive(Debug, Deserialize)]

View File

@ -21,7 +21,6 @@ use format::EdgeFormatter;
pub(crate) mod path;
pub use path::{IpfsPath, WalkSuccess};
use crate::v0::support::unshared::Unshared;
use crate::v0::support::{HandledErr, StreamResponse};
/// https://docs-beta.ipfs.io/reference/http/api/#api-v0-refs
@ -98,9 +97,7 @@ async fn refs_inner<T: IpfsTypes>(
}
});
// Note: Unshared has the unsafe impl Sync which sadly is needed.
// See documentation for `Unshared` for more information.
Ok(StreamResponse(Unshared::new(st)))
Ok(StreamResponse(st))
}
#[derive(Debug, Serialize, Deserialize)]

View File

@ -1,5 +1,4 @@
use crate::v0::refs::{walk_path, IpfsPath};
use crate::v0::support::unshared::Unshared;
use crate::v0::support::{with_ipfs, StreamResponse, StringError};
use async_stream::try_stream;
use bytes::Bytes;
@ -66,7 +65,7 @@ async fn cat_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: CatArgs) -> Result<impl Re
Err(e) => return Err(StringError::from(e).into()),
};
Ok(StreamResponse(Unshared::new(stream)))
Ok(StreamResponse(stream))
}
#[derive(Deserialize)]
@ -97,7 +96,7 @@ async fn get_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: GetArgs) -> Result<impl Re
return Err(StringError::from("unknown node type").into());
}
Ok(StreamResponse(Unshared::new(walk(ipfs, cid).into_stream())))
Ok(StreamResponse(walk(ipfs, cid).into_stream()))
}
fn walk<Types: IpfsTypes>(

View File

@ -8,7 +8,6 @@ use std::fmt;
pub mod option_parsing;
mod stream;
pub mod unshared;
pub use stream::StreamResponse;
/// The common responses apparently returned by the go-ipfs HTTP api on errors.

View File

@ -10,7 +10,7 @@ pub struct StreamResponse<S>(pub S);
impl<S> Reply for StreamResponse<S>
where
S: TryStream + Send + Sync + 'static,
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: StdError + Send + Sync + 'static,
{

View File

@ -1,51 +0,0 @@
use futures::stream::Stream;
use pin_project::pin_project;
use std::fmt;
use std::{
pin::Pin,
task::{Context, Poll},
};
/// Copied from https://docs.rs/crate/async-compression/0.3.2/source/src/unshared.rs ... Did not
/// keep the safety discussion comment because I am unsure if this is safe with the pinned
/// projections.
///
/// The reason why this is needed is because `warp` or `hyper` needs it. `hyper` needs it because
/// of compiler bug https://github.com/hyperium/hyper/issues/2159 and the future or stream we
/// combine up with `async-stream` is not `Sync`, because the `async_trait` builds up a
/// `Pin<Box<dyn std::future::Future<Output = _> + Send + '_>>`. The lifetime of those futures is
/// not an issue, because at higher level (`refs_path`) those are within the owned values that
/// method receives. It is unclear for me at least if the compiler is too strict with the `Sync`
/// requirement which is derives for any reference or if the root cause here is that `hyper`
/// suffers from that compiler issue.
///
/// Related: https://internals.rust-lang.org/t/what-shall-sync-mean-across-an-await/12020
/// Related: https://github.com/dtolnay/async-trait/issues/77
#[pin_project]
pub struct Unshared<T> {
#[pin]
inner: T,
}
impl<T: Send> Unshared<T> {
pub fn new(inner: T) -> Self {
Unshared { inner }
}
}
unsafe impl<T: Send> Sync for Unshared<T> {}
impl<T> fmt::Debug for Unshared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(core::any::type_name::<T>()).finish()
}
}
impl<S: Stream + Send> Stream for Unshared<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
this.inner.poll_next(ctx)
}
}