2020-07-13 12:44:18 +02:00
use ipfs ::repo ::CidUpgradedRef ;
2020-07-09 15:11:36 +02:00
use async_std ::{
future ::{ pending , timeout } ,
task ,
} ;
use futures ::future ::select ;
use futures ::future ::FutureExt ;
use libipld ::Cid ;
use std ::{ convert ::TryFrom , future ::Future , time ::Duration } ;
fn bounded_retry < Fun , Fut , F , T > (
n_times : usize ,
sleep_between : Duration ,
mut future : Fun ,
check : F ,
) -> Result < usize , Duration >
where
Fun : FnMut ( ) -> Fut ,
Fut : Future < Output = T > ,
F : Fn ( T ) -> bool ,
{
let started = std ::time ::Instant ::now ( ) ;
for n in 0 .. n_times {
if check ( futures ::executor ::block_on ( future ( ) ) ) {
return Ok ( n ) ;
}
std ::thread ::sleep ( sleep_between ) ;
}
Err ( started . elapsed ( ) )
}
/// Check if canceling a Cid affects the wantlist.
#[ async_std::test ]
async fn wantlist_cancellation ( ) {
// start a single node
let opts = ipfs ::IpfsOptions ::inmemory_with_generated_keys ( false ) ;
let ( ipfs , ipfs_fut ) = ipfs ::UninitializedIpfs ::new ( opts )
. await
. start ( )
. await
. unwrap ( ) ;
let _fut_task = task ::spawn ( ipfs_fut ) ;
// execute a get_block request
let cid = Cid ::try_from ( " QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KaGa " ) . unwrap ( ) ;
let ipfs_clone = ipfs . clone ( ) ;
let cid_clone = cid . clone ( ) ;
// start a get_request future and give it some time
let get_request1 = ipfs_clone . get_block ( & cid_clone ) ;
let get_timeout = timeout ( Duration ::from_millis ( 200 ) , pending ::< ( ) > ( ) ) ;
let get_request1 = select ( get_timeout . boxed ( ) , get_request1 . boxed ( ) ) . await ;
// verify that the requested Cid is in the wantlist
let wantlist = ipfs . bitswap_wantlist ( None ) . await ;
assert! ( wantlist
. iter ( )
. map ( | list | list . iter ( ) )
. flatten ( )
. any ( | ( c , _ ) | * c = = cid ) ) ;
// fire up an additional get request
let get_request2 = ipfs_clone . get_block ( & cid_clone ) ;
let get_timeout = timeout ( Duration ::from_millis ( 200 ) , pending ::< ( ) > ( ) ) ;
let get_request2 = select ( get_timeout . boxed ( ) , get_request2 . boxed ( ) ) . await ;
// cancel the first requested Cid
drop ( get_request1 ) ;
// verify that the requested Cid is STILL in the wantlist
let wantlist = ipfs . bitswap_wantlist ( None ) . await ;
assert! ( wantlist
. iter ( )
. map ( | list | list . iter ( ) )
. flatten ( )
. any ( | ( c , _ ) | * c = = cid ) ) ;
// cancel the second requested Cid
drop ( get_request2 ) ;
// verify that the requested Cid is no longer in the wantlist
let wantlist_cleared = bounded_retry (
3 ,
Duration ::from_millis ( 200 ) ,
| | ipfs . bitswap_wantlist ( None ) ,
| ret | ret . unwrap ( ) . is_empty ( ) ,
) ;
assert! (
wantlist_cleared . is_ok ( ) ,
" a block was not removed from the wantlist after all its subscriptions had died "
) ;
}