more updating of types

This commit is contained in:
Addy Bryant 2022-03-18 11:54:06 -04:00 committed by Joonas Koivunen
parent 4e5ff4d23e
commit 918d4d8f6d
5 changed files with 85 additions and 43 deletions

View File

@ -208,7 +208,6 @@ impl<I: Borrow<Keypair>> fmt::Debug for DebuggableKeypair<I> {
let kind = match self.get_ref() {
Keypair::Ed25519(_) => "Ed25519",
Keypair::Rsa(_) => "Rsa",
Keypair::Secp256k1(_) => "Secp256k1",
};
write!(fmt, "Keypair::{}", kind)
@ -1476,12 +1475,14 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
IpfsEvent::RemoveListeningAddress(addr, ret) => {
let removed = if let Some((id, _)) = self.listening_addresses.remove(&addr)
{
self.swarm.remove_listener(id).map_err(|_: ()| {
format_err!(
if !self.swarm.remove_listener(id) {
Err(format_err!(
"Failed to remove previously added listening address: {}",
addr
)
})
))
} else {
Ok(())
}
} else {
Err(format_err!("Address was not listened to before: {}", addr))
};

View File

@ -620,7 +620,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {
pub fn dht_get(&mut self, key: Key, quorum: Quorum) -> SubscriptionFuture<KadResult, String> {
self.kad_subscriptions
.create_subscription(self.kademlia.get_record(&key, quorum).into(), None)
.create_subscription(self.kademlia.get_record(key, quorum).into(), None)
}
pub fn dht_put(

View File

@ -37,7 +37,7 @@ pub struct SwarmOptions {
impl From<&IpfsOptions> for SwarmOptions {
fn from(options: &IpfsOptions) -> Self {
let keypair = options.keypair.clone();
let peer_id = keypair.public().into_peer_id();
let peer_id = keypair.public().to_peer_id();
let bootstrap = options.bootstrap.clone();
let mdns = options.mdns;
let kad_protocol = options.kad_protocol.clone();

View File

@ -12,7 +12,10 @@ use libp2p::core::{
Multiaddr, PeerId,
};
use libp2p::floodsub::{Floodsub, FloodsubConfig, FloodsubEvent, FloodsubMessage, Topic};
use libp2p::swarm::{ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::swarm::{
ConnectionHandler, DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
};
/// Currently a thin wrapper around Floodsub, perhaps supporting both Gossipsub and Floodsub later.
/// Allows single subscription to a topic with only unbounded senders. Tracks the peers subscribed
@ -235,6 +238,7 @@ impl Pubsub {
type PubsubNetworkBehaviourAction = NetworkBehaviourAction<
<<Pubsub as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::OutEvent,
<Pubsub as NetworkBehaviour>::ConnectionHandler,
<<Pubsub as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::InEvent,
>;
impl NetworkBehaviour for Pubsub {
@ -253,20 +257,34 @@ impl NetworkBehaviour for Pubsub {
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
connected_point: &ConnectedPoint,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.floodsub
.inject_connection_established(peer_id, connection_id, connected_point)
self.floodsub.inject_connection_established(
peer_id,
connection_id,
endpoint,
failed_addresses,
other_established,
)
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
connected_point: &ConnectedPoint,
endpoint: &ConnectedPoint,
handler: Self::ConnectionHandler,
remaining_established: usize,
) {
self.floodsub
.inject_connection_closed(peer_id, connection_id, connected_point)
self.floodsub.inject_connection_closed(
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
)
}
fn inject_event(
@ -278,8 +296,13 @@ impl NetworkBehaviour for Pubsub {
self.floodsub.inject_event(peer_id, connection, event)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.floodsub.inject_dial_failure(peer_id)
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &DialError,
) {
self.floodsub.inject_dial_failure(peer_id, handler, error)
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
@ -390,11 +413,8 @@ impl NetworkBehaviour for Pubsub {
continue;
}
NetworkBehaviourAction::DialAddress { address } => {
return Poll::Ready(NetworkBehaviourAction::DialAddress { address });
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition });
action @ NetworkBehaviourAction::Dial { .. } => {
return Poll::Ready(action);
}
NetworkBehaviourAction::NotifyHandler {
peer_id,

View File

@ -6,7 +6,7 @@ use libp2p::swarm::handler::DummyConnectionHandler;
use libp2p::swarm::{
self,
dial_opts::{DialOpts, PeerCondition},
ConnectionHandler, NetworkBehaviour, PollParameters, Swarm,
ConnectionHandler, DialError, NetworkBehaviour, PollParameters, Swarm,
};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::convert::{TryFrom, TryInto};
@ -114,9 +114,10 @@ impl SwarmApi {
self.events.push_back(NetworkBehaviourAction::Dial {
// rationale: this is sort of explicit command, perhaps the old address is no longer
// valid. Always would be even better but it's bugged at the moment.
opt: DialOpts::peer_id(addr.peer_id)
opts: DialOpts::peer_id(addr.peer_id)
.condition(PeerCondition::NotDialing)
.build(),
handler: todo!(),
});
self.pending_addresses
@ -193,7 +194,11 @@ impl NetworkBehaviour for SwarmApi {
);
}
if let ConnectedPoint::Dialer { address } = endpoint {
if let ConnectedPoint::Dialer {
address,
role_override,
} = endpoint
{
// we dialed to the `address`
match self.pending_connections.entry(*peer_id) {
Entry::Occupied(mut oe) => {
@ -225,10 +230,12 @@ impl NetworkBehaviour for SwarmApi {
&mut self,
peer_id: &PeerId,
_id: &ConnectionId,
cp: &ConnectedPoint,
endpoint: &ConnectedPoint,
handler: Self::ConnectionHandler,
remaining_established: usize,
) {
trace!("inject_connection_closed {} {:?}", peer_id, cp);
let closed_addr = connection_point_addr(cp);
trace!("inject_connection_closed {} {:?}", peer_id, endpoint);
let closed_addr = connection_point_addr(endpoint);
match self.connected_peers.entry(*peer_id) {
Entry::Occupied(mut oe) => {
@ -255,7 +262,7 @@ impl NetworkBehaviour for SwarmApi {
closed_addr
);
if let ConnectedPoint::Dialer { .. } = cp {
if let ConnectedPoint::Dialer { .. } = endpoint {
let addr = MultiaddrWithPeerId::from((closed_addr, peer_id.to_owned()));
match self.pending_connections.entry(*peer_id) {
@ -290,24 +297,35 @@ impl NetworkBehaviour for SwarmApi {
fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
trace!("inject_dial_failure: {}", peer_id);
if self.pending_addresses.contains_key(peer_id) {
// it is possible that these addresses have not been tried yet; they will be asked
// for soon.
self.events
.push_back(swarm::NetworkBehaviourAction::DialPeer {
opt: DialOpts::peer_id(peer_id)
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &DialError,
) {
trace!("inject_dial_failure: {:?}", peer_id);
if let Some(peer_id) = peer_id {
if self.pending_addresses.contains_key(&peer_id) {
// it is possible that these addresses have not been tried yet; they will be asked
// for soon.
self.events.push_back(swarm::NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.build(),
handler: todo!(),
});
}
}
// this should not be executed once, but probably will be in case unsupported addresses or something
// surprising happens.
for failed in self.pending_connections.remove(peer_id).unwrap_or_default() {
self.connect_registry
.finish_subscription(failed.into(), Err("addresses exhausted".into()));
// this should not be executed once, but probably will be in case unsupported addresses or something
// surprising happens.
for failed in self
.pending_connections
.remove(&peer_id)
.unwrap_or_default()
{
self.connect_registry
.finish_subscription(failed.into(), Err("addresses exhausted".into()));
}
}
}
@ -326,7 +344,10 @@ impl NetworkBehaviour for SwarmApi {
fn connection_point_addr(cp: &ConnectedPoint) -> MultiaddrWithoutPeerId {
match cp {
ConnectedPoint::Dialer { address } => MultiaddrWithPeerId::try_from(address.to_owned())
ConnectedPoint::Dialer {
address,
role_override,
} => MultiaddrWithPeerId::try_from(address.to_owned())
.expect("dialed address contains peerid in libp2p 0.38")
.into(),
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr