[pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module
Dominik Csapak
d.csapak at proxmox.com
Tue Oct 13 11:50:41 CEST 2020
this is a module to to stream a zip file in an api call
the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
the resulting object itself implements stream for easy use in a
hyper::Body
for now, this does not implement compression (uses ZIPs STORE mode), and
does not support empty directories or hardlinks (or any other special
files)
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
src/tools.rs | 1 +
src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 469 insertions(+)
create mode 100644 src/tools/zip.rs
diff --git a/src/tools.rs b/src/tools.rs
index 5b244c02..28f0338b 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -35,6 +35,7 @@ pub mod nom;
pub mod logrotate;
pub mod loopdev;
pub mod fuse_loop;
+pub mod zip;
mod parallel_handler;
pub use parallel_handler::*;
diff --git a/src/tools/zip.rs b/src/tools/zip.rs
new file mode 100644
index 00000000..acb193cb
--- /dev/null
+++ b/src/tools/zip.rs
@@ -0,0 +1,468 @@
+use std::ffi::OsString;
+use std::collections::VecDeque;
+use std::os::unix::ffi::OsStrExt;
+use std::pin::Pin;
+use std::path::{Component, Path, PathBuf};
+
+use futures::task::{Context, Poll};
+use tokio::io::AsyncRead;
+use tokio::stream::Stream;
+use anyhow::Result;
+
+use proxmox::io_format_err;
+use proxmox::tools::{
+ byte_buffer::ByteBuffer,
+ time::gmtime,
+};
+use crc32fast::Hasher;
+
+const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
+const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
+const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
+const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
+const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
+const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
+
+const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
+const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
+const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
+
+fn epoch_to_dos(epoch: i64) -> (u16, u16) {
+ let gmtime = match gmtime(epoch) {
+ Ok(gmtime) => gmtime,
+ Err(_) => return (0,0),
+ };
+
+ let seconds = gmtime.tm_sec/2 & 0b11111;
+ let minutes = gmtime.tm_min & 0xb111111;
+ let hours = gmtime.tm_hour & 0b11111;
+ let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
+
+ let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
+ 0
+ } else {
+ let day = gmtime.tm_mday & 0b11111;
+ let month = (gmtime.tm_mon + 1) & 0b1111;
+ let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
+ ((year<<9)|(month<<5)|(day)) as u16
+ };
+
+ (date, time)
+}
+
+struct Zip64Field {
+ size: u64,
+ compressed_size: u64,
+ offset: u64,
+}
+
+impl Zip64Field {
+ fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
+ let size :u16 = if include_offset { 24 } else { 16 };
+ buf[0..2].copy_from_slice(ZIP64_TYPE);
+ buf[2..4].copy_from_slice(&size.to_le_bytes());
+ buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
+ buf[12..20].copy_from_slice(&self.size.to_le_bytes());
+ if include_offset {
+ buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
+ }
+ }
+}
+
+struct FileEntry {
+ filename: OsString,
+ mtime: i64,
+ mode: u16,
+ zip64: Zip64Field,
+ crc32: [u8; 4],
+}
+
+impl FileEntry {
+ fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
+ let mut relpath = PathBuf::new();
+
+ for comp in path.as_ref().components() {
+ match comp {
+ Component::Normal(_) => {
+ relpath.push(comp);
+ },
+ _ => {},
+ }
+ }
+
+ Self {
+ filename: relpath.into(),
+ crc32: [0,0,0,0],
+ mtime,
+ mode,
+ zip64: Zip64Field {
+ size: 0,
+ compressed_size: 0,
+ offset: 0,
+ },
+ }
+ }
+
+ fn local_file_header(&self, buf: &mut [u8]) -> usize {
+ let filename = self.filename.as_bytes();
+ let filename_len: u16 = filename.len() as u16;
+ let size: usize = 30 + (filename_len as usize) + 20;
+
+ if size > buf.len() { return 0; }
+
+ buf[0..4].copy_from_slice(LOCAL_FH_SIG);
+ buf[4..6].copy_from_slice(VERSION_NEEDED);
+ buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
+ let (date, time) = epoch_to_dos(self.mtime);
+
+ buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
+ buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
+ buf[14..26].copy_from_slice(&[
+ 0x00, 0x00, 0x00, 0x00, // crc32
+ 0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+ 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+ ]);
+ buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
+
+ buf[28..30].copy_from_slice(&[20, 00]); // extra field len
+
+ buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
+
+ self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
+
+ size
+ }
+
+ fn local_file_footer(&self, buf: &mut [u8]) -> usize {
+ let size = 24;
+ if size > buf.len() { return 0; }
+
+ buf[0..4].copy_from_slice(LOCAL_FF_SIG);
+ buf[4..8].copy_from_slice(&self.crc32);
+ buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
+ buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
+
+ size
+ }
+
+ fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
+ let filename = self.filename.as_bytes();
+ let filename_len: u16 = filename.len() as u16;
+ let size = 46 + 28 + (filename_len as usize);
+
+ if size > buf.len() { return 0; }
+
+ buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
+ buf[4..6].copy_from_slice(VERSION_MADE_BY);
+ buf[6..8].copy_from_slice(VERSION_NEEDED);
+
+ buf[8..12].copy_from_slice(&[
+ 0x08, 0x00, // general purpose flags
+ 0x00, 0x00, // compression
+ ]);
+
+ let (date, time) = epoch_to_dos(self.mtime);
+
+ buf[12..14].copy_from_slice(&time.to_le_bytes());
+ buf[14..16].copy_from_slice(&date.to_le_bytes());
+ buf[16..20].copy_from_slice(&self.crc32);
+
+ buf[20..28].copy_from_slice(&[
+ 0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+ 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+ ]);
+
+ buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
+ buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
+
+ buf[32..38].copy_from_slice(&[
+ 0x00, 0x00, // comment len
+ 0x00, 0x00, // start disk
+ 0x00, 0x00, // internal flags
+ ]);
+
+ buf[38..40].copy_from_slice(&[
+ 0x00, 0x00, // upper part of external flags
+ ]);
+
+ buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
+ buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
+
+ buf[46..46+filename_len as usize].copy_from_slice(&filename);
+ self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
+
+ size
+ }
+}
+
+enum ZipStreamPos {
+ // index of file and if the header is already finished
+ FileHeader(usize),
+ File(usize),
+ CentralIndex,
+ End,
+}
+
+/// Shorthand for the necessary metadata for a file inside a ZIP
+pub type File<R> = (PathBuf, i64, u16, R);
+
+/// Represents a ZIP file that can be streamed
+///
+/// This will create a ZIP file on the fly by getting File entries from the
+/// given stream, and finishes it when the stream is done
+/// Example:
+/// ```no_run
+/// use proxmox_backup::tools::zip;
+///
+/// #[tokio::async]
+/// async fn main() {
+/// let (sender, receiver) = tokio::sync::mpsc::channel();
+/// let zip = zip::ZipEncoderStream::new(receiver);
+///
+/// tokio::spawn(async move {
+/// sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
+/// });
+///
+/// zip.next().await; // until done
+///
+/// }
+/// ```
+pub struct ZipEncoderStream<R, S>
+where
+ R: AsyncRead + Unpin,
+ S: Stream<Item = File<R>> + Unpin,
+{
+ buf: ByteBuffer,
+ bytes: usize,
+ central_dir_offset: usize,
+ central_dir_size: usize,
+ cur_reader: Option<R>,
+ entrycount: usize,
+ files: VecDeque<FileEntry>,
+ filestream: Option<S>,
+ hasher: Option<Hasher>,
+ pos: ZipStreamPos,
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
+ pub fn new(stream: S) -> Self {
+ Self {
+ buf: ByteBuffer::with_capacity(4*1024*1024),
+ bytes: 0,
+ central_dir_offset: 0,
+ central_dir_size: 0,
+ cur_reader: None,
+ entrycount: 0,
+ files: VecDeque::new(),
+ filestream: Some(stream),
+ hasher: None,
+ pos: ZipStreamPos::FileHeader(0),
+ }
+ }
+
+ fn eocd(&mut self) -> usize {
+ let size = if self.central_dir_size > u32::MAX as usize
+ || self.central_dir_offset > u32::MAX as usize
+ || self.entrycount > u16::MAX as usize
+ {
+ 56+20+22
+ } else {
+ 22
+ };
+
+
+ if self.buf.free_size() < size { return 0; }
+
+ let eocd_start = size - 22;
+ let buf = self.buf.get_free_mut_slice();
+
+ let mut count = self.entrycount as u16;
+ let mut dir_size = self.central_dir_size as u32;
+ let mut offset = self.central_dir_offset as u32;
+
+ if size > 22 {
+ count = 0xFFFF;
+ dir_size = 0xFFFFFFFF;
+ offset = 0xFFFFFFFF;
+
+ buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
+ buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
+ buf[12..14].copy_from_slice(VERSION_MADE_BY);
+ buf[14..16].copy_from_slice(VERSION_NEEDED);
+ buf[16..24].copy_from_slice(&[
+ 0x00, 0x00, 0x00, 0x00, // number of disk
+ 0x00, 0x00, 0x00, 0x00, // number of disk of central directory
+ ]);
+ buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
+ buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
+ buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
+ buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
+
+ let locator_offset = self.central_dir_offset + self.central_dir_size;
+ buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
+ buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
+ buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
+ buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
+ }
+
+ buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
+ buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
+ 0x00, 0x00, // disk number
+ 0x00, 0x00, // start disk
+ ]);
+
+ buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
+ buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
+
+ buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
+ buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
+ buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
+
+ size
+ }
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
+ type Item = Result<Vec<u8>, std::io::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.pos {
+ ZipStreamPos::FileHeader(idx) => {
+ if this.files.is_empty() || idx > this.files.len() - 1 {
+ if let Some(mut stream) = this.filestream.take() {
+ match Pin::new(&mut stream).poll_next(cx) {
+ Poll::Ready(Some((path, mtime, mode, file))) => {
+ this.filestream = Some(stream);
+ let entry = FileEntry::new(path, mtime, mode);
+ this.files.push_back(entry);
+ this.cur_reader = Some(file);
+ continue;
+ }
+ Poll::Pending => {
+ this.filestream = Some(stream);
+ if this.buf.is_empty() {
+ return Poll::Pending;
+ }
+ break;
+ }
+ Poll::Ready(None) => {},
+ }
+ }
+ this.pos = ZipStreamPos::CentralIndex;
+ this.central_dir_offset = this.bytes;
+ this.entrycount = this.files.len();
+ continue;
+ }
+
+ let mut entry = &mut this.files[idx];
+ entry.zip64.offset = this.bytes as u64;
+ let size = entry.local_file_header(this.buf.get_free_mut_slice());
+ this.bytes += size;
+ this.buf.add_size(size as usize);
+
+ if size == 0 {
+ break;
+ }
+
+ if this.cur_reader.is_some() {
+ this.pos = ZipStreamPos::File(idx);
+ } else {
+ this.pos = ZipStreamPos::FileHeader(idx + 1);
+ }
+
+ if this.buf.is_full() {
+ break;
+ }
+ },
+ ZipStreamPos::File(idx) => {
+ let mut entry = &mut this.files[idx];
+ let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
+ match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
+ Poll::Ready(Ok(n)) => {
+ let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
+ this.buf.add_size(n);
+ if n == 0 {
+ entry.crc32 = hasher.finalize().to_le_bytes();
+ let size = entry.local_file_footer(this.buf.get_free_mut_slice());
+ this.buf.add_size(size);
+ this.bytes += size;
+
+ if size == 0 {
+ break;
+ }
+
+ this.pos = ZipStreamPos::FileHeader(idx + 1);
+
+ if this.buf.is_full() {
+ break;
+ }
+
+ continue;
+ }
+
+ this.bytes += n;
+ entry.zip64.size += n as u64;
+ entry.zip64.compressed_size += n as u64;
+
+ hasher.update(&this.buf[this.buf.len() - n..]);
+ this.hasher = Some(hasher);
+ this.cur_reader = Some(reader);
+
+ if this.buf.is_full() {
+ break;
+ }
+ }
+ Poll::Pending => {
+ this.cur_reader = Some(reader);
+ if this.buf.is_empty() {
+ return Poll::Pending;
+ }
+ break;
+ }
+ Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
+ }
+ },
+ ZipStreamPos::CentralIndex => {
+ let mut finished_central_directory = true;
+ while this.files.len() > 0 {
+ let file = this.files.pop_front().unwrap();
+ let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
+ this.buf.add_size(size);
+ if size == 0 {
+ this.files.push_front(file);
+ finished_central_directory = false;
+ break;
+ }
+
+ this.bytes += size;
+ this.central_dir_size += size;
+
+ if this.buf.is_full() {
+ finished_central_directory = false;
+ break;
+ }
+ }
+
+ if !finished_central_directory {
+ break;
+ }
+
+ let size = this.eocd();
+ this.buf.add_size(size);
+ if size == 0 {
+ break;
+ }
+
+ this.pos = ZipStreamPos::End;
+ break;
+ }
+ ZipStreamPos::End => return Poll::Ready(None)
+ }
+ }
+
+ return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
+ }
+}
--
2.20.1
More information about the pbs-devel
mailing list