[pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module

Wolfgang Bumiller w.bumiller at proxmox.com
Tue Oct 13 13:02:51 CEST 2020


partial review only for now:

On Tue, Oct 13, 2020 at 11:50:41AM +0200, Dominik Csapak wrote:
> this is a module to to stream a zip file in an api call
> the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
> the resulting object itself implements stream for easy use in a
> hyper::Body
> 
> for now, this does not implement compression (uses ZIPs STORE mode), and
> does not support empty directories or hardlinks (or any other special
> files)
> 
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
>  src/tools.rs     |   1 +
>  src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 469 insertions(+)
>  create mode 100644 src/tools/zip.rs
> 
> diff --git a/src/tools.rs b/src/tools.rs
> index 5b244c02..28f0338b 100644
> --- a/src/tools.rs
> +++ b/src/tools.rs
> @@ -35,6 +35,7 @@ pub mod nom;
>  pub mod logrotate;
>  pub mod loopdev;
>  pub mod fuse_loop;
> +pub mod zip;
>  
>  mod parallel_handler;
>  pub use parallel_handler::*;
> diff --git a/src/tools/zip.rs b/src/tools/zip.rs
> new file mode 100644
> index 00000000..acb193cb
> --- /dev/null
> +++ b/src/tools/zip.rs
> @@ -0,0 +1,468 @@
> +use std::ffi::OsString;
> +use std::collections::VecDeque;
> +use std::os::unix::ffi::OsStrExt;
> +use std::pin::Pin;
> +use std::path::{Component, Path, PathBuf};
> +
> +use futures::task::{Context, Poll};
> +use tokio::io::AsyncRead;
> +use tokio::stream::Stream;
> +use anyhow::Result;
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::{
> +    byte_buffer::ByteBuffer,
> +    time::gmtime,
> +};
> +use crc32fast::Hasher;
> +
> +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
> +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
> +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
> +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
> +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
> +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
> +
> +const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
> +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
> +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
> +
> +fn epoch_to_dos(epoch: i64) -> (u16, u16) {
> +    let gmtime = match gmtime(epoch) {
> +        Ok(gmtime) => gmtime,
> +        Err(_) => return (0,0),
> +    };
> +
> +    let seconds = gmtime.tm_sec/2 & 0b11111;

Instead of stripping spaces, please add parenthesis for grouping (clippy
hint).
Also, for binary literals please use underscores for readability (group
them in nibbles, that way a group always represents a hex digit):

    let seconds = (gmtime.tm_sec / 2) & 0b1_1111;

same for all the ones below

> +    let minutes = gmtime.tm_min & 0xb111111;
> +    let hours = gmtime.tm_hour & 0b11111;
> +    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
> +
> +    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
> +        0
> +    } else {
> +        let day = gmtime.tm_mday & 0b11111;
> +        let month = (gmtime.tm_mon + 1) & 0b1111;
> +        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
> +        ((year<<9)|(month<<5)|(day)) as u16
> +    };
> +
> +    (date, time)
> +}
> +
> +struct Zip64Field {
> +    size: u64,
> +    compressed_size: u64,
> +    offset: u64,
> +}
> +
> +impl Zip64Field {
> +    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
> +        let size :u16 = if include_offset { 24 }  else { 16 };
> +        buf[0..2].copy_from_slice(ZIP64_TYPE);
> +        buf[2..4].copy_from_slice(&size.to_le_bytes());
> +        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
> +        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
> +        if include_offset {
> +            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
> +        }
> +    }
> +}
> +
> +struct FileEntry {
> +    filename: OsString,
> +    mtime: i64,
> +    mode: u16,
> +    zip64: Zip64Field,
> +    crc32: [u8; 4],
> +}
> +
> +impl FileEntry {
> +    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
> +        let mut relpath = PathBuf::new();
> +
> +        for comp in path.as_ref().components() {
> +            match comp {
> +                Component::Normal(_) => {
> +                    relpath.push(comp);
> +                },
> +                _ => {},
> +            }

clipy hint: `if let` would be shorter here:

          if let Component::Normal(_) = comp {
              relpath.push(comp);
          }


> +        }
> +
> +        Self {
> +            filename: relpath.into(),
> +            crc32: [0,0,0,0],
> +            mtime,
> +            mode,
> +            zip64: Zip64Field {
> +                size: 0,
> +                compressed_size: 0,
> +                offset: 0,
> +            },
> +        }
> +    }
> +
> +    fn local_file_header(&self, buf: &mut [u8]) -> usize {

method name & signature doesn't really make the purpose of this function clear

> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size: usize = 30 + (filename_len as usize) + 20;
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_NEEDED);
> +        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
> +        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
> +        buf[14..26].copy_from_slice(&[
> +            0x00, 0x00, 0x00, 0x00, // crc32
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
> +
> +        buf[28..30].copy_from_slice(&[20, 00]); // extra field len

^ Is there a reason you chose to do it this way than, say, define a

    #[derive(Endian)]
    #[repr(C)]
    struct LocalFileHeader {
        <contents up to the file name>
    }

you could do this for all structs and have a helper like:

    fn write_struct<E, T>(output: &mut T, data: E) -> io::Result<()>
    where
        T: Write + ?Sized,
        E: Endian,
    {
        let data = data.to_le();
        output.write_all(unsafe {
            std::slice::from_raw_parts(
                &data as *const E as *const u8,
                size_of_val(&data),
            )
        })
    }

and use `write_struct(&mut buffer, LocalFileHeader { ... })?`

> +
> +        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
> +
> +        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
> +
> +        size
> +    }
> +
> +    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
> +        let size = 24;
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
> +        buf[4..8].copy_from_slice(&self.crc32);
> +        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
> +        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
> +
> +        size
> +    }
> +
> +    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size = 46 + 28 + (filename_len as usize);
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_MADE_BY);
> +        buf[6..8].copy_from_slice(VERSION_NEEDED);
> +
> +        buf[8..12].copy_from_slice(&[
> +            0x08, 0x00, // general purpose flags
> +            0x00, 0x00, // compression
> +        ]);
> +
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[12..14].copy_from_slice(&time.to_le_bytes());
> +        buf[14..16].copy_from_slice(&date.to_le_bytes());
> +        buf[16..20].copy_from_slice(&self.crc32);
> +
> +        buf[20..28].copy_from_slice(&[
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +
> +        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
> +        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
> +
> +        buf[32..38].copy_from_slice(&[
> +            0x00, 0x00, // comment len
> +            0x00, 0x00, // start disk
> +            0x00, 0x00, // internal flags
> +        ]);
> +
> +        buf[38..40].copy_from_slice(&[
> +            0x00, 0x00, // upper part of external flags
> +        ]);
> +
> +        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
> +        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
> +
> +        buf[46..46+filename_len as usize].copy_from_slice(&filename);
> +        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
> +
> +        size
> +    }
> +}
> +
> +enum ZipStreamPos {
> +    // index of file and if the header is already finished
> +    FileHeader(usize),
> +    File(usize),
> +    CentralIndex,
> +    End,
> +}
> +
> +/// Shorthand for the necessary metadata for a file inside a ZIP
> +pub type File<R> = (PathBuf, i64, u16, R);
> +
> +/// Represents a ZIP file that can be streamed
> +///
> +/// This will create a ZIP file on the fly by getting File entries from the
> +/// given stream, and finishes it when the stream is done
> +/// Example:
> +/// ```no_run
> +/// use proxmox_backup::tools::zip;
> +///
> +/// #[tokio::async]
> +/// async fn main() {
> +///     let (sender, receiver) = tokio::sync::mpsc::channel();
> +///     let zip = zip::ZipEncoderStream::new(receiver);
> +///
> +///     tokio::spawn(async move {
> +///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
> +///     });
> +///
> +///     zip.next().await; // until done
> +///
> +/// }
> +/// ```
> +pub struct ZipEncoderStream<R, S>
> +where
> +    R: AsyncRead + Unpin,
> +    S: Stream<Item = File<R>> + Unpin,
> +{
> +    buf: ByteBuffer,
> +    bytes: usize,
> +    central_dir_offset: usize,
> +    central_dir_size: usize,
> +    cur_reader: Option<R>,
> +    entrycount: usize,
> +    files: VecDeque<FileEntry>,
> +    filestream: Option<S>,
> +    hasher: Option<Hasher>,
> +    pos: ZipStreamPos,
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
> +    pub fn new(stream: S) -> Self {
> +        Self {
> +            buf: ByteBuffer::with_capacity(4*1024*1024),
> +            bytes: 0,
> +            central_dir_offset: 0,
> +            central_dir_size: 0,
> +            cur_reader: None,
> +            entrycount: 0,
> +            files: VecDeque::new(),
> +            filestream: Some(stream),
> +            hasher: None,
> +            pos: ZipStreamPos::FileHeader(0),
> +        }
> +    }
> +
> +    fn eocd(&mut self) -> usize {
> +        let size = if self.central_dir_size > u32::MAX as usize
> +            || self.central_dir_offset > u32::MAX as  usize
> +            || self.entrycount > u16::MAX as usize
> +        {
> +            56+20+22
> +        } else {
> +            22
> +        };
> +
> +
> +        if self.buf.free_size() < size { return 0; }
> +
> +        let eocd_start = size - 22;
> +        let buf = self.buf.get_free_mut_slice();
> +
> +        let mut count = self.entrycount as u16;
> +        let mut dir_size = self.central_dir_size as u32;
> +        let mut offset = self.central_dir_offset as u32;
> +
> +        if size > 22 {
> +            count = 0xFFFF;
> +            dir_size = 0xFFFFFFFF;
> +            offset = 0xFFFFFFFF;
> +
> +            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
> +            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
> +            buf[12..14].copy_from_slice(VERSION_MADE_BY);
> +            buf[14..16].copy_from_slice(VERSION_NEEDED);
> +            buf[16..24].copy_from_slice(&[
> +                0x00, 0x00, 0x00, 0x00, // number of disk
> +                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
> +            ]);
> +            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
> +            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
> +            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
> +            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
> +
> +            let locator_offset = self.central_dir_offset + self.central_dir_size;
> +            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
> +            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
> +            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
> +            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
> +        }
> +
> +        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
> +        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
> +            0x00, 0x00, // disk number
> +            0x00, 0x00, // start disk
> +        ]);
> +
> +        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
> +        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
> +
> +        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
> +        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
> +        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
> +
> +        size
> +    }
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
> +    type Item = Result<Vec<u8>, std::io::Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +
> +        loop {
> +            match this.pos {
> +                ZipStreamPos::FileHeader(idx) => {
> +                    if this.files.is_empty() || idx > this.files.len() - 1 {
> +                        if let Some(mut stream) = this.filestream.take() {
> +                            match Pin::new(&mut stream).poll_next(cx) {
> +                                Poll::Ready(Some((path, mtime, mode, file))) => {
> +                                    this.filestream = Some(stream);
> +                                    let entry = FileEntry::new(path, mtime, mode);
> +                                    this.files.push_back(entry);
> +                                    this.cur_reader = Some(file);
> +                                    continue;
> +                                }
> +                                Poll::Pending => {
> +                                    this.filestream = Some(stream);
> +                                    if this.buf.is_empty() {
> +                                        return Poll::Pending;
> +                                    }
> +                                    break;
> +                                }
> +                                Poll::Ready(None) => {},
> +                            }
> +                        }
> +                        this.pos = ZipStreamPos::CentralIndex;
> +                        this.central_dir_offset = this.bytes;
> +                        this.entrycount = this.files.len();
> +                        continue;
> +                    }
> +
> +                    let mut entry = &mut this.files[idx];
> +                    entry.zip64.offset = this.bytes as u64;
> +                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
> +                    this.bytes += size;
> +                    this.buf.add_size(size as usize);
> +
> +                    if size == 0 {
> +                        break;
> +                    }
> +
> +                    if this.cur_reader.is_some() {
> +                        this.pos = ZipStreamPos::File(idx);
> +                    } else {
> +                        this.pos = ZipStreamPos::FileHeader(idx + 1);
> +                    }
> +
> +                    if this.buf.is_full() {
> +                        break;
> +                    }
> +                },
> +                ZipStreamPos::File(idx) => {
> +                    let mut entry = &mut this.files[idx];
> +                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
> +                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
> +                        Poll::Ready(Ok(n)) => {
> +                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
> +                            this.buf.add_size(n);
> +                            if n == 0 {
> +                                entry.crc32 = hasher.finalize().to_le_bytes();
> +                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
> +                                this.buf.add_size(size);
> +                                this.bytes += size;
> +
> +                                if size == 0 {
> +                                    break;
> +                                }
> +
> +                                this.pos = ZipStreamPos::FileHeader(idx + 1);
> +
> +                                if this.buf.is_full() {
> +                                    break;
> +                                }
> +
> +                                continue;
> +                            }
> +
> +                            this.bytes += n;
> +                            entry.zip64.size += n as u64;
> +                            entry.zip64.compressed_size += n as u64;
> +
> +                            hasher.update(&this.buf[this.buf.len() - n..]);
> +                            this.hasher = Some(hasher);
> +                            this.cur_reader = Some(reader);
> +
> +                            if this.buf.is_full() {
> +                                break;
> +                            }
> +                        }
> +                        Poll::Pending => {
> +                            this.cur_reader = Some(reader);
> +                            if this.buf.is_empty() {
> +                                return Poll::Pending;
> +                            }
> +                            break;
> +                        }
> +                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
> +                    }
> +                },
> +                ZipStreamPos::CentralIndex => {
> +                    let mut finished_central_directory = true;
> +                    while this.files.len() > 0 {
> +                        let file = this.files.pop_front().unwrap();
> +                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
> +                        this.buf.add_size(size);
> +                        if size == 0 {
> +                            this.files.push_front(file);
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +
> +                        this.bytes += size;
> +                        this.central_dir_size += size;
> +
> +                        if this.buf.is_full() {
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +                    }
> +
> +                    if !finished_central_directory {
> +                        break;
> +                    }
> +
> +                    let size = this.eocd();
> +                    this.buf.add_size(size);
> +                    if size == 0 {
> +                        break;
> +                    }
> +
> +                    this.pos = ZipStreamPos::End;
> +                    break;
> +                }
> +                ZipStreamPos::End => return Poll::Ready(None)
> +            }
> +        }
> +
> +        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
> +    }
> +}
> -- 
> 2.20.1





More information about the pbs-devel mailing list