[pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module
Wolfgang Bumiller
w.bumiller at proxmox.com
Tue Oct 13 13:02:51 CEST 2020
partial review only for now:
On Tue, Oct 13, 2020 at 11:50:41AM +0200, Dominik Csapak wrote:
> this is a module to to stream a zip file in an api call
> the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
> the resulting object itself implements stream for easy use in a
> hyper::Body
>
> for now, this does not implement compression (uses ZIPs STORE mode), and
> does not support empty directories or hardlinks (or any other special
> files)
>
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
> src/tools.rs | 1 +
> src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 469 insertions(+)
> create mode 100644 src/tools/zip.rs
>
> diff --git a/src/tools.rs b/src/tools.rs
> index 5b244c02..28f0338b 100644
> --- a/src/tools.rs
> +++ b/src/tools.rs
> @@ -35,6 +35,7 @@ pub mod nom;
> pub mod logrotate;
> pub mod loopdev;
> pub mod fuse_loop;
> +pub mod zip;
>
> mod parallel_handler;
> pub use parallel_handler::*;
> diff --git a/src/tools/zip.rs b/src/tools/zip.rs
> new file mode 100644
> index 00000000..acb193cb
> --- /dev/null
> +++ b/src/tools/zip.rs
> @@ -0,0 +1,468 @@
> +use std::ffi::OsString;
> +use std::collections::VecDeque;
> +use std::os::unix::ffi::OsStrExt;
> +use std::pin::Pin;
> +use std::path::{Component, Path, PathBuf};
> +
> +use futures::task::{Context, Poll};
> +use tokio::io::AsyncRead;
> +use tokio::stream::Stream;
> +use anyhow::Result;
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::{
> + byte_buffer::ByteBuffer,
> + time::gmtime,
> +};
> +use crc32fast::Hasher;
> +
> +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
> +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
> +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
> +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
> +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
> +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
> +
> +const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
> +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
> +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
> +
> +fn epoch_to_dos(epoch: i64) -> (u16, u16) {
> + let gmtime = match gmtime(epoch) {
> + Ok(gmtime) => gmtime,
> + Err(_) => return (0,0),
> + };
> +
> + let seconds = gmtime.tm_sec/2 & 0b11111;
Instead of stripping spaces, please add parenthesis for grouping (clippy
hint).
Also, for binary literals please use underscores for readability (group
them in nibbles, that way a group always represents a hex digit):
let seconds = (gmtime.tm_sec / 2) & 0b1_1111;
same for all the ones below
> + let minutes = gmtime.tm_min & 0xb111111;
> + let hours = gmtime.tm_hour & 0b11111;
> + let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
> +
> + let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
> + 0
> + } else {
> + let day = gmtime.tm_mday & 0b11111;
> + let month = (gmtime.tm_mon + 1) & 0b1111;
> + let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
> + ((year<<9)|(month<<5)|(day)) as u16
> + };
> +
> + (date, time)
> +}
> +
> +struct Zip64Field {
> + size: u64,
> + compressed_size: u64,
> + offset: u64,
> +}
> +
> +impl Zip64Field {
> + fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
> + let size :u16 = if include_offset { 24 } else { 16 };
> + buf[0..2].copy_from_slice(ZIP64_TYPE);
> + buf[2..4].copy_from_slice(&size.to_le_bytes());
> + buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
> + buf[12..20].copy_from_slice(&self.size.to_le_bytes());
> + if include_offset {
> + buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
> + }
> + }
> +}
> +
> +struct FileEntry {
> + filename: OsString,
> + mtime: i64,
> + mode: u16,
> + zip64: Zip64Field,
> + crc32: [u8; 4],
> +}
> +
> +impl FileEntry {
> + fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
> + let mut relpath = PathBuf::new();
> +
> + for comp in path.as_ref().components() {
> + match comp {
> + Component::Normal(_) => {
> + relpath.push(comp);
> + },
> + _ => {},
> + }
clipy hint: `if let` would be shorter here:
if let Component::Normal(_) = comp {
relpath.push(comp);
}
> + }
> +
> + Self {
> + filename: relpath.into(),
> + crc32: [0,0,0,0],
> + mtime,
> + mode,
> + zip64: Zip64Field {
> + size: 0,
> + compressed_size: 0,
> + offset: 0,
> + },
> + }
> + }
> +
> + fn local_file_header(&self, buf: &mut [u8]) -> usize {
method name & signature doesn't really make the purpose of this function clear
> + let filename = self.filename.as_bytes();
> + let filename_len: u16 = filename.len() as u16;
> + let size: usize = 30 + (filename_len as usize) + 20;
> +
> + if size > buf.len() { return 0; }
> +
> + buf[0..4].copy_from_slice(LOCAL_FH_SIG);
> + buf[4..6].copy_from_slice(VERSION_NEEDED);
> + buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
> + let (date, time) = epoch_to_dos(self.mtime);
> +
> + buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
> + buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
> + buf[14..26].copy_from_slice(&[
> + 0x00, 0x00, 0x00, 0x00, // crc32
> + 0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> + 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> + ]);
> + buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
> +
> + buf[28..30].copy_from_slice(&[20, 00]); // extra field len
^ Is there a reason you chose to do it this way than, say, define a
#[derive(Endian)]
#[repr(C)]
struct LocalFileHeader {
<contents up to the file name>
}
you could do this for all structs and have a helper like:
fn write_struct<E, T>(output: &mut T, data: E) -> io::Result<()>
where
T: Write + ?Sized,
E: Endian,
{
let data = data.to_le();
output.write_all(unsafe {
std::slice::from_raw_parts(
&data as *const E as *const u8,
size_of_val(&data),
)
})
}
and use `write_struct(&mut buffer, LocalFileHeader { ... })?`
> +
> + buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
> +
> + self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
> +
> + size
> + }
> +
> + fn local_file_footer(&self, buf: &mut [u8]) -> usize {
> + let size = 24;
> + if size > buf.len() { return 0; }
> +
> + buf[0..4].copy_from_slice(LOCAL_FF_SIG);
> + buf[4..8].copy_from_slice(&self.crc32);
> + buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
> + buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
> +
> + size
> + }
> +
> + fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
> + let filename = self.filename.as_bytes();
> + let filename_len: u16 = filename.len() as u16;
> + let size = 46 + 28 + (filename_len as usize);
> +
> + if size > buf.len() { return 0; }
> +
> + buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
> + buf[4..6].copy_from_slice(VERSION_MADE_BY);
> + buf[6..8].copy_from_slice(VERSION_NEEDED);
> +
> + buf[8..12].copy_from_slice(&[
> + 0x08, 0x00, // general purpose flags
> + 0x00, 0x00, // compression
> + ]);
> +
> + let (date, time) = epoch_to_dos(self.mtime);
> +
> + buf[12..14].copy_from_slice(&time.to_le_bytes());
> + buf[14..16].copy_from_slice(&date.to_le_bytes());
> + buf[16..20].copy_from_slice(&self.crc32);
> +
> + buf[20..28].copy_from_slice(&[
> + 0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> + 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> + ]);
> +
> + buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
> + buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
> +
> + buf[32..38].copy_from_slice(&[
> + 0x00, 0x00, // comment len
> + 0x00, 0x00, // start disk
> + 0x00, 0x00, // internal flags
> + ]);
> +
> + buf[38..40].copy_from_slice(&[
> + 0x00, 0x00, // upper part of external flags
> + ]);
> +
> + buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
> + buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
> +
> + buf[46..46+filename_len as usize].copy_from_slice(&filename);
> + self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
> +
> + size
> + }
> +}
> +
> +enum ZipStreamPos {
> + // index of file and if the header is already finished
> + FileHeader(usize),
> + File(usize),
> + CentralIndex,
> + End,
> +}
> +
> +/// Shorthand for the necessary metadata for a file inside a ZIP
> +pub type File<R> = (PathBuf, i64, u16, R);
> +
> +/// Represents a ZIP file that can be streamed
> +///
> +/// This will create a ZIP file on the fly by getting File entries from the
> +/// given stream, and finishes it when the stream is done
> +/// Example:
> +/// ```no_run
> +/// use proxmox_backup::tools::zip;
> +///
> +/// #[tokio::async]
> +/// async fn main() {
> +/// let (sender, receiver) = tokio::sync::mpsc::channel();
> +/// let zip = zip::ZipEncoderStream::new(receiver);
> +///
> +/// tokio::spawn(async move {
> +/// sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
> +/// });
> +///
> +/// zip.next().await; // until done
> +///
> +/// }
> +/// ```
> +pub struct ZipEncoderStream<R, S>
> +where
> + R: AsyncRead + Unpin,
> + S: Stream<Item = File<R>> + Unpin,
> +{
> + buf: ByteBuffer,
> + bytes: usize,
> + central_dir_offset: usize,
> + central_dir_size: usize,
> + cur_reader: Option<R>,
> + entrycount: usize,
> + files: VecDeque<FileEntry>,
> + filestream: Option<S>,
> + hasher: Option<Hasher>,
> + pos: ZipStreamPos,
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
> + pub fn new(stream: S) -> Self {
> + Self {
> + buf: ByteBuffer::with_capacity(4*1024*1024),
> + bytes: 0,
> + central_dir_offset: 0,
> + central_dir_size: 0,
> + cur_reader: None,
> + entrycount: 0,
> + files: VecDeque::new(),
> + filestream: Some(stream),
> + hasher: None,
> + pos: ZipStreamPos::FileHeader(0),
> + }
> + }
> +
> + fn eocd(&mut self) -> usize {
> + let size = if self.central_dir_size > u32::MAX as usize
> + || self.central_dir_offset > u32::MAX as usize
> + || self.entrycount > u16::MAX as usize
> + {
> + 56+20+22
> + } else {
> + 22
> + };
> +
> +
> + if self.buf.free_size() < size { return 0; }
> +
> + let eocd_start = size - 22;
> + let buf = self.buf.get_free_mut_slice();
> +
> + let mut count = self.entrycount as u16;
> + let mut dir_size = self.central_dir_size as u32;
> + let mut offset = self.central_dir_offset as u32;
> +
> + if size > 22 {
> + count = 0xFFFF;
> + dir_size = 0xFFFFFFFF;
> + offset = 0xFFFFFFFF;
> +
> + buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
> + buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
> + buf[12..14].copy_from_slice(VERSION_MADE_BY);
> + buf[14..16].copy_from_slice(VERSION_NEEDED);
> + buf[16..24].copy_from_slice(&[
> + 0x00, 0x00, 0x00, 0x00, // number of disk
> + 0x00, 0x00, 0x00, 0x00, // number of disk of central directory
> + ]);
> + buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
> + buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
> + buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
> + buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
> +
> + let locator_offset = self.central_dir_offset + self.central_dir_size;
> + buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
> + buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
> + buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
> + buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
> + }
> +
> + buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
> + buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
> + 0x00, 0x00, // disk number
> + 0x00, 0x00, // start disk
> + ]);
> +
> + buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
> + buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
> +
> + buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
> + buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
> + buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
> +
> + size
> + }
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
> + type Item = Result<Vec<u8>, std::io::Error>;
> +
> + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> + let this = self.get_mut();
> +
> + loop {
> + match this.pos {
> + ZipStreamPos::FileHeader(idx) => {
> + if this.files.is_empty() || idx > this.files.len() - 1 {
> + if let Some(mut stream) = this.filestream.take() {
> + match Pin::new(&mut stream).poll_next(cx) {
> + Poll::Ready(Some((path, mtime, mode, file))) => {
> + this.filestream = Some(stream);
> + let entry = FileEntry::new(path, mtime, mode);
> + this.files.push_back(entry);
> + this.cur_reader = Some(file);
> + continue;
> + }
> + Poll::Pending => {
> + this.filestream = Some(stream);
> + if this.buf.is_empty() {
> + return Poll::Pending;
> + }
> + break;
> + }
> + Poll::Ready(None) => {},
> + }
> + }
> + this.pos = ZipStreamPos::CentralIndex;
> + this.central_dir_offset = this.bytes;
> + this.entrycount = this.files.len();
> + continue;
> + }
> +
> + let mut entry = &mut this.files[idx];
> + entry.zip64.offset = this.bytes as u64;
> + let size = entry.local_file_header(this.buf.get_free_mut_slice());
> + this.bytes += size;
> + this.buf.add_size(size as usize);
> +
> + if size == 0 {
> + break;
> + }
> +
> + if this.cur_reader.is_some() {
> + this.pos = ZipStreamPos::File(idx);
> + } else {
> + this.pos = ZipStreamPos::FileHeader(idx + 1);
> + }
> +
> + if this.buf.is_full() {
> + break;
> + }
> + },
> + ZipStreamPos::File(idx) => {
> + let mut entry = &mut this.files[idx];
> + let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
> + match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
> + Poll::Ready(Ok(n)) => {
> + let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
> + this.buf.add_size(n);
> + if n == 0 {
> + entry.crc32 = hasher.finalize().to_le_bytes();
> + let size = entry.local_file_footer(this.buf.get_free_mut_slice());
> + this.buf.add_size(size);
> + this.bytes += size;
> +
> + if size == 0 {
> + break;
> + }
> +
> + this.pos = ZipStreamPos::FileHeader(idx + 1);
> +
> + if this.buf.is_full() {
> + break;
> + }
> +
> + continue;
> + }
> +
> + this.bytes += n;
> + entry.zip64.size += n as u64;
> + entry.zip64.compressed_size += n as u64;
> +
> + hasher.update(&this.buf[this.buf.len() - n..]);
> + this.hasher = Some(hasher);
> + this.cur_reader = Some(reader);
> +
> + if this.buf.is_full() {
> + break;
> + }
> + }
> + Poll::Pending => {
> + this.cur_reader = Some(reader);
> + if this.buf.is_empty() {
> + return Poll::Pending;
> + }
> + break;
> + }
> + Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
> + }
> + },
> + ZipStreamPos::CentralIndex => {
> + let mut finished_central_directory = true;
> + while this.files.len() > 0 {
> + let file = this.files.pop_front().unwrap();
> + let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
> + this.buf.add_size(size);
> + if size == 0 {
> + this.files.push_front(file);
> + finished_central_directory = false;
> + break;
> + }
> +
> + this.bytes += size;
> + this.central_dir_size += size;
> +
> + if this.buf.is_full() {
> + finished_central_directory = false;
> + break;
> + }
> + }
> +
> + if !finished_central_directory {
> + break;
> + }
> +
> + let size = this.eocd();
> + this.buf.add_size(size);
> + if size == 0 {
> + break;
> + }
> +
> + this.pos = ZipStreamPos::End;
> + break;
> + }
> + ZipStreamPos::End => return Poll::Ready(None)
> + }
> + }
> +
> + return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
> + }
> +}
> --
> 2.20.1
More information about the pbs-devel
mailing list