fix: panicking while putting a block

See related issue #386, the panic reports were popping up on GHA builds.
The reason why this fixes the issue is not entirely clear, even in
hindsight moving the tx over to the blocking task did seem like the
right thing to do, and it could had enabled other ways to cleanup the
synchronization as well.

However perhaps it is not more explcit that the synchronization key is
created and cleared in the async context.
This commit is contained in:
Joonas Koivunen 2020-09-22 13:23:19 +03:00
parent ffc65c8b5d
commit 64dc0afcc4

View File

@ -201,7 +201,6 @@ impl BlockStore for FsBlockStore {
let span = tracing::Span::current();
// launch a blocking task for the filesystem mutation.
// `tx` is moved into the task but `rx` stays in the async context.
let je = tokio::task::spawn_blocking(move || {
let _entered = span.enter();
// pick winning writer with filesystem and create_new; this error will be the 1st
@ -227,10 +226,6 @@ impl BlockStore for FsBlockStore {
match write_through_tempfile(target, &target_path, temp_path, &data) {
Ok(()) => {
let _ = tx
.send(Ok(()))
.expect("this cannot fail as we have at least one receiver on stack");
trace!("successfully wrote the block");
Ok::<_, std::io::Error>(Ok(data.len()))
}
@ -242,9 +237,6 @@ impl BlockStore for FsBlockStore {
target_path, removal
),
}
let _ = tx
.send(Err(()))
.expect("this cannot fail as we have at least one receiver on stack");
Ok(Err(e))
}
}
@ -257,7 +249,13 @@ impl BlockStore for FsBlockStore {
match je {
Ok(Ok(Ok(written))) => {
trace!(bytes = written, "block writing succeeded");
let _ = tx
.send(Ok(()))
.expect("this cannot fail as we have at least one receiver on stack");
drop(rx);
drop(tx);
self.written_bytes
.fetch_add(written as u64, Ordering::SeqCst);
@ -265,17 +263,25 @@ impl BlockStore for FsBlockStore {
Ok((cid, BlockPut::NewBlock))
}
Ok(Ok(Err(e))) => {
// write failed but hopefully the target was removed
// no point in trying to remove it now
// ignore if no one is listening
trace!("write failed but hopefully the target was removed");
let _ = tx
.send(Err(()))
.expect("this cannot fail as we have at least one receiver on stack");
drop(rx);
drop(tx);
Err(Error::new(e))
}
Ok(Err(_)) => {
Ok(Err(e)) => {
trace!("lost block writing race: {}", e);
// At least the following cases:
// - the block existed already
// - the block is being written to and we should await for this to complete
// - readonly or full filesystem prevents file creation
drop(tx);
let message = match rx.recv().await {
Ok(message) => {
trace!("synchronized with writer, write outcome: {:?}", message);