[pmg-devel] [PATCH v5 pmg-log-tracker 1/2] rewrite in rust

Mira Limbeck m.limbeck at proxmox.com
Mon Nov 25 10:30:07 CET 2019


Now it has the wrong start/end time when using the "YYYY-MM-DD HH:MM:SS" 
format. This and a clippy warning are already fixed and I'll wait for 
feedback before sending the next version.

I'll also add some regression tests in my local repository in hopes of 
catching issues like the ones since v3.

On 11/21/19 11:51 AM, Mira Limbeck wrote:
> pmg-log-tracker has been rewritten in Rust. Functionality is the same.
> Output sometimes has a different order than the pmg-log-tracker in C.
> This only happens when the time of the entries match.
>
> There's one change regarding the interface. In addition to the short
> versions of arguments also long versions exist.
>
> The implementation uses Rc<>, Weak<> and RefCell<> to make holding mutable
> cross-references possible, without having to change the original logic
> completely. This allowed for easier translation from C to Rust.
>
> The file debian/cargo-checksum.json is required by dh-cargo, otherwise
> it won't compile. The cargo-checksum.json should contain the upstream
> .crate file which does not exist in this case, so we just create an
> empty one with the required keys. (see 'Library package structure' in
> https://wiki.debian.org/Teams/RustPackaging/Policy)
>
> The change to the minimum version of debhelper required was done
> according to other rust packages (rust-clap, rust-bindgen, rust-ripgrep).
>
> Signed-off-by: Mira Limbeck <m.limbeck at proxmox.com>
> ---
> v5:
>   - fixed start/end time
> v4:
>   - Fixed an issue where the Rc<> was destroyed before we tried to access
>     it via a weak reference.
> v3:
>   - Use Weak<> together with Rc<> so that there are no cycles that can
>     keep the objects alive longer than necessary. Reduces memory
>     footprint from ~1.8G to 190M with ~1.8G syslog files
>   - cleanup: simplification of QEntry, SEntry and FEntry creation/lookup
>   - cleanup: moved some function/struct declarations around
>   - benchmarks using /usr/bin/time instead of zsh built-in
>
> v2:
>   - fixed an issue where the incorrect number of files got returned by
>     Parser::count_relevant_files()
>   - remove src/pmg-log-tracker.c and src/Makefile
>   - added simple benchmarks
>
> Regarding the version, how do you want to handle it? Keep the Cargo.toml
> version in sync with the package version (major, minor)?
>
> This already contains the changes @Wolfgang recommended, simplifying a
> lot of code, especially the printing. Also got rid of one RefCell and
> one Cell usage, as we can now pass the Parser as mutable reference to
> both print() functions.
>
> Some simple benchmarks: (32 syslog files (syslog to syslog.31.gz))
>
> Rust: (median of 5 runs + 1 for cache)
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -v > /dev/null
>    User time (seconds): 9.01
>    System time (seconds): 0.10
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:09.12
>    Maximum resident set size (kbytes): 194156
>
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -vv > /dev/null
>    User time (seconds): 10.17
>    System time (seconds): 0.07
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.24
>    Maximum resident set size (kbytes): 194096
>
> C: (same as for the rust version)
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -v > /dev/null
>    User time (seconds): 10.41
>    System time (seconds): 0.25
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.67
>    Maximum resident set size (kbytes): 148556
>
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -vv > /dev/null
>    User time (seconds): 11.67
>    System time (seconds): 0.35
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:12.02
>    Maximum resident set size (kbytes): 148536
>
> The -v version is used by the GUI.
>
>   Cargo.toml                 |   12 +
>   Makefile                   |    4 +-
>   debian/cargo-checksum.json |    1 +
>   debian/control             |   17 +-
>   debian/rules               |    2 +-
>   src/main.rs                | 1986 ++++++++++++++++++++++++++++++++++++
>   6 files changed, 2017 insertions(+), 5 deletions(-)
>   create mode 100644 Cargo.toml
>   create mode 100644 debian/cargo-checksum.json
>   create mode 100644 src/main.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> new file mode 100644
> index 0000000..4cf75f1
> --- /dev/null
> +++ b/Cargo.toml
> @@ -0,0 +1,12 @@
> +[package]
> +name = "pmg-log-tracker"
> +version = "2.1.0"
> +authors = ["Mira Limbeck <m.limbeck at proxmox.com>", "Dietmar Maurer <dietmar at proxmox.com>"]
> +edition = "2018"
> +
> +[dependencies]
> +clap = "2.32.0"
> +failure = "0.1.5"
> +flate2 = "1.0.6"
> +libc = "0.2.48"
> +time = "0.1.42"
> diff --git a/Makefile b/Makefile
> index 1e344dd..b50fe02 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -14,7 +14,9 @@ all: ${DEB}
>   .PHONY: ${BUILDDIR}
>   ${BUILDDIR}: src
>   	rm -rf ${BUILDDIR} ${BUILDDIR}.tmp
> -	cp -a src ${BUILDDIR}.tmp
> +	mkdir ${BUILDDIR}.tmp
> +	cp -a src ${BUILDDIR}.tmp/src
> +	cp Cargo.toml ${BUILDDIR}.tmp/
>   	cp -a debian ${BUILDDIR}.tmp/debian
>   	echo "git clone git://git.proxmox.com/git/pmg-log-tracker.git\\ngit checkout ${GITVERSION}" > ${BUILDDIR}.tmp/debian/SOURCE
>   	mv ${BUILDDIR}.tmp ${BUILDDIR}
> diff --git a/debian/cargo-checksum.json b/debian/cargo-checksum.json
> new file mode 100644
> index 0000000..ce2579b
> --- /dev/null
> +++ b/debian/cargo-checksum.json
> @@ -0,0 +1 @@
> +{"package":"","files":{}}
> diff --git a/debian/control b/debian/control
> index 26210a3..e02c296 100644
> --- a/debian/control
> +++ b/debian/control
> @@ -2,13 +2,24 @@ Source: pmg-log-tracker
>   Section: admin
>   Priority: optional
>   Maintainer: Proxmox Support Team <support at proxmox.com>
> -Build-Depends: debhelper (>= 10~),
> -               libglib2.0-dev (>= 2.42.1)
> +Build-Depends: cargo:native,
> +               debhelper (>= 11),
> +               dh-cargo (>= 10),
> +               librust-clap-dev,
> +               librust-failure-dev,
> +               librust-flate2-dev,
> +               librust-libc-dev,
> +               librust-time-dev,
> +               libstd-rust-dev,
> +               rustc:native
>   Standards-Version: 3.9.8
>   Homepage: http://www.proxmox.com
>   
>   Package: pmg-log-tracker
>   Architecture: any
> -Depends: ${shlibs:Depends}, ${misc:Depends}
> +Depends: ${misc:Depends},
> +         ${shlibs:Depends}
> +Built-Using: ${cargo:Built-Using}
> +XB-X-Cargo-Built-Using: ${cargo:X-Cargo-Built-Using}
>   Description: Proxmox Mailgateway Log Tracker
>    Tools to scan mail logs.
> diff --git a/debian/rules b/debian/rules
> index a7521b6..e3ba5d4 100755
> --- a/debian/rules
> +++ b/debian/rules
> @@ -5,4 +5,4 @@
>   
>   
>   %:
> -	dh $@
> +	dh $@ --buildsystem cargo
> diff --git a/src/main.rs b/src/main.rs
> new file mode 100644
> index 0000000..926be3c
> --- /dev/null
> +++ b/src/main.rs
> @@ -0,0 +1,1986 @@
> +#[macro_use]
> +extern crate clap;
> +extern crate failure;
> +extern crate flate2;
> +extern crate libc;
> +
> +use std::cell::RefCell;
> +use std::collections::HashMap;
> +use std::ffi::CString;
> +use std::rc::{Rc, Weak};
> +
> +use std::fs::File;
> +use std::io::BufRead;
> +use std::io::BufReader;
> +use std::io::BufWriter;
> +use std::io::Write;
> +
> +use failure::Error;
> +use flate2::read;
> +
> +use clap::{App, Arg};
> +
> +fn main() -> Result<(), Error> {
> +    let matches = App::new(crate_name!())
> +        .version(crate_version!())
> +        .about(crate_description!())
> +        .arg(
> +            Arg::with_name("verbose")
> +                .short("v")
> +                .long("verbose")
> +                .help("Verbose output, can be specified multiple times")
> +                .multiple(true)
> +                .takes_value(false),
> +        )
> +        .arg(
> +            Arg::with_name("inputfile")
> +                .short("i")
> +                .long("inputfile")
> +                .help("Input file to use instead of /var/log/syslog, or '-' for stdin")
> +                .value_name("INPUTFILE"),
> +        )
> +        .arg(
> +            Arg::with_name("host")
> +                .short("h")
> +                .long("host")
> +                .help("Hostname or Server IP")
> +                .value_name("HOST"),
> +        )
> +        .arg(
> +            Arg::with_name("from")
> +                .short("f")
> +                .long("from")
> +                .help("Mails from SENDER")
> +                .value_name("SENDER"),
> +        )
> +        .arg(
> +            Arg::with_name("to")
> +                .short("t")
> +                .long("to")
> +                .help("Mails to RECIPIENT")
> +                .value_name("RECIPIENT"),
> +        )
> +        .arg(
> +            Arg::with_name("start")
> +                .short("s")
> +                .long("starttime")
> +                .help("Start time (YYYY-MM-DD HH:MM:SS) or seconds since epoch")
> +                .value_name("TIME"),
> +        )
> +        .arg(
> +            Arg::with_name("end")
> +                .short("e")
> +                .long("endtime")
> +                .help("End time (YYYY-MM-DD HH:MM:SS) or seconds since epoch")
> +                .value_name("TIME"),
> +        )
> +        .arg(
> +            Arg::with_name("msgid")
> +                .short("m")
> +                .long("message-id")
> +                .help("Message ID (exact match)")
> +                .value_name("MSGID"),
> +        )
> +        .arg(
> +            Arg::with_name("qids")
> +                .short("q")
> +                .long("queue-id")
> +                .help("Queue ID (exact match), can be specified multiple times")
> +                .value_name("QID")
> +                .multiple(true)
> +                .number_of_values(1),
> +        )
> +        .arg(
> +            Arg::with_name("search")
> +                .short("x")
> +                .long("search-string")
> +                .help("Search for string")
> +                .value_name("STRING"),
> +        )
> +        .arg(
> +            Arg::with_name("limit")
> +                .short("l")
> +                .long("limit")
> +                .help("Print MAX entries")
> +                .value_name("MAX")
> +                .default_value("0"),
> +        )
> +        .arg(
> +            Arg::with_name("exclude_greylist")
> +                .short("g")
> +                .long("exclude-greylist")
> +                .help("Exclude greylist entries"),
> +        )
> +        .arg(
> +            Arg::with_name("exclude_ndr")
> +                .short("n")
> +                .long("exclude-ndr")
> +                .help("Exclude NDR entries"),
> +        )
> +        .get_matches();
> +
> +    let mut parser = Parser::new();
> +    parser.handle_args(matches)?;
> +
> +    println!("# LogReader: {}", std::process::id());
> +    println!("# Query options");
> +    if !parser.options.from.is_empty() {
> +        println!("# Sender: {}", parser.options.from);
> +    }
> +    if !parser.options.to.is_empty() {
> +        println!("# Recipient: {}", parser.options.to);
> +    }
> +    if !parser.options.host.is_empty() {
> +        println!("# Server: {}", parser.options.host);
> +    }
> +    if !parser.options.msgid.is_empty() {
> +        println!("# MsgID: {}", parser.options.msgid);
> +    }
> +    for m in parser.options.match_list.iter() {
> +        match m {
> +            Match::Qid(b) => println!("# QID: {}", std::str::from_utf8(b)?),
> +            Match::RelLineNr(t, l) => println!("# QID: T{:8X}L{:08X}", *t as u32, *l as u32),
> +        }
> +    }
> +
> +    if !parser.options.string_match.is_empty() {
> +        println!("# Match: {}", parser.options.string_match);
> +    }
> +
> +    println!(
> +        "# Start: {} ({})",
> +        time::strftime("%F %T", &parser.start_tm)?,
> +        parser.options.start
> +    );
> +    println!(
> +        "# End: {} ({})",
> +        time::strftime("%F %T", &parser.end_tm)?,
> +        parser.options.end
> +    );
> +
> +    println!("# End Query Options\n");
> +    parser.parse_files()?;
> +
> +    Ok(())
> +}
> +
> +fn handle_pmg_smtp_filter_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 25) {
> +        Some((q, m)) => (q, m),
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let fe = get_or_create_fentry(&mut parser.fentries, qid);
> +
> +    if parser.string_match {
> +        fe.borrow_mut().string_match = parser.string_match;
> +    }
> +
> +    fe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if data.starts_with(b"accept mail to <") {
> +        let data = &data[16..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (to, data) = data.split_at(to_count);
> +        if !data.starts_with(b"> (") {
> +            return;
> +        }
> +        let data = &data[3..];
> +        let qid_count = data.iter().take_while(|b| (**b as char) != ')').count();
> +        let qid = &data[..qid_count];
> +
> +        fe.borrow_mut()
> +            .set_accept(to, qid, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    if data.starts_with(b"moved mail for <") {
> +        let data = &data[16..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (to, data) = data.split_at(to_count);
> +
> +        let qid_index = match find(data, b"quarantine - ") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let data = &data[qid_index + 13..];
> +        let (qid, _) = match parse_qid(data, 25) {
> +            Some(t) => t,
> +            None => return,
> +        };
> +
> +        fe.borrow_mut()
> +            .set_quarantine(to, qid, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    if data.starts_with(b"block mail to <") {
> +        let data = &data[15..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let to = &data[..to_count];
> +
> +        fe.borrow_mut()
> +            .set_block(to, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    if data.starts_with(b"processing time: ") {
> +        let data = &data[17..];
> +        let time_count = data.iter().take_while(|b| !b.is_ascii_whitespace()).count();
> +        let time = &data[..time_count];
> +
> +        fe.borrow_mut().set_processing_time(time);
> +        return;
> +    }
> +}
> +
> +fn handle_postscreen_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    if !msg.starts_with(b"NOQUEUE: reject: RCPT from ") {
> +        return;
> +    }
> +    let data = &msg[27..];
> +    let client_index = match find(data, b"; client [") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[client_index + 10..];
> +
> +    let client_count = data.iter().take_while(|b| (**b as char) != ']').count();
> +    let (client, data) = data.split_at(client_count);
> +
> +    let from_index = match find(data, b"; from=<") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[from_index + 8..];
> +
> +    let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +    let (from, data) = data.split_at(from_count);
> +
> +    if !data.starts_with(b">, to=<") {
> +        return;
> +    }
> +    let data = &data[7..];
> +    let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +    let to = &data[..to_count];
> +
> +    let se = get_or_create_sentry(
> +        &mut parser.sentries,
> +        parser.current_record_state.pid,
> +        parser.rel_line_nr,
> +        parser.current_record_state.timestamp,
> +    );
> +
> +    if parser.string_match {
> +        se.borrow_mut().string_match = parser.string_match;
> +    }
> +
> +    se.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +    se.borrow_mut().add_noqueue_entry(
> +        from,
> +        to,
> +        DStatus::Noqueue,
> +        parser.current_record_state.timestamp,
> +    );
> +    se.borrow_mut().set_connect(client);
> +    se.borrow_mut().disconnect = true;
> +    se.borrow_mut().print(parser);
> +    parser.free_sentry(parser.current_record_state.pid);
> +}
> +
> +fn handle_qmgr_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some(t) => t,
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +    qe.borrow_mut().cleanup = true;
> +    qe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if data.starts_with(b"from=<") {
> +        let data = &data[6..];
> +
> +        let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (from, data) = data.split_at(from_count);
> +
> +        if !data.starts_with(b">, size=") {
> +            return;
> +        }
> +        let data = &data[8..];
> +
> +        let size_count = data
> +            .iter()
> +            .take_while(|b| (**b as char).is_ascii_digit())
> +            .count();
> +        let (size, _) = data.split_at(size_count);
> +        qe.borrow_mut().from = from.into();
> +        qe.borrow_mut().size = unsafe { std::str::from_utf8_unchecked(size) }
> +            .parse()
> +            .unwrap();
> +    } else if data == b"removed" {
> +        qe.borrow_mut().removed = true;
> +        qe.borrow_mut().finalize(parser);
> +    }
> +}
> +
> +fn handle_lmtp_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some((q, t)) => (q, t),
> +        None => return,
> +    };
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +    qe.borrow_mut().cleanup = true;
> +    qe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    let data = &data[2..];
> +    if !data.starts_with(b"to=<") {
> +        return;
> +    }
> +    let data = &data[4..];
> +    let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +    let (to, data) = data.split_at(to_count);
> +
> +    let relay_index = match find(data, b"relay=") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[relay_index + 6..];
> +    let relay_count = data.iter().take_while(|b| (**b as char) != ',').count();
> +    let (relay, data) = data.split_at(relay_count);
> +
> +    let dsn_index = match find(data, b"dsn=") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[dsn_index + 4..];
> +    let dsn = match data.iter().next() {
> +        Some(b) => {
> +            if (*b as char).is_ascii_digit() {
> +                (*b as char).to_digit(10).unwrap()
> +            } else {
> +                return;
> +            }
> +        }
> +        None => return,
> +    };
> +
> +    qe.borrow_mut().add_to_entry(
> +        to,
> +        relay,
> +        DStatus::Dsn(dsn),
> +        parser.current_record_state.timestamp,
> +    );
> +
> +    if &*parser.current_record_state.sys == b"postfix/lmtp" {
> +        let sent_index = match find(data, b"status=sent (250 2.") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let mut data = &data[sent_index + 19..];
> +        if data.starts_with(b"5.0 OK") {
> +            data = &data[8..];
> +        } else if data.starts_with(b"7.0 BLOCKED") {
> +            data = &data[13..];
> +        } else {
> +            return;
> +        }
> +
> +        let (qid, _) = match parse_qid(data, 25) {
> +            Some(t) => t,
> +            None => return,
> +        };
> +
> +        qe.borrow_mut().filtered = true;
> +        if let Some(fe) = parser.fentries.get(qid) {
> +            qe.borrow_mut().filter = Some(Rc::downgrade(fe));
> +        }
> +    }
> +}
> +
> +fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let se = get_or_create_sentry(
> +        &mut parser.sentries,
> +        parser.current_record_state.pid,
> +        parser.rel_line_nr,
> +        parser.current_record_state.timestamp,
> +    );
> +
> +    if parser.string_match {
> +        se.borrow_mut().string_match = parser.string_match;
> +    }
> +    se.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if msg.starts_with(b"connect from ") {
> +        let addr = &msg[13..];
> +        se.borrow_mut().set_connect(addr);
> +        return;
> +    }
> +
> +    if msg.starts_with(b"disconnect from") {
> +        parser.sentries.remove(&parser.current_record_state.pid);
> +        se.borrow_mut().disconnect = true;
> +
> +        if se.borrow_mut().remove_unneeded_refs(parser) == 0 {
> +            se.borrow_mut().print(parser);
> +            parser.free_sentry(se.borrow().pid);
> +        } else {
> +            se.borrow_mut().finalize_refs(parser);
> +        }
> +        return;
> +    }
> +
> +    if msg.starts_with(b"NOQUEUE:") {
> +        let data = &msg[8..];
> +        let colon_index = match find(data, b":") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let data = &data[colon_index + 1..];
> +        let colon_index = match find(data, b":") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let data = &data[colon_index + 1..];
> +        let semicolon_index = match find(data, b";") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let (grey, data) = data.split_at(semicolon_index);
> +        let dstatus = if find(
> +            grey,
> +            b"Recipient address rejected: Service is unavailable (try later)",
> +        )
> +        .is_some()
> +        {
> +            DStatus::Greylist
> +        } else {
> +            DStatus::Noqueue
> +        };
> +
> +        if !data.starts_with(b"; from=<") {
> +            return;
> +        }
> +        let data = &data[8..];
> +        let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (from, data) = data.split_at(from_count);
> +
> +        if !data.starts_with(b"> to=<") {
> +            return;
> +        }
> +        let data = &data[6..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let to = &data[..to_count];
> +
> +        se.borrow_mut()
> +            .add_noqueue_entry(from, to, dstatus, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some(t) => t,
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +
> +    SEntry::add_ref(&se, &qe);
> +
> +    if !data.starts_with(b"client=") {
> +        return;
> +    }
> +    let data = &data[7..];
> +    let client_count = data
> +        .iter()
> +        .take_while(|b| !(**b as char).is_ascii_whitespace())
> +        .count();
> +    let client = &data[..client_count];
> +
> +    qe.borrow_mut().set_client(client);
> +}
> +
> +fn handle_cleanup_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some(t) => t,
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +    qe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if !data.starts_with(b"message-id=") {
> +        return;
> +    }
> +    let data = &data[11..];
> +    let msgid_count = data
> +        .iter()
> +        .take_while(|b| !(**b as char).is_ascii_whitespace())
> +        .count();
> +    let msgid = &data[..msgid_count];
> +
> +    if !msgid.is_empty() {
> +        if qe.borrow().msgid.is_empty() {
> +            qe.borrow_mut().msgid = msgid.into();
> +        }
> +        qe.borrow_mut().cleanup = true;
> +    }
> +}
> +
> +#[derive(Default, Debug)]
> +struct NoqueueEntry {
> +    from: Box<[u8]>,
> +    to: Box<[u8]>,
> +    dstatus: DStatus,
> +    timestamp: u64,
> +}
> +
> +#[derive(Debug)]
> +struct ToEntry {
> +    to: Box<[u8]>,
> +    relay: Box<[u8]>,
> +    dstatus: DStatus,
> +    timestamp: u64,
> +}
> +
> +impl Default for ToEntry {
> +    fn default() -> Self {
> +        ToEntry {
> +            to: Default::default(),
> +            relay: (&b"none"[..]).into(),
> +            dstatus: Default::default(),
> +            timestamp: Default::default(),
> +        }
> +    }
> +}
> +
> +#[derive(Debug, PartialEq)]
> +enum DStatus {
> +    Invalid,
> +    Accept,
> +    Quarantine,
> +    Block,
> +    Greylist,
> +    Noqueue,
> +    Dsn(u32),
> +}
> +
> +impl Default for DStatus {
> +    fn default() -> Self {
> +        DStatus::Invalid
> +    }
> +}
> +
> +impl std::fmt::Display for DStatus {
> +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
> +        let c = match self {
> +            DStatus::Invalid => '\0', // other default
> +            DStatus::Accept => 'A',
> +            DStatus::Quarantine => 'Q',
> +            DStatus::Block => 'B',
> +            DStatus::Greylist => 'G',
> +            DStatus::Noqueue => 'N',
> +            DStatus::Dsn(v) => std::char::from_digit(*v, 10).unwrap(),
> +        };
> +        write!(f, "{}", c)
> +    }
> +}
> +
> +impl DStatus {
> +    fn is_dsn(&self, value: Option<u32>) -> bool {
> +        match (self, value) {
> +            (DStatus::Dsn(v), Some(val)) => *v == val,
> +            (DStatus::Dsn(_), None) => true,
> +            _ => false,
> +        }
> +    }
> +}
> +
> +#[derive(Debug, Default)]
> +struct SEntry {
> +    log: Vec<(Box<[u8]>, u64)>,
> +    connect: Box<[u8]>,
> +    cursor: Box<[u8]>,
> +    pid: u64,
> +    refs: Vec<Weak<RefCell<QEntry>>>,
> +    nq_entries: Vec<NoqueueEntry>,
> +    disconnect: bool,
> +    string_match: bool,
> +    timestamp: u64,
> +    rel_line_nr: u64,
> +}
> +
> +impl SEntry {
> +    fn add_noqueue_entry(&mut self, from: &[u8], to: &[u8], dstatus: DStatus, timestamp: u64) {
> +        let ne = NoqueueEntry {
> +            to: to.into(),
> +            from: from.into(),
> +            dstatus,
> +            timestamp,
> +        };
> +        self.nq_entries.push(ne);
> +    }
> +
> +    fn set_connect(&mut self, client: &[u8]) {
> +        if self.connect.is_empty() {
> +            self.connect = client.into();
> +        }
> +    }
> +
> +    fn print(&mut self, parser: &mut Parser) {
> +        if !parser.options.msgid.is_empty() {
> +            return;
> +        }
> +
> +        if !parser.options.host.is_empty() {
> +            if self.connect.is_empty() {
> +                return;
> +            }
> +            if find_lowercase(&self.connect, parser.options.host.as_bytes()).is_none() {
> +                return;
> +            }
> +        }
> +
> +        if !parser.options.match_list.is_empty() {
> +            let mut found = false;
> +            for m in parser.options.match_list.iter() {
> +                match m {
> +                    Match::Qid(_) => return,
> +                    Match::RelLineNr(t, l) => {
> +                        if (*t as u64) == self.timestamp && *l == self.rel_line_nr {
> +                            found = true;
> +                            break;
> +                        }
> +                    }
> +                }
> +            }
> +            if !found {
> +                return;
> +            }
> +        }
> +
> +        if !parser.options.from.is_empty()
> +            || !parser.options.to.is_empty()
> +            || parser.options.exclude_greylist
> +            || parser.options.exclude_ndr
> +        {
> +            let mut found = false;
> +            for nq in self.nq_entries.iter_mut().rev() {
> +                if !parser.options.from.is_empty()
> +                    && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none()
> +                {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +
> +                if parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +                if parser.options.exclude_ndr && nq.from.is_empty() {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +
> +                if !parser.options.to.is_empty()
> +                    && !nq.to.is_empty()
> +                    && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none()
> +                {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +
> +                if nq.dstatus != DStatus::Invalid {
> +                    found = true;
> +                }
> +            }
> +
> +            if !found {
> +                return;
> +            }
> +        }
> +
> +        if !parser.options.string_match.is_empty() && !self.string_match {
> +            return;
> +        }
> +
> +        if parser.options.verbose > 0 {
> +            parser.write_all_ok(format!(
> +                "SMTPD: T{:8X}L{:08X}\n",
> +                self.timestamp as u32, self.rel_line_nr as u32
> +            ));
> +            parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime).as_bytes());
> +
> +            if !self.connect.is_empty() {
> +                parser.write_all_ok(b"CLIENT: ");
> +                parser.write_all_ok(&self.connect);
> +                parser.write_all_ok(b"\n");
> +            }
> +        }
> +
> +        for nq in self.nq_entries.iter().rev() {
> +            if nq.dstatus != DStatus::Invalid {
> +                parser.write_all_ok(format!(
> +                    "TO:{:X}:T{:08X}L{:08X}:{}: from <",
> +                    nq.timestamp as i32, self.timestamp as i32, self.rel_line_nr, nq.dstatus,
> +                ));
> +                parser.write_all_ok(&nq.from);
> +                parser.write_all_ok(b"> to <");
> +                parser.write_all_ok(&nq.to);
> +                parser.write_all_ok(b">\n");
> +                parser.count += 1;
> +            }
> +        }
> +
> +        if parser.options.verbose > 1 {
> +            parser.write_all_ok(b"LOGS:\n");
> +            for (log, line) in self.log.iter() {
> +                parser.write_all_ok(format!("L{:08X} ", *line as u32));
> +                parser.write_all_ok(log);
> +                parser.write_all_ok(b"\n");
> +            }
> +        }
> +        parser.write_all_ok(b"\n");
> +    }
> +
> +    fn delete_ref(&mut self, qentry: &Rc<RefCell<QEntry>>) {
> +        self.refs.retain(|q| {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => return false,
> +            };
> +            if Rc::ptr_eq(&q, qentry) {
> +                return false;
> +            }
> +            true
> +        });
> +    }
> +
> +    fn remove_unneeded_refs(&mut self, parser: &mut Parser) -> u32 {
> +        let mut count: u32 = 0;
> +        let mut to_delete = Vec::new();
> +        self.refs.retain(|q| {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => return false,
> +            };
> +            let is_cleanup = q.borrow().cleanup;
> +            if !is_cleanup {
> +                to_delete.push(q);
> +                false
> +            } else {
> +                count += 1;
> +                true
> +            }
> +        });
> +
> +        for q in to_delete {
> +            q.borrow_mut().smtpd = None;
> +            parser.free_qentry(&q.borrow().qid, Some(self));
> +        }
> +        count
> +    }
> +
> +    fn finalize_refs(&mut self, parser: &mut Parser) {
> +        let mut qentries = Vec::new();
> +        for q in self.refs.iter() {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => continue,
> +            };
> +
> +            if !q.borrow().removed {
> +                continue;
> +            }
> +
> +            let fe = &q.borrow().filter;
> +            if let Some(f) = fe {
> +                if let Some(f) = f.upgrade() {
> +                    if !f.borrow().finished {
> +                        continue;
> +                    }
> +                }
> +            }
> +
> +            qentries.push(Rc::clone(&q));
> +        }
> +
> +        for q in qentries.iter() {
> +            q.borrow_mut().print(parser, Some(self));
> +            q.borrow_mut().smtpd = None;
> +            parser.free_qentry(&q.borrow().qid, Some(self));
> +
> +            if let Some(f) = &q.borrow().filter {
> +                if let Some(f) = f.upgrade() {
> +                    parser.free_fentry(&f.borrow().logid);
> +                }
> +            }
> +        }
> +    }
> +
> +    fn add_ref(sentry: &Rc<RefCell<SEntry>>, qentry: &Rc<RefCell<QEntry>>) {
> +        let smtpd = qentry.borrow().smtpd.clone();
> +        if let Some(s) = smtpd {
> +            if !Rc::ptr_eq(sentry, &s) {
> +                eprintln!("Error: qentry ref already set");
> +            }
> +            return;
> +        }
> +
> +        for q in sentry.borrow().refs.iter() {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => continue,
> +            };
> +            if Rc::ptr_eq(&q, qentry) {
> +                return;
> +            }
> +        }
> +
> +        sentry.borrow_mut().refs.push(Rc::downgrade(qentry));
> +        qentry.borrow_mut().smtpd = Some(Rc::clone(sentry));
> +    }
> +}
> +
> +#[derive(Default, Debug)]
> +struct QEntry {
> +    log: Vec<(Box<[u8]>, u64)>,
> +    smtpd: Option<Rc<RefCell<SEntry>>>,
> +    filter: Option<Weak<RefCell<FEntry>>>,
> +    qid: Box<[u8]>,
> +    from: Box<[u8]>,
> +    client: Box<[u8]>,
> +    msgid: Box<[u8]>,
> +    size: u64,
> +    to_entries: Vec<ToEntry>,
> +    cleanup: bool,
> +    removed: bool,
> +    filtered: bool,
> +    string_match: bool,
> +}
> +
> +impl QEntry {
> +    fn add_to_entry(&mut self, to: &[u8], relay: &[u8], dstatus: DStatus, timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: relay.into(),
> +            dstatus,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn finalize(&mut self, parser: &mut Parser) {
> +        if self.removed {
> +            if let Some(se) = &self.smtpd {
> +                if !se.borrow().disconnect {
> +                    return;
> +                }
> +            }
> +
> +            if let Some(fe) = self.filter.clone() {
> +                if let Some(fe) = fe.upgrade() {
> +                    if !fe.borrow().finished {
> +                        return;
> +                    }
> +                }
> +
> +                match self.smtpd.clone() {
> +                    Some(s) => self.print(parser, Some(&*s.borrow())),
> +                    None => self.print(parser, None),
> +                };
> +                if let Some(se) = &self.smtpd {
> +                    parser.free_qentry(&self.qid, Some(&mut *se.borrow_mut()));
> +                } else {
> +                    parser.free_qentry(&self.qid, None);
> +                }
> +
> +                if let Some(fe) = fe.upgrade() {
> +                    parser.free_fentry(&fe.borrow().logid);
> +                }
> +            } else {
> +                if let Some(s) = self.smtpd.clone() {
> +                    self.print(parser, Some(&*s.borrow()));
> +                    parser.free_qentry(&self.qid, Some(&mut *s.borrow_mut()));
> +                } else {
> +                    self.print(parser, None);
> +                    parser.free_qentry(&self.qid, None);
> +                }
> +            }
> +        }
> +    }
> +
> +    fn msgid_matches(&self, parser: &Parser) -> bool {
> +        if !parser.options.msgid.is_empty() {
> +            if self.msgid.is_empty() {
> +                return false;
> +            }
> +            let qentry_msgid_lowercase = self.msgid.to_ascii_lowercase();
> +            let msgid_lowercase = parser.options.msgid.as_bytes().to_ascii_lowercase();
> +            if qentry_msgid_lowercase != msgid_lowercase {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn match_list_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> +        let fe = &self.filter;
> +        if !parser.options.match_list.is_empty() {
> +            let mut found = false;
> +            for m in parser.options.match_list.iter() {
> +                match m {
> +                    Match::Qid(q) => {
> +                        if let Some(f) = fe {
> +                            if let Some(f) = f.upgrade() {
> +                                if &f.borrow().logid == q {
> +                                    found = true;
> +                                    break;
> +                                }
> +                            }
> +                        }
> +                        if &self.qid == q {
> +                            found = true;
> +                            break;
> +                        }
> +                    }
> +                    Match::RelLineNr(t, l) => {
> +                        if let Some(s) = se {
> +                            if s.timestamp == (*t as u64) && s.rel_line_nr == *l {
> +                                found = true;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                }
> +            }
> +            if !found {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn host_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> +        if !parser.options.host.is_empty() {
> +            let mut found = false;
> +            if let Some(s) = se {
> +                if !s.connect.is_empty()
> +                    && find_lowercase(&s.connect, parser.options.host.as_bytes()).is_some()
> +                {
> +                    found = true;
> +                }
> +            }
> +            if !self.client.is_empty()
> +                && find_lowercase(&self.client, parser.options.host.as_bytes()).is_some()
> +            {
> +                found = true;
> +            }
> +
> +            if !found {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn from_to_matches(&mut self, parser: &Parser) -> bool {
> +        if !parser.options.from.is_empty() {
> +            if self.from.is_empty() {
> +                return false;
> +            }
> +            if find_lowercase(&self.from, parser.options.from.as_bytes()).is_none() {
> +                return false;
> +            }
> +        } else if parser.options.exclude_ndr && self.from.is_empty() {
> +            return false;
> +        }
> +
> +        if !parser.options.to.is_empty() {
> +            let mut found = false;
> +            self.to_entries.retain(|to| {
> +                if find_lowercase(&to.to, parser.options.to.as_bytes()).is_none() {
> +                    false
> +                } else {
> +                    found = true;
> +                    true
> +                }
> +            });
> +            if !found {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn string_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> +        let fe = &self.filter;
> +        if !parser.options.string_match.is_empty() {
> +            let mut string_match = self.string_match;
> +
> +            if let Some(s) = se {
> +                if s.string_match {
> +                    string_match = true;
> +                }
> +            }
> +            if let Some(f) = fe {
> +                if let Some(f) = f.upgrade() {
> +                    if f.borrow().string_match {
> +                        string_match = true;
> +                    }
> +                }
> +            }
> +            if !string_match {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) {
> +        let fe = self.filter.clone();
> +
> +        if !self.msgid_matches(parser) {
> +            return;
> +        }
> +
> +        if !self.match_list_matches(parser, se) {
> +            return;
> +        }
> +
> +        if !self.host_matches(parser, se) {
> +            return;
> +        }
> +
> +        if !self.from_to_matches(parser) {
> +            return;
> +        }
> +
> +        if !self.string_matches(parser, se) {
> +            return;
> +        }
> +
> +        if parser.options.verbose > 0 {
> +            parser.write_all_ok(b"QENTRY: ");
> +            parser.write_all_ok(&self.qid);
> +            parser.write_all_ok(b"\n");
> +            parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime));
> +            parser.write_all_ok(format!("SIZE: {}\n", self.size));
> +
> +            if !self.client.is_empty() {
> +                parser.write_all_ok(b"CLIENT: ");
> +                parser.write_all_ok(&self.client);
> +                parser.write_all_ok(b"\n");
> +            } else if let Some(s) = se {
> +                if !s.connect.is_empty() {
> +                    parser.write_all_ok(b"CLIENT: ");
> +                    parser.write_all_ok(&s.connect);
> +                    parser.write_all_ok(b"\n");
> +                }
> +            }
> +
> +            if !self.msgid.is_empty() {
> +                parser.write_all_ok(b"MSGID: ");
> +                parser.write_all_ok(&self.msgid);
> +                parser.write_all_ok(b"\n");
> +            }
> +        }
> +
> +        for to in self.to_entries.iter().rev() {
> +            if !to.to.is_empty() {
> +                let final_rc;
> +                let final_borrow;
> +                let mut final_to: &ToEntry = to;
> +                if to.dstatus.is_dsn(Some(2)) {
> +                    if let Some(f) = &fe {
> +                        if let Some(f) = f.upgrade() {
> +                            final_rc = f;
> +                            final_borrow = final_rc.borrow();
> +                            for to2 in final_borrow.to_entries.iter().rev() {
> +                                if to.to == to2.to {
> +                                    final_to = to2;
> +                                    break;
> +                                }
> +                            }
> +                        }
> +                    }
> +                }
> +
> +                parser.write_all_ok(format!("TO:{:X}:", to.timestamp as i32,));
> +                parser.write_all_ok(&self.qid);
> +                parser.write_all_ok(format!(":{}: from <", final_to.dstatus));
> +                parser.write_all_ok(&self.from);
> +                parser.write_all_ok(b"> to <");
> +                parser.write_all_ok(&final_to.to);
> +                parser.write_all_ok(b"> (");
> +                parser.write_all_ok(&final_to.relay);
> +                parser.write_all_ok(b")\n");
> +                parser.count += 1;
> +            }
> +        }
> +
> +        if parser.options.verbose > 1 {
> +            let print_log = |parser: &mut Parser, logs: &Vec<(Box<[u8]>, u64)>| {
> +                for (log, line) in logs.iter() {
> +                    parser.write_all_ok(format!("L{:08X} ", *line as u32));
> +                    parser.write_all_ok(log);
> +                    parser.write_all_ok(b"\n");
> +                }
> +            };
> +            if let Some(s) = se {
> +                if !s.log.is_empty() {
> +                    parser.write_all_ok(b"SMTP:\n");
> +                    print_log(parser, &s.log);
> +                }
> +            }
> +
> +            if let Some(f) = fe {
> +                if let Some(f) = f.upgrade() {
> +                    if !f.borrow().log.is_empty() {
> +                        parser.write_all_ok(format!("FILTER: {}\n", unsafe {
> +                            std::str::from_utf8_unchecked(&f.borrow().logid)
> +                        }));
> +                        print_log(parser, &f.borrow().log);
> +                    }
> +                }
> +            }
> +
> +            if !self.log.is_empty() {
> +                parser.write_all_ok(b"QMGR:\n");
> +                print_log(parser, &self.log);
> +            }
> +        }
> +        parser.write_all_ok(b"\n")
> +    }
> +
> +    fn set_client(&mut self, client: &[u8]) {
> +        if self.client.is_empty() {
> +            self.client = client.into();
> +        }
> +    }
> +}
> +
> +#[derive(Default, Debug)]
> +struct FEntry {
> +    log: Vec<(Box<[u8]>, u64)>,
> +    logid: Box<[u8]>,
> +    to_entries: Vec<ToEntry>,
> +    processing_time: Box<[u8]>,
> +    string_match: bool,
> +    finished: bool,
> +}
> +
> +impl FEntry {
> +    fn set_accept(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: qid.into(),
> +            dstatus: DStatus::Accept,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn set_quarantine(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: qid.into(),
> +            dstatus: DStatus::Quarantine,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn set_block(&mut self, to: &[u8], timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: (&b"none"[..]).into(),
> +            dstatus: DStatus::Block,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn set_processing_time(&mut self, time: &[u8]) {
> +        self.processing_time = time.into();
> +        self.finished = true;
> +    }
> +}
> +
> +#[derive(Debug)]
> +struct Parser {
> +    sentries: HashMap<u64, Rc<RefCell<SEntry>>>,
> +    fentries: HashMap<Box<[u8]>, Rc<RefCell<FEntry>>>,
> +    qentries: HashMap<Box<[u8]>, Rc<RefCell<QEntry>>>,
> +
> +    current_record_state: RecordState,
> +    rel_line_nr: u64,
> +
> +    current_year: [i64; 32],
> +    current_month: i64,
> +    current_file_index: usize,
> +
> +    count: u64,
> +
> +    buffered_stdout: BufWriter<std::io::Stdout>,
> +
> +    options: Options,
> +
> +    start_tm: time::Tm,
> +    end_tm: time::Tm,
> +
> +    ctime: libc::time_t,
> +    string_match: bool,
> +
> +    lines: u64,
> +}
> +
> +impl Parser {
> +    fn new() -> Self {
> +        let mut years: [i64; 32] = [0; 32];
> +        let mut tv: libc::timeval = libc::timeval {
> +            tv_sec: 0,
> +            tv_usec: 0,
> +        };
> +        let mut ltime: *mut libc::tm;
> +
> +        for (i, year) in years.iter_mut().enumerate() {
> +            unsafe {
> +                libc::gettimeofday(&mut tv, std::ptr::null_mut());
> +            }
> +            tv.tv_sec -= (3600 * 24 * i) as i64;
> +            ltime = unsafe { libc::localtime(&tv.tv_sec) };
> +            *year = (unsafe { (*ltime).tm_year + 1900 }) as i64;
> +        }
> +
> +        Self {
> +            sentries: HashMap::new(),
> +            fentries: HashMap::new(),
> +            qentries: HashMap::new(),
> +            current_record_state: Default::default(),
> +            rel_line_nr: 0,
> +            current_year: years,
> +            current_month: 0,
> +            current_file_index: 0,
> +            count: 0,
> +            buffered_stdout: BufWriter::with_capacity(4 * 1024 * 1024, std::io::stdout()),
> +            options: Options::default(),
> +            start_tm: time::empty_tm(),
> +            end_tm: time::empty_tm(),
> +            ctime: 0,
> +            string_match: false,
> +            lines: 0,
> +        }
> +    }
> +
> +    fn free_sentry(&mut self, sentry_pid: u64) {
> +        self.sentries.remove(&sentry_pid);
> +    }
> +
> +    fn free_qentry(&mut self, qid: &[u8], se: Option<&mut SEntry>) {
> +        if let Some(qe) = self.qentries.get(qid) {
> +            if let Some(se) = se {
> +                se.delete_ref(qe);
> +            }
> +        }
> +
> +        self.qentries.remove(qid);
> +    }
> +
> +    fn free_fentry(&mut self, fentry_logid: &[u8]) {
> +        self.fentries.remove(fentry_logid);
> +    }
> +
> +    fn parse_files(&mut self) -> Result<(), Error> {
> +        if !self.options.inputfile.is_empty() {
> +            if self.options.inputfile == "-" {
> +                self.current_file_index = 0;
> +                let mut reader = BufReader::new(std::io::stdin());
> +                self.handle_input_by_line(&mut reader)?;
> +            } else if let Ok(file) = File::open(&self.options.inputfile) {
> +                self.current_file_index = 0;
> +                let mut reader = BufReader::with_capacity(4 * 1024 * 1024, file);
> +                self.handle_input_by_line(&mut reader)?;
> +            }
> +        } else {
> +            let filecount = self.count_files_in_time_range();
> +            for i in (0..filecount).rev() {
> +                self.current_month = 0;
> +                if let Ok(file) = File::open(LOGFILES[i]) {
> +                    self.current_file_index = i;
> +                    if i > 1 {
> +                        let gzdecoder = read::GzDecoder::new(file);
> +                        let mut reader = BufReader::with_capacity(4 * 1024 * 1024, gzdecoder);
> +                        self.handle_input_by_line(&mut reader)?;
> +                    } else {
> +                        let mut reader = BufReader::with_capacity(4 * 1024 * 1024, file);
> +                        self.handle_input_by_line(&mut reader)?;
> +                    }
> +                }
> +            }
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn handle_input_by_line(&mut self, reader: &mut dyn BufRead) -> Result<(), Error> {
> +        let mut buffer = Vec::<u8>::with_capacity(4096);
> +        let mut prev_time = 0;
> +        loop {
> +            if self.options.limit > 0 && (self.count >= self.options.limit) {
> +                self.write_all_ok("STATUS: aborted by limit (too many hits)\n");
> +                self.buffered_stdout.flush()?;
> +                std::process::exit(0);
> +            }
> +
> +            buffer.clear();
> +            let size = match reader.read_until(b'\n', &mut buffer) {
> +                Err(e) => return Err(e.into()),
> +                Ok(0) => return Ok(()),
> +                Ok(s) => s,
> +            };
> +            let line = &buffer[0..size - 1];
> +            let complete_line = line;
> +
> +            let (time, line) = match parse_time(
> +                line,
> +                self.current_year[self.current_file_index],
> +                &mut self.current_month,
> +            ) {
> +                Some(t) => t,
> +                None => continue,
> +            };
> +            if time != prev_time {
> +                self.rel_line_nr = 0;
> +            } else {
> +                self.rel_line_nr += 1;
> +            }
> +            prev_time = time;
> +
> +            if time < self.options.start {
> +                continue;
> +            }
> +            if time > self.options.end {
> +                break;
> +            }
> +
> +            self.lines += 1;
> +
> +            let (host, service, pid, line) = match parse_host_service_pid(line) {
> +                Some((h, s, p, l)) => (h, s, p, l),
> +                None => continue,
> +            };
> +
> +            self.ctime = time;
> +
> +            self.current_record_state.host = host.into();
> +            self.current_record_state.sys = service.into();
> +            self.current_record_state.pid = pid;
> +            self.current_record_state.timestamp = time as u64;
> +
> +            self.string_match = false;
> +            if !self.options.string_match.is_empty()
> +                && find(complete_line, self.options.string_match.as_bytes()).is_some()
> +            {
> +                self.string_match = true;
> +            }
> +
> +            if service == b"pmg-smtp-filter" {
> +                handle_pmg_smtp_filter_message(line, self, complete_line);
> +            } else if service == b"postfix/postscreen" {
> +                handle_postscreen_message(line, self, complete_line);
> +            } else if service == b"postfix/qmgr" {
> +                handle_qmgr_message(line, self, complete_line);
> +            } else if service == b"postfix/lmtp"
> +                || service == b"postfix/smtp"
> +                || service == b"postfix/local"
> +                || service == b"postfix/error"
> +            {
> +                handle_lmtp_message(line, self, complete_line);
> +            } else if service == b"postfix/smtpd" {
> +                handle_smtpd_message(line, self, complete_line);
> +            } else if service == b"postfix/cleanup" {
> +                handle_cleanup_message(line, self, complete_line);
> +            }
> +        }
> +        Ok(())
> +    }
> +
> +    /// Returns the number of files to parse. Does not error out if it can't access any file
> +    /// (permission denied)
> +    fn count_files_in_time_range(&mut self) -> usize {
> +        let mut count = 0;
> +        let mut buffer = Vec::new();
> +
> +        for (i, item) in LOGFILES.iter().enumerate() {
> +            self.current_month = 0;
> +
> +            count = i;
> +            if let Ok(file) = File::open(item) {
> +                self.current_file_index = i;
> +                buffer.clear();
> +                if i > 1 {
> +                    let gzdecoder = read::GzDecoder::new(file);
> +                    let mut reader = BufReader::new(gzdecoder);
> +                    if let Ok(size) = reader.read_until(b'\n', &mut buffer) {
> +                        if size == 0 {
> +                            return count;
> +                        }
> +                        if let Some((time, _)) = parse_time(
> +                            &buffer[0..size],
> +                            self.current_year[i],
> +                            &mut self.current_month,
> +                        ) {
> +                            if time < self.options.start {
> +                                break;
> +                            }
> +                        }
> +                    } else {
> +                        return count;
> +                    }
> +                } else {
> +                    let mut reader = BufReader::new(file);
> +                    if let Ok(size) = reader.read_until(b'\n', &mut buffer) {
> +                        if size == 0 {
> +                            return count;
> +                        }
> +                        if let Some((time, _)) = parse_time(
> +                            &buffer[0..size],
> +                            self.current_year[i],
> +                            &mut self.current_month,
> +                        ) {
> +                            if time < self.options.start {
> +                                break;
> +                            }
> +                        }
> +                    } else {
> +                        return count;
> +                    }
> +                }
> +            } else {
> +                return count;
> +            }
> +        }
> +
> +        count + 1
> +    }
> +
> +    fn handle_args(&mut self, args: clap::ArgMatches) -> Result<(), Error> {
> +        if let Some(inputfile) = args.value_of("inputfile") {
> +            self.options.inputfile = inputfile.to_string();
> +        }
> +
> +        if let Some(start) = args.value_of("start") {
> +            if let Ok(res) = time::strptime(&start, "%F %T") {
> +                self.options.start = mkgmtime(&res);
> +                self.start_tm = res;
> +            } else if let Ok(res) = time::strptime(&start, "%s") {
> +                self.options.start = mkgmtime(&res);
> +                self.start_tm = res;
> +            } else {
> +                failure::bail!(failure::err_msg("failed to parse start time"));
> +            }
> +        } else {
> +            let mut ltime = time::now();
> +            ltime.tm_sec = 0;
> +            ltime.tm_min = 0;
> +            ltime.tm_hour = 0;
> +            self.options.start = mkgmtime(&ltime);
> +            self.start_tm = ltime;
> +        }
> +
> +        if let Some(end) = args.value_of("end") {
> +            if let Ok(res) = time::strptime(&end, "%F %T") {
> +                self.options.end = mkgmtime(&res);
> +                self.end_tm = res;
> +            } else if let Ok(res) = time::strptime(&end, "%s") {
> +                self.options.end = mkgmtime(&res);
> +                self.end_tm = res;
> +            } else {
> +                failure::bail!(failure::err_msg("failed to parse end time"));
> +            }
> +        } else {
> +            let ltime = time::now();
> +            self.options.end = mkgmtime(&ltime);
> +            self.end_tm = ltime;
> +        }
> +
> +        if self.options.end < self.options.start {
> +            failure::bail!(failure::err_msg("end time before start time"));
> +        }
> +
> +        self.options.limit = match args.value_of("limit") {
> +            Some(l) => l.parse().unwrap(),
> +            None => 0,
> +        };
> +
> +        if let Some(qids) = args.values_of("qids") {
> +            for q in qids {
> +                let ltime: libc::time_t = 0;
> +                let rel_line_nr: libc::c_ulong = 0;
> +                let input = CString::new(q)?;
> +                let bytes = concat!("T%08lXL%08lX", "\0");
> +                let format =
> +                    unsafe { std::ffi::CStr::from_bytes_with_nul_unchecked(bytes.as_bytes()) };
> +                if unsafe {
> +                    libc::sscanf(input.as_ptr(), format.as_ptr(), &ltime, &rel_line_nr) == 2
> +                } {
> +                    self.options
> +                        .match_list
> +                        .push(Match::RelLineNr(ltime, rel_line_nr));
> +                } else {
> +                    self.options
> +                        .match_list
> +                        .push(Match::Qid(q.as_bytes().into()));
> +                }
> +            }
> +        }
> +
> +        if let Some(from) = args.value_of("from") {
> +            self.options.from = from.to_string();
> +        }
> +        if let Some(to) = args.value_of("to") {
> +            self.options.to = to.to_string();
> +        }
> +        if let Some(host) = args.value_of("host") {
> +            self.options.host = host.to_string();
> +        }
> +        if let Some(msgid) = args.value_of("msgid") {
> +            self.options.msgid = msgid.to_string();
> +        }
> +
> +        self.options.exclude_greylist = args.is_present("exclude_greylist");
> +        self.options.exclude_ndr = args.is_present("exclude_ndr");
> +
> +        self.options.verbose = args.occurrences_of("verbose") as _;
> +
> +        if let Some(string_match) = args.value_of("search") {
> +            self.options.string_match = string_match.to_string();
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn write_all_ok<T: AsRef<[u8]>>(&mut self, data: T) {
> +        self.buffered_stdout
> +            .write_all(data.as_ref())
> +            .expect("failed to write to stdout");
> +    }
> +}
> +
> +impl Drop for Parser {
> +    fn drop(&mut self) {
> +        let mut qentries = std::mem::replace(&mut self.qentries, HashMap::new());
> +        for q in qentries.values() {
> +            let smtpd = q.borrow().smtpd.clone();
> +            if let Some(s) = smtpd {
> +                q.borrow_mut().print(self, Some(&*s.borrow()));
> +            } else {
> +                q.borrow_mut().print(self, None);
> +            }
> +        }
> +        qentries.clear();
> +        let mut sentries = std::mem::replace(&mut self.sentries, HashMap::new());
> +        for s in sentries.values() {
> +            s.borrow_mut().print(self);
> +        }
> +        sentries.clear();
> +    }
> +}
> +
> +#[derive(Debug, Default)]
> +struct Options {
> +    match_list: Vec<Match>,
> +    inputfile: String,
> +    string_match: String,
> +    host: String,
> +    msgid: String,
> +    from: String,
> +    to: String,
> +    start: libc::time_t,
> +    end: libc::time_t,
> +    limit: u64,
> +    verbose: u32,
> +    exclude_greylist: bool,
> +    exclude_ndr: bool,
> +}
> +
> +#[derive(Debug)]
> +enum Match {
> +    Qid(Box<[u8]>),
> +    RelLineNr(libc::time_t, u64),
> +}
> +
> +#[derive(Debug, Default)]
> +struct RecordState {
> +    host: Box<[u8]>,
> +    sys: Box<[u8]>,
> +    pid: u64,
> +    timestamp: u64,
> +}
> +
> +fn get_or_create_qentry(
> +    qentries: &mut HashMap<Box<[u8]>, Rc<RefCell<QEntry>>>,
> +    qid: &[u8],
> +) -> Rc<RefCell<QEntry>> {
> +    if let Some(qe) = qentries.get(qid) {
> +        Rc::clone(qe)
> +    } else {
> +        let qe = Rc::new(RefCell::new(QEntry::default()));
> +        qe.borrow_mut().qid = qid.into();
> +        qentries.insert(qid.into(), qe.clone());
> +        qe
> +    }
> +}
> +
> +fn get_or_create_sentry(
> +    sentries: &mut HashMap<u64, Rc<RefCell<SEntry>>>,
> +    pid: u64,
> +    rel_line_nr: u64,
> +    timestamp: u64,
> +) -> Rc<RefCell<SEntry>> {
> +    if let Some(se) = sentries.get(&pid) {
> +        Rc::clone(se)
> +    } else {
> +        let se = Rc::new(RefCell::new(SEntry::default()));
> +        se.borrow_mut().rel_line_nr = rel_line_nr;
> +        se.borrow_mut().timestamp = timestamp;
> +        sentries.insert(pid, se.clone());
> +        se
> +    }
> +}
> +
> +fn get_or_create_fentry(
> +    fentries: &mut HashMap<Box<[u8]>, Rc<RefCell<FEntry>>>,
> +    qid: &[u8],
> +) -> Rc<RefCell<FEntry>> {
> +    if let Some(fe) = fentries.get(qid) {
> +        Rc::clone(fe)
> +    } else {
> +        let fe = Rc::new(RefCell::new(FEntry::default()));
> +        fe.borrow_mut().logid = qid.into();
> +        fentries.insert(qid.into(), fe.clone());
> +        fe
> +    }
> +}
> +
> +fn mkgmtime(tm: &time::Tm) -> libc::time_t {
> +    let tm = tm.to_local();
> +    let mut res: libc::time_t;
> +
> +    let mut year = (tm.tm_year + 1900) as i64;
> +    let mon = tm.tm_mon;
> +
> +    res = (year - 1970) * 365 + CAL_MTOD[mon as usize];
> +
> +    if mon <= 1 {
> +        year -= 1;
> +    }
> +
> +    res += (year - 1968) / 4;
> +    res -= (year - 1900) / 100;
> +    res += (year - 1600) / 400;
> +
> +    res += (tm.tm_mday - 1) as i64;
> +    res = res * 24 + tm.tm_hour as i64;
> +    res = res * 60 + tm.tm_min as i64;
> +    res = res * 60 + tm.tm_sec as i64;
> +
> +    res
> +}
> +
> +const LOGFILES: [&str; 32] = [
> +    "/var/log/syslog",
> +    "/var/log/syslog.1",
> +    "/var/log/syslog.2.gz",
> +    "/var/log/syslog.3.gz",
> +    "/var/log/syslog.4.gz",
> +    "/var/log/syslog.5.gz",
> +    "/var/log/syslog.6.gz",
> +    "/var/log/syslog.7.gz",
> +    "/var/log/syslog.8.gz",
> +    "/var/log/syslog.9.gz",
> +    "/var/log/syslog.10.gz",
> +    "/var/log/syslog.11.gz",
> +    "/var/log/syslog.12.gz",
> +    "/var/log/syslog.13.gz",
> +    "/var/log/syslog.14.gz",
> +    "/var/log/syslog.15.gz",
> +    "/var/log/syslog.16.gz",
> +    "/var/log/syslog.17.gz",
> +    "/var/log/syslog.18.gz",
> +    "/var/log/syslog.19.gz",
> +    "/var/log/syslog.20.gz",
> +    "/var/log/syslog.21.gz",
> +    "/var/log/syslog.22.gz",
> +    "/var/log/syslog.23.gz",
> +    "/var/log/syslog.24.gz",
> +    "/var/log/syslog.25.gz",
> +    "/var/log/syslog.26.gz",
> +    "/var/log/syslog.27.gz",
> +    "/var/log/syslog.28.gz",
> +    "/var/log/syslog.29.gz",
> +    "/var/log/syslog.30.gz",
> +    "/var/log/syslog.31.gz",
> +];
> +
> +/// Parse a QID ([A-Z]+). Returns a tuple of (qid, remaining_text) or None.
> +fn parse_qid(data: &[u8], max: usize) -> Option<(&[u8], &[u8])> {
> +    let mut bytes = data.iter();
> +    let mut count = 0;
> +    while count < max {
> +        match bytes.next() {
> +            Some(c) => {
> +                if c.is_ascii_hexdigit() {
> +                    count += 1;
> +                } else {
> +                    break;
> +                }
> +            }
> +            None => break,
> +        }
> +    }
> +    if count > 1 {
> +        return Some((&data[..count], &data[count..]));
> +    }
> +    None
> +}
> +
> +/// Parse a number. Returns a tuple of (parsed_number, remaining_text) or None.
> +fn parse_number(data: &[u8], max_digits: usize) -> Option<(usize, &[u8])> {
> +    let mut bytes = data.iter();
> +    let mut number: usize = 0;
> +    let mut digits: usize = 0;
> +    while digits < max_digits {
> +        let c = match bytes.next() {
> +            Some(c) => c,
> +            None => break,
> +        };
> +        if (*c as char).is_digit(10) {
> +            digits += 1;
> +            number *= 10;
> +            number += (*c as char).to_digit(10).unwrap() as usize;
> +        } else {
> +            break;
> +        }
> +    }
> +    if digits == 0 {
> +        None
> +    } else {
> +        Some((number, &data[digits..]))
> +    }
> +}
> +
> +/// Parse time. Returns a tuple of (parsed_time, remaining_text) or None.
> +fn parse_time<'a>(
> +    data: &'a [u8],
> +    cur_year: i64,
> +    cur_month: &mut i64,
> +) -> Option<(libc::time_t, &'a [u8])> {
> +    if data.len() < 15 {
> +        return None;
> +    }
> +
> +    let mon = match &data[0..3] {
> +        b"Jan" => 0,
> +        b"Feb" => 1,
> +        b"Mar" => 2,
> +        b"Apr" => 3,
> +        b"May" => 4,
> +        b"Jun" => 5,
> +        b"Jul" => 6,
> +        b"Aug" => 7,
> +        b"Sep" => 8,
> +        b"Oct" => 9,
> +        b"Nov" => 10,
> +        b"Dec" => 11,
> +        _ => return None,
> +    };
> +    let data = &data[3..];
> +
> +    let mut ltime: libc::time_t;
> +    let mut year = cur_year;
> +
> +    if *cur_month == 11 && mon == 0 {
> +        year += 1;
> +    }
> +    if mon > *cur_month {
> +        *cur_month = mon;
> +    }
> +
> +    ltime = (year - 1970) * 365 + CAL_MTOD[mon as usize];
> +
> +    if mon <= 1 {
> +        year -= 1;
> +    }
> +
> +    ltime += (year - 1968) / 4;
> +    ltime -= (year - 1900) / 100;
> +    ltime += (year - 1600) / 400;
> +
> +    let whitespace_count = data.iter().take_while(|b| b.is_ascii_whitespace()).count();
> +    let data = &data[whitespace_count..];
> +
> +    let (mday, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no day matched");
> +            return None;
> +        }
> +    };
> +    if mday == 0 {
> +        eprintln!("mday == 0");
> +        return None;
> +    }
> +
> +    ltime += (mday - 1) as i64;
> +
> +    let data = &data[1..];
> +
> +    let (hour, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no hour matched");
> +            return None;
> +        }
> +    };
> +
> +    ltime *= 24;
> +    ltime += hour as i64;
> +
> +    if let Some(c) = data.iter().next() {
> +        if (*c as char) != ':' {
> +            eprintln!("char != ':'");
> +            return None;
> +        }
> +    } else {
> +        eprintln!("no next char");
> +        return None;
> +    }
> +    let data = &data[1..];
> +
> +    let (min, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no min matched");
> +            return None;
> +        }
> +    };
> +
> +    ltime *= 60;
> +    ltime += min as i64;
> +
> +    if let Some(c) = data.iter().next() {
> +        if (*c as char) != ':' {
> +            eprintln!("char != ':'");
> +            return None;
> +        }
> +    } else {
> +        eprintln!("no next char");
> +        return None;
> +    }
> +    let data = &data[1..];
> +
> +    let (sec, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no sec matched");
> +            return None;
> +        }
> +    };
> +
> +    ltime *= 60;
> +    ltime += sec as i64;
> +
> +    let data = &data[1..];
> +
> +    Some((ltime, data))
> +}
> +
> +const CAL_MTOD: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
> +
> +type ByteSlice<'a> = &'a [u8];
> +/// Parse Host, Service and PID at the beginning of data. Returns a tuple of (host, service, pid, remaining_text).
> +fn parse_host_service_pid(data: &[u8]) -> Option<(ByteSlice, ByteSlice, u64, ByteSlice)> {
> +    let host_count = data
> +        .iter()
> +        .take_while(|b| !(**b as char).is_ascii_whitespace())
> +        .count();
> +    let host = &data[0..host_count];
> +    let data = &data[host_count + 1..]; // whitespace after host
> +
> +    let service_count = data
> +        .iter()
> +        .take_while(|b| {
> +            (**b as char).is_ascii_alphabetic() || (**b as char) == '/' || (**b as char) == '-'
> +        })
> +        .count();
> +    let service = &data[0..service_count];
> +    let data = &data[service_count..];
> +    if data.get(0) != Some(&b'[') {
> +        return None;
> +    }
> +    let data = &data[1..];
> +
> +    let pid_count = data
> +        .iter()
> +        .take_while(|b| (**b as char).is_ascii_digit())
> +        .count();
> +    let pid = match unsafe { std::str::from_utf8_unchecked(&data[0..pid_count]) }.parse() {
> +        // all ascii digits so valid utf8
> +        Ok(p) => p,
> +        Err(e) => {
> +            eprintln!("failed to parse PID: {}", e);
> +            return None;
> +        }
> +    };
> +    let data = &data[pid_count..];
> +    if !data.starts_with(b"]: ") {
> +        eprintln!("error after PID");
> +        return None;
> +    }
> +    let data = &data[3..];
> +
> +    Some((host, service, pid, data))
> +}
> +
> +/// A find implementation for [u8]. Returns the index or None.
> +fn find<T: PartialOrd>(data: &[T], needle: &[T]) -> Option<usize> {
> +    data.windows(needle.len()).position(|d| d == needle)
> +}
> +
> +/// A find implementation for [u8] that converts to lowercase before the comparison. Returns the
> +/// index or None.
> +fn find_lowercase(data: &[u8], needle: &[u8]) -> Option<usize> {
> +    let data = data.to_ascii_lowercase();
> +    let needle = needle.to_ascii_lowercase();
> +    data.windows(needle.len()).position(|d| d == &needle[..])
> +}



More information about the pmg-devel mailing list