[pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module

Wolfgang Bumiller w.bumiller at proxmox.com
Tue Oct 13 17:04:26 CEST 2020


Okay we really need to consider adding async generator support or
finding some better solution for this.

An idea would be to have a Buffer with 2 accessors: one implementing AsyncWrite
in such a way that when it is full it returns `Pending` to "yield", and
one which allows taking the current data out of it (both may have to be
used with mutable borrows simultaneously, therefore the two accessors).
That might help get rid of the "return size 0 if buffer is too full"
strategy, basically write the whole code as if there was no buffer to
fill, as an `async fn`, with the Stream's `poll_next` just polling that
method and taking any available data out of the buffer, if you get the
idea. I just find that state machine quite hard to follow, likely also
because all of the actual data is written manually with a bunch of magic
numbers...

On Tue, Oct 13, 2020 at 11:50:41AM +0200, Dominik Csapak wrote:
> this is a module to to stream a zip file in an api call
> the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
> the resulting object itself implements stream for easy use in a
> hyper::Body
> 
> for now, this does not implement compression (uses ZIPs STORE mode), and
> does not support empty directories or hardlinks (or any other special
> files)
> 
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
>  src/tools.rs     |   1 +
>  src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 469 insertions(+)
>  create mode 100644 src/tools/zip.rs
> 
> diff --git a/src/tools.rs b/src/tools.rs
> index 5b244c02..28f0338b 100644
> --- a/src/tools.rs
> +++ b/src/tools.rs
> @@ -35,6 +35,7 @@ pub mod nom;
>  pub mod logrotate;
>  pub mod loopdev;
>  pub mod fuse_loop;
> +pub mod zip;
>  
>  mod parallel_handler;
>  pub use parallel_handler::*;
> diff --git a/src/tools/zip.rs b/src/tools/zip.rs
> new file mode 100644
> index 00000000..acb193cb
> --- /dev/null
> +++ b/src/tools/zip.rs
> @@ -0,0 +1,468 @@
> +use std::ffi::OsString;
> +use std::collections::VecDeque;
> +use std::os::unix::ffi::OsStrExt;
> +use std::pin::Pin;
> +use std::path::{Component, Path, PathBuf};
> +
> +use futures::task::{Context, Poll};
> +use tokio::io::AsyncRead;
> +use tokio::stream::Stream;
> +use anyhow::Result;
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::{
> +    byte_buffer::ByteBuffer,
> +    time::gmtime,
> +};
> +use crc32fast::Hasher;
> +
> +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
> +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
> +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
> +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
> +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
> +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
> +
> +const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
> +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
> +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
> +
> +fn epoch_to_dos(epoch: i64) -> (u16, u16) {
> +    let gmtime = match gmtime(epoch) {
> +        Ok(gmtime) => gmtime,
> +        Err(_) => return (0,0),
> +    };
> +
> +    let seconds = gmtime.tm_sec/2 & 0b11111;
> +    let minutes = gmtime.tm_min & 0xb111111;
> +    let hours = gmtime.tm_hour & 0b11111;
> +    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
> +
> +    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
> +        0
> +    } else {
> +        let day = gmtime.tm_mday & 0b11111;
> +        let month = (gmtime.tm_mon + 1) & 0b1111;
> +        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
> +        ((year<<9)|(month<<5)|(day)) as u16
> +    };
> +
> +    (date, time)
> +}
> +
> +struct Zip64Field {
> +    size: u64,
> +    compressed_size: u64,
> +    offset: u64,
> +}
> +
> +impl Zip64Field {
> +    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
> +        let size :u16 = if include_offset { 24 }  else { 16 };
> +        buf[0..2].copy_from_slice(ZIP64_TYPE);
> +        buf[2..4].copy_from_slice(&size.to_le_bytes());
> +        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
> +        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
> +        if include_offset {
> +            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
> +        }
> +    }
> +}
> +
> +struct FileEntry {
> +    filename: OsString,
> +    mtime: i64,
> +    mode: u16,
> +    zip64: Zip64Field,
> +    crc32: [u8; 4],
> +}
> +
> +impl FileEntry {
> +    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
> +        let mut relpath = PathBuf::new();
> +
> +        for comp in path.as_ref().components() {
> +            match comp {
> +                Component::Normal(_) => {
> +                    relpath.push(comp);
> +                },
> +                _ => {},
> +            }
> +        }
> +
> +        Self {
> +            filename: relpath.into(),
> +            crc32: [0,0,0,0],
> +            mtime,
> +            mode,
> +            zip64: Zip64Field {
> +                size: 0,
> +                compressed_size: 0,
> +                offset: 0,
> +            },
> +        }
> +    }
> +
> +    fn local_file_header(&self, buf: &mut [u8]) -> usize {
> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size: usize = 30 + (filename_len as usize) + 20;
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_NEEDED);
> +        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
> +        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
> +        buf[14..26].copy_from_slice(&[
> +            0x00, 0x00, 0x00, 0x00, // crc32
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
> +
> +        buf[28..30].copy_from_slice(&[20, 00]); // extra field len
> +
> +        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
> +
> +        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
> +
> +        size
> +    }
> +
> +    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
> +        let size = 24;
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
> +        buf[4..8].copy_from_slice(&self.crc32);
> +        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
> +        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
> +
> +        size
> +    }
> +
> +    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size = 46 + 28 + (filename_len as usize);
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_MADE_BY);
> +        buf[6..8].copy_from_slice(VERSION_NEEDED);
> +
> +        buf[8..12].copy_from_slice(&[
> +            0x08, 0x00, // general purpose flags
> +            0x00, 0x00, // compression
> +        ]);
> +
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[12..14].copy_from_slice(&time.to_le_bytes());
> +        buf[14..16].copy_from_slice(&date.to_le_bytes());
> +        buf[16..20].copy_from_slice(&self.crc32);
> +
> +        buf[20..28].copy_from_slice(&[
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +
> +        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
> +        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
> +
> +        buf[32..38].copy_from_slice(&[
> +            0x00, 0x00, // comment len
> +            0x00, 0x00, // start disk
> +            0x00, 0x00, // internal flags
> +        ]);
> +
> +        buf[38..40].copy_from_slice(&[
> +            0x00, 0x00, // upper part of external flags
> +        ]);
> +
> +        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
> +        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
> +
> +        buf[46..46+filename_len as usize].copy_from_slice(&filename);
> +        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
> +
> +        size
> +    }
> +}
> +
> +enum ZipStreamPos {
> +    // index of file and if the header is already finished
> +    FileHeader(usize),
> +    File(usize),
> +    CentralIndex,
> +    End,
> +}
> +
> +/// Shorthand for the necessary metadata for a file inside a ZIP
> +pub type File<R> = (PathBuf, i64, u16, R);

