From 3c11bcb17985d297d359b351fb7706a8104fd706 Mon Sep 17 00:00:00 2001 From: saresend Date: Mon, 23 Mar 2020 11:14:49 -0700 Subject: [PATCH] fixing edits --- src/dag.rs | 6 +- src/lib.rs | 779 ++++++++++++++++++++++++++--------------------------- 2 files changed, 387 insertions(+), 398 deletions(-) diff --git a/src/dag.rs b/src/dag.rs index 284a918b..409cd9ef 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -37,11 +37,7 @@ impl IpldDag { None => return Err(anyhow::anyhow!("expected cid")), }; let mut ipld = decode_ipld(&cid, self.repo.get_block(&cid).await?.data())?; - for sub_path in path.iter() { - - } - - + for sub_path in path.iter() {} todo!() } diff --git a/src/lib.rs b/src/lib.rs index bedc4080..895d41dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ use self::unixfs::File; /// `IpfsTypes`. pub trait IpfsTypes: SwarmTypes + RepoTypes {} impl SwarmTypes for T { - type TStrategy = bitswap::AltruisticStrategy; + type TStrategy = bitswap::AltruisticStrategy; } impl IpfsTypes for T {} @@ -63,59 +63,58 @@ impl IpfsTypes for T {} #[derive(Clone, Debug)] pub struct Types; impl RepoTypes for Types { - type TBlockStore = repo::fs::FsBlockStore; - #[cfg(feature = "rocksdb")] - type TDataStore = repo::fs::RocksDataStore; - #[cfg(not(feature = "rocksdb"))] - type TDataStore = repo::mem::MemDataStore; + type TBlockStore = repo::fs::FsBlockStore; + #[cfg(feature = "rocksdb")] + type TDataStore = repo::fs::RocksDataStore; + #[cfg(not(feature = "rocksdb"))] + type TDataStore = repo::mem::MemDataStore; } /// Testing IPFS types #[derive(Clone, Debug)] pub struct TestTypes; impl RepoTypes for TestTypes { - type TBlockStore = repo::mem::MemBlockStore; - type TDataStore = repo::mem::MemDataStore; + type TBlockStore = repo::mem::MemBlockStore; + type TDataStore = repo::mem::MemDataStore; } /// Ipfs options #[derive(Clone)] pub struct IpfsOptions { - _marker: PhantomData, - /// The path of the ipfs repo. - pub ipfs_path: PathBuf, - /// The keypair used with libp2p. - pub keypair: Keypair, - /// Nodes dialed during startup. - pub bootstrap: Vec<(Multiaddr, PeerId)>, - /// Enables mdns for peer discovery when true. - pub mdns: bool, + _marker: PhantomData, + /// The path of the ipfs repo. + pub ipfs_path: PathBuf, + /// The keypair used with libp2p. + pub keypair: Keypair, + /// Nodes dialed during startup. + pub bootstrap: Vec<(Multiaddr, PeerId)>, + /// Enables mdns for peer discovery when true. + pub mdns: bool, } impl fmt::Debug for IpfsOptions { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - // needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions - // is a struct with all public fields, so don't enforce users to use this wrapper. - fmt - .debug_struct("IpfsOptions") - .field("ipfs_path", &self.ipfs_path) - .field("bootstrap", &self.bootstrap) - .field("keypair", &DebuggableKeypair(&self.keypair)) - .field("mdns", &self.mdns) - .finish() - } + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + // needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions + // is a struct with all public fields, so don't enforce users to use this wrapper. + fmt.debug_struct("IpfsOptions") + .field("ipfs_path", &self.ipfs_path) + .field("bootstrap", &self.bootstrap) + .field("keypair", &DebuggableKeypair(&self.keypair)) + .field("mdns", &self.mdns) + .finish() + } } impl IpfsOptions { - /// Creates an inmemory store backed node for tests - pub fn inmemory_with_generated_keys(mdns: bool) -> Self { - Self::new( - std::env::temp_dir().into(), - Keypair::generate_ed25519(), - vec![], - mdns, - ) - } + /// Creates an inmemory store backed node for tests + pub fn inmemory_with_generated_keys(mdns: bool) -> Self { + Self::new( + std::env::temp_dir().into(), + Keypair::generate_ed25519(), + vec![], + mdns, + ) + } } /// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned @@ -124,80 +123,80 @@ impl IpfsOptions { struct DebuggableKeypair>(I); impl> fmt::Debug for DebuggableKeypair { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let kind = match self.get_ref() { - Keypair::Ed25519(_) => "Ed25519", - Keypair::Rsa(_) => "Rsa", - Keypair::Secp256k1(_) => "Secp256k1", - }; + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let kind = match self.get_ref() { + Keypair::Ed25519(_) => "Ed25519", + Keypair::Rsa(_) => "Rsa", + Keypair::Secp256k1(_) => "Secp256k1", + }; - write!(fmt, "Keypair::{}", kind) - } + write!(fmt, "Keypair::{}", kind) + } } impl> DebuggableKeypair { - fn get_ref(&self) -> &Keypair { - self.0.borrow() - } + fn get_ref(&self) -> &Keypair { + self.0.borrow() + } } impl IpfsOptions { - pub fn new( - ipfs_path: PathBuf, - keypair: Keypair, - bootstrap: Vec<(Multiaddr, PeerId)>, - mdns: bool, - ) -> Self { - Self { - _marker: PhantomData, - ipfs_path, - keypair, - bootstrap, - mdns, + pub fn new( + ipfs_path: PathBuf, + keypair: Keypair, + bootstrap: Vec<(Multiaddr, PeerId)>, + mdns: bool, + ) -> Self { + Self { + _marker: PhantomData, + ipfs_path, + keypair, + bootstrap, + mdns, + } } - } } impl Default for IpfsOptions { - /// Create `IpfsOptions` from environment. - fn default() -> Self { - let ipfs_path = if let Ok(path) = std::env::var("IPFS_PATH") { - PathBuf::from(path) - } else { - let root = if let Some(home) = dirs::home_dir() { - home - } else { - std::env::current_dir().unwrap() - }; - root.join(".rust-ipfs").into() - }; - let config_path = dirs::config_dir() - .unwrap() - .join("rust-ipfs") - .join("config.json"); - let config = ConfigFile::new(config_path).unwrap(); - let keypair = config.secio_key_pair(); - let bootstrap = config.bootstrap(); + /// Create `IpfsOptions` from environment. + fn default() -> Self { + let ipfs_path = if let Ok(path) = std::env::var("IPFS_PATH") { + PathBuf::from(path) + } else { + let root = if let Some(home) = dirs::home_dir() { + home + } else { + std::env::current_dir().unwrap() + }; + root.join(".rust-ipfs").into() + }; + let config_path = dirs::config_dir() + .unwrap() + .join("rust-ipfs") + .join("config.json"); + let config = ConfigFile::new(config_path).unwrap(); + let keypair = config.secio_key_pair(); + let bootstrap = config.bootstrap(); - IpfsOptions { - _marker: PhantomData, - ipfs_path, - keypair, - bootstrap, - mdns: true, + IpfsOptions { + _marker: PhantomData, + ipfs_path, + keypair, + bootstrap, + mdns: true, + } } - } } /// Ipfs struct creates a new IPFS node and is the main entry point /// for interacting with IPFS. #[derive(Clone, Debug)] pub struct Ipfs { - repo: Arc>, - dag: IpldDag, - ipns: Ipns, - keys: DebuggableKeypair, - to_task: Sender, + repo: Arc>, + dag: IpldDag, + ipns: Ipns, + keys: DebuggableKeypair, + to_task: Sender, } type Channel = OneshotSender>; @@ -206,365 +205,359 @@ type Channel = OneshotSender>; /// task. #[derive(Debug)] enum IpfsEvent { - /// Connect - Connect( - Multiaddr, - OneshotSender>>, - ), - /// Addresses - Addresses(Channel)>>), - /// Local addresses - Listeners(Channel>), - /// Connections - Connections(Channel>), - /// Disconnect - Disconnect(Multiaddr, Channel<()>), - /// Request background task to return the listened and external addresses - GetAddresses(OneshotSender>), - Exit, + /// Connect + Connect( + Multiaddr, + OneshotSender>>, + ), + /// Addresses + Addresses(Channel)>>), + /// Local addresses + Listeners(Channel>), + /// Connections + Connections(Channel>), + /// Disconnect + Disconnect(Multiaddr, Channel<()>), + /// Request background task to return the listened and external addresses + GetAddresses(OneshotSender>), + Exit, } /// Configured Ipfs instace or value which can be only initialized. pub struct UninitializedIpfs { - repo: Arc>, - dag: IpldDag, - ipns: Ipns, - keys: Keypair, - moved_on_init: Option<(Receiver, TSwarm)>, + repo: Arc>, + dag: IpldDag, + ipns: Ipns, + keys: Keypair, + moved_on_init: Option<(Receiver, TSwarm)>, } impl UninitializedIpfs { - /// Configures a new UninitializedIpfs with from the given options. - pub async fn new(options: IpfsOptions) -> Self { - let repo_options = RepoOptions::::from(&options); - let keys = options.keypair.clone(); - let (repo, repo_events) = create_repo(repo_options); - let swarm_options = SwarmOptions::::from(&options); - let swarm = create_swarm(swarm_options, repo.clone()).await; - let dag = IpldDag::new(repo.clone()); - let ipns = Ipns::new(repo.clone()); + /// Configures a new UninitializedIpfs with from the given options. + pub async fn new(options: IpfsOptions) -> Self { + let repo_options = RepoOptions::::from(&options); + let keys = options.keypair.clone(); + let (repo, repo_events) = create_repo(repo_options); + let swarm_options = SwarmOptions::::from(&options); + let swarm = create_swarm(swarm_options, repo.clone()).await; + let dag = IpldDag::new(repo.clone()); + let ipns = Ipns::new(repo.clone()); - UninitializedIpfs { - repo, - dag, - ipns, - keys, - moved_on_init: Some((repo_events, swarm)), + UninitializedIpfs { + repo, + dag, + ipns, + keys, + moved_on_init: Some((repo_events, swarm)), + } } - } - /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the - /// future should be spawned on a executor as soon as possible. - pub async fn start( - mut self, - ) -> Result<(Ipfs, impl std::future::Future), Error> { - use futures::stream::StreamExt; + /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the + /// future should be spawned on a executor as soon as possible. + pub async fn start( + mut self, + ) -> Result<(Ipfs, impl std::future::Future), Error> { + use futures::stream::StreamExt; - let (repo_events, swarm) = self - .moved_on_init - .take() - .expect("start cannot be called twice"); + let (repo_events, swarm) = self + .moved_on_init + .take() + .expect("start cannot be called twice"); - self.repo.init().await?; - self.repo.init().await?; + self.repo.init().await?; + self.repo.init().await?; - let (to_task, receiver) = channel::(1); + let (to_task, receiver) = channel::(1); - let fut = IpfsFuture { - repo_events: repo_events.fuse(), - from_facade: receiver.fuse(), - swarm, - }; + let fut = IpfsFuture { + repo_events: repo_events.fuse(), + from_facade: receiver.fuse(), + swarm, + }; - let UninitializedIpfs { - repo, - dag, - ipns, - keys, - .. - } = self; + let UninitializedIpfs { + repo, + dag, + ipns, + keys, + .. + } = self; - Ok(( - Ipfs { - repo, - dag, - ipns, - keys: DebuggableKeypair(keys), - to_task, - }, - fut, - )) - } + Ok(( + Ipfs { + repo, + dag, + ipns, + keys: DebuggableKeypair(keys), + to_task, + }, + fut, + )) + } } impl Ipfs { - /// Puts a block into the ipfs repo. - pub async fn put_block(&mut self, block: Block) -> Result { - Ok(self.repo.put_block(block).await?) - } + /// Puts a block into the ipfs repo. + pub async fn put_block(&mut self, block: Block) -> Result { + Ok(self.repo.put_block(block).await?) + } - /// Retrives a block from the ipfs repo. - pub async fn get_block(&mut self, cid: &Cid) -> Result { - Ok(self.repo.get_block(cid).await?) - } + /// Retrives a block from the ipfs repo. + pub async fn get_block(&mut self, cid: &Cid) -> Result { + Ok(self.repo.get_block(cid).await?) + } - /// Remove block from the ipfs repo. - pub async fn remove_block(&mut self, cid: &Cid) -> Result<(), Error> { - Ok(self.repo.remove_block(cid).await?) - } + /// Remove block from the ipfs repo. + pub async fn remove_block(&mut self, cid: &Cid) -> Result<(), Error> { + Ok(self.repo.remove_block(cid).await?) + } - /// Pins a given Cid - pub async fn pin_block(&mut self, cid: IpfsPath) -> Result<(), Error> { - Ok( - self - .dag - .pin(self.identity().await?.0.into_peer_id(), cid) - .await?, - ) - } + /// Pins a given Cid + pub async fn pin_block(&mut self, cid: IpfsPath) -> Result<(), Error> { + Ok(self + .dag + .pin(self.identity().await?.0.into_peer_id(), cid) + .await?) + } - /// Puts an ipld dag node into the ipfs repo. - pub async fn put_dag(&self, ipld: Ipld) -> Result { - Ok(self.dag.put(ipld, Codec::DagCBOR).await?) - } + /// Puts an ipld dag node into the ipfs repo. + pub async fn put_dag(&self, ipld: Ipld) -> Result { + Ok(self.dag.put(ipld, Codec::DagCBOR).await?) + } - /// Gets an ipld dag node from the ipfs repo. - pub async fn get_dag(&self, path: IpfsPath) -> Result { - Ok(self.dag.get(path).await?) - } + /// Gets an ipld dag node from the ipfs repo. + pub async fn get_dag(&self, path: IpfsPath) -> Result { + Ok(self.dag.get(path).await?) + } - /// Adds a file into the ipfs repo. - pub async fn add(&self, path: PathBuf) -> Result { - let dag = self.dag.clone(); - let file = File::new(path).await?; - let path = file.put_unixfs_v1(&dag).await?; - Ok(path) - } + /// Adds a file into the ipfs repo. + pub async fn add(&self, path: PathBuf) -> Result { + let dag = self.dag.clone(); + let file = File::new(path).await?; + let path = file.put_unixfs_v1(&dag).await?; + Ok(path) + } - /// Gets a file from the ipfs repo. - pub async fn get(&self, path: IpfsPath) -> Result { - Ok(File::get_unixfs_v1(&self.dag, path).await?) - } + /// Gets a file from the ipfs repo. + pub async fn get(&self, path: IpfsPath) -> Result { + Ok(File::get_unixfs_v1(&self.dag, path).await?) + } - /// Resolves a ipns path to an ipld path. - pub async fn resolve_ipns(&self, path: &IpfsPath) -> Result { - Ok(self.ipns.resolve(path).await?) - } + /// Resolves a ipns path to an ipld path. + pub async fn resolve_ipns(&self, path: &IpfsPath) -> Result { + Ok(self.ipns.resolve(path).await?) + } - /// Publishes an ipld path. - pub async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result { - Ok(self.ipns.publish(key, path).await?) - } + /// Publishes an ipld path. + pub async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result { + Ok(self.ipns.publish(key, path).await?) + } - /// Cancel an ipns path. - pub async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error> { - self.ipns.cancel(key).await?; - Ok(()) - } + /// Cancel an ipns path. + pub async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error> { + self.ipns.cancel(key).await?; + Ok(()) + } - pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> { - let (tx, rx) = oneshot_channel(); - self - .to_task - .clone() - .send(IpfsEvent::Connect(addr, tx)) - .await?; - let subscription = rx.await?; - subscription.await.map_err(|e| format_err!("{}", e)) - } + pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> { + let (tx, rx) = oneshot_channel(); + self.to_task + .clone() + .send(IpfsEvent::Connect(addr, tx)) + .await?; + let subscription = rx.await?; + subscription.await.map_err(|e| format_err!("{}", e)) + } - pub async fn addrs(&self) -> Result)>, Error> { - let (tx, rx) = oneshot_channel(); - self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?; - rx.await? - } + pub async fn addrs(&self) -> Result)>, Error> { + let (tx, rx) = oneshot_channel(); + self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?; + rx.await? + } - pub async fn addrs_local(&self) -> Result, Error> { - let (tx, rx) = oneshot_channel(); - self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?; - rx.await? - } + pub async fn addrs_local(&self) -> Result, Error> { + let (tx, rx) = oneshot_channel(); + self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?; + rx.await? + } - pub async fn peers(&self) -> Result, Error> { - let (tx, rx) = oneshot_channel(); - self - .to_task - .clone() - .send(IpfsEvent::Connections(tx)) - .await?; - rx.await? - } + pub async fn peers(&self) -> Result, Error> { + let (tx, rx) = oneshot_channel(); + self.to_task + .clone() + .send(IpfsEvent::Connections(tx)) + .await?; + rx.await? + } - pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> { - let (tx, rx) = oneshot_channel(); - self - .to_task - .clone() - .send(IpfsEvent::Disconnect(addr, tx)) - .await?; - rx.await? - } + pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> { + let (tx, rx) = oneshot_channel(); + self.to_task + .clone() + .send(IpfsEvent::Disconnect(addr, tx)) + .await?; + rx.await? + } - pub async fn identity(&self) -> Result<(PublicKey, Vec), Error> { - let (tx, rx) = oneshot_channel(); + pub async fn identity(&self) -> Result<(PublicKey, Vec), Error> { + let (tx, rx) = oneshot_channel(); - self - .to_task - .clone() - .send(IpfsEvent::GetAddresses(tx)) - .await?; - let addresses = rx.await?; - Ok((self.keys.get_ref().public(), addresses)) - } + self.to_task + .clone() + .send(IpfsEvent::GetAddresses(tx)) + .await?; + let addresses = rx.await?; + Ok((self.keys.get_ref().public(), addresses)) + } - /// Exit daemon. - pub async fn exit_daemon(mut self) { - // ignoring the error because it'd mean that the background task would had already been - // dropped - let _ = self.to_task.send(IpfsEvent::Exit).await; - } + /// Exit daemon. + pub async fn exit_daemon(mut self) { + // ignoring the error because it'd mean that the background task would had already been + // dropped + let _ = self.to_task.send(IpfsEvent::Exit).await; + } } /// Background task of `Ipfs` created when calling `UninitializedIpfs::start`. // The receivers are Fuse'd so that we don't have to manage state on them being exhausted. struct IpfsFuture { - swarm: TSwarm, - repo_events: Fuse>, - from_facade: Fuse>, + swarm: TSwarm, + repo_events: Fuse>, + from_facade: Fuse>, } impl Future for IpfsFuture { - type Output = (); + type Output = (); - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - use futures::Stream; - use libp2p::{swarm::SwarmEvent, Swarm}; + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { + use futures::Stream; + use libp2p::{swarm::SwarmEvent, Swarm}; - // begin by polling the swarm so that initially it'll first have chance to bind listeners - // and such. TODO: this no longer needs to be a swarm event but perhaps we should - // consolidate logging of these events here, if necessary? - loop { - let inner = { - let next = self.swarm.next_event(); - futures::pin_mut!(next); - match next.poll(ctx) { - Poll::Ready(inner) => inner, - Poll::Pending => break, + // begin by polling the swarm so that initially it'll first have chance to bind listeners + // and such. TODO: this no longer needs to be a swarm event but perhaps we should + // consolidate logging of these events here, if necessary? + loop { + let inner = { + let next = self.swarm.next_event(); + futures::pin_mut!(next); + match next.poll(ctx) { + Poll::Ready(inner) => inner, + Poll::Pending => break, + } + }; + match inner { + SwarmEvent::Behaviour(()) => {} + SwarmEvent::Connected(_peer_id) => {} + SwarmEvent::Disconnected(_peer_id) => {} + SwarmEvent::NewListenAddr(_addr) => {} + SwarmEvent::ExpiredListenAddr(_addr) => {} + SwarmEvent::UnreachableAddr { + peer_id: _peer_id, + address: _address, + error: _error, + } => {} + SwarmEvent::StartConnect(_peer_id) => {} + } } - }; - match inner { - SwarmEvent::Behaviour(()) => {} - SwarmEvent::Connected(_peer_id) => {} - SwarmEvent::Disconnected(_peer_id) => {} - SwarmEvent::NewListenAddr(_addr) => {} - SwarmEvent::ExpiredListenAddr(_addr) => {} - SwarmEvent::UnreachableAddr { - peer_id: _peer_id, - address: _address, - error: _error, - } => {} - SwarmEvent::StartConnect(_peer_id) => {} - } + + // temporary pinning of the receivers should be safe as we are pinning through the + // already pinned self. with the receivers we can also safely ignore exhaustion + // as those are fused. + loop { + let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) { + Poll::Ready(Some(evt)) => evt, + // doing teardown also after the `Ipfs` has been dropped + Poll::Ready(None) => IpfsEvent::Exit, + Poll::Pending => break, + }; + + match inner { + IpfsEvent::Connect(addr, ret) => { + ret.send(self.swarm.connect(addr)).ok(); + } + IpfsEvent::Addresses(ret) => { + let addrs = self.swarm.addrs(); + ret.send(Ok(addrs)).ok(); + } + IpfsEvent::Listeners(ret) => { + let listeners = Swarm::listeners(&self.swarm).cloned().collect(); + ret.send(Ok(listeners)).ok(); + } + IpfsEvent::Connections(ret) => { + let connections = self.swarm.connections(); + ret.send(Ok(connections)).ok(); + } + IpfsEvent::Disconnect(addr, ret) => { + if let Some(disconnector) = self.swarm.disconnect(addr) { + disconnector.disconnect(&mut self.swarm); + } + ret.send(Ok(())).ok(); + } + IpfsEvent::GetAddresses(ret) => { + // perhaps this could be moved under `IpfsEvent` or free functions? + let mut addresses = Vec::new(); + addresses.extend(Swarm::listeners(&self.swarm).cloned()); + addresses.extend(Swarm::external_addresses(&self.swarm).cloned()); + // ignore error, perhaps caller went away already + let _ = ret.send(addresses); + } + IpfsEvent::Exit => { + // FIXME: we could do a proper teardown + return Poll::Ready(()); + } + } + } + + // Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy + // wants this to be written with a `while let`. + while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { + match evt { + RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), + RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid), + RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), + } + } + + Poll::Pending } - - // temporary pinning of the receivers should be safe as we are pinning through the - // already pinned self. with the receivers we can also safely ignore exhaustion - // as those are fused. - loop { - let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) { - Poll::Ready(Some(evt)) => evt, - // doing teardown also after the `Ipfs` has been dropped - Poll::Ready(None) => IpfsEvent::Exit, - Poll::Pending => break, - }; - - match inner { - IpfsEvent::Connect(addr, ret) => { - ret.send(self.swarm.connect(addr)).ok(); - } - IpfsEvent::Addresses(ret) => { - let addrs = self.swarm.addrs(); - ret.send(Ok(addrs)).ok(); - } - IpfsEvent::Listeners(ret) => { - let listeners = Swarm::listeners(&self.swarm).cloned().collect(); - ret.send(Ok(listeners)).ok(); - } - IpfsEvent::Connections(ret) => { - let connections = self.swarm.connections(); - ret.send(Ok(connections)).ok(); - } - IpfsEvent::Disconnect(addr, ret) => { - if let Some(disconnector) = self.swarm.disconnect(addr) { - disconnector.disconnect(&mut self.swarm); - } - ret.send(Ok(())).ok(); - } - IpfsEvent::GetAddresses(ret) => { - // perhaps this could be moved under `IpfsEvent` or free functions? - let mut addresses = Vec::new(); - addresses.extend(Swarm::listeners(&self.swarm).cloned()); - addresses.extend(Swarm::external_addresses(&self.swarm).cloned()); - // ignore error, perhaps caller went away already - let _ = ret.send(addresses); - } - IpfsEvent::Exit => { - // FIXME: we could do a proper teardown - return Poll::Ready(()); - } - } - } - - // Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy - // wants this to be written with a `while let`. - while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { - match evt { - RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), - RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid), - RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), - } - } - - Poll::Pending - } } #[cfg(test)] mod tests { - use super::*; - use async_std::task; - use libipld::ipld; - use multihash::Sha2_256; + use super::*; + use async_std::task; + use libipld::ipld; + use multihash::Sha2_256; - #[async_std::test] - async fn test_put_and_get_block() { - let options = IpfsOptions::::default(); - let data = b"hello block\n".to_vec().into_boxed_slice(); - let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data)); - let block = Block::new(data, cid); - let ipfs = UninitializedIpfs::new(options).await; - let (mut ipfs, fut) = ipfs.start().await.unwrap(); - task::spawn(fut); + #[async_std::test] + async fn test_put_and_get_block() { + let options = IpfsOptions::::default(); + let data = b"hello block\n".to_vec().into_boxed_slice(); + let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data)); + let block = Block::new(data, cid); + let ipfs = UninitializedIpfs::new(options).await; + let (mut ipfs, fut) = ipfs.start().await.unwrap(); + task::spawn(fut); - let cid: Cid = ipfs.put_block(block.clone()).await.unwrap(); - let new_block = ipfs.get_block(&cid).await.unwrap(); - assert_eq!(block, new_block); + let cid: Cid = ipfs.put_block(block.clone()).await.unwrap(); + let new_block = ipfs.get_block(&cid).await.unwrap(); + assert_eq!(block, new_block); - ipfs.exit_daemon().await; - } + ipfs.exit_daemon().await; + } - #[async_std::test] - async fn test_put_and_get_dag() { - let options = IpfsOptions::::default(); + #[async_std::test] + async fn test_put_and_get_dag() { + let options = IpfsOptions::::default(); - let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); - task::spawn(fut); + let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); + task::spawn(fut); - let data = ipld!([-1, -2, -3]); - let cid = ipfs.put_dag(data.clone()).await.unwrap(); - let new_data = ipfs.get_dag(cid.into()).await.unwrap(); - assert_eq!(data, new_data); + let data = ipld!([-1, -2, -3]); + let cid = ipfs.put_dag(data.clone()).await.unwrap(); + let new_data = ipfs.get_dag(cid.into()).await.unwrap(); + assert_eq!(data, new_data); - ipfs.exit_daemon().await; - } + ipfs.exit_daemon().await; + } }