280: Patch a Bitswap leak r=koivunej a=ljedrz

We currently don't properly clean up after a connection to a `Bitswap` peer is closed, which leads to leaking `Ledger`s; funnily enough, it seems that just uncommenting a pre-existing line solves the issue. In addition, add a relevant test.

Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-08-06 15:08:27 +00:00 committed by GitHub
commit 70bccd8873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 7 deletions

View File

@ -86,7 +86,7 @@ impl Stats {
pub struct Bitswap {
/// Queue of events to report to the user.
events: VecDeque<NetworkBehaviourAction<Message, BitswapEvent>>,
/// List of peers to send messages to.
/// List of prospect peers to connect to.
target_peers: FnvHashSet<PeerId>,
/// Ledger
pub connected_peers: HashMap<PeerId, Ledger>,
@ -159,11 +159,9 @@ impl Bitswap {
/// Called from a Strategy.
pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
trace!("queueing block to be sent to {}: {}", peer_id, block.cid);
let ledger = self
.connected_peers
.get_mut(&peer_id)
.expect("Peer not in ledger?!");
ledger.add_block(block);
if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
ledger.add_block(block);
}
}
/// Sends the wantlist to the peer.
@ -231,7 +229,9 @@ impl NetworkBehaviour for Bitswap {
fn inject_disconnected(&mut self, peer_id: &PeerId) {
debug!("bitswap: inject_disconnected {:?}", peer_id);
//self.connected_peers.remove(peer_id);
self.connected_peers.remove(peer_id);
// the related stats are not dropped, so that they
// persist for peers regardless of disconnects
}
fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, message: MessageWrapper) {

View File

@ -263,6 +263,7 @@ enum IpfsEvent {
Bootstrap(OneshotSender<Result<SubscriptionFuture<(), String>, Error>>),
AddPeer(PeerId, Multiaddr),
GetClosestPeers(PeerId, OneshotSender<SubscriptionFuture<(), String>>),
GetBitswapPeers(OneshotSender<Vec<PeerId>>),
Exit,
}
@ -983,6 +984,16 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
let future = self.swarm.get_closest_peers(self_peer);
let _ = ret.send(future);
}
IpfsEvent::GetBitswapPeers(ret) => {
let peers = self
.swarm
.bitswap()
.connected_peers
.keys()
.cloned()
.collect();
let _ = ret.send(peers);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
@ -1114,6 +1125,17 @@ mod node {
Ok(())
}
pub async fn get_bitswap_peers(&self) -> Result<Vec<PeerId>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetBitswapPeers(tx))
.await?;
rx.await.map_err(|e| anyhow!(e))
}
pub async fn shutdown(self) {
self.ipfs.exit_daemon().await;
self.bg_task.await;

39
tests/bitswap_cleanup.rs Normal file
View File

@ -0,0 +1,39 @@
use async_std::task;
use ipfs::Node;
async fn wait(millis: u64) {
task::spawn(task::sleep(std::time::Duration::from_millis(millis))).await;
}
// Ensure that the Bitswap object doesn't leak.
#[async_std::test]
async fn check_bitswap_cleanups() {
// create a few nodes
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
let node_c = Node::new("c").await;
// connect node a to node b...
let (_, mut b_addrs) = node_b.identity().await.unwrap();
node_a.connect(b_addrs.pop().unwrap()).await.unwrap();
let bitswap_peers = node_a.get_bitswap_peers().await.unwrap();
assert_eq!(bitswap_peers.len(), 1);
// ...and to node c
let (_, mut c_addrs) = node_c.identity().await.unwrap();
node_a.connect(c_addrs.pop().unwrap()).await.unwrap();
let bitswap_peers = node_a.get_bitswap_peers().await.unwrap();
assert_eq!(bitswap_peers.len(), 2);
// node b says goodbye; check the number of bitswap peers
node_b.shutdown().await;
wait(200).await;
let bitswap_peers = node_a.get_bitswap_peers().await.unwrap();
assert_eq!(bitswap_peers.len(), 1);
// node c says goodbye; check the number of bitswap peers
node_c.shutdown().await;
wait(200).await;
let bitswap_peers = node_a.get_bitswap_peers().await.unwrap();
assert!(bitswap_peers.is_empty());
}