fixing edits

This commit is contained in:
saresend 2020-03-23 11:14:49 -07:00
parent 3ad149494d
commit 3c11bcb179
2 changed files with 387 additions and 398 deletions

View File

@ -37,11 +37,7 @@ impl<Types: RepoTypes> IpldDag<Types> {
None => return Err(anyhow::anyhow!("expected cid")),
};
let mut ipld = decode_ipld(&cid, self.repo.get_block(&cid).await?.data())?;
for sub_path in path.iter() {
}
for sub_path in path.iter() {}
todo!()
}

View File

@ -55,7 +55,7 @@ use self::unixfs::File;
/// `IpfsTypes`.
pub trait IpfsTypes: SwarmTypes + RepoTypes {}
impl<T: RepoTypes> SwarmTypes for T {
type TStrategy = bitswap::AltruisticStrategy;
type TStrategy = bitswap::AltruisticStrategy;
}
impl<T: SwarmTypes + RepoTypes> IpfsTypes for T {}
@ -63,59 +63,58 @@ impl<T: SwarmTypes + RepoTypes> IpfsTypes for T {}
#[derive(Clone, Debug)]
pub struct Types;
impl RepoTypes for Types {
type TBlockStore = repo::fs::FsBlockStore;
#[cfg(feature = "rocksdb")]
type TDataStore = repo::fs::RocksDataStore;
#[cfg(not(feature = "rocksdb"))]
type TDataStore = repo::mem::MemDataStore;
type TBlockStore = repo::fs::FsBlockStore;
#[cfg(feature = "rocksdb")]
type TDataStore = repo::fs::RocksDataStore;
#[cfg(not(feature = "rocksdb"))]
type TDataStore = repo::mem::MemDataStore;
}
/// Testing IPFS types
#[derive(Clone, Debug)]
pub struct TestTypes;
impl RepoTypes for TestTypes {
type TBlockStore = repo::mem::MemBlockStore;
type TDataStore = repo::mem::MemDataStore;
type TBlockStore = repo::mem::MemBlockStore;
type TDataStore = repo::mem::MemDataStore;
}
/// Ipfs options
#[derive(Clone)]
pub struct IpfsOptions<Types: IpfsTypes> {
_marker: PhantomData<Types>,
/// The path of the ipfs repo.
pub ipfs_path: PathBuf,
/// The keypair used with libp2p.
pub keypair: Keypair,
/// Nodes dialed during startup.
pub bootstrap: Vec<(Multiaddr, PeerId)>,
/// Enables mdns for peer discovery when true.
pub mdns: bool,
_marker: PhantomData<Types>,
/// The path of the ipfs repo.
pub ipfs_path: PathBuf,
/// The keypair used with libp2p.
pub keypair: Keypair,
/// Nodes dialed during startup.
pub bootstrap: Vec<(Multiaddr, PeerId)>,
/// Enables mdns for peer discovery when true.
pub mdns: bool,
}
impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
// needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions
// is a struct with all public fields, so don't enforce users to use this wrapper.
fmt
.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("mdns", &self.mdns)
.finish()
}
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
// needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions
// is a struct with all public fields, so don't enforce users to use this wrapper.
fmt.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("mdns", &self.mdns)
.finish()
}
}
impl IpfsOptions<TestTypes> {
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys(mdns: bool) -> Self {
Self::new(
std::env::temp_dir().into(),
Keypair::generate_ed25519(),
vec![],
mdns,
)
}
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys(mdns: bool) -> Self {
Self::new(
std::env::temp_dir().into(),
Keypair::generate_ed25519(),
vec![],
mdns,
)
}
}
/// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned
@ -124,80 +123,80 @@ impl IpfsOptions<TestTypes> {
struct DebuggableKeypair<I: Borrow<Keypair>>(I);
impl<I: Borrow<Keypair>> fmt::Debug for DebuggableKeypair<I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match self.get_ref() {
Keypair::Ed25519(_) => "Ed25519",
Keypair::Rsa(_) => "Rsa",
Keypair::Secp256k1(_) => "Secp256k1",
};
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match self.get_ref() {
Keypair::Ed25519(_) => "Ed25519",
Keypair::Rsa(_) => "Rsa",
Keypair::Secp256k1(_) => "Secp256k1",
};
write!(fmt, "Keypair::{}", kind)
}
write!(fmt, "Keypair::{}", kind)
}
}
impl<I: Borrow<Keypair>> DebuggableKeypair<I> {
fn get_ref(&self) -> &Keypair {
self.0.borrow()
}
fn get_ref(&self) -> &Keypair {
self.0.borrow()
}
}
impl<Types: IpfsTypes> IpfsOptions<Types> {
pub fn new(
ipfs_path: PathBuf,
keypair: Keypair,
bootstrap: Vec<(Multiaddr, PeerId)>,
mdns: bool,
) -> Self {
Self {
_marker: PhantomData,
ipfs_path,
keypair,
bootstrap,
mdns,
pub fn new(
ipfs_path: PathBuf,
keypair: Keypair,
bootstrap: Vec<(Multiaddr, PeerId)>,
mdns: bool,
) -> Self {
Self {
_marker: PhantomData,
ipfs_path,
keypair,
bootstrap,
mdns,
}
}
}
}
impl<T: IpfsTypes> Default for IpfsOptions<T> {
/// Create `IpfsOptions` from environment.
fn default() -> Self {
let ipfs_path = if let Ok(path) = std::env::var("IPFS_PATH") {
PathBuf::from(path)
} else {
let root = if let Some(home) = dirs::home_dir() {
home
} else {
std::env::current_dir().unwrap()
};
root.join(".rust-ipfs").into()
};
let config_path = dirs::config_dir()
.unwrap()
.join("rust-ipfs")
.join("config.json");
let config = ConfigFile::new(config_path).unwrap();
let keypair = config.secio_key_pair();
let bootstrap = config.bootstrap();
/// Create `IpfsOptions` from environment.
fn default() -> Self {
let ipfs_path = if let Ok(path) = std::env::var("IPFS_PATH") {
PathBuf::from(path)
} else {
let root = if let Some(home) = dirs::home_dir() {
home
} else {
std::env::current_dir().unwrap()
};
root.join(".rust-ipfs").into()
};
let config_path = dirs::config_dir()
.unwrap()
.join("rust-ipfs")
.join("config.json");
let config = ConfigFile::new(config_path).unwrap();
let keypair = config.secio_key_pair();
let bootstrap = config.bootstrap();
IpfsOptions {
_marker: PhantomData,
ipfs_path,
keypair,
bootstrap,
mdns: true,
IpfsOptions {
_marker: PhantomData,
ipfs_path,
keypair,
bootstrap,
mdns: true,
}
}
}
}
/// Ipfs struct creates a new IPFS node and is the main entry point
/// for interacting with IPFS.
#[derive(Clone, Debug)]
pub struct Ipfs<Types: IpfsTypes> {
repo: Arc<Repo<Types>>,
dag: IpldDag<Types>,
ipns: Ipns<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
repo: Arc<Repo<Types>>,
dag: IpldDag<Types>,
ipns: Ipns<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}
type Channel<T> = OneshotSender<Result<T, Error>>;
@ -206,365 +205,359 @@ type Channel<T> = OneshotSender<Result<T, Error>>;
/// task.
#[derive(Debug)]
enum IpfsEvent {
/// Connect
Connect(
Multiaddr,
OneshotSender<SubscriptionFuture<Result<(), String>>>,
),
/// Addresses
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
/// Local addresses
Listeners(Channel<Vec<Multiaddr>>),
/// Connections
Connections(Channel<Vec<Connection>>),
/// Disconnect
Disconnect(Multiaddr, Channel<()>),
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
Exit,
/// Connect
Connect(
Multiaddr,
OneshotSender<SubscriptionFuture<Result<(), String>>>,
),
/// Addresses
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
/// Local addresses
Listeners(Channel<Vec<Multiaddr>>),
/// Connections
Connections(Channel<Vec<Connection>>),
/// Disconnect
Disconnect(Multiaddr, Channel<()>),
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
Exit,
}
/// Configured Ipfs instace or value which can be only initialized.
pub struct UninitializedIpfs<Types: IpfsTypes> {
repo: Arc<Repo<Types>>,
dag: IpldDag<Types>,
ipns: Ipns<Types>,
keys: Keypair,
moved_on_init: Option<(Receiver<RepoEvent>, TSwarm<Types>)>,
repo: Arc<Repo<Types>>,
dag: IpldDag<Types>,
ipns: Ipns<Types>,
keys: Keypair,
moved_on_init: Option<(Receiver<RepoEvent>, TSwarm<Types>)>,
}
impl<Types: IpfsTypes> UninitializedIpfs<Types> {
/// Configures a new UninitializedIpfs with from the given options.
pub async fn new(options: IpfsOptions<Types>) -> Self {
let repo_options = RepoOptions::<Types>::from(&options);
let keys = options.keypair.clone();
let (repo, repo_events) = create_repo(repo_options);
let swarm_options = SwarmOptions::<Types>::from(&options);
let swarm = create_swarm(swarm_options, repo.clone()).await;
let dag = IpldDag::new(repo.clone());
let ipns = Ipns::new(repo.clone());
/// Configures a new UninitializedIpfs with from the given options.
pub async fn new(options: IpfsOptions<Types>) -> Self {
let repo_options = RepoOptions::<Types>::from(&options);
let keys = options.keypair.clone();
let (repo, repo_events) = create_repo(repo_options);
let swarm_options = SwarmOptions::<Types>::from(&options);
let swarm = create_swarm(swarm_options, repo.clone()).await;
let dag = IpldDag::new(repo.clone());
let ipns = Ipns::new(repo.clone());
UninitializedIpfs {
repo,
dag,
ipns,
keys,
moved_on_init: Some((repo_events, swarm)),
UninitializedIpfs {
repo,
dag,
ipns,
keys,
moved_on_init: Some((repo_events, swarm)),
}
}
}
/// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the
/// future should be spawned on a executor as soon as possible.
pub async fn start(
mut self,
) -> Result<(Ipfs<Types>, impl std::future::Future<Output = ()>), Error> {
use futures::stream::StreamExt;
/// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the
/// future should be spawned on a executor as soon as possible.
pub async fn start(
mut self,
) -> Result<(Ipfs<Types>, impl std::future::Future<Output = ()>), Error> {
use futures::stream::StreamExt;
let (repo_events, swarm) = self
.moved_on_init
.take()
.expect("start cannot be called twice");
let (repo_events, swarm) = self
.moved_on_init
.take()
.expect("start cannot be called twice");
self.repo.init().await?;
self.repo.init().await?;
self.repo.init().await?;
self.repo.init().await?;
let (to_task, receiver) = channel::<IpfsEvent>(1);
let (to_task, receiver) = channel::<IpfsEvent>(1);
let fut = IpfsFuture {
repo_events: repo_events.fuse(),
from_facade: receiver.fuse(),
swarm,
};
let fut = IpfsFuture {
repo_events: repo_events.fuse(),
from_facade: receiver.fuse(),
swarm,
};
let UninitializedIpfs {
repo,
dag,
ipns,
keys,
..
} = self;
let UninitializedIpfs {
repo,
dag,
ipns,
keys,
..
} = self;
Ok((
Ipfs {
repo,
dag,
ipns,
keys: DebuggableKeypair(keys),
to_task,
},
fut,
))
}
Ok((
Ipfs {
repo,
dag,
ipns,
keys: DebuggableKeypair(keys),
to_task,
},
fut,
))
}
}
impl<Types: IpfsTypes> Ipfs<Types> {
/// Puts a block into the ipfs repo.
pub async fn put_block(&mut self, block: Block) -> Result<Cid, Error> {
Ok(self.repo.put_block(block).await?)
}
/// Puts a block into the ipfs repo.
pub async fn put_block(&mut self, block: Block) -> Result<Cid, Error> {
Ok(self.repo.put_block(block).await?)
}
/// Retrives a block from the ipfs repo.
pub async fn get_block(&mut self, cid: &Cid) -> Result<Block, Error> {
Ok(self.repo.get_block(cid).await?)
}
/// Retrives a block from the ipfs repo.
pub async fn get_block(&mut self, cid: &Cid) -> Result<Block, Error> {
Ok(self.repo.get_block(cid).await?)
}
/// Remove block from the ipfs repo.
pub async fn remove_block(&mut self, cid: &Cid) -> Result<(), Error> {
Ok(self.repo.remove_block(cid).await?)
}
/// Remove block from the ipfs repo.
pub async fn remove_block(&mut self, cid: &Cid) -> Result<(), Error> {
Ok(self.repo.remove_block(cid).await?)
}
/// Pins a given Cid
pub async fn pin_block(&mut self, cid: IpfsPath) -> Result<(), Error> {
Ok(
self
.dag
.pin(self.identity().await?.0.into_peer_id(), cid)
.await?,
)
}
/// Pins a given Cid
pub async fn pin_block(&mut self, cid: IpfsPath) -> Result<(), Error> {
Ok(self
.dag
.pin(self.identity().await?.0.into_peer_id(), cid)
.await?)
}
/// Puts an ipld dag node into the ipfs repo.
pub async fn put_dag(&self, ipld: Ipld) -> Result<Cid, Error> {
Ok(self.dag.put(ipld, Codec::DagCBOR).await?)
}
/// Puts an ipld dag node into the ipfs repo.
pub async fn put_dag(&self, ipld: Ipld) -> Result<Cid, Error> {
Ok(self.dag.put(ipld, Codec::DagCBOR).await?)
}
/// Gets an ipld dag node from the ipfs repo.
pub async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error> {
Ok(self.dag.get(path).await?)
}
/// Gets an ipld dag node from the ipfs repo.
pub async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error> {
Ok(self.dag.get(path).await?)
}
/// Adds a file into the ipfs repo.
pub async fn add(&self, path: PathBuf) -> Result<Cid, Error> {
let dag = self.dag.clone();
let file = File::new(path).await?;
let path = file.put_unixfs_v1(&dag).await?;
Ok(path)
}
/// Adds a file into the ipfs repo.
pub async fn add(&self, path: PathBuf) -> Result<Cid, Error> {
let dag = self.dag.clone();
let file = File::new(path).await?;
let path = file.put_unixfs_v1(&dag).await?;
Ok(path)
}
/// Gets a file from the ipfs repo.
pub async fn get(&self, path: IpfsPath) -> Result<File, Error> {
Ok(File::get_unixfs_v1(&self.dag, path).await?)
}
/// Gets a file from the ipfs repo.
pub async fn get(&self, path: IpfsPath) -> Result<File, Error> {
Ok(File::get_unixfs_v1(&self.dag, path).await?)
}
/// Resolves a ipns path to an ipld path.
pub async fn resolve_ipns(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
Ok(self.ipns.resolve(path).await?)
}
/// Resolves a ipns path to an ipld path.
pub async fn resolve_ipns(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
Ok(self.ipns.resolve(path).await?)
}
/// Publishes an ipld path.
pub async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error> {
Ok(self.ipns.publish(key, path).await?)
}
/// Publishes an ipld path.
pub async fn publish_ipns(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error> {
Ok(self.ipns.publish(key, path).await?)
}
/// Cancel an ipns path.
pub async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error> {
self.ipns.cancel(key).await?;
Ok(())
}
/// Cancel an ipns path.
pub async fn cancel_ipns(&self, key: &PeerId) -> Result<(), Error> {
self.ipns.cancel(key).await?;
Ok(())
}
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self
.to_task
.clone()
.send(IpfsEvent::Connect(addr, tx))
.await?;
let subscription = rx.await?;
subscription.await.map_err(|e| format_err!("{}", e))
}
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connect(addr, tx))
.await?;
let subscription = rx.await?;
subscription.await.map_err(|e| format_err!("{}", e))
}
pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
rx.await?
}
pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
rx.await?
}
pub async fn addrs_local(&self) -> Result<Vec<Multiaddr>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
rx.await?
}
pub async fn addrs_local(&self) -> Result<Vec<Multiaddr>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
rx.await?
}
pub async fn peers(&self) -> Result<Vec<Connection>, Error> {
let (tx, rx) = oneshot_channel();
self
.to_task
.clone()
.send(IpfsEvent::Connections(tx))
.await?;
rx.await?
}
pub async fn peers(&self) -> Result<Vec<Connection>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connections(tx))
.await?;
rx.await?
}
pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self
.to_task
.clone()
.send(IpfsEvent::Disconnect(addr, tx))
.await?;
rx.await?
}
pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Disconnect(addr, tx))
.await?;
rx.await?
}
pub async fn identity(&self) -> Result<(PublicKey, Vec<Multiaddr>), Error> {
let (tx, rx) = oneshot_channel();
pub async fn identity(&self) -> Result<(PublicKey, Vec<Multiaddr>), Error> {
let (tx, rx) = oneshot_channel();
self
.to_task
.clone()
.send(IpfsEvent::GetAddresses(tx))
.await?;
let addresses = rx.await?;
Ok((self.keys.get_ref().public(), addresses))
}
self.to_task
.clone()
.send(IpfsEvent::GetAddresses(tx))
.await?;
let addresses = rx.await?;
Ok((self.keys.get_ref().public(), addresses))
}
/// Exit daemon.
pub async fn exit_daemon(mut self) {
// ignoring the error because it'd mean that the background task would had already been
// dropped
let _ = self.to_task.send(IpfsEvent::Exit).await;
}
/// Exit daemon.
pub async fn exit_daemon(mut self) {
// ignoring the error because it'd mean that the background task would had already been
// dropped
let _ = self.to_task.send(IpfsEvent::Exit).await;
}
}
/// Background task of `Ipfs` created when calling `UninitializedIpfs::start`.
// The receivers are Fuse'd so that we don't have to manage state on them being exhausted.
struct IpfsFuture<Types: SwarmTypes> {
swarm: TSwarm<Types>,
repo_events: Fuse<Receiver<RepoEvent>>,
from_facade: Fuse<Receiver<IpfsEvent>>,
swarm: TSwarm<Types>,
repo_events: Fuse<Receiver<RepoEvent>>,
from_facade: Fuse<Receiver<IpfsEvent>>,
}
impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
type Output = ();
type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::{swarm::SwarmEvent, Swarm};
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::{swarm::SwarmEvent, Swarm};
// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
// consolidate logging of these events here, if necessary?
loop {
let inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Poll::Ready(inner) => inner,
Poll::Pending => break,
// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
// consolidate logging of these events here, if necessary?
loop {
let inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Poll::Ready(inner) => inner,
Poll::Pending => break,
}
};
match inner {
SwarmEvent::Behaviour(()) => {}
SwarmEvent::Connected(_peer_id) => {}
SwarmEvent::Disconnected(_peer_id) => {}
SwarmEvent::NewListenAddr(_addr) => {}
SwarmEvent::ExpiredListenAddr(_addr) => {}
SwarmEvent::UnreachableAddr {
peer_id: _peer_id,
address: _address,
error: _error,
} => {}
SwarmEvent::StartConnect(_peer_id) => {}
}
}
};
match inner {
SwarmEvent::Behaviour(()) => {}
SwarmEvent::Connected(_peer_id) => {}
SwarmEvent::Disconnected(_peer_id) => {}
SwarmEvent::NewListenAddr(_addr) => {}
SwarmEvent::ExpiredListenAddr(_addr) => {}
SwarmEvent::UnreachableAddr {
peer_id: _peer_id,
address: _address,
error: _error,
} => {}
SwarmEvent::StartConnect(_peer_id) => {}
}
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(addr, ret) => {
ret.send(self.swarm.connect(addr)).ok();
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
}
// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}
Poll::Pending
}
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(addr, ret) => {
ret.send(self.swarm.connect(addr)).ok();
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
}
// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::task;
use libipld::ipld;
use multihash::Sha2_256;
use super::*;
use async_std::task;
use libipld::ipld;
use multihash::Sha2_256;
#[async_std::test]
async fn test_put_and_get_block() {
let options = IpfsOptions::<TestTypes>::default();
let data = b"hello block\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let block = Block::new(data, cid);
let ipfs = UninitializedIpfs::new(options).await;
let (mut ipfs, fut) = ipfs.start().await.unwrap();
task::spawn(fut);
#[async_std::test]
async fn test_put_and_get_block() {
let options = IpfsOptions::<TestTypes>::default();
let data = b"hello block\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let block = Block::new(data, cid);
let ipfs = UninitializedIpfs::new(options).await;
let (mut ipfs, fut) = ipfs.start().await.unwrap();
task::spawn(fut);
let cid: Cid = ipfs.put_block(block.clone()).await.unwrap();
let new_block = ipfs.get_block(&cid).await.unwrap();
assert_eq!(block, new_block);
let cid: Cid = ipfs.put_block(block.clone()).await.unwrap();
let new_block = ipfs.get_block(&cid).await.unwrap();
assert_eq!(block, new_block);
ipfs.exit_daemon().await;
}
ipfs.exit_daemon().await;
}
#[async_std::test]
async fn test_put_and_get_dag() {
let options = IpfsOptions::<TestTypes>::default();
#[async_std::test]
async fn test_put_and_get_dag() {
let options = IpfsOptions::<TestTypes>::default();
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
task::spawn(fut);
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
task::spawn(fut);
let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let new_data = ipfs.get_dag(cid.into()).await.unwrap();
assert_eq!(data, new_data);
let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();
let new_data = ipfs.get_dag(cid.into()).await.unwrap();
assert_eq!(data, new_data);
ipfs.exit_daemon().await;
}
ipfs.exit_daemon().await;
}
}