[pmg-devel] [PATCH v4 pmg-log-tracker 1/2] rewrite in rust

Mira Limbeck m.limbeck at proxmox.com
Wed Nov 20 15:58:50 CET 2019


Looks like there is something wrong with the start time.

On 11/20/19 3:31 PM, Mira Limbeck wrote:
> pmg-log-tracker has been rewritten in Rust. Functionality is the same.
> Output sometimes has a different order than the pmg-log-tracker in C.
> This only happens when the time of the entries match.
>
> There's one change regarding the interface. In addition to the short
> versions of arguments also long versions exist.
>
> The implementation uses Rc<>, Weak<> and RefCell<> to make holding mutable
> cross-references possible, without having to change the original logic
> completely. This allowed for easier translation from C to Rust.
>
> The file debian/cargo-checksum.json is required by dh-cargo, otherwise
> it won't compile. The cargo-checksum.json should contain the upstream
> .crate file which does not exist in this case, so we just create an
> empty one with the required keys. (see 'Library package structure' in
> https://wiki.debian.org/Teams/RustPackaging/Policy)
>
> The change to the minimum version of debhelper required was done
> according to other rust packages (rust-clap, rust-bindgen, rust-ripgrep).
>
> Signed-off-by: Mira Limbeck <m.limbeck at proxmox.com>
> ---
> v4:
>   - Fixed an issue where the Rc<> was destroyed before we tried to access
>     it via a weak reference.
> v3:
>   - Use Weak<> together with Rc<> so that there are no cycles that can
>     keep the objects alive longer than necessary. Reduces memory
>     footprint from ~1.8G to 190M with ~1.8G syslog files
>   - cleanup: simplification of QEntry, SEntry and FEntry creation/lookup
>   - cleanup: moved some function/struct declarations around
>   - benchmarks using /usr/bin/time instead of zsh built-in
>
> v2:
>   - fixed an issue where the incorrect number of files got returned by
>     Parser::count_relevant_files()
>   - remove src/pmg-log-tracker.c and src/Makefile
>   - added simple benchmarks
>
> Regarding the version, how do you want to handle it? Keep the Cargo.toml
> version in sync with the package version (major, minor)?
>
> This already contains the changes @Wolfgang recommended, simplifying a
> lot of code, especially the printing. Also got rid of one RefCell and
> one Cell usage, as we can now pass the Parser as mutable reference to
> both print() functions.
>
> Some simple benchmarks: (32 syslog files (syslog to syslog.31.gz))
>
> Rust: (median of 5 runs + 1 for cache)
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -v > /dev/null
>    User time (seconds): 9.01
>    System time (seconds): 0.10
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:09.12
>    Maximum resident set size (kbytes): 194156
>
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -vv > /dev/null
>    User time (seconds): 10.17
>    System time (seconds): 0.07
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.24
>    Maximum resident set size (kbytes): 194096
>
> C: (same as for the rust version)
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -v > /dev/null
>    User time (seconds): 10.41
>    System time (seconds): 0.25
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.67
>    Maximum resident set size (kbytes): 148556
>
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -vv > /dev/null
>    User time (seconds): 11.67
>    System time (seconds): 0.35
>    Elapsed (wall clock) time (h:mm:ss or m:ss): 0:12.02
>    Maximum resident set size (kbytes): 148536
>
> The -v version is used by the GUI.
>
>   Cargo.toml                 |   12 +
>   Makefile                   |    4 +-
>   debian/cargo-checksum.json |    1 +
>   debian/control             |   17 +-
>   debian/rules               |    2 +-
>   src/main.rs                | 1985 ++++++++++++++++++++++++++++++++++++
>   6 files changed, 2016 insertions(+), 5 deletions(-)
>   create mode 100644 Cargo.toml
>   create mode 100644 debian/cargo-checksum.json
>   create mode 100644 src/main.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> new file mode 100644
> index 0000000..4cf75f1
> --- /dev/null
> +++ b/Cargo.toml
> @@ -0,0 +1,12 @@
> +[package]
> +name = "pmg-log-tracker"
> +version = "2.1.0"
> +authors = ["Mira Limbeck <m.limbeck at proxmox.com>", "Dietmar Maurer <dietmar at proxmox.com>"]
> +edition = "2018"
> +
> +[dependencies]
> +clap = "2.32.0"
> +failure = "0.1.5"
> +flate2 = "1.0.6"
> +libc = "0.2.48"
> +time = "0.1.42"
> diff --git a/Makefile b/Makefile
> index 1e344dd..b50fe02 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -14,7 +14,9 @@ all: ${DEB}
>   .PHONY: ${BUILDDIR}
>   ${BUILDDIR}: src
>   	rm -rf ${BUILDDIR} ${BUILDDIR}.tmp
> -	cp -a src ${BUILDDIR}.tmp
> +	mkdir ${BUILDDIR}.tmp
> +	cp -a src ${BUILDDIR}.tmp/src
> +	cp Cargo.toml ${BUILDDIR}.tmp/
>   	cp -a debian ${BUILDDIR}.tmp/debian
>   	echo "git clone git://git.proxmox.com/git/pmg-log-tracker.git\\ngit checkout ${GITVERSION}" > ${BUILDDIR}.tmp/debian/SOURCE
>   	mv ${BUILDDIR}.tmp ${BUILDDIR}
> diff --git a/debian/cargo-checksum.json b/debian/cargo-checksum.json
> new file mode 100644
> index 0000000..ce2579b
> --- /dev/null
> +++ b/debian/cargo-checksum.json
> @@ -0,0 +1 @@
> +{"package":"","files":{}}
> diff --git a/debian/control b/debian/control
> index 26210a3..e02c296 100644
> --- a/debian/control
> +++ b/debian/control
> @@ -2,13 +2,24 @@ Source: pmg-log-tracker
>   Section: admin
>   Priority: optional
>   Maintainer: Proxmox Support Team <support at proxmox.com>
> -Build-Depends: debhelper (>= 10~),
> -               libglib2.0-dev (>= 2.42.1)
> +Build-Depends: cargo:native,
> +               debhelper (>= 11),
> +               dh-cargo (>= 10),
> +               librust-clap-dev,
> +               librust-failure-dev,
> +               librust-flate2-dev,
> +               librust-libc-dev,
> +               librust-time-dev,
> +               libstd-rust-dev,
> +               rustc:native
>   Standards-Version: 3.9.8
>   Homepage: http://www.proxmox.com
>   
>   Package: pmg-log-tracker
>   Architecture: any
> -Depends: ${shlibs:Depends}, ${misc:Depends}
> +Depends: ${misc:Depends},
> +         ${shlibs:Depends}
> +Built-Using: ${cargo:Built-Using}
> +XB-X-Cargo-Built-Using: ${cargo:X-Cargo-Built-Using}
>   Description: Proxmox Mailgateway Log Tracker
>    Tools to scan mail logs.
> diff --git a/debian/rules b/debian/rules
> index a7521b6..e3ba5d4 100755
> --- a/debian/rules
> +++ b/debian/rules
> @@ -5,4 +5,4 @@
>   
>   
>   %:
> -	dh $@
> +	dh $@ --buildsystem cargo
> diff --git a/src/main.rs b/src/main.rs
> new file mode 100644
> index 0000000..9714e04
> --- /dev/null
> +++ b/src/main.rs
> @@ -0,0 +1,1985 @@
> +#[macro_use]
> +extern crate clap;
> +extern crate failure;
> +extern crate flate2;
> +extern crate libc;
> +
> +use std::cell::RefCell;
> +use std::collections::HashMap;
> +use std::ffi::CString;
> +use std::rc::{Rc, Weak};
> +
> +use std::fs::File;
> +use std::io::BufRead;
> +use std::io::BufReader;
> +use std::io::BufWriter;
> +use std::io::Write;
> +
> +use failure::Error;
> +use flate2::read;
> +
> +use clap::{App, Arg};
> +
> +fn main() -> Result<(), Error> {
> +    let matches = App::new(crate_name!())
> +        .version(crate_version!())
> +        .about(crate_description!())
> +        .arg(
> +            Arg::with_name("verbose")
> +                .short("v")
> +                .long("verbose")
> +                .help("Verbose output, can be specified multiple times")
> +                .multiple(true)
> +                .takes_value(false),
> +        )
> +        .arg(
> +            Arg::with_name("inputfile")
> +                .short("i")
> +                .long("inputfile")
> +                .help("Input file to use instead of /var/log/syslog, or '-' for stdin")
> +                .value_name("INPUTFILE"),
> +        )
> +        .arg(
> +            Arg::with_name("host")
> +                .short("h")
> +                .long("host")
> +                .help("Hostname or Server IP")
> +                .value_name("HOST"),
> +        )
> +        .arg(
> +            Arg::with_name("from")
> +                .short("f")
> +                .long("from")
> +                .help("Mails from SENDER")
> +                .value_name("SENDER"),
> +        )
> +        .arg(
> +            Arg::with_name("to")
> +                .short("t")
> +                .long("to")
> +                .help("Mails to RECIPIENT")
> +                .value_name("RECIPIENT"),
> +        )
> +        .arg(
> +            Arg::with_name("start")
> +                .short("s")
> +                .long("starttime")
> +                .help("Start time (YYYY-MM-DD HH:MM:SS) or seconds since epoch")
> +                .value_name("TIME"),
> +        )
> +        .arg(
> +            Arg::with_name("end")
> +                .short("e")
> +                .long("endtime")
> +                .help("End time (YYYY-MM-DD HH:MM:SS) or seconds since epoch")
> +                .value_name("TIME"),
> +        )
> +        .arg(
> +            Arg::with_name("msgid")
> +                .short("m")
> +                .long("message-id")
> +                .help("Message ID (exact match)")
> +                .value_name("MSGID"),
> +        )
> +        .arg(
> +            Arg::with_name("qids")
> +                .short("q")
> +                .long("queue-id")
> +                .help("Queue ID (exact match), can be specified multiple times")
> +                .value_name("QID")
> +                .multiple(true)
> +                .number_of_values(1),
> +        )
> +        .arg(
> +            Arg::with_name("search")
> +                .short("x")
> +                .long("search-string")
> +                .help("Search for string")
> +                .value_name("STRING"),
> +        )
> +        .arg(
> +            Arg::with_name("limit")
> +                .short("l")
> +                .long("limit")
> +                .help("Print MAX entries")
> +                .value_name("MAX")
> +                .default_value("0"),
> +        )
> +        .arg(
> +            Arg::with_name("exclude_greylist")
> +                .short("g")
> +                .long("exclude-greylist")
> +                .help("Exclude greylist entries"),
> +        )
> +        .arg(
> +            Arg::with_name("exclude_ndr")
> +                .short("n")
> +                .long("exclude-ndr")
> +                .help("Exclude NDR entries"),
> +        )
> +        .get_matches();
> +
> +    let mut parser = Parser::new();
> +    parser.handle_args(matches)?;
> +
> +    println!("# LogReader: {}", std::process::id());
> +    println!("# Query options");
> +    if !parser.options.from.is_empty() {
> +        println!("# Sender: {}", parser.options.from);
> +    }
> +    if !parser.options.to.is_empty() {
> +        println!("# Recipient: {}", parser.options.to);
> +    }
> +    if !parser.options.host.is_empty() {
> +        println!("# Server: {}", parser.options.host);
> +    }
> +    if !parser.options.msgid.is_empty() {
> +        println!("# MsgID: {}", parser.options.msgid);
> +    }
> +    for m in parser.options.match_list.iter() {
> +        match m {
> +            Match::Qid(b) => println!("# QID: {}", std::str::from_utf8(b)?),
> +            Match::RelLineNr(t, l) => println!("# QID: T{:8X}L{:08X}", *t as u32, *l as u32),
> +        }
> +    }
> +
> +    if !parser.options.string_match.is_empty() {
> +        println!("# Match: {}", parser.options.string_match);
> +    }
> +
> +    println!(
> +        "# Start: {} ({})",
> +        time::strftime("%F %T", &parser.start_tm)?,
> +        parser.options.start
> +    );
> +    println!(
> +        "# End: {} ({})",
> +        time::strftime("%F %T", &parser.end_tm)?,
> +        parser.options.end
> +    );
> +
> +    println!("# End Query Options\n");
> +    parser.parse_files()?;
> +
> +    Ok(())
> +}
> +
> +fn handle_pmg_smtp_filter_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 25) {
> +        Some((q, m)) => (q, m),
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let fe = get_or_create_fentry(&mut parser.fentries, qid);
> +
> +    if parser.string_match {
> +        fe.borrow_mut().string_match = parser.string_match;
> +    }
> +
> +    fe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if data.starts_with(b"accept mail to <") {
> +        let data = &data[16..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (to, data) = data.split_at(to_count);
> +        if !data.starts_with(b"> (") {
> +            return;
> +        }
> +        let data = &data[3..];
> +        let qid_count = data.iter().take_while(|b| (**b as char) != ')').count();
> +        let qid = &data[..qid_count];
> +
> +        fe.borrow_mut()
> +            .set_accept(to, qid, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    if data.starts_with(b"moved mail for <") {
> +        let data = &data[16..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (to, data) = data.split_at(to_count);
> +
> +        let qid_index = match find(data, b"quarantine - ") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let data = &data[qid_index + 13..];
> +        let (qid, _) = match parse_qid(data, 25) {
> +            Some(t) => t,
> +            None => return,
> +        };
> +
> +        fe.borrow_mut()
> +            .set_quarantine(to, qid, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    if data.starts_with(b"block mail to <") {
> +        let data = &data[15..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let to = &data[..to_count];
> +
> +        fe.borrow_mut()
> +            .set_block(to, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    if data.starts_with(b"processing time: ") {
> +        let data = &data[17..];
> +        let time_count = data.iter().take_while(|b| !b.is_ascii_whitespace()).count();
> +        let time = &data[..time_count];
> +
> +        fe.borrow_mut().set_processing_time(time);
> +        return;
> +    }
> +}
> +
> +fn handle_postscreen_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    if !msg.starts_with(b"NOQUEUE: reject: RCPT from ") {
> +        return;
> +    }
> +    let data = &msg[27..];
> +    let client_index = match find(data, b"; client [") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[client_index + 10..];
> +
> +    let client_count = data.iter().take_while(|b| (**b as char) != ']').count();
> +    let (client, data) = data.split_at(client_count);
> +
> +    let from_index = match find(data, b"; from=<") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[from_index + 8..];
> +
> +    let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +    let (from, data) = data.split_at(from_count);
> +
> +    if !data.starts_with(b">, to=<") {
> +        return;
> +    }
> +    let data = &data[7..];
> +    let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +    let to = &data[..to_count];
> +
> +    let se = get_or_create_sentry(
> +        &mut parser.sentries,
> +        parser.current_record_state.pid,
> +        parser.rel_line_nr,
> +        parser.current_record_state.timestamp,
> +    );
> +
> +    if parser.string_match {
> +        se.borrow_mut().string_match = parser.string_match;
> +    }
> +
> +    se.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +    se.borrow_mut().add_noqueue_entry(
> +        from,
> +        to,
> +        DStatus::Noqueue,
> +        parser.current_record_state.timestamp,
> +    );
> +    se.borrow_mut().set_connect(client);
> +    se.borrow_mut().disconnect = true;
> +    se.borrow_mut().print(parser);
> +    parser.free_sentry(parser.current_record_state.pid);
> +}
> +
> +fn handle_qmgr_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some(t) => t,
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +    qe.borrow_mut().cleanup = true;
> +    qe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if data.starts_with(b"from=<") {
> +        let data = &data[6..];
> +
> +        let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (from, data) = data.split_at(from_count);
> +
> +        if !data.starts_with(b">, size=") {
> +            return;
> +        }
> +        let data = &data[8..];
> +
> +        let size_count = data
> +            .iter()
> +            .take_while(|b| (**b as char).is_ascii_digit())
> +            .count();
> +        let (size, _) = data.split_at(size_count);
> +        qe.borrow_mut().from = from.into();
> +        qe.borrow_mut().size = unsafe { std::str::from_utf8_unchecked(size) }
> +            .parse()
> +            .unwrap();
> +    } else if data == b"removed" {
> +        qe.borrow_mut().removed = true;
> +        qe.borrow_mut().finalize(parser);
> +    }
> +}
> +
> +fn handle_lmtp_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some((q, t)) => (q, t),
> +        None => return,
> +    };
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +    qe.borrow_mut().cleanup = true;
> +    qe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    let data = &data[2..];
> +    if !data.starts_with(b"to=<") {
> +        return;
> +    }
> +    let data = &data[4..];
> +    let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +    let (to, data) = data.split_at(to_count);
> +
> +    let relay_index = match find(data, b"relay=") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[relay_index + 6..];
> +    let relay_count = data.iter().take_while(|b| (**b as char) != ',').count();
> +    let (relay, data) = data.split_at(relay_count);
> +
> +    let dsn_index = match find(data, b"dsn=") {
> +        Some(i) => i,
> +        None => return,
> +    };
> +    let data = &data[dsn_index + 4..];
> +    let dsn = match data.iter().next() {
> +        Some(b) => {
> +            if (*b as char).is_ascii_digit() {
> +                (*b as char).to_digit(10).unwrap()
> +            } else {
> +                return;
> +            }
> +        }
> +        None => return,
> +    };
> +
> +    qe.borrow_mut().add_to_entry(
> +        to,
> +        relay,
> +        DStatus::Dsn(dsn),
> +        parser.current_record_state.timestamp,
> +    );
> +
> +    if &*parser.current_record_state.sys == b"postfix/lmtp" {
> +        let sent_index = match find(data, b"status=sent (250 2.") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let mut data = &data[sent_index + 19..];
> +        if data.starts_with(b"5.0 OK") {
> +            data = &data[8..];
> +        } else if data.starts_with(b"7.0 BLOCKED") {
> +            data = &data[13..];
> +        } else {
> +            return;
> +        }
> +
> +        let (qid, _) = match parse_qid(data, 25) {
> +            Some(t) => t,
> +            None => return,
> +        };
> +
> +        qe.borrow_mut().filtered = true;
> +        if let Some(fe) = parser.fentries.get(qid) {
> +            qe.borrow_mut().filter = Some(Rc::downgrade(fe));
> +        }
> +    }
> +}
> +
> +fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let se = get_or_create_sentry(
> +        &mut parser.sentries,
> +        parser.current_record_state.pid,
> +        parser.rel_line_nr,
> +        parser.current_record_state.timestamp,
> +    );
> +
> +    if parser.string_match {
> +        se.borrow_mut().string_match = parser.string_match;
> +    }
> +    se.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if msg.starts_with(b"connect from ") {
> +        let addr = &msg[13..];
> +        se.borrow_mut().set_connect(addr);
> +        return;
> +    }
> +
> +    if msg.starts_with(b"disconnect from") {
> +        parser.sentries.remove(&parser.current_record_state.pid);
> +        se.borrow_mut().disconnect = true;
> +
> +        if se.borrow_mut().remove_unneeded_refs(parser) == 0 {
> +            se.borrow_mut().print(parser);
> +            parser.free_sentry(se.borrow().pid);
> +        } else {
> +            se.borrow_mut().finalize_refs(parser);
> +        }
> +        return;
> +    }
> +
> +    if msg.starts_with(b"NOQUEUE:") {
> +        let data = &msg[8..];
> +        let colon_index = match find(data, b":") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let data = &data[colon_index + 1..];
> +        let colon_index = match find(data, b":") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let data = &data[colon_index + 1..];
> +        let semicolon_index = match find(data, b";") {
> +            Some(i) => i,
> +            None => return,
> +        };
> +        let (grey, data) = data.split_at(semicolon_index);
> +        let dstatus = if find(
> +            grey,
> +            b"Recipient address rejected: Service is unavailable (try later)",
> +        )
> +        .is_some()
> +        {
> +            DStatus::Greylist
> +        } else {
> +            DStatus::Noqueue
> +        };
> +
> +        if !data.starts_with(b"; from=<") {
> +            return;
> +        }
> +        let data = &data[8..];
> +        let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let (from, data) = data.split_at(from_count);
> +
> +        if !data.starts_with(b"> to=<") {
> +            return;
> +        }
> +        let data = &data[6..];
> +        let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> +        let to = &data[..to_count];
> +
> +        se.borrow_mut()
> +            .add_noqueue_entry(from, to, dstatus, parser.current_record_state.timestamp);
> +        return;
> +    }
> +
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some(t) => t,
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +
> +    SEntry::add_ref(&se, &qe);
> +
> +    if !data.starts_with(b"client=") {
> +        return;
> +    }
> +    let data = &data[7..];
> +    let client_count = data
> +        .iter()
> +        .take_while(|b| !(**b as char).is_ascii_whitespace())
> +        .count();
> +    let client = &data[..client_count];
> +
> +    qe.borrow_mut().set_client(client);
> +}
> +
> +fn handle_cleanup_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> +    let (qid, data) = match parse_qid(msg, 15) {
> +        Some(t) => t,
> +        None => return,
> +    };
> +    let data = &data[2..];
> +
> +    let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> +    if parser.string_match {
> +        qe.borrow_mut().string_match = parser.string_match;
> +    }
> +    qe.borrow_mut()
> +        .log
> +        .push((complete_line.into(), parser.lines));
> +
> +    if !data.starts_with(b"message-id=") {
> +        return;
> +    }
> +    let data = &data[11..];
> +    let msgid_count = data
> +        .iter()
> +        .take_while(|b| !(**b as char).is_ascii_whitespace())
> +        .count();
> +    let msgid = &data[..msgid_count];
> +
> +    if !msgid.is_empty() {
> +        if qe.borrow().msgid.is_empty() {
> +            qe.borrow_mut().msgid = msgid.into();
> +        }
> +        qe.borrow_mut().cleanup = true;
> +    }
> +}
> +
> +#[derive(Default, Debug)]
> +struct NoqueueEntry {
> +    from: Box<[u8]>,
> +    to: Box<[u8]>,
> +    dstatus: DStatus,
> +    timestamp: u64,
> +}
> +
> +#[derive(Debug)]
> +struct ToEntry {
> +    to: Box<[u8]>,
> +    relay: Box<[u8]>,
> +    dstatus: DStatus,
> +    timestamp: u64,
> +}
> +
> +impl Default for ToEntry {
> +    fn default() -> Self {
> +        ToEntry {
> +            to: Default::default(),
> +            relay: (&b"none"[..]).into(),
> +            dstatus: Default::default(),
> +            timestamp: Default::default(),
> +        }
> +    }
> +}
> +
> +#[derive(Debug, PartialEq)]
> +enum DStatus {
> +    Invalid,
> +    Accept,
> +    Quarantine,
> +    Block,
> +    Greylist,
> +    Noqueue,
> +    Dsn(u32),
> +}
> +
> +impl Default for DStatus {
> +    fn default() -> Self {
> +        DStatus::Invalid
> +    }
> +}
> +
> +impl std::fmt::Display for DStatus {
> +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
> +        let c = match self {
> +            DStatus::Invalid => '\0', // other default
> +            DStatus::Accept => 'A',
> +            DStatus::Quarantine => 'Q',
> +            DStatus::Block => 'B',
> +            DStatus::Greylist => 'G',
> +            DStatus::Noqueue => 'N',
> +            DStatus::Dsn(v) => std::char::from_digit(*v, 10).unwrap(),
> +        };
> +        write!(f, "{}", c)
> +    }
> +}
> +
> +impl DStatus {
> +    fn is_dsn(&self, value: Option<u32>) -> bool {
> +        match (self, value) {
> +            (DStatus::Dsn(v), Some(val)) => *v == val,
> +            (DStatus::Dsn(_), None) => true,
> +            _ => false,
> +        }
> +    }
> +}
> +
> +#[derive(Debug, Default)]
> +struct SEntry {
> +    log: Vec<(Box<[u8]>, u64)>,
> +    connect: Box<[u8]>,
> +    cursor: Box<[u8]>,
> +    pid: u64,
> +    refs: Vec<Weak<RefCell<QEntry>>>,
> +    nq_entries: Vec<NoqueueEntry>,
> +    disconnect: bool,
> +    string_match: bool,
> +    timestamp: u64,
> +    rel_line_nr: u64,
> +}
> +
> +impl SEntry {
> +    fn add_noqueue_entry(&mut self, from: &[u8], to: &[u8], dstatus: DStatus, timestamp: u64) {
> +        let ne = NoqueueEntry {
> +            to: to.into(),
> +            from: from.into(),
> +            dstatus,
> +            timestamp,
> +        };
> +        self.nq_entries.push(ne);
> +    }
> +
> +    fn set_connect(&mut self, client: &[u8]) {
> +        if self.connect.is_empty() {
> +            self.connect = client.into();
> +        }
> +    }
> +
> +    fn print(&mut self, parser: &mut Parser) {
> +        if !parser.options.msgid.is_empty() {
> +            return;
> +        }
> +
> +        if !parser.options.host.is_empty() {
> +            if self.connect.is_empty() {
> +                return;
> +            }
> +            if find_lowercase(&self.connect, parser.options.host.as_bytes()).is_none() {
> +                return;
> +            }
> +        }
> +
> +        if !parser.options.match_list.is_empty() {
> +            let mut found = false;
> +            for m in parser.options.match_list.iter() {
> +                match m {
> +                    Match::Qid(_) => return,
> +                    Match::RelLineNr(t, l) => {
> +                        if (*t as u64) == self.timestamp && *l == self.rel_line_nr {
> +                            found = true;
> +                            break;
> +                        }
> +                    }
> +                }
> +            }
> +            if !found {
> +                return;
> +            }
> +        }
> +
> +        if !parser.options.from.is_empty()
> +            || !parser.options.to.is_empty()
> +            || parser.options.exclude_greylist
> +            || parser.options.exclude_ndr
> +        {
> +            let mut found = false;
> +            for nq in self.nq_entries.iter_mut().rev() {
> +                if !parser.options.from.is_empty()
> +                    && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none()
> +                {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +
> +                if parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +                if parser.options.exclude_ndr && nq.from.is_empty() {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +
> +                if !parser.options.to.is_empty()
> +                    && !nq.to.is_empty()
> +                    && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none()
> +                {
> +                    nq.dstatus = DStatus::Invalid;
> +                }
> +
> +                if nq.dstatus != DStatus::Invalid {
> +                    found = true;
> +                }
> +            }
> +
> +            if !found {
> +                return;
> +            }
> +        }
> +
> +        if !parser.options.string_match.is_empty() && !self.string_match {
> +            return;
> +        }
> +
> +        if parser.options.verbose > 0 {
> +            parser.write_all_ok(format!(
> +                "SMTPD: T{:8X}L{:08X}\n",
> +                self.timestamp as u32, self.rel_line_nr as u32
> +            ));
> +            parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime).as_bytes());
> +
> +            if !self.connect.is_empty() {
> +                parser.write_all_ok(b"CLIENT: ");
> +                parser.write_all_ok(&self.connect);
> +                parser.write_all_ok(b"\n");
> +            }
> +        }
> +
> +        for nq in self.nq_entries.iter().rev() {
> +            if nq.dstatus != DStatus::Invalid {
> +                parser.write_all_ok(format!(
> +                    "TO:{:X}:T{:08X}L{:08X}:{}: from <",
> +                    nq.timestamp as i32, self.timestamp as i32, self.rel_line_nr, nq.dstatus,
> +                ));
> +                parser.write_all_ok(&nq.from);
> +                parser.write_all_ok(b"> to <");
> +                parser.write_all_ok(&nq.to);
> +                parser.write_all_ok(b">\n");
> +                parser.count += 1;
> +            }
> +        }
> +
> +        if parser.options.verbose > 1 {
> +            parser.write_all_ok(b"LOGS:\n");
> +            for (log, line) in self.log.iter() {
> +                parser.write_all_ok(format!("L{:08X} ", *line as u32));
> +                parser.write_all_ok(log);
> +                parser.write_all_ok(b"\n");
> +            }
> +        }
> +        parser.write_all_ok(b"\n");
> +    }
> +
> +    fn delete_ref(&mut self, qentry: &Rc<RefCell<QEntry>>) {
> +        self.refs.retain(|q| {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => return false,
> +            };
> +            if Rc::ptr_eq(&q, qentry) {
> +                return false;
> +            }
> +            true
> +        });
> +    }
> +
> +    fn remove_unneeded_refs(&mut self, parser: &mut Parser) -> u32 {
> +        let mut count: u32 = 0;
> +        let mut to_delete = Vec::new();
> +        self.refs.retain(|q| {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => return false,
> +            };
> +            let is_cleanup = q.borrow().cleanup;
> +            if !is_cleanup {
> +                to_delete.push(q);
> +                false
> +            } else {
> +                count += 1;
> +                true
> +            }
> +        });
> +
> +        for q in to_delete {
> +            q.borrow_mut().smtpd = None;
> +            parser.free_qentry(&q.borrow().qid, Some(self));
> +        }
> +        count
> +    }
> +
> +    fn finalize_refs(&mut self, parser: &mut Parser) {
> +        let mut qentries = Vec::new();
> +        for q in self.refs.iter() {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => continue,
> +            };
> +
> +            if !q.borrow().removed {
> +                continue;
> +            }
> +
> +            let fe = &q.borrow().filter;
> +            if let Some(f) = fe {
> +                if let Some(f) = f.upgrade() {
> +                    if !f.borrow().finished {
> +                        continue;
> +                    }
> +                }
> +            }
> +
> +            qentries.push(Rc::clone(&q));
> +        }
> +
> +        for q in qentries.iter() {
> +            q.borrow_mut().print(parser, Some(self));
> +            q.borrow_mut().smtpd = None;
> +            parser.free_qentry(&q.borrow().qid, Some(self));
> +
> +            if let Some(f) = &q.borrow().filter {
> +                if let Some(f) = f.upgrade() {
> +                    parser.free_fentry(&f.borrow().logid);
> +                }
> +            }
> +        }
> +    }
> +
> +    fn add_ref(sentry: &Rc<RefCell<SEntry>>, qentry: &Rc<RefCell<QEntry>>) {
> +        let smtpd = qentry.borrow().smtpd.clone();
> +        if let Some(s) = smtpd {
> +            if !Rc::ptr_eq(sentry, &s) {
> +                eprintln!("Error: qentry ref already set");
> +            }
> +            return;
> +        }
> +
> +        for q in sentry.borrow().refs.iter() {
> +            let q = match q.upgrade() {
> +                Some(q) => q,
> +                None => continue,
> +            };
> +            if Rc::ptr_eq(&q, qentry) {
> +                return;
> +            }
> +        }
> +
> +        sentry.borrow_mut().refs.push(Rc::downgrade(qentry));
> +        qentry.borrow_mut().smtpd = Some(Rc::clone(sentry));
> +    }
> +}
> +
> +#[derive(Default, Debug)]
> +struct QEntry {
> +    log: Vec<(Box<[u8]>, u64)>,
> +    smtpd: Option<Rc<RefCell<SEntry>>>,
> +    filter: Option<Weak<RefCell<FEntry>>>,
> +    qid: Box<[u8]>,
> +    from: Box<[u8]>,
> +    client: Box<[u8]>,
> +    msgid: Box<[u8]>,
> +    size: u64,
> +    to_entries: Vec<ToEntry>,
> +    cleanup: bool,
> +    removed: bool,
> +    filtered: bool,
> +    string_match: bool,
> +}
> +
> +impl QEntry {
> +    fn add_to_entry(&mut self, to: &[u8], relay: &[u8], dstatus: DStatus, timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: relay.into(),
> +            dstatus,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn finalize(&mut self, parser: &mut Parser) {
> +        if self.removed {
> +            if let Some(se) = &self.smtpd {
> +                if !se.borrow().disconnect {
> +                    return;
> +                }
> +            }
> +
> +            if let Some(fe) = self.filter.clone() {
> +                if let Some(fe) = fe.upgrade() {
> +                    if !fe.borrow().finished {
> +                        return;
> +                    }
> +                }
> +
> +                match self.smtpd.clone() {
> +                    Some(s) => self.print(parser, Some(&*s.borrow())),
> +                    None => self.print(parser, None),
> +                };
> +                if let Some(se) = &self.smtpd {
> +                    parser.free_qentry(&self.qid, Some(&mut *se.borrow_mut()));
> +                } else {
> +                    parser.free_qentry(&self.qid, None);
> +                }
> +
> +                if let Some(fe) = fe.upgrade() {
> +                    parser.free_fentry(&fe.borrow().logid);
> +                }
> +            } else {
> +                if let Some(s) = self.smtpd.clone() {
> +                    self.print(parser, Some(&*s.borrow()));
> +                    parser.free_qentry(&self.qid, Some(&mut *s.borrow_mut()));
> +                } else {
> +                    self.print(parser, None);
> +                    parser.free_qentry(&self.qid, None);
> +                }
> +            }
> +        }
> +    }
> +
> +    fn msgid_matches(&self, parser: &Parser) -> bool {
> +        if !parser.options.msgid.is_empty() {
> +            if self.msgid.is_empty() {
> +                return false;
> +            }
> +            let qentry_msgid_lowercase = self.msgid.to_ascii_lowercase();
> +            let msgid_lowercase = parser.options.msgid.as_bytes().to_ascii_lowercase();
> +            if qentry_msgid_lowercase != msgid_lowercase {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn match_list_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> +        let fe = &self.filter;
> +        if !parser.options.match_list.is_empty() {
> +            let mut found = false;
> +            for m in parser.options.match_list.iter() {
> +                match m {
> +                    Match::Qid(q) => {
> +                        if let Some(f) = fe {
> +                            if let Some(f) = f.upgrade() {
> +                                if &f.borrow().logid == q {
> +                                    found = true;
> +                                    break;
> +                                }
> +                            }
> +                        }
> +                        if &self.qid == q {
> +                            found = true;
> +                            break;
> +                        }
> +                    }
> +                    Match::RelLineNr(t, l) => {
> +                        if let Some(s) = se {
> +                            if s.timestamp == (*t as u64) && s.rel_line_nr == *l {
> +                                found = true;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                }
> +            }
> +            if !found {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn host_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> +        if !parser.options.host.is_empty() {
> +            let mut found = false;
> +            if let Some(s) = se {
> +                if !s.connect.is_empty()
> +                    && find_lowercase(&s.connect, parser.options.host.as_bytes()).is_some()
> +                {
> +                    found = true;
> +                }
> +            }
> +            if !self.client.is_empty()
> +                && find_lowercase(&self.client, parser.options.host.as_bytes()).is_some()
> +            {
> +                found = true;
> +            }
> +
> +            if !found {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn from_to_matches(&mut self, parser: &Parser) -> bool {
> +        if !parser.options.from.is_empty() {
> +            if self.from.is_empty() {
> +                return false;
> +            }
> +            if find_lowercase(&self.from, parser.options.from.as_bytes()).is_none() {
> +                return false;
> +            }
> +        } else if parser.options.exclude_ndr && self.from.is_empty() {
> +            return false;
> +        }
> +
> +        if !parser.options.to.is_empty() {
> +            let mut found = false;
> +            self.to_entries.retain(|to| {
> +                if find_lowercase(&to.to, parser.options.to.as_bytes()).is_none() {
> +                    false
> +                } else {
> +                    found = true;
> +                    true
> +                }
> +            });
> +            if !found {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn string_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> +        let fe = &self.filter;
> +        if !parser.options.string_match.is_empty() {
> +            let mut string_match = self.string_match;
> +
> +            if let Some(s) = se {
> +                if s.string_match {
> +                    string_match = true;
> +                }
> +            }
> +            if let Some(f) = fe {
> +                if let Some(f) = f.upgrade() {
> +                    if f.borrow().string_match {
> +                        string_match = true;
> +                    }
> +                }
> +            }
> +            if !string_match {
> +                return false;
> +            }
> +        }
> +        true
> +    }
> +
> +    fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) {
> +        let fe = self.filter.clone();
> +
> +        if !self.msgid_matches(parser) {
> +            return;
> +        }
> +
> +        if !self.match_list_matches(parser, se) {
> +            return;
> +        }
> +
> +        if !self.host_matches(parser, se) {
> +            return;
> +        }
> +
> +        if !self.from_to_matches(parser) {
> +            return;
> +        }
> +
> +        if !self.string_matches(parser, se) {
> +            return;
> +        }
> +
> +        if parser.options.verbose > 0 {
> +            parser.write_all_ok(b"QENTRY: ");
> +            parser.write_all_ok(&self.qid);
> +            parser.write_all_ok(b"\n");
> +            parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime));
> +            parser.write_all_ok(format!("SIZE: {}\n", self.size));
> +
> +            if !self.client.is_empty() {
> +                parser.write_all_ok(b"CLIENT: ");
> +                parser.write_all_ok(&self.client);
> +                parser.write_all_ok(b"\n");
> +            } else if let Some(s) = se {
> +                if !s.connect.is_empty() {
> +                    parser.write_all_ok(b"CLIENT: ");
> +                    parser.write_all_ok(&s.connect);
> +                    parser.write_all_ok(b"\n");
> +                }
> +            }
> +
> +            if !self.msgid.is_empty() {
> +                parser.write_all_ok(b"MSGID: ");
> +                parser.write_all_ok(&self.msgid);
> +                parser.write_all_ok(b"\n");
> +            }
> +        }
> +
> +        for to in self.to_entries.iter().rev() {
> +            if !to.to.is_empty() {
> +                let final_rc;
> +                let final_borrow;
> +                let mut final_to: &ToEntry = to;
> +                if to.dstatus.is_dsn(Some(2)) {
> +                    if let Some(f) = &fe {
> +                        if let Some(f) = f.upgrade() {
> +                            final_rc = f;
> +                            final_borrow = final_rc.borrow();
> +                            for to2 in final_borrow.to_entries.iter().rev() {
> +                                if to.to == to2.to {
> +                                    final_to = to2;
> +                                    break;
> +                                }
> +                            }
> +                        }
> +                    }
> +                }
> +
> +                parser.write_all_ok(format!("TO:{:X}:", to.timestamp as i32,));
> +                parser.write_all_ok(&self.qid);
> +                parser.write_all_ok(format!(":{}: from <", final_to.dstatus));
> +                parser.write_all_ok(&self.from);
> +                parser.write_all_ok(b"> to <");
> +                parser.write_all_ok(&final_to.to);
> +                parser.write_all_ok(b"> (");
> +                parser.write_all_ok(&final_to.relay);
> +                parser.write_all_ok(b")\n");
> +                parser.count += 1;
> +            }
> +        }
> +
> +        if parser.options.verbose > 1 {
> +            let print_log = |parser: &mut Parser, logs: &Vec<(Box<[u8]>, u64)>| {
> +                for (log, line) in logs.iter() {
> +                    parser.write_all_ok(format!("L{:08X} ", *line as u32));
> +                    parser.write_all_ok(log);
> +                    parser.write_all_ok(b"\n");
> +                }
> +            };
> +            if let Some(s) = se {
> +                if !s.log.is_empty() {
> +                    parser.write_all_ok(b"SMTP:\n");
> +                    print_log(parser, &s.log);
> +                }
> +            }
> +
> +            if let Some(f) = fe {
> +                if let Some(f) = f.upgrade() {
> +                    if !f.borrow().log.is_empty() {
> +                        parser.write_all_ok(format!("FILTER: {}\n", unsafe {
> +                            std::str::from_utf8_unchecked(&f.borrow().logid)
> +                        }));
> +                        print_log(parser, &f.borrow().log);
> +                    }
> +                }
> +            }
> +
> +            if !self.log.is_empty() {
> +                parser.write_all_ok(b"QMGR:\n");
> +                print_log(parser, &self.log);
> +            }
> +        }
> +        parser.write_all_ok(b"\n")
> +    }
> +
> +    fn set_client(&mut self, client: &[u8]) {
> +        if self.client.is_empty() {
> +            self.client = client.into();
> +        }
> +    }
> +}
> +
> +#[derive(Default, Debug)]
> +struct FEntry {
> +    log: Vec<(Box<[u8]>, u64)>,
> +    logid: Box<[u8]>,
> +    to_entries: Vec<ToEntry>,
> +    processing_time: Box<[u8]>,
> +    string_match: bool,
> +    finished: bool,
> +}
> +
> +impl FEntry {
> +    fn set_accept(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: qid.into(),
> +            dstatus: DStatus::Accept,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn set_quarantine(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: qid.into(),
> +            dstatus: DStatus::Quarantine,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn set_block(&mut self, to: &[u8], timestamp: u64) {
> +        let te = ToEntry {
> +            to: to.into(),
> +            relay: (&b"none"[..]).into(),
> +            dstatus: DStatus::Block,
> +            timestamp,
> +        };
> +        self.to_entries.push(te);
> +    }
> +
> +    fn set_processing_time(&mut self, time: &[u8]) {
> +        self.processing_time = time.into();
> +        self.finished = true;
> +    }
> +}
> +
> +#[derive(Debug)]
> +struct Parser {
> +    sentries: HashMap<u64, Rc<RefCell<SEntry>>>,
> +    fentries: HashMap<Box<[u8]>, Rc<RefCell<FEntry>>>,
> +    qentries: HashMap<Box<[u8]>, Rc<RefCell<QEntry>>>,
> +
> +    current_record_state: RecordState,
> +    rel_line_nr: u64,
> +
> +    current_year: [i64; 32],
> +    current_month: i64,
> +    current_file_index: usize,
> +
> +    count: u64,
> +
> +    buffered_stdout: BufWriter<std::io::Stdout>,
> +
> +    options: Options,
> +
> +    start_tm: time::Tm,
> +    end_tm: time::Tm,
> +
> +    ctime: libc::time_t,
> +    string_match: bool,
> +
> +    lines: u64,
> +}
> +
> +impl Parser {
> +    fn new() -> Self {
> +        let mut years: [i64; 32] = [0; 32];
> +        let mut tv: libc::timeval = libc::timeval {
> +            tv_sec: 0,
> +            tv_usec: 0,
> +        };
> +        let mut ltime: *mut libc::tm;
> +
> +        for (i, year) in years.iter_mut().enumerate() {
> +            unsafe {
> +                libc::gettimeofday(&mut tv, std::ptr::null_mut());
> +            }
> +            tv.tv_sec -= (3600 * 24 * i) as i64;
> +            ltime = unsafe { libc::localtime(&tv.tv_sec) };
> +            *year = (unsafe { (*ltime).tm_year + 1900 }) as i64;
> +        }
> +
> +        Self {
> +            sentries: HashMap::new(),
> +            fentries: HashMap::new(),
> +            qentries: HashMap::new(),
> +            current_record_state: Default::default(),
> +            rel_line_nr: 0,
> +            current_year: years,
> +            current_month: 0,
> +            current_file_index: 0,
> +            count: 0,
> +            buffered_stdout: BufWriter::with_capacity(4 * 1024 * 1024, std::io::stdout()),
> +            options: Options::default(),
> +            start_tm: time::empty_tm(),
> +            end_tm: time::empty_tm(),
> +            ctime: 0,
> +            string_match: false,
> +            lines: 0,
> +        }
> +    }
> +
> +    fn free_sentry(&mut self, sentry_pid: u64) {
> +        self.sentries.remove(&sentry_pid);
> +    }
> +
> +    fn free_qentry(&mut self, qid: &[u8], se: Option<&mut SEntry>) {
> +        if let Some(qe) = self.qentries.get(qid) {
> +            if let Some(se) = se {
> +                se.delete_ref(qe);
> +            }
> +        }
> +
> +        self.qentries.remove(qid);
> +    }
> +
> +    fn free_fentry(&mut self, fentry_logid: &[u8]) {
> +        self.fentries.remove(fentry_logid);
> +    }
> +
> +    fn parse_files(&mut self) -> Result<(), Error> {
> +        if !self.options.inputfile.is_empty() {
> +            if self.options.inputfile == "-" {
> +                self.current_file_index = 0;
> +                let mut reader = BufReader::new(std::io::stdin());
> +                self.handle_input_by_line(&mut reader)?;
> +            } else if let Ok(file) = File::open(&self.options.inputfile) {
> +                self.current_file_index = 0;
> +                let mut reader = BufReader::with_capacity(4 * 1024 * 1024, file);
> +                self.handle_input_by_line(&mut reader)?;
> +            }
> +        } else {
> +            let filecount = self.count_files_in_time_range();
> +            for i in (0..filecount).rev() {
> +                self.current_month = 0;
> +                if let Ok(file) = File::open(LOGFILES[i]) {
> +                    self.current_file_index = i;
> +                    if i > 1 {
> +                        let gzdecoder = read::GzDecoder::new(file);
> +                        let mut reader = BufReader::with_capacity(4 * 1024 * 1024, gzdecoder);
> +                        self.handle_input_by_line(&mut reader)?;
> +                    } else {
> +                        let mut reader = BufReader::with_capacity(4 * 1024 * 1024, file);
> +                        self.handle_input_by_line(&mut reader)?;
> +                    }
> +                }
> +            }
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn handle_input_by_line(&mut self, reader: &mut dyn BufRead) -> Result<(), Error> {
> +        let mut buffer = Vec::<u8>::with_capacity(4096);
> +        let mut prev_time = 0;
> +        loop {
> +            if self.options.limit > 0 && (self.count >= self.options.limit) {
> +                self.write_all_ok("STATUS: aborted by limit (too many hits)\n");
> +                self.buffered_stdout.flush()?;
> +                std::process::exit(0);
> +            }
> +
> +            buffer.clear();
> +            let size = match reader.read_until(b'\n', &mut buffer) {
> +                Err(e) => return Err(e.into()),
> +                Ok(0) => return Ok(()),
> +                Ok(s) => s,
> +            };
> +            let line = &buffer[0..size - 1];
> +            let complete_line = line;
> +
> +            let (time, line) = match parse_time(
> +                line,
> +                self.current_year[self.current_file_index],
> +                &mut self.current_month,
> +            ) {
> +                Some(t) => t,
> +                None => continue,
> +            };
> +            if time != prev_time {
> +                self.rel_line_nr = 0;
> +            } else {
> +                self.rel_line_nr += 1;
> +            }
> +            prev_time = time;
> +
> +            if time < self.options.start {
> +                continue;
> +            }
> +            if time > self.options.end {
> +                break;
> +            }
> +
> +            self.lines += 1;
> +
> +            let (host, service, pid, line) = match parse_host_service_pid(line) {
> +                Some((h, s, p, l)) => (h, s, p, l),
> +                None => continue,
> +            };
> +
> +            self.ctime = time;
> +
> +            self.current_record_state.host = host.into();
> +            self.current_record_state.sys = service.into();
> +            self.current_record_state.pid = pid;
> +            self.current_record_state.timestamp = time as u64;
> +
> +            self.string_match = false;
> +            if !self.options.string_match.is_empty()
> +                && find(complete_line, self.options.string_match.as_bytes()).is_some()
> +            {
> +                self.string_match = true;
> +            }
> +
> +            if service == b"pmg-smtp-filter" {
> +                handle_pmg_smtp_filter_message(line, self, complete_line);
> +            } else if service == b"postfix/postscreen" {
> +                handle_postscreen_message(line, self, complete_line);
> +            } else if service == b"postfix/qmgr" {
> +                handle_qmgr_message(line, self, complete_line);
> +            } else if service == b"postfix/lmtp"
> +                || service == b"postfix/smtp"
> +                || service == b"postfix/local"
> +                || service == b"postfix/error"
> +            {
> +                handle_lmtp_message(line, self, complete_line);
> +            } else if service == b"postfix/smtpd" {
> +                handle_smtpd_message(line, self, complete_line);
> +            } else if service == b"postfix/cleanup" {
> +                handle_cleanup_message(line, self, complete_line);
> +            }
> +        }
> +        Ok(())
> +    }
> +
> +    /// Returns the number of files to parse. Does not error out if it can't access any file
> +    /// (permission denied)
> +    fn count_files_in_time_range(&mut self) -> usize {
> +        let mut count = 0;
> +        let mut buffer = Vec::new();
> +
> +        for (i, item) in LOGFILES.iter().enumerate() {
> +            self.current_month = 0;
> +
> +            count = i;
> +            if let Ok(file) = File::open(item) {
> +                self.current_file_index = i;
> +                buffer.clear();
> +                if i > 1 {
> +                    let gzdecoder = read::GzDecoder::new(file);
> +                    let mut reader = BufReader::new(gzdecoder);
> +                    if let Ok(size) = reader.read_until(b'\n', &mut buffer) {
> +                        if size == 0 {
> +                            return count;
> +                        }
> +                        if let Some((time, _)) = parse_time(
> +                            &buffer[0..size],
> +                            self.current_year[i],
> +                            &mut self.current_month,
> +                        ) {
> +                            if time < self.options.start {
> +                                break;
> +                            }
> +                        }
> +                    } else {
> +                        return count;
> +                    }
> +                } else {
> +                    let mut reader = BufReader::new(file);
> +                    if let Ok(size) = reader.read_until(b'\n', &mut buffer) {
> +                        if size == 0 {
> +                            return count;
> +                        }
> +                        if let Some((time, _)) = parse_time(
> +                            &buffer[0..size],
> +                            self.current_year[i],
> +                            &mut self.current_month,
> +                        ) {
> +                            if time < self.options.start {
> +                                break;
> +                            }
> +                        }
> +                    } else {
> +                        return count;
> +                    }
> +                }
> +            } else {
> +                return count;
> +            }
> +        }
> +
> +        count + 1
> +    }
> +
> +    fn handle_args(&mut self, args: clap::ArgMatches) -> Result<(), Error> {
> +        if let Some(inputfile) = args.value_of("inputfile") {
> +            self.options.inputfile = inputfile.to_string();
> +        }
> +
> +        if let Some(start) = args.value_of("start") {
> +            if let Ok(res) = time::strptime(&start, "%F %T") {
> +                self.options.start = mkgmtime(&res);
> +                self.start_tm = res;
> +            } else if let Ok(res) = time::strptime(&start, "%s") {
> +                self.options.start = mkgmtime(&res);
> +                self.start_tm = res;
> +            } else {
> +                failure::bail!(failure::err_msg("failed to parse start time"));
> +            }
> +        } else {
> +            let mut ltime = time::now();
> +            ltime.tm_sec = 0;
> +            ltime.tm_min = 0;
> +            ltime.tm_hour = 0;
> +            self.options.start = mkgmtime(&ltime);
> +            self.start_tm = ltime;
> +        }
> +
> +        if let Some(end) = args.value_of("end") {
> +            if let Ok(res) = time::strptime(&end, "%F %T") {
> +                self.options.end = mkgmtime(&res);
> +                self.end_tm = res;
> +            } else if let Ok(res) = time::strptime(&end, "%s") {
> +                self.options.end = mkgmtime(&res);
> +                self.end_tm = res;
> +            } else {
> +                failure::bail!(failure::err_msg("failed to parse end time"));
> +            }
> +        } else {
> +            let ltime = time::now();
> +            self.options.end = mkgmtime(&ltime);
> +            self.end_tm = ltime;
> +        }
> +
> +        if self.options.end < self.options.start {
> +            failure::bail!(failure::err_msg("end time before start time"));
> +        }
> +
> +        self.options.limit = match args.value_of("limit") {
> +            Some(l) => l.parse().unwrap(),
> +            None => 0,
> +        };
> +
> +        if let Some(qids) = args.values_of("qids") {
> +            for q in qids {
> +                let ltime: libc::time_t = 0;
> +                let rel_line_nr: libc::c_ulong = 0;
> +                let input = CString::new(q)?;
> +                let bytes = concat!("T%08lXL%08lX", "\0");
> +                let format =
> +                    unsafe { std::ffi::CStr::from_bytes_with_nul_unchecked(bytes.as_bytes()) };
> +                if unsafe {
> +                    libc::sscanf(input.as_ptr(), format.as_ptr(), &ltime, &rel_line_nr) == 2
> +                } {
> +                    self.options
> +                        .match_list
> +                        .push(Match::RelLineNr(ltime, rel_line_nr));
> +                } else {
> +                    self.options
> +                        .match_list
> +                        .push(Match::Qid(q.as_bytes().into()));
> +                }
> +            }
> +        }
> +
> +        if let Some(from) = args.value_of("from") {
> +            self.options.from = from.to_string();
> +        }
> +        if let Some(to) = args.value_of("to") {
> +            self.options.to = to.to_string();
> +        }
> +        if let Some(host) = args.value_of("host") {
> +            self.options.host = host.to_string();
> +        }
> +        if let Some(msgid) = args.value_of("msgid") {
> +            self.options.msgid = msgid.to_string();
> +        }
> +
> +        self.options.exclude_greylist = args.is_present("exclude_greylist");
> +        self.options.exclude_ndr = args.is_present("exclude_ndr");
> +
> +        self.options.verbose = args.occurrences_of("verbose") as _;
> +
> +        if let Some(string_match) = args.value_of("search") {
> +            self.options.string_match = string_match.to_string();
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn write_all_ok<T: AsRef<[u8]>>(&mut self, data: T) {
> +        self.buffered_stdout
> +            .write_all(data.as_ref())
> +            .expect("failed to write to stdout");
> +    }
> +}
> +
> +impl Drop for Parser {
> +    fn drop(&mut self) {
> +        let mut qentries = std::mem::replace(&mut self.qentries, HashMap::new());
> +        for q in qentries.values() {
> +            let smtpd = q.borrow().smtpd.clone();
> +            if let Some(s) = smtpd {
> +                q.borrow_mut().print(self, Some(&*s.borrow()));
> +            } else {
> +                q.borrow_mut().print(self, None);
> +            }
> +        }
> +        qentries.clear();
> +        let mut sentries = std::mem::replace(&mut self.sentries, HashMap::new());
> +        for s in sentries.values() {
> +            s.borrow_mut().print(self);
> +        }
> +        sentries.clear();
> +    }
> +}
> +
> +#[derive(Debug, Default)]
> +struct Options {
> +    match_list: Vec<Match>,
> +    inputfile: String,
> +    string_match: String,
> +    host: String,
> +    msgid: String,
> +    from: String,
> +    to: String,
> +    start: libc::time_t,
> +    end: libc::time_t,
> +    limit: u64,
> +    verbose: u32,
> +    exclude_greylist: bool,
> +    exclude_ndr: bool,
> +}
> +
> +#[derive(Debug)]
> +enum Match {
> +    Qid(Box<[u8]>),
> +    RelLineNr(libc::time_t, u64),
> +}
> +
> +#[derive(Debug, Default)]
> +struct RecordState {
> +    host: Box<[u8]>,
> +    sys: Box<[u8]>,
> +    pid: u64,
> +    timestamp: u64,
> +}
> +
> +fn get_or_create_qentry(
> +    qentries: &mut HashMap<Box<[u8]>, Rc<RefCell<QEntry>>>,
> +    qid: &[u8],
> +) -> Rc<RefCell<QEntry>> {
> +    if let Some(qe) = qentries.get(qid) {
> +        Rc::clone(qe)
> +    } else {
> +        let qe = Rc::new(RefCell::new(QEntry::default()));
> +        qe.borrow_mut().qid = qid.into();
> +        qentries.insert(qid.into(), qe.clone());
> +        qe
> +    }
> +}
> +
> +fn get_or_create_sentry(
> +    sentries: &mut HashMap<u64, Rc<RefCell<SEntry>>>,
> +    pid: u64,
> +    rel_line_nr: u64,
> +    timestamp: u64,
> +) -> Rc<RefCell<SEntry>> {
> +    if let Some(se) = sentries.get(&pid) {
> +        Rc::clone(se)
> +    } else {
> +        let se = Rc::new(RefCell::new(SEntry::default()));
> +        se.borrow_mut().rel_line_nr = rel_line_nr;
> +        se.borrow_mut().timestamp = timestamp;
> +        sentries.insert(pid, se.clone());
> +        se
> +    }
> +}
> +
> +fn get_or_create_fentry(
> +    fentries: &mut HashMap<Box<[u8]>, Rc<RefCell<FEntry>>>,
> +    qid: &[u8],
> +) -> Rc<RefCell<FEntry>> {
> +    if let Some(fe) = fentries.get(qid) {
> +        Rc::clone(fe)
> +    } else {
> +        let fe = Rc::new(RefCell::new(FEntry::default()));
> +        fe.borrow_mut().logid = qid.into();
> +        fentries.insert(qid.into(), fe.clone());
> +        fe
> +    }
> +}
> +
> +fn mkgmtime(tm: &time::Tm) -> libc::time_t {
> +    let mut res: libc::time_t;
> +
> +    let mut year = (tm.tm_year + 1900) as i64;
> +    let mon = tm.tm_mon;
> +
> +    res = (year - 1970) * 365 + CAL_MTOD[mon as usize];
> +
> +    if mon <= 1 {
> +        year -= 1;
> +    }
> +
> +    res += (year - 1968) / 4;
> +    res -= (year - 1900) / 100;
> +    res += (year - 1600) / 400;
> +
> +    res += (tm.tm_mday - 1) as i64;
> +    res = res * 24 + tm.tm_hour as i64;
> +    res = res * 60 + tm.tm_min as i64;
> +    res = res * 60 + tm.tm_sec as i64;
> +
> +    res
> +}
> +
> +const LOGFILES: [&str; 32] = [
> +    "/var/log/syslog",
> +    "/var/log/syslog.1",
> +    "/var/log/syslog.2.gz",
> +    "/var/log/syslog.3.gz",
> +    "/var/log/syslog.4.gz",
> +    "/var/log/syslog.5.gz",
> +    "/var/log/syslog.6.gz",
> +    "/var/log/syslog.7.gz",
> +    "/var/log/syslog.8.gz",
> +    "/var/log/syslog.9.gz",
> +    "/var/log/syslog.10.gz",
> +    "/var/log/syslog.11.gz",
> +    "/var/log/syslog.12.gz",
> +    "/var/log/syslog.13.gz",
> +    "/var/log/syslog.14.gz",
> +    "/var/log/syslog.15.gz",
> +    "/var/log/syslog.16.gz",
> +    "/var/log/syslog.17.gz",
> +    "/var/log/syslog.18.gz",
> +    "/var/log/syslog.19.gz",
> +    "/var/log/syslog.20.gz",
> +    "/var/log/syslog.21.gz",
> +    "/var/log/syslog.22.gz",
> +    "/var/log/syslog.23.gz",
> +    "/var/log/syslog.24.gz",
> +    "/var/log/syslog.25.gz",
> +    "/var/log/syslog.26.gz",
> +    "/var/log/syslog.27.gz",
> +    "/var/log/syslog.28.gz",
> +    "/var/log/syslog.29.gz",
> +    "/var/log/syslog.30.gz",
> +    "/var/log/syslog.31.gz",
> +];
> +
> +/// Parse a QID ([A-Z]+). Returns a tuple of (qid, remaining_text) or None.
> +fn parse_qid(data: &[u8], max: usize) -> Option<(&[u8], &[u8])> {
> +    let mut bytes = data.iter();
> +    let mut count = 0;
> +    while count < max {
> +        match bytes.next() {
> +            Some(c) => {
> +                if c.is_ascii_hexdigit() {
> +                    count += 1;
> +                } else {
> +                    break;
> +                }
> +            }
> +            None => break,
> +        }
> +    }
> +    if count > 1 {
> +        return Some((&data[..count], &data[count..]));
> +    }
> +    None
> +}
> +
> +/// Parse a number. Returns a tuple of (parsed_number, remaining_text) or None.
> +fn parse_number(data: &[u8], max_digits: usize) -> Option<(usize, &[u8])> {
> +    let mut bytes = data.iter();
> +    let mut number: usize = 0;
> +    let mut digits: usize = 0;
> +    while digits < max_digits {
> +        let c = match bytes.next() {
> +            Some(c) => c,
> +            None => break,
> +        };
> +        if (*c as char).is_digit(10) {
> +            digits += 1;
> +            number *= 10;
> +            number += (*c as char).to_digit(10).unwrap() as usize;
> +        } else {
> +            break;
> +        }
> +    }
> +    if digits == 0 {
> +        None
> +    } else {
> +        Some((number, &data[digits..]))
> +    }
> +}
> +
> +/// Parse time. Returns a tuple of (parsed_time, remaining_text) or None.
> +fn parse_time<'a>(
> +    data: &'a [u8],
> +    cur_year: i64,
> +    cur_month: &mut i64,
> +) -> Option<(libc::time_t, &'a [u8])> {
> +    if data.len() < 15 {
> +        return None;
> +    }
> +
> +    let mon = match &data[0..3] {
> +        b"Jan" => 0,
> +        b"Feb" => 1,
> +        b"Mar" => 2,
> +        b"Apr" => 3,
> +        b"May" => 4,
> +        b"Jun" => 5,
> +        b"Jul" => 6,
> +        b"Aug" => 7,
> +        b"Sep" => 8,
> +        b"Oct" => 9,
> +        b"Nov" => 10,
> +        b"Dec" => 11,
> +        _ => return None,
> +    };
> +    let data = &data[3..];
> +
> +    let mut ltime: libc::time_t;
> +    let mut year = cur_year;
> +
> +    if *cur_month == 11 && mon == 0 {
> +        year += 1;
> +    }
> +    if mon > *cur_month {
> +        *cur_month = mon;
> +    }
> +
> +    ltime = (year - 1970) * 365 + CAL_MTOD[mon as usize];
> +
> +    if mon <= 1 {
> +        year -= 1;
> +    }
> +
> +    ltime += (year - 1968) / 4;
> +    ltime -= (year - 1900) / 100;
> +    ltime += (year - 1600) / 400;
> +
> +    let whitespace_count = data.iter().take_while(|b| b.is_ascii_whitespace()).count();
> +    let data = &data[whitespace_count..];
> +
> +    let (mday, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no day matched");
> +            return None;
> +        }
> +    };
> +    if mday == 0 {
> +        eprintln!("mday == 0");
> +        return None;
> +    }
> +
> +    ltime += (mday - 1) as i64;
> +
> +    let data = &data[1..];
> +
> +    let (hour, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no hour matched");
> +            return None;
> +        }
> +    };
> +
> +    ltime *= 24;
> +    ltime += hour as i64;
> +
> +    if let Some(c) = data.iter().next() {
> +        if (*c as char) != ':' {
> +            eprintln!("char != ':'");
> +            return None;
> +        }
> +    } else {
> +        eprintln!("no next char");
> +        return None;
> +    }
> +    let data = &data[1..];
> +
> +    let (min, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no min matched");
> +            return None;
> +        }
> +    };
> +
> +    ltime *= 60;
> +    ltime += min as i64;
> +
> +    if let Some(c) = data.iter().next() {
> +        if (*c as char) != ':' {
> +            eprintln!("char != ':'");
> +            return None;
> +        }
> +    } else {
> +        eprintln!("no next char");
> +        return None;
> +    }
> +    let data = &data[1..];
> +
> +    let (sec, data) = match parse_number(data, 2) {
> +        Some(t) => t,
> +        None => {
> +            eprintln!("no sec matched");
> +            return None;
> +        }
> +    };
> +
> +    ltime *= 60;
> +    ltime += sec as i64;
> +
> +    let data = &data[1..];
> +
> +    Some((ltime, data))
> +}
> +
> +const CAL_MTOD: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
> +
> +type ByteSlice<'a> = &'a [u8];
> +/// Parse Host, Service and PID at the beginning of data. Returns a tuple of (host, service, pid, remaining_text).
> +fn parse_host_service_pid(data: &[u8]) -> Option<(ByteSlice, ByteSlice, u64, ByteSlice)> {
> +    let host_count = data
> +        .iter()
> +        .take_while(|b| !(**b as char).is_ascii_whitespace())
> +        .count();
> +    let host = &data[0..host_count];
> +    let data = &data[host_count + 1..]; // whitespace after host
> +
> +    let service_count = data
> +        .iter()
> +        .take_while(|b| {
> +            (**b as char).is_ascii_alphabetic() || (**b as char) == '/' || (**b as char) == '-'
> +        })
> +        .count();
> +    let service = &data[0..service_count];
> +    let data = &data[service_count..];
> +    if data.get(0) != Some(&b'[') {
> +        return None;
> +    }
> +    let data = &data[1..];
> +
> +    let pid_count = data
> +        .iter()
> +        .take_while(|b| (**b as char).is_ascii_digit())
> +        .count();
> +    let pid = match unsafe { std::str::from_utf8_unchecked(&data[0..pid_count]) }.parse() {
> +        // all ascii digits so valid utf8
> +        Ok(p) => p,
> +        Err(e) => {
> +            eprintln!("failed to parse PID: {}", e);
> +            return None;
> +        }
> +    };
> +    let data = &data[pid_count..];
> +    if !data.starts_with(b"]: ") {
> +        eprintln!("error after PID");
> +        return None;
> +    }
> +    let data = &data[3..];
> +
> +    Some((host, service, pid, data))
> +}
> +
> +/// A find implementation for [u8]. Returns the index or None.
> +fn find<T: PartialOrd>(data: &[T], needle: &[T]) -> Option<usize> {
> +    data.windows(needle.len()).position(|d| d == needle)
> +}
> +
> +/// A find implementation for [u8] that converts to lowercase before the comparison. Returns the
> +/// index or None.
> +fn find_lowercase(data: &[u8], needle: &[u8]) -> Option<usize> {
> +    let data = data.to_ascii_lowercase();
> +    let needle = needle.to_ascii_lowercase();
> +    data.windows(needle.len()).position(|d| d == &needle[..])
> +}



More information about the pmg-devel mailing list