fix: full link block generation
this included making the compression to loop on the lower level, making the algorithm in place modifying instead of draining (and thus moving yet to be processed links). added comments and asserts as well. Co-authored-by: ljedrz <ljedrz@users.noreply.github.com>
This commit is contained in:
parent
27068d1e4e
commit
fedf176e3d
@ -60,7 +60,7 @@ impl fmt::Display for LinkFormatter<'_> {
|
||||
if current == next_depth {
|
||||
count += 1;
|
||||
} else {
|
||||
write!(fmt, "{}/", count)?;
|
||||
write!(fmt, "{}: {}/", current, count)?;
|
||||
|
||||
let steps_between = if current > next_depth {
|
||||
current - next_depth
|
||||
@ -68,7 +68,7 @@ impl fmt::Display for LinkFormatter<'_> {
|
||||
next_depth - current
|
||||
};
|
||||
|
||||
for _ in 0..steps_between {
|
||||
for _ in 0..steps_between - 1 {
|
||||
write!(fmt, "0/")?;
|
||||
}
|
||||
count = 1;
|
||||
@ -76,7 +76,7 @@ impl fmt::Display for LinkFormatter<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
write!(fmt, "{}]", count)
|
||||
write!(fmt, "{}: {}]", current, count)
|
||||
}
|
||||
}
|
||||
|
||||
@ -432,6 +432,9 @@ impl BalancedCollector {
|
||||
}
|
||||
}
|
||||
|
||||
/// In-place compression of the `pending` links to a balanced hierarchy. When `finishing`, the
|
||||
/// links will be compressed iteratively from the lowest level to produce a single root link
|
||||
/// block.
|
||||
fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
|
||||
/*
|
||||
|
||||
@ -495,9 +498,9 @@ impl BalancedCollector {
|
||||
reused_blocksizes.reserve(need);
|
||||
}
|
||||
|
||||
for level in 0.. {
|
||||
'outer: for level in 0.. {
|
||||
if pending.len() == 1 && finishing
|
||||
|| pending.len() < self.branching_factor && !finishing
|
||||
|| pending.len() <= self.branching_factor && !finishing
|
||||
{
|
||||
// when there is just a single linking block left and we are finishing, we are
|
||||
// done. It might not be part of the `ret` as will be the case with single chunk
|
||||
@ -507,19 +510,30 @@ impl BalancedCollector {
|
||||
break;
|
||||
}
|
||||
|
||||
// this could be optimized... perhaps by maintaining an index structure?
|
||||
let first_at = pending
|
||||
// when finishing, we iterate the level to completion in blocks of
|
||||
// self.branching_factor and *insert* values at the offset of the first compressed
|
||||
// link. on following iterations this will be the index after the higher level index.
|
||||
let mut starting_point = 0;
|
||||
|
||||
// when creating the link blocks, start overwriting the pending links at the first
|
||||
// found link for this depth. this index will be incremented for successive link
|
||||
// blocks.
|
||||
let mut last_overwrite = None;
|
||||
|
||||
while let Some(mut first_at) = &pending[starting_point..]
|
||||
.iter()
|
||||
.position(|Link { depth, .. }| depth == &level);
|
||||
.position(|Link { depth, .. }| depth == &level)
|
||||
{
|
||||
// fix first_at as absolute index from being possible relative to the
|
||||
// starting_point
|
||||
first_at += starting_point;
|
||||
|
||||
if let Some(first_at) = first_at {
|
||||
let to_compress = pending[first_at..].len();
|
||||
|
||||
if to_compress < self.branching_factor && !finishing {
|
||||
// when not finishing recheck, if we have enough work to do (as we may have had
|
||||
// earlier higher level links which were skipped), but we are still waiting for a
|
||||
// full link block
|
||||
break;
|
||||
if !finishing && pending[first_at..].len() <= self.branching_factor {
|
||||
if let Some(last_overwrite) = last_overwrite {
|
||||
// drain any processed
|
||||
pending.drain((last_overwrite + 1)..first_at);
|
||||
}
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
reused_links.clear();
|
||||
@ -528,25 +542,29 @@ impl BalancedCollector {
|
||||
let mut nested_size = 0;
|
||||
let mut nested_total_size = 0;
|
||||
|
||||
for Link {
|
||||
depth,
|
||||
target: cid,
|
||||
total_size,
|
||||
file_size,
|
||||
} in pending.drain(first_at..)
|
||||
{
|
||||
assert_eq!(depth, level);
|
||||
let last = (first_at + self.branching_factor).min(pending.len());
|
||||
|
||||
reused_links.push(PBLink {
|
||||
Hash: Some(cid.to_bytes().into()),
|
||||
Name: Some("".into()),
|
||||
Tsize: Some(total_size),
|
||||
});
|
||||
reused_blocksizes.push(file_size);
|
||||
nested_total_size += total_size;
|
||||
nested_size += file_size;
|
||||
for (index, link) in pending[first_at..last].iter().enumerate() {
|
||||
assert_eq!(
|
||||
link.depth,
|
||||
level,
|
||||
"unexpected link depth {} when searching at level {} index {}",
|
||||
link.depth,
|
||||
level,
|
||||
index + first_at
|
||||
);
|
||||
|
||||
Self::partition_link(
|
||||
link,
|
||||
&mut reused_links,
|
||||
&mut reused_blocksizes,
|
||||
&mut nested_size,
|
||||
&mut nested_total_size,
|
||||
);
|
||||
}
|
||||
|
||||
debug_assert_eq!(reused_links.len(), reused_blocksizes.len());
|
||||
|
||||
let inner = FlatUnixFs {
|
||||
links: reused_links,
|
||||
data: UnixFs {
|
||||
@ -559,21 +577,38 @@ impl BalancedCollector {
|
||||
|
||||
let (cid, vec) = render_and_hash(&inner);
|
||||
|
||||
pending.push(Link {
|
||||
// start overwriting at the first index of this level, then continue forward on
|
||||
// next iterations.
|
||||
let index = last_overwrite.map(|i| i + 1).unwrap_or(first_at);
|
||||
pending[index] = Link {
|
||||
depth: level + 1,
|
||||
target: cid.clone(),
|
||||
total_size: nested_total_size + vec.len() as u64,
|
||||
file_size: nested_size,
|
||||
});
|
||||
};
|
||||
|
||||
ret.push((cid, vec));
|
||||
|
||||
reused_links = inner.links;
|
||||
reused_links.clear();
|
||||
|
||||
reused_blocksizes = inner.data.blocksizes;
|
||||
reused_blocksizes.clear();
|
||||
|
||||
starting_point = last;
|
||||
last_overwrite = Some(index);
|
||||
}
|
||||
|
||||
if let Some(last_overwrite) = last_overwrite {
|
||||
pending.truncate(last_overwrite + 1);
|
||||
}
|
||||
|
||||
// this holds regardless of finishing; we would had broken 'outer had there been less
|
||||
// than full blocks left.
|
||||
debug_assert_eq!(
|
||||
pending.iter().position(|l| l.depth == level),
|
||||
None,
|
||||
"should have no more of depth {}: {}",
|
||||
level,
|
||||
LinkFormatter(pending.as_slice())
|
||||
);
|
||||
}
|
||||
|
||||
self.reused_links = reused_links;
|
||||
@ -581,6 +616,25 @@ impl BalancedCollector {
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
/// Each link needs to be partitioned into the four mut arguments received by this function in
|
||||
/// order to produce the expected UnixFs output.
|
||||
fn partition_link(
|
||||
link: &Link,
|
||||
links: &mut Vec<PBLink<'static>>,
|
||||
blocksizes: &mut Vec<u64>,
|
||||
nested_size: &mut u64,
|
||||
nested_total_size: &mut u64,
|
||||
) {
|
||||
links.push(PBLink {
|
||||
Hash: Some(link.target.to_bytes().into()),
|
||||
Name: Some("".into()),
|
||||
Tsize: Some(link.total_size),
|
||||
});
|
||||
blocksizes.push(link.file_size);
|
||||
*nested_size += link.file_size;
|
||||
*nested_total_size += link.total_size;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -778,7 +832,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "https://github.com/rs-ipfs/rust-ipfs/issues/242"]
|
||||
fn full_link_block() {
|
||||
let buf = vec![0u8; 1];
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user