^ Please make this a struct with named members.

> +
> +/// Represents a ZIP file that can be streamed
> +///
> +/// This will create a ZIP file on the fly by getting File entries from the
> +/// given stream, and finishes it when the stream is done
> +/// Example:
> +/// ```no_run
> +/// use proxmox_backup::tools::zip;
> +///
> +/// #[tokio::async]
> +/// async fn main() {
> +///     let (sender, receiver) = tokio::sync::mpsc::channel();
> +///     let zip = zip::ZipEncoderStream::new(receiver);
> +///
> +///     tokio::spawn(async move {
> +///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
> +///     });
> +///
> +///     zip.next().await; // until done
> +///
> +/// }
> +/// ```
> +pub struct ZipEncoderStream<R, S>
> +where
> +    R: AsyncRead + Unpin,
> +    S: Stream<Item = File<R>> + Unpin,
> +{
> +    buf: ByteBuffer,
> +    bytes: usize,
> +    central_dir_offset: usize,
> +    central_dir_size: usize,
> +    cur_reader: Option<R>,
> +    entrycount: usize,
> +    files: VecDeque<FileEntry>,
> +    filestream: Option<S>,
> +    hasher: Option<Hasher>,
> +    pos: ZipStreamPos,
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
> +    pub fn new(stream: S) -> Self {
> +        Self {
> +            buf: ByteBuffer::with_capacity(4*1024*1024),

You use this constant below again as well. Does this need to be used
explicitly at the bottom? If so, please make it a named const.

> +            bytes: 0,
> +            central_dir_offset: 0,
> +            central_dir_size: 0,
> +            cur_reader: None,
> +            entrycount: 0,
> +            files: VecDeque::new(),
> +            filestream: Some(stream),
> +            hasher: None,
> +            pos: ZipStreamPos::FileHeader(0),
> +        }
> +    }
> +
> +    fn eocd(&mut self) -> usize {
> +        let size = if self.central_dir_size > u32::MAX as usize
> +            || self.central_dir_offset > u32::MAX as  usize
> +            || self.entrycount > u16::MAX as usize
> +        {
> +            56+20+22

magic numbers?

> +        } else {
> +            22
> +        };
> +
> +
> +        if self.buf.free_size() < size { return 0; }
> +
> +        let eocd_start = size - 22;
> +        let buf = self.buf.get_free_mut_slice();
> +
> +        let mut count = self.entrycount as u16;
> +        let mut dir_size = self.central_dir_size as u32;
> +        let mut offset = self.central_dir_offset as u32;
> +
> +        if size > 22 {

name consts and structs with `size_of` would be more easy to follow...

> +            count = 0xFFFF;
> +            dir_size = 0xFFFFFFFF;
> +            offset = 0xFFFFFFFF;
> +
> +            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
> +            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
> +            buf[12..14].copy_from_slice(VERSION_MADE_BY);
> +            buf[14..16].copy_from_slice(VERSION_NEEDED);
> +            buf[16..24].copy_from_slice(&[
> +                0x00, 0x00, 0x00, 0x00, // number of disk
> +                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
> +            ]);
> +            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
> +            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
> +            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
> +            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
> +
> +            let locator_offset = self.central_dir_offset + self.central_dir_size;
> +            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
> +            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
> +            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
> +            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
> +        }
> +
> +        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
> +        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
> +            0x00, 0x00, // disk number
> +            0x00, 0x00, // start disk
> +        ]);
> +
> +        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
> +        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
> +
> +        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
> +        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
> +        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
> +
> +        size
> +    }
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
> +    type Item = Result<Vec<u8>, std::io::Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +
> +        loop {
> +            match this.pos {

Please use separate methods for each case.

> +                ZipStreamPos::FileHeader(idx) => {
> +                    if this.files.is_empty() || idx > this.files.len() - 1 {
> +                        if let Some(mut stream) = this.filestream.take() {

Also for this, with some enum return value, because...

> +                            match Pin::new(&mut stream).poll_next(cx) {
> +                                Poll::Ready(Some((path, mtime, mode, file))) => {
> +                                    this.filestream = Some(stream);
> +                                    let entry = FileEntry::new(path, mtime, mode);
> +                                    this.files.push_back(entry);
> +                                    this.cur_reader = Some(file);
> +                                    continue;
> +                                }
> +                                Poll::Pending => {
> +                                    this.filestream = Some(stream);
> +                                    if this.buf.is_empty() {
> +                                        return Poll::Pending;
> +                                    }
> +                                    break;

...because this is quite the jump.

> +                                }
> +                                Poll::Ready(None) => {},
> +                            }
> +                        }
> +                        this.pos = ZipStreamPos::CentralIndex;
> +                        this.central_dir_offset = this.bytes;
> +                        this.entrycount = this.files.len();
> +                        continue;
> +                    }
> +
> +                    let mut entry = &mut this.files[idx];
> +                    entry.zip64.offset = this.bytes as u64;
> +                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
> +                    this.bytes += size;
> +                    this.buf.add_size(size as usize);
> +
> +                    if size == 0 {

Rather put the separating space before the `let size =` line, do this
before the `add`s.

Consider moving the size-adding into a separate method and have the
writer-methods use it already, and have those only return a boolean?

Because...

> +                        break;
> +                    }
> +
> +                    if this.cur_reader.is_some() {
> +                        this.pos = ZipStreamPos::File(idx);
> +                    } else {
> +                        this.pos = ZipStreamPos::FileHeader(idx + 1);
> +                    }
> +
> +                    if this.buf.is_full() {
> +                        break;
> +                    }
> +                },
> +                ZipStreamPos::File(idx) => {
> +                    let mut entry = &mut this.files[idx];
> +                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
> +                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
> +                        Poll::Ready(Ok(n)) => {
> +                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
> +                            this.buf.add_size(n);
> +                            if n == 0 {
> +                                entry.crc32 = hasher.finalize().to_le_bytes();
> +                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
> +                                this.buf.add_size(size);
> +                                this.bytes += size;
> +
> +                                if size == 0 {
> +                                    break;
> +                                }

... same pattern here ...

> +
> +                                this.pos = ZipStreamPos::FileHeader(idx + 1);
> +
> +                                if this.buf.is_full() {
> +                                    break;
> +                                }
> +
> +                                continue;
> +                            }
> +
> +                            this.bytes += n;
> +                            entry.zip64.size += n as u64;
> +                            entry.zip64.compressed_size += n as u64;
> +
> +                            hasher.update(&this.buf[this.buf.len() - n..]);
> +                            this.hasher = Some(hasher);
> +                            this.cur_reader = Some(reader);
> +
> +                            if this.buf.is_full() {
> +                                break;
> +                            }
> +                        }
> +                        Poll::Pending => {
> +                            this.cur_reader = Some(reader);
> +                            if this.buf.is_empty() {
> +                                return Poll::Pending;
> +                            }
> +                            break;
> +                        }
> +                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
> +                    }
> +                },
> +                ZipStreamPos::CentralIndex => {
> +                    let mut finished_central_directory = true;
> +                    while this.files.len() > 0 {
> +                        let file = this.files.pop_front().unwrap();
> +                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
> +                        this.buf.add_size(size);
> +                        if size == 0 {

... and here.

> +                            this.files.push_front(file);
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +
> +                        this.bytes += size;
> +                        this.central_dir_size += size;
> +
> +                        if this.buf.is_full() {
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +                    }
> +
> +                    if !finished_central_directory {
> +                        break;
> +                    }
> +
> +                    let size = this.eocd();
> +                    this.buf.add_size(size);
> +                    if size == 0 {
> +                        break;
> +                    }
> +
> +                    this.pos = ZipStreamPos::End;
> +                    break;
> +                }
> +                ZipStreamPos::End => return Poll::Ready(None)
> +            }
> +        }
> +
> +        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
> +    }
> +}
> -- 
> 2.20.1





More information about the pbs-devel mailing list