diff --git a/libs/parity-tokio-ipc/examples/client.rs b/libs/parity-tokio-ipc/examples/client.rs index 418461d34..a74cfed8c 100644 --- a/libs/parity-tokio-ipc/examples/client.rs +++ b/libs/parity-tokio-ipc/examples/client.rs @@ -1,4 +1,4 @@ -use tokio::{self, prelude::*}; +use tokio::{self, io::*}; use parity_tokio_ipc::Endpoint; #[tokio::main] @@ -19,6 +19,6 @@ async fn main() { break; } - tokio::time::delay_for(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } diff --git a/libs/parity-tokio-ipc/examples/server.rs b/libs/parity-tokio-ipc/examples/server.rs index 9c53ab377..3b66153d6 100644 --- a/libs/parity-tokio-ipc/examples/server.rs +++ b/libs/parity-tokio-ipc/examples/server.rs @@ -1,6 +1,5 @@ -use futures::StreamExt as _; use tokio::{ - prelude::*, + io::*, self, io::split, }; diff --git a/libs/parity-tokio-ipc/src/lib.rs b/libs/parity-tokio-ipc/src/lib.rs index 0627287e7..37c0ff269 100644 --- a/libs/parity-tokio-ipc/src/lib.rs +++ b/libs/parity-tokio-ipc/src/lib.rs @@ -31,9 +31,9 @@ mod unix; /// } ///``` #[cfg(windows)] -pub use win::{SecurityAttributes, Endpoint, Connection, Incoming}; +pub use win::{SecurityAttributes, Endpoint, Connection, ConnectionClient, Incoming}; #[cfg(unix)] -pub use unix::{SecurityAttributes, Endpoint, Connection, Incoming}; +pub use unix::{SecurityAttributes, Endpoint, Connection, ConnectionClient, Incoming}; /// For testing/examples pub fn dummy_endpoint() -> String { @@ -47,13 +47,14 @@ pub fn dummy_endpoint() -> String { #[cfg(test)] mod tests { - use tokio::prelude::*; - use futures::{channel::oneshot, StreamExt as _, FutureExt as _}; + use futures::{channel::oneshot, FutureExt as _}; use std::time::Duration; use tokio::{ self, io::split, }; + use tokio::io::AsyncWriteExt; + use tokio::io::AsyncReadExt; use super::{dummy_endpoint, Endpoint, SecurityAttributes}; use std::path::Path; @@ -100,12 +101,12 @@ mod tests { }); tokio::spawn(server); - tokio::time::delay_for(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(2)).await; println!("Connecting to client 0..."); let mut client_0 = Endpoint::connect(&path).await .expect("failed to open client_0"); - tokio::time::delay_for(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(2)).await; println!("Connecting to client 1..."); let mut client_1 = Endpoint::connect(&path).await .expect("failed to open client_1"); @@ -125,7 +126,7 @@ mod tests { // shutdown server if let Ok(()) = shutdown_tx.send(()) { // wait one second for the file to be deleted. - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; let path = Path::new(&path); // assert that it has assert!(!path.exists()); diff --git a/libs/parity-tokio-ipc/src/unix.rs b/libs/parity-tokio-ipc/src/unix.rs index 80cbdb137..d8b363b8e 100644 --- a/libs/parity-tokio-ipc/src/unix.rs +++ b/libs/parity-tokio-ipc/src/unix.rs @@ -156,3 +156,5 @@ impl AsyncWrite for Connection { Pin::new(&mut this.inner).poll_shutdown(ctx) } } + +pub type ConnectionClient = Connection; diff --git a/libs/parity-tokio-ipc/src/win.rs b/libs/parity-tokio-ipc/src/win.rs index 20dbac071..c9263a230 100644 --- a/libs/parity-tokio-ipc/src/win.rs +++ b/libs/parity-tokio-ipc/src/win.rs @@ -9,17 +9,12 @@ use winapi::um::winnt::*; use std::io; use std::marker; use std::mem; -use std::ptr; -use futures::Stream; -use tokio::prelude::*; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::path::Path; -use tokio::io::PollEvented; +use std::ptr; +use tokio::net::windows::named_pipe::*; -type NamedPipe = PollEvented; - -const PIPE_AVAILABILITY_TIMEOUT: u64 = 5000; +pub type ConnectionClient = NamedPipeClient; +pub type Connection = NamedPipeServer; /// Endpoint implementation for windows pub struct Endpoint { @@ -27,6 +22,18 @@ pub struct Endpoint { security_attributes: SecurityAttributes, } +fn create_server(path: &str, first: bool, attr: *mut libc::c_void) -> io::Result { + unsafe { + ServerOptions::new() + .access_inbound(true) + .access_outbound(true) + .out_buffer_size(65536) + .in_buffer_size(65536) + .first_pipe_instance(first) + .create_with_security_attributes_raw(path, attr) + } +} + impl Endpoint { /// Stream of incoming connections pub fn incoming(mut self) -> io::Result { @@ -42,23 +49,8 @@ impl Endpoint { } /// Inner platform-dependant state of the endpoint - fn inner(&mut self) -> io::Result { - use miow::pipe::NamedPipeBuilder; - use std::os::windows::io::*; - - let raw_handle = unsafe { - NamedPipeBuilder::new(&self.path) - .first(true) - .inbound(true) - .outbound(true) - .out_buffer_size(65536) - .in_buffer_size(65536) - .with_security_attributes(self.security_attributes.as_ptr())? - .into_raw_handle() - }; - - let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) }; - NamedPipe::new(mio_pipe) + fn inner(&mut self) -> io::Result { + unsafe { create_server(&self.path, true, self.security_attributes.as_ptr() as _) } } /// Set security attributes for the connection @@ -72,30 +64,25 @@ impl Endpoint { } /// Make new connection using the provided path and running event pool. - pub async fn connect>(path: P) -> io::Result { - Ok(Connection::wrap(Self::connect_inner(path.as_ref())?)) + pub async fn connect>(path: P) -> io::Result { + Self::connect_inner(path.as_ref()).await } - fn connect_inner(path: &Path) -> io::Result { - use std::fs::OpenOptions; - use std::os::windows::fs::OpenOptionsExt; - use std::os::windows::io::{FromRawHandle, IntoRawHandle}; - use winapi::um::winbase::FILE_FLAG_OVERLAPPED; - - // Wait for the pipe to become available or fail after 5 seconds. - miow::pipe::NamedPipe::wait( - path, - Some(std::time::Duration::from_millis(PIPE_AVAILABILITY_TIMEOUT)), - )?; - let file = OpenOptions::new() - .read(true) - .write(true) - .custom_flags(FILE_FLAG_OVERLAPPED) - .open(path)?; - let mio_pipe = - unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) }; - let pipe = NamedPipe::new(mio_pipe)?; - Ok(pipe) + async fn connect_inner(path: &Path) -> io::Result { + let client = loop { + match ClientOptions::new().read(true).write(true).open(path) { + Ok(client) => break client, + Err(e) + if e.raw_os_error() + == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as i32) => + { + () + } + Err(e) => return Err(e), + } + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + Ok(client) } /// New IPC endpoint at the given path @@ -109,31 +96,10 @@ impl Endpoint { struct NamedPipeSupport { path: String, - pipe: NamedPipe, + pipe: NamedPipeServer, security_attributes: SecurityAttributes, } -impl NamedPipeSupport { - fn replacement_pipe(&mut self) -> io::Result { - use miow::pipe::NamedPipeBuilder; - use std::os::windows::io::*; - - let raw_handle = unsafe { - NamedPipeBuilder::new(&self.path) - .first(false) - .inbound(true) - .outbound(true) - .out_buffer_size(65536) - .in_buffer_size(65536) - .with_security_attributes(self.security_attributes.as_ptr())? - .into_raw_handle() - }; - - let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) }; - NamedPipe::new(mio_pipe) - } -} - /// Stream of incoming connections pub struct Incoming { #[allow(dead_code)] @@ -141,70 +107,20 @@ pub struct Incoming { inner: NamedPipeSupport, } -impl Stream for Incoming { - type Item = tokio::io::Result; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - match self.inner.pipe.get_ref().connect() { - Ok(()) => { - log::trace!("Incoming connection polled successfully"); - let new_listener = self.inner.replacement_pipe()?; - Poll::Ready( - Some(Ok(Connection::wrap(std::mem::replace(&mut self.inner.pipe, new_listener)))) - ) - } - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.inner.pipe.clear_write_ready(ctx); - Poll::Pending - } else { - Poll::Ready(Some(Err(e))) - } - } - } +impl Incoming { + async fn next_(&mut self) -> io::Result { + self.inner.pipe.connect().await?; + let new_listener = unsafe { + create_server( + &self.inner.path, + false, + self.inner.security_attributes.as_ptr() as _, + )? + }; + Ok(std::mem::replace(&mut self.inner.pipe, new_listener)) } -} - -/// IPC connection. -pub struct Connection { - inner: NamedPipe, -} - -impl Connection { - pub fn wrap(pipe: NamedPipe) -> Self { - Self { inner: pipe } - } -} - -impl AsyncRead for Connection { - fn poll_read( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_read(ctx, buf) - } -} - -impl AsyncWrite for Connection { - fn poll_write( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_write(ctx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_flush(ctx) - } - - fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_shutdown(ctx) + pub async fn next(&mut self) -> Option> { + Some(self.next_().await) } } @@ -259,6 +175,7 @@ struct Sid { impl Sid { fn everyone_sid() -> io::Result { let mut sid_ptr = ptr::null_mut(); + #[allow(const_item_mutation)] let result = unsafe { AllocateAndInitializeSid( SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr() as *mut _, @@ -479,5 +396,4 @@ mod test { .allow_everyone_connect() .expect("failed to create security attributes that allow everyone to read and write to/from a pipe"); } - }