[pbs-devel] [PATCH v3 proxmox-backup 38/58] upload stream: impl reused chunk injector

Christian Ebner c.ebner at proxmox.com
Fri Apr 5 12:26:11 CEST 2024


On 4/4/24 16:24, Fabian Grünbichler wrote:
> 
> but I'd like the following even better, since it allows us to get rid of
> the buffer altogether:
> 
>      fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
>          let mut this = self.project();
> 
>          let mut injections = this.injection_queue.lock().unwrap();
> 
>          // check whether we have something to inject
>          if let Some(inject) = injections.pop_front() {
>              let offset = this.stream_len.load(Ordering::SeqCst) as u64;
> 
>              if inject.boundary == offset {
>                  // inject now
>                  let mut chunks = Vec::new();
>                  let mut csum = this.index_csum.lock().unwrap();
> 
>                  // account for injected chunks
>                  for chunk in inject.chunks {
>                      let offset = this
>                          .stream_len
>                          .fetch_add(chunk.size() as usize, Ordering::SeqCst)
>                          as u64;
>                      this.reused_len
>                          .fetch_add(chunk.size() as usize, Ordering::SeqCst);
>                      let digest = chunk.digest();
>                      chunks.push((offset, digest));
>                      let end_offset = offset + chunk.size();
>                      csum.update(&end_offset.to_le_bytes());
>                      csum.update(&digest);
>                  }
>                  let chunk_info = InjectedChunksInfo::Known(chunks);
>                  return Poll::Ready(Some(Ok(chunk_info)));
>              } else if inject.boundary < offset {
>                  // incoming new chunks and injections didn't line up?
>                  return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
>              } else {
>                  // inject later
>                  injections.push_front(inject);
>              }
>          }
> 
>          // nothing to inject now, let's see if there's further input
>          match ready!(this.input.as_mut().poll_next(cx)) {
>              None => Poll::Ready(None),
>              Some(Err(err)) => Poll::Ready(Some(Err(err))),
>              Some(Ok(raw)) if raw.is_empty() => {
>                  Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))))
>              }
>              Some(Ok(raw)) => {
>                  let offset = this.stream_len.fetch_add(raw.len(), Ordering::SeqCst) as u64;
>                  let data = InjectedChunksInfo::Raw((offset, raw));
> 
>                  Poll::Ready(Some(Ok(data)))
>              }
>          }
>      }
> 
> but technically all this accounting could move back to the backup_writer
> as well, if the injected chunk info also contained the size..
> 

Yes, this is much more compact! Also, moving this to the backup writer 
as suggested should allow to further reduce code even more there, at 
least from the initial refactoring it seems to behave just fine.





More information about the pbs-devel mailing list