From 6f796db523ec9474dd55f0d5ed43031b609dbefd Mon Sep 17 00:00:00 2001
From: rustdesk <info@rustdesk.com>
Date: Sat, 26 Jun 2021 00:47:45 +0800
Subject: [PATCH] new namedpipe

---
 libs/parity-tokio-ipc/examples/client.rs |   4 +-
 libs/parity-tokio-ipc/examples/server.rs |   3 +-
 libs/parity-tokio-ipc/src/lib.rs         |  15 +-
 libs/parity-tokio-ipc/src/unix.rs        |   2 +
 libs/parity-tokio-ipc/src/win.rs         | 184 ++++++-----------------
 5 files changed, 63 insertions(+), 145 deletions(-)

diff --git a/libs/parity-tokio-ipc/examples/client.rs b/libs/parity-tokio-ipc/examples/client.rs
index 418461d34..a74cfed8c 100644
--- a/libs/parity-tokio-ipc/examples/client.rs
+++ b/libs/parity-tokio-ipc/examples/client.rs
@@ -1,4 +1,4 @@
-use tokio::{self, prelude::*};
+use tokio::{self, io::*};
 use parity_tokio_ipc::Endpoint;
 
 #[tokio::main]
@@ -19,6 +19,6 @@ async fn main() {
 			break;
 		}
 
-		tokio::time::delay_for(std::time::Duration::from_secs(2)).await;
+		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
 	}
 }
diff --git a/libs/parity-tokio-ipc/examples/server.rs b/libs/parity-tokio-ipc/examples/server.rs
index 9c53ab377..3b66153d6 100644
--- a/libs/parity-tokio-ipc/examples/server.rs
+++ b/libs/parity-tokio-ipc/examples/server.rs
@@ -1,6 +1,5 @@
-use futures::StreamExt as _;
 use tokio::{
-	prelude::*,
+	io::*,
 	self,
 	io::split,
 };
diff --git a/libs/parity-tokio-ipc/src/lib.rs b/libs/parity-tokio-ipc/src/lib.rs
index 0627287e7..37c0ff269 100644
--- a/libs/parity-tokio-ipc/src/lib.rs
+++ b/libs/parity-tokio-ipc/src/lib.rs
@@ -31,9 +31,9 @@ mod unix;
 /// }
 ///```
 #[cfg(windows)]
-pub use win::{SecurityAttributes, Endpoint, Connection, Incoming};
+pub use win::{SecurityAttributes, Endpoint, Connection, ConnectionClient, Incoming};
 #[cfg(unix)]
-pub use unix::{SecurityAttributes, Endpoint, Connection, Incoming};
+pub use unix::{SecurityAttributes, Endpoint, Connection, ConnectionClient, Incoming};
 
 /// For testing/examples
 pub fn dummy_endpoint() -> String {
@@ -47,13 +47,14 @@ pub fn dummy_endpoint() -> String {
 
 #[cfg(test)]
 mod tests {
-	use tokio::prelude::*;
-	use futures::{channel::oneshot, StreamExt as _, FutureExt as _};
+	use futures::{channel::oneshot, FutureExt as _};
 	use std::time::Duration;
 	use tokio::{
 		self,
 		io::split,
 	};
+	use tokio::io::AsyncWriteExt;
+	use tokio::io::AsyncReadExt;
 
 	use super::{dummy_endpoint, Endpoint, SecurityAttributes};
 	use std::path::Path;
@@ -100,12 +101,12 @@ mod tests {
 			});
 		tokio::spawn(server);
 
-		tokio::time::delay_for(Duration::from_secs(2)).await;
+		tokio::time::sleep(Duration::from_secs(2)).await;
 
 		println!("Connecting to client 0...");
 		let mut client_0 = Endpoint::connect(&path).await
 			.expect("failed to open client_0");
-		tokio::time::delay_for(Duration::from_secs(2)).await;
+		tokio::time::sleep(Duration::from_secs(2)).await;
 		println!("Connecting to client 1...");
 		let mut client_1 = Endpoint::connect(&path).await
 			.expect("failed to open client_1");
@@ -125,7 +126,7 @@ mod tests {
 		// shutdown server
 		if let Ok(()) = shutdown_tx.send(()) {
 			// wait one second for the file to be deleted.
-			tokio::time::delay_for(Duration::from_secs(1)).await;
+			tokio::time::sleep(Duration::from_secs(1)).await;
 			let path = Path::new(&path);
 			// assert that it has
 			assert!(!path.exists());
diff --git a/libs/parity-tokio-ipc/src/unix.rs b/libs/parity-tokio-ipc/src/unix.rs
index 80cbdb137..d8b363b8e 100644
--- a/libs/parity-tokio-ipc/src/unix.rs
+++ b/libs/parity-tokio-ipc/src/unix.rs
@@ -156,3 +156,5 @@ impl AsyncWrite for Connection {
         Pin::new(&mut this.inner).poll_shutdown(ctx)
     }
 }
+
+pub type ConnectionClient = Connection;
diff --git a/libs/parity-tokio-ipc/src/win.rs b/libs/parity-tokio-ipc/src/win.rs
index 20dbac071..c9263a230 100644
--- a/libs/parity-tokio-ipc/src/win.rs
+++ b/libs/parity-tokio-ipc/src/win.rs
@@ -9,17 +9,12 @@ use winapi::um::winnt::*;
 use std::io;
 use std::marker;
 use std::mem;
-use std::ptr;
-use futures::Stream;
-use tokio::prelude::*;
-use std::pin::Pin;
-use std::task::{Context, Poll};
 use std::path::Path;
-use tokio::io::PollEvented;
+use std::ptr;
+use tokio::net::windows::named_pipe::*;
 
-type NamedPipe = PollEvented<mio_named_pipes::NamedPipe>;
-
-const PIPE_AVAILABILITY_TIMEOUT: u64 = 5000;
+pub type ConnectionClient = NamedPipeClient;
+pub type Connection = NamedPipeServer;
 
 /// Endpoint implementation for windows
 pub struct Endpoint {
@@ -27,6 +22,18 @@ pub struct Endpoint {
     security_attributes: SecurityAttributes,
 }
 
+fn create_server(path: &str, first: bool, attr: *mut libc::c_void) -> io::Result<NamedPipeServer> {
+    unsafe {
+        ServerOptions::new()
+            .access_inbound(true)
+            .access_outbound(true)
+            .out_buffer_size(65536)
+            .in_buffer_size(65536)
+            .first_pipe_instance(first)
+            .create_with_security_attributes_raw(path, attr)
+    }
+}
+
 impl Endpoint {
     /// Stream of incoming connections
     pub fn incoming(mut self) -> io::Result<Incoming> {
@@ -42,23 +49,8 @@ impl Endpoint {
     }
 
     /// Inner platform-dependant state of the endpoint
-    fn inner(&mut self) -> io::Result<NamedPipe> {
-        use miow::pipe::NamedPipeBuilder;
-        use std::os::windows::io::*;
-
-        let raw_handle = unsafe {
-            NamedPipeBuilder::new(&self.path)
-                .first(true)
-                .inbound(true)
-                .outbound(true)
-                .out_buffer_size(65536)
-                .in_buffer_size(65536)
-                .with_security_attributes(self.security_attributes.as_ptr())?
-                .into_raw_handle()
-        };
-
-        let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
-        NamedPipe::new(mio_pipe)
+    fn inner(&mut self) -> io::Result<NamedPipeServer> {
+        unsafe { create_server(&self.path, true, self.security_attributes.as_ptr() as _) }
     }
 
     /// Set security attributes for the connection
@@ -72,30 +64,25 @@ impl Endpoint {
     }
 
     /// Make new connection using the provided path and running event pool.
-    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Connection> {
-        Ok(Connection::wrap(Self::connect_inner(path.as_ref())?))
+    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<ConnectionClient> {
+        Self::connect_inner(path.as_ref()).await
     }
 
-    fn connect_inner(path: &Path) -> io::Result<NamedPipe> {
-        use std::fs::OpenOptions;
-        use std::os::windows::fs::OpenOptionsExt;
-        use std::os::windows::io::{FromRawHandle, IntoRawHandle};
-        use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
-
-        // Wait for the pipe to become available or fail after 5 seconds.
-        miow::pipe::NamedPipe::wait(
-            path,
-            Some(std::time::Duration::from_millis(PIPE_AVAILABILITY_TIMEOUT)),
-        )?;
-        let file = OpenOptions::new()
-            .read(true)
-            .write(true)
-            .custom_flags(FILE_FLAG_OVERLAPPED)
-            .open(path)?;
-        let mio_pipe =
-            unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) };
-        let pipe = NamedPipe::new(mio_pipe)?;
-        Ok(pipe)
+    async fn connect_inner(path: &Path) -> io::Result<NamedPipeClient> {
+        let client = loop {
+            match ClientOptions::new().read(true).write(true).open(path) {
+                Ok(client) => break client,
+                Err(e)
+                    if e.raw_os_error()
+                        == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as i32) =>
+                {
+                    ()
+                }
+                Err(e) => return Err(e),
+            }
+            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
+        };
+        Ok(client)
     }
 
     /// New IPC endpoint at the given path
@@ -109,31 +96,10 @@ impl Endpoint {
 
 struct NamedPipeSupport {
     path: String,
-    pipe: NamedPipe,
+    pipe: NamedPipeServer,
     security_attributes: SecurityAttributes,
 }
 
-impl NamedPipeSupport {
-    fn replacement_pipe(&mut self) -> io::Result<NamedPipe> {
-        use miow::pipe::NamedPipeBuilder;
-        use std::os::windows::io::*;
-
-        let raw_handle = unsafe {
-            NamedPipeBuilder::new(&self.path)
-                .first(false)
-                .inbound(true)
-                .outbound(true)
-                .out_buffer_size(65536)
-                .in_buffer_size(65536)
-                .with_security_attributes(self.security_attributes.as_ptr())?
-                .into_raw_handle()
-        };
-
-        let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
-        NamedPipe::new(mio_pipe)
-    }
-}
-
 /// Stream of incoming connections
 pub struct Incoming {
     #[allow(dead_code)]
@@ -141,70 +107,20 @@ pub struct Incoming {
     inner: NamedPipeSupport,
 }
 
-impl Stream for Incoming {
-    type Item = tokio::io::Result<Connection>;
-
-    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match self.inner.pipe.get_ref().connect() {
-            Ok(()) => {
-                log::trace!("Incoming connection polled successfully");
-                let new_listener = self.inner.replacement_pipe()?;
-                Poll::Ready(
-                    Some(Ok(Connection::wrap(std::mem::replace(&mut self.inner.pipe, new_listener))))
-                )
-            }
-            Err(e) => {
-                if e.kind() == io::ErrorKind::WouldBlock {
-                    self.inner.pipe.clear_write_ready(ctx);
-                    Poll::Pending
-                } else {
-                    Poll::Ready(Some(Err(e)))
-                }
-            }
-        }
+impl Incoming {
+    async fn next_(&mut self) -> io::Result<NamedPipeServer> {
+        self.inner.pipe.connect().await?;
+        let new_listener = unsafe {
+            create_server(
+                &self.inner.path,
+                false,
+                self.inner.security_attributes.as_ptr() as _,
+            )?
+        };
+        Ok(std::mem::replace(&mut self.inner.pipe, new_listener))
     }
-}
-
-/// IPC connection.
-pub struct Connection {
-    inner: NamedPipe,
-}
-
-impl Connection {
-    pub fn wrap(pipe: NamedPipe) -> Self {
-        Self { inner: pipe }
-	}
-}
-
-impl AsyncRead for Connection {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        ctx: &mut Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        let this = Pin::into_inner(self);
-        Pin::new(&mut this.inner).poll_read(ctx, buf)
-    }
-}
-
-impl AsyncWrite for Connection {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        ctx: &mut Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize>> {
-        let this = Pin::into_inner(self);
-        Pin::new(&mut this.inner).poll_write(ctx, buf)
-    }
-
-    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<()>> {
-        let this = Pin::into_inner(self);
-        Pin::new(&mut this.inner).poll_flush(ctx)
-    }
-
-    fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<()>> {
-        let this = Pin::into_inner(self);
-        Pin::new(&mut this.inner).poll_shutdown(ctx)
+    pub async fn next(&mut self) -> Option<io::Result<NamedPipeServer>> {
+        Some(self.next_().await)
     }
 }
 
@@ -259,6 +175,7 @@ struct Sid {
 impl Sid {
     fn everyone_sid() -> io::Result<Sid> {
         let mut sid_ptr = ptr::null_mut();
+        #[allow(const_item_mutation)]
         let result = unsafe {
             AllocateAndInitializeSid(
                 SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr() as *mut _,
@@ -479,5 +396,4 @@ mod test {
             .allow_everyone_connect()
             .expect("failed to create security attributes that allow everyone to read and write to/from a pipe");
     }
-
 }