[pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module

Dominik Csapak d.csapak at proxmox.com
Tue Oct 13 11:50:41 CEST 2020


this is a module to to stream a zip file in an api call
the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
the resulting object itself implements stream for easy use in a
hyper::Body

for now, this does not implement compression (uses ZIPs STORE mode), and
does not support empty directories or hardlinks (or any other special
files)

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 src/tools.rs     |   1 +
 src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 469 insertions(+)
 create mode 100644 src/tools/zip.rs

diff --git a/src/tools.rs b/src/tools.rs
index 5b244c02..28f0338b 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -35,6 +35,7 @@ pub mod nom;
 pub mod logrotate;
 pub mod loopdev;
 pub mod fuse_loop;
+pub mod zip;
 
 mod parallel_handler;
 pub use parallel_handler::*;
diff --git a/src/tools/zip.rs b/src/tools/zip.rs
new file mode 100644
index 00000000..acb193cb
--- /dev/null
+++ b/src/tools/zip.rs
@@ -0,0 +1,468 @@
+use std::ffi::OsString;
+use std::collections::VecDeque;
+use std::os::unix::ffi::OsStrExt;
+use std::pin::Pin;
+use std::path::{Component, Path, PathBuf};
+
+use futures::task::{Context, Poll};
+use tokio::io::AsyncRead;
+use tokio::stream::Stream;
+use anyhow::Result;
+
+use proxmox::io_format_err;
+use proxmox::tools::{
+    byte_buffer::ByteBuffer,
+    time::gmtime,
+};
+use crc32fast::Hasher;
+
+const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
+const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
+const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
+const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
+const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
+const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
+
+const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
+const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
+const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
+
+fn epoch_to_dos(epoch: i64) -> (u16, u16) {
+    let gmtime = match gmtime(epoch) {
+        Ok(gmtime) => gmtime,
+        Err(_) => return (0,0),
+    };
+
+    let seconds = gmtime.tm_sec/2 & 0b11111;
+    let minutes = gmtime.tm_min & 0xb111111;
+    let hours = gmtime.tm_hour & 0b11111;
+    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
+
+    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
+        0
+    } else {
+        let day = gmtime.tm_mday & 0b11111;
+        let month = (gmtime.tm_mon + 1) & 0b1111;
+        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
+        ((year<<9)|(month<<5)|(day)) as u16
+    };
+
+    (date, time)
+}
+
+struct Zip64Field {
+    size: u64,
+    compressed_size: u64,
+    offset: u64,
+}
+
+impl Zip64Field {
+    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
+        let size :u16 = if include_offset { 24 }  else { 16 };
+        buf[0..2].copy_from_slice(ZIP64_TYPE);
+        buf[2..4].copy_from_slice(&size.to_le_bytes());
+        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
+        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
+        if include_offset {
+            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
+        }
+    }
+}
+
+struct FileEntry {
+    filename: OsString,
+    mtime: i64,
+    mode: u16,
+    zip64: Zip64Field,
+    crc32: [u8; 4],
+}
+
+impl FileEntry {
+    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
+        let mut relpath = PathBuf::new();
+
+        for comp in path.as_ref().components() {
+            match comp {
+                Component::Normal(_) => {
+                    relpath.push(comp);
+                },
+                _ => {},
+            }
+        }
+
+        Self {
+            filename: relpath.into(),
+            crc32: [0,0,0,0],
+            mtime,
+            mode,
+            zip64: Zip64Field {
+                size: 0,
+                compressed_size: 0,
+                offset: 0,
+            },
+        }
+    }
+
+    fn local_file_header(&self, buf: &mut [u8]) -> usize {
+        let filename = self.filename.as_bytes();
+        let filename_len: u16 = filename.len() as u16;
+        let size: usize = 30 + (filename_len as usize) + 20;
+
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
+        buf[4..6].copy_from_slice(VERSION_NEEDED);
+        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
+        let (date, time) = epoch_to_dos(self.mtime);
+
+        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
+        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
+        buf[14..26].copy_from_slice(&[
+            0x00, 0x00, 0x00, 0x00, // crc32
+            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+        ]);
+        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
+
+        buf[28..30].copy_from_slice(&[20, 00]); // extra field len
+
+        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
+
+        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
+
+        size
+    }
+
+    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
+        let size = 24;
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
+        buf[4..8].copy_from_slice(&self.crc32);
+        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
+        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
+
+        size
+    }
+
+    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
+        let filename = self.filename.as_bytes();
+        let filename_len: u16 = filename.len() as u16;
+        let size = 46 + 28 + (filename_len as usize);
+
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
+        buf[4..6].copy_from_slice(VERSION_MADE_BY);
+        buf[6..8].copy_from_slice(VERSION_NEEDED);
+
+        buf[8..12].copy_from_slice(&[
+            0x08, 0x00, // general purpose flags
+            0x00, 0x00, // compression
+        ]);
+
+        let (date, time) = epoch_to_dos(self.mtime);
+
+        buf[12..14].copy_from_slice(&time.to_le_bytes());
+        buf[14..16].copy_from_slice(&date.to_le_bytes());
+        buf[16..20].copy_from_slice(&self.crc32);
+
+        buf[20..28].copy_from_slice(&[
+            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+        ]);
+
+        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
+        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
+
+        buf[32..38].copy_from_slice(&[
+            0x00, 0x00, // comment len
+            0x00, 0x00, // start disk
+            0x00, 0x00, // internal flags
+        ]);
+
+        buf[38..40].copy_from_slice(&[
+            0x00, 0x00, // upper part of external flags
+        ]);
+
+        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
+        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
+
+        buf[46..46+filename_len as usize].copy_from_slice(&filename);
+        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
+
+        size
+    }
+}
+
+enum ZipStreamPos {
+    // index of file and if the header is already finished
+    FileHeader(usize),
+    File(usize),
+    CentralIndex,
+    End,
+}
+
+/// Shorthand for the necessary metadata for a file inside a ZIP
+pub type File<R> = (PathBuf, i64, u16, R);
+
+/// Represents a ZIP file that can be streamed
+///
+/// This will create a ZIP file on the fly by getting File entries from the
+/// given stream, and finishes it when the stream is done
+/// Example:
+/// ```no_run
+/// use proxmox_backup::tools::zip;
+///
+/// #[tokio::async]
+/// async fn main() {
+///     let (sender, receiver) = tokio::sync::mpsc::channel();
+///     let zip = zip::ZipEncoderStream::new(receiver);
+///
+///     tokio::spawn(async move {
+///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
+///     });
+///
+///     zip.next().await; // until done
+///
+/// }
+/// ```
+pub struct ZipEncoderStream<R, S>
+where
+    R: AsyncRead + Unpin,
+    S: Stream<Item = File<R>> + Unpin,
+{
+    buf: ByteBuffer,
+    bytes: usize,
+    central_dir_offset: usize,
+    central_dir_size: usize,
+    cur_reader: Option<R>,
+    entrycount: usize,
+    files: VecDeque<FileEntry>,
+    filestream: Option<S>,
+    hasher: Option<Hasher>,
+    pos: ZipStreamPos,
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
+    pub fn new(stream: S) -> Self {
+        Self {
+            buf: ByteBuffer::with_capacity(4*1024*1024),
+            bytes: 0,
+            central_dir_offset: 0,
+            central_dir_size: 0,
+            cur_reader: None,
+            entrycount: 0,
+            files: VecDeque::new(),
+            filestream: Some(stream),
+            hasher: None,
+            pos: ZipStreamPos::FileHeader(0),
+        }
+    }
+
+    fn eocd(&mut self) -> usize {
+        let size = if self.central_dir_size > u32::MAX as usize
+            || self.central_dir_offset > u32::MAX as  usize
+            || self.entrycount > u16::MAX as usize
+        {
+            56+20+22
+        } else {
+            22
+        };
+
+
+        if self.buf.free_size() < size { return 0; }
+
+        let eocd_start = size - 22;
+        let buf = self.buf.get_free_mut_slice();
+
+        let mut count = self.entrycount as u16;
+        let mut dir_size = self.central_dir_size as u32;
+        let mut offset = self.central_dir_offset as u32;
+
+        if size > 22 {
+            count = 0xFFFF;
+            dir_size = 0xFFFFFFFF;
+            offset = 0xFFFFFFFF;
+
+            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
+            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
+            buf[12..14].copy_from_slice(VERSION_MADE_BY);
+            buf[14..16].copy_from_slice(VERSION_NEEDED);
+            buf[16..24].copy_from_slice(&[
+                0x00, 0x00, 0x00, 0x00, // number of disk
+                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
+            ]);
+            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
+            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
+            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
+            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
+
+            let locator_offset = self.central_dir_offset + self.central_dir_size;
+            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
+            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
+            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
+            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
+        }
+
+        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
+        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
+            0x00, 0x00, // disk number
+            0x00, 0x00, // start disk
+        ]);
+
+        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
+        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
+
+        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
+        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
+        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
+
+        size
+    }
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
+    type Item = Result<Vec<u8>, std::io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.pos {
+                ZipStreamPos::FileHeader(idx) => {
+                    if this.files.is_empty() || idx > this.files.len() - 1 {
+                        if let Some(mut stream) = this.filestream.take() {
+                            match Pin::new(&mut stream).poll_next(cx) {
+                                Poll::Ready(Some((path, mtime, mode, file))) => {
+                                    this.filestream = Some(stream);
+                                    let entry = FileEntry::new(path, mtime, mode);
+                                    this.files.push_back(entry);
+                                    this.cur_reader = Some(file);
+                                    continue;
+                                }
+                                Poll::Pending => {
+                                    this.filestream = Some(stream);
+                                    if this.buf.is_empty() {
+                                        return Poll::Pending;
+                                    }
+                                    break;
+                                }
+                                Poll::Ready(None) => {},
+                            }
+                        }
+                        this.pos = ZipStreamPos::CentralIndex;
+                        this.central_dir_offset = this.bytes;
+                        this.entrycount = this.files.len();
+                        continue;
+                    }
+
+                    let mut entry = &mut this.files[idx];
+                    entry.zip64.offset = this.bytes as u64;
+                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
+                    this.bytes += size;
+                    this.buf.add_size(size as usize);
+
+                    if size == 0 {
+                        break;
+                    }
+
+                    if this.cur_reader.is_some() {
+                        this.pos = ZipStreamPos::File(idx);
+                    } else {
+                        this.pos = ZipStreamPos::FileHeader(idx + 1);
+                    }
+
+                    if this.buf.is_full() {
+                        break;
+                    }
+                },
+                ZipStreamPos::File(idx) => {
+                    let mut entry = &mut this.files[idx];
+                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
+                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
+                        Poll::Ready(Ok(n)) => {
+                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
+                            this.buf.add_size(n);
+                            if n == 0 {
+                                entry.crc32 = hasher.finalize().to_le_bytes();
+                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
+                                this.buf.add_size(size);
+                                this.bytes += size;
+
+                                if size == 0 {
+                                    break;
+                                }
+
+                                this.pos = ZipStreamPos::FileHeader(idx + 1);
+
+                                if this.buf.is_full() {
+                                    break;
+                                }
+
+                                continue;
+                            }
+
+                            this.bytes += n;
+                            entry.zip64.size += n as u64;
+                            entry.zip64.compressed_size += n as u64;
+
+                            hasher.update(&this.buf[this.buf.len() - n..]);
+                            this.hasher = Some(hasher);
+                            this.cur_reader = Some(reader);
+
+                            if this.buf.is_full() {
+                                break;
+                            }
+                        }
+                        Poll::Pending => {
+                            this.cur_reader = Some(reader);
+                            if this.buf.is_empty() {
+                                return Poll::Pending;
+                            }
+                            break;
+                        }
+                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
+                    }
+                },
+                ZipStreamPos::CentralIndex => {
+                    let mut finished_central_directory = true;
+                    while this.files.len() > 0 {
+                        let file = this.files.pop_front().unwrap();
+                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
+                        this.buf.add_size(size);
+                        if size == 0 {
+                            this.files.push_front(file);
+                            finished_central_directory = false;
+                            break;
+                        }
+
+                        this.bytes += size;
+                        this.central_dir_size += size;
+
+                        if this.buf.is_full() {
+                            finished_central_directory = false;
+                            break;
+                        }
+                    }
+
+                    if !finished_central_directory {
+                        break;
+                    }
+
+                    let size = this.eocd();
+                    this.buf.add_size(size);
+                    if size == 0 {
+                        break;
+                    }
+
+                    this.pos = ZipStreamPos::End;
+                    break;
+                }
+                ZipStreamPos::End => return Poll::Ready(None)
+            }
+        }
+
+        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
+    }
+}
-- 
2.20.1






More information about the pbs-devel mailing list