[pmg-devel] [PATCH v3 pmg-log-tracker 1/2] rewrite in rust
Mira Limbeck
m.limbeck at proxmox.com
Thu Nov 14 16:06:02 CET 2019
Somehow the output is now different from the C version (missing smtpd
log entries). Don't apply yet.
On 11/8/19 4:09 PM, Mira Limbeck wrote:
> pmg-log-tracker has been rewritten in Rust. Functionality is the same.
> Output sometimes has a different order than the pmg-log-tracker in C.
> This only happens when the time of the entries match.
>
> There's one change regarding the interface. In addition to the short
> versions of arguments also long versions exist.
>
> The implementation uses Rc<>, Weak<> and RefCell<> to make holding mutable
> cross-references possible, without having to change the original logic
> completely. This allowed for easier translation from C to Rust.
>
> The file debian/cargo-checksum.json is required by dh-cargo, otherwise
> it won't compile. The cargo-checksum.json should contain the upstream
> .crate file which does not exist in this case, so we just create an
> empty one with the required keys. (see 'Library package structure' in
> https://wiki.debian.org/Teams/RustPackaging/Policy)
>
> The change to the minimum version of debhelper required was done
> according to other rust packages (rust-clap, rust-bindgen, rust-ripgrep).
>
> Signed-off-by: Mira Limbeck <m.limbeck at proxmox.com>
> ---
> v3:
> - Use Weak<> together with Rc<> so that there are no cycles that can
> keep the objects alive longer than necessary. Reduces memory
> footprint from ~1.8G to 190M with ~1.8G syslog files
> - cleanup: simplification of QEntry, SEntry and FEntry creation/lookup
> - cleanup: moved some function/struct declarations around
> - benchmarks using /usr/bin/time instead of zsh built-in
>
> v2:
> - fixed an issue where the incorrect number of files got returned by
> Parser::count_relevant_files()
> - remove src/pmg-log-tracker.c and src/Makefile
> - added simple benchmarks
>
> Regarding the version, how do you want to handle it? Keep the Cargo.toml
> version in sync with the package version (major, minor)?
>
> This already contains the changes @Wolfgang recommended, simplifying a
> lot of code, especially the printing. Also got rid of one RefCell and
> one Cell usage, as we can now pass the Parser as mutable reference to
> both print() functions.
>
> Some simple benchmarks: (32 syslog files (syslog to syslog.31.gz))
>
> Rust: (median of 5 runs + 1 for cache)
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -v > /dev/null
> User time (seconds): 9.01
> System time (seconds): 0.10
> Elapsed (wall clock) time (h:mm:ss or m:ss): 0:09.12
> Maximum resident set size (kbytes): 194156
>
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -vv > /dev/null
> User time (seconds): 10.17
> System time (seconds): 0.07
> Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.24
> Maximum resident set size (kbytes): 194096
>
> C: (same as for the rust version)
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -v > /dev/null
> User time (seconds): 10.41
> System time (seconds): 0.25
> Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.67
> Maximum resident set size (kbytes): 148556
>
> /usr/bin/time -v sudo pmg-log-tracker -s 0 -vv > /dev/null
> User time (seconds): 11.67
> System time (seconds): 0.35
> Elapsed (wall clock) time (h:mm:ss or m:ss): 0:12.02
> Maximum resident set size (kbytes): 148536
>
> The -v version is used by the GUI.
>
> Cargo.toml | 12 +
> Makefile | 4 +-
> debian/cargo-checksum.json | 1 +
> debian/control | 17 +-
> debian/rules | 2 +-
> src/main.rs | 2006 ++++++++++++++++++++++++++++++++++++
> 6 files changed, 2037 insertions(+), 5 deletions(-)
> create mode 100644 Cargo.toml
> create mode 100644 debian/cargo-checksum.json
> create mode 100644 src/main.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> new file mode 100644
> index 0000000..4cf75f1
> --- /dev/null
> +++ b/Cargo.toml
> @@ -0,0 +1,12 @@
> +[package]
> +name = "pmg-log-tracker"
> +version = "2.1.0"
> +authors = ["Mira Limbeck <m.limbeck at proxmox.com>", "Dietmar Maurer <dietmar at proxmox.com>"]
> +edition = "2018"
> +
> +[dependencies]
> +clap = "2.32.0"
> +failure = "0.1.5"
> +flate2 = "1.0.6"
> +libc = "0.2.48"
> +time = "0.1.42"
> diff --git a/Makefile b/Makefile
> index 1e344dd..b50fe02 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -14,7 +14,9 @@ all: ${DEB}
> .PHONY: ${BUILDDIR}
> ${BUILDDIR}: src
> rm -rf ${BUILDDIR} ${BUILDDIR}.tmp
> - cp -a src ${BUILDDIR}.tmp
> + mkdir ${BUILDDIR}.tmp
> + cp -a src ${BUILDDIR}.tmp/src
> + cp Cargo.toml ${BUILDDIR}.tmp/
> cp -a debian ${BUILDDIR}.tmp/debian
> echo "git clone git://git.proxmox.com/git/pmg-log-tracker.git\\ngit checkout ${GITVERSION}" > ${BUILDDIR}.tmp/debian/SOURCE
> mv ${BUILDDIR}.tmp ${BUILDDIR}
> diff --git a/debian/cargo-checksum.json b/debian/cargo-checksum.json
> new file mode 100644
> index 0000000..ce2579b
> --- /dev/null
> +++ b/debian/cargo-checksum.json
> @@ -0,0 +1 @@
> +{"package":"","files":{}}
> diff --git a/debian/control b/debian/control
> index 26210a3..e02c296 100644
> --- a/debian/control
> +++ b/debian/control
> @@ -2,13 +2,24 @@ Source: pmg-log-tracker
> Section: admin
> Priority: optional
> Maintainer: Proxmox Support Team <support at proxmox.com>
> -Build-Depends: debhelper (>= 10~),
> - libglib2.0-dev (>= 2.42.1)
> +Build-Depends: cargo:native,
> + debhelper (>= 11),
> + dh-cargo (>= 10),
> + librust-clap-dev,
> + librust-failure-dev,
> + librust-flate2-dev,
> + librust-libc-dev,
> + librust-time-dev,
> + libstd-rust-dev,
> + rustc:native
> Standards-Version: 3.9.8
> Homepage: http://www.proxmox.com
>
> Package: pmg-log-tracker
> Architecture: any
> -Depends: ${shlibs:Depends}, ${misc:Depends}
> +Depends: ${misc:Depends},
> + ${shlibs:Depends}
> +Built-Using: ${cargo:Built-Using}
> +XB-X-Cargo-Built-Using: ${cargo:X-Cargo-Built-Using}
> Description: Proxmox Mailgateway Log Tracker
> Tools to scan mail logs.
> diff --git a/debian/rules b/debian/rules
> index a7521b6..e3ba5d4 100755
> --- a/debian/rules
> +++ b/debian/rules
> @@ -5,4 +5,4 @@
>
>
> %:
> - dh $@
> + dh $@ --buildsystem cargo
> diff --git a/src/main.rs b/src/main.rs
> new file mode 100644
> index 0000000..076be2b
> --- /dev/null
> +++ b/src/main.rs
> @@ -0,0 +1,2006 @@
> +#[macro_use]
> +extern crate clap;
> +extern crate failure;
> +extern crate flate2;
> +extern crate libc;
> +
> +use std::cell::RefCell;
> +use std::collections::HashMap;
> +use std::ffi::CString;
> +use std::rc::{Rc, Weak};
> +
> +use std::fs::File;
> +use std::io::BufRead;
> +use std::io::BufReader;
> +use std::io::BufWriter;
> +use std::io::Write;
> +
> +use failure::Error;
> +use flate2::read;
> +
> +use clap::{App, Arg};
> +
> +fn main() -> Result<(), Error> {
> + let matches = App::new(crate_name!())
> + .version(crate_version!())
> + .about(crate_description!())
> + .arg(
> + Arg::with_name("verbose")
> + .short("v")
> + .long("verbose")
> + .help("Verbose output, can be specified multiple times")
> + .multiple(true)
> + .takes_value(false),
> + )
> + .arg(
> + Arg::with_name("inputfile")
> + .short("i")
> + .long("inputfile")
> + .help("Input file to use instead of /var/log/syslog, or '-' for stdin")
> + .value_name("INPUTFILE"),
> + )
> + .arg(
> + Arg::with_name("host")
> + .short("h")
> + .long("host")
> + .help("Hostname or Server IP")
> + .value_name("HOST"),
> + )
> + .arg(
> + Arg::with_name("from")
> + .short("f")
> + .long("from")
> + .help("Mails from SENDER")
> + .value_name("SENDER"),
> + )
> + .arg(
> + Arg::with_name("to")
> + .short("t")
> + .long("to")
> + .help("Mails to RECIPIENT")
> + .value_name("RECIPIENT"),
> + )
> + .arg(
> + Arg::with_name("start")
> + .short("s")
> + .long("starttime")
> + .help("Start time (YYYY-MM-DD HH:MM:SS) or seconds since epoch")
> + .value_name("TIME"),
> + )
> + .arg(
> + Arg::with_name("end")
> + .short("e")
> + .long("endtime")
> + .help("End time (YYYY-MM-DD HH:MM:SS) or seconds since epoch")
> + .value_name("TIME"),
> + )
> + .arg(
> + Arg::with_name("msgid")
> + .short("m")
> + .long("message-id")
> + .help("Message ID (exact match)")
> + .value_name("MSGID"),
> + )
> + .arg(
> + Arg::with_name("qids")
> + .short("q")
> + .long("queue-id")
> + .help("Queue ID (exact match), can be specified multiple times")
> + .value_name("QID")
> + .multiple(true)
> + .number_of_values(1),
> + )
> + .arg(
> + Arg::with_name("search")
> + .short("x")
> + .long("search-string")
> + .help("Search for string")
> + .value_name("STRING"),
> + )
> + .arg(
> + Arg::with_name("limit")
> + .short("l")
> + .long("limit")
> + .help("Print MAX entries")
> + .value_name("MAX")
> + .default_value("0"),
> + )
> + .arg(
> + Arg::with_name("exclude_greylist")
> + .short("g")
> + .long("exclude-greylist")
> + .help("Exclude greylist entries"),
> + )
> + .arg(
> + Arg::with_name("exclude_ndr")
> + .short("n")
> + .long("exclude-ndr")
> + .help("Exclude NDR entries"),
> + )
> + .get_matches();
> +
> + let mut parser = Parser::new();
> + parser.handle_args(matches)?;
> +
> + println!("# LogReader: {}", std::process::id());
> + println!("# Query options");
> + if !parser.options.from.is_empty() {
> + println!("# Sender: {}", parser.options.from);
> + }
> + if !parser.options.to.is_empty() {
> + println!("# Recipient: {}", parser.options.to);
> + }
> + if !parser.options.host.is_empty() {
> + println!("# Server: {}", parser.options.host);
> + }
> + if !parser.options.msgid.is_empty() {
> + println!("# MsgID: {}", parser.options.msgid);
> + }
> + for m in parser.options.match_list.iter() {
> + match m {
> + Match::Qid(b) => println!("# QID: {}", std::str::from_utf8(b)?),
> + Match::RelLineNr(t, l) => println!("# QID: T{:8X}L{:08X}", *t as u32, *l as u32),
> + }
> + }
> +
> + if !parser.options.string_match.is_empty() {
> + println!("# Match: {}", parser.options.string_match);
> + }
> +
> + println!(
> + "# Start: {} ({})",
> + time::strftime("%F %T", &parser.start_tm)?,
> + parser.options.start
> + );
> + println!(
> + "# End: {} ({})",
> + time::strftime("%F %T", &parser.end_tm)?,
> + parser.options.end
> + );
> +
> + println!("# End Query Options\n");
> + parser.parse_files()?;
> +
> + Ok(())
> +}
> +
> +fn handle_pmg_smtp_filter_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> + let (qid, data) = match parse_qid(msg, 25) {
> + Some((q, m)) => (q, m),
> + None => return,
> + };
> + let data = &data[2..];
> +
> + let fe = get_or_create_fentry(&mut parser.fentries, qid);
> +
> + if parser.string_match {
> + fe.borrow_mut().string_match = parser.string_match;
> + }
> +
> + fe.borrow_mut()
> + .log
> + .push((complete_line.into(), parser.lines));
> +
> + if data.starts_with(b"accept mail to <") {
> + let data = &data[16..];
> + let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let (to, data) = data.split_at(to_count);
> + if !data.starts_with(b"> (") {
> + return;
> + }
> + let data = &data[3..];
> + let qid_count = data.iter().take_while(|b| (**b as char) != ')').count();
> + let qid = &data[..qid_count];
> +
> + fe.borrow_mut()
> + .set_accept(to, qid, parser.current_record_state.timestamp);
> + return;
> + }
> +
> + if data.starts_with(b"moved mail for <") {
> + let data = &data[16..];
> + let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let (to, data) = data.split_at(to_count);
> +
> + let qid_index = match find(data, b"quarantine - ") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[qid_index + 13..];
> + let (qid, _) = match parse_qid(data, 25) {
> + Some(t) => t,
> + None => return,
> + };
> +
> + fe.borrow_mut()
> + .set_quarantine(to, qid, parser.current_record_state.timestamp);
> + return;
> + }
> +
> + if data.starts_with(b"block mail to <") {
> + let data = &data[15..];
> + let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let to = &data[..to_count];
> +
> + fe.borrow_mut()
> + .set_block(to, parser.current_record_state.timestamp);
> + return;
> + }
> +
> + if data.starts_with(b"processing time: ") {
> + let data = &data[17..];
> + let time_count = data.iter().take_while(|b| !b.is_ascii_whitespace()).count();
> + let time = &data[..time_count];
> +
> + fe.borrow_mut().set_processing_time(time);
> + return;
> + }
> +}
> +
> +fn handle_postscreen_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> + if !msg.starts_with(b"NOQUEUE: reject: RCPT from ") {
> + return;
> + }
> + let data = &msg[27..];
> + let client_index = match find(data, b"; client [") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[client_index + 10..];
> +
> + let client_count = data.iter().take_while(|b| (**b as char) != ']').count();
> + let (client, data) = data.split_at(client_count);
> +
> + let from_index = match find(data, b"; from=<") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[from_index + 8..];
> +
> + let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let (from, data) = data.split_at(from_count);
> +
> + if !data.starts_with(b">, to=<") {
> + return;
> + }
> + let data = &data[7..];
> + let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let to = &data[..to_count];
> +
> + let se = get_or_create_sentry(
> + &mut parser.sentries,
> + parser.current_record_state.pid,
> + parser.rel_line_nr,
> + parser.current_record_state.timestamp,
> + );
> +
> + if parser.string_match {
> + se.borrow_mut().string_match = parser.string_match;
> + }
> +
> + se.borrow_mut()
> + .log
> + .push((complete_line.into(), parser.lines));
> + se.borrow_mut().add_noqueue_entry(
> + from,
> + to,
> + DStatus::Noqueue,
> + parser.current_record_state.timestamp,
> + );
> + se.borrow_mut().set_connect(client);
> + se.borrow_mut().disconnect = true;
> + se.borrow_mut().print(parser);
> + parser.free_sentry(parser.current_record_state.pid);
> +}
> +
> +fn handle_qmgr_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> + let (qid, data) = match parse_qid(msg, 15) {
> + Some(t) => t,
> + None => return,
> + };
> + let data = &data[2..];
> +
> + let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> + if parser.string_match {
> + qe.borrow_mut().string_match = parser.string_match;
> + }
> + qe.borrow_mut().cleanup = true;
> + qe.borrow_mut()
> + .log
> + .push((complete_line.into(), parser.lines));
> +
> + if data.starts_with(b"from=<") {
> + let data = &data[6..];
> +
> + let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let (from, data) = data.split_at(from_count);
> +
> + if !data.starts_with(b">, size=") {
> + return;
> + }
> + let data = &data[8..];
> +
> + let size_count = data
> + .iter()
> + .take_while(|b| (**b as char).is_ascii_digit())
> + .count();
> + let (size, _) = data.split_at(size_count);
> + qe.borrow_mut().from = from.into();
> + qe.borrow_mut().size = unsafe { std::str::from_utf8_unchecked(size) }
> + .parse()
> + .unwrap();
> + } else if data == b"removed" {
> + qe.borrow_mut().removed = true;
> + qe.borrow_mut().finalize(parser);
> + }
> +}
> +
> +fn handle_lmtp_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> + let (qid, data) = match parse_qid(msg, 15) {
> + Some((q, t)) => (q, t),
> + None => return,
> + };
> +
> + let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> + if parser.string_match {
> + qe.borrow_mut().string_match = parser.string_match;
> + }
> + qe.borrow_mut().cleanup = true;
> + qe.borrow_mut()
> + .log
> + .push((complete_line.into(), parser.lines));
> +
> + let data = &data[2..];
> + if !data.starts_with(b"to=<") {
> + return;
> + }
> + let data = &data[4..];
> + let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let (to, data) = data.split_at(to_count);
> +
> + let relay_index = match find(data, b"relay=") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[relay_index + 6..];
> + let relay_count = data.iter().take_while(|b| (**b as char) != ',').count();
> + let (relay, data) = data.split_at(relay_count);
> +
> + let dsn_index = match find(data, b"dsn=") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[dsn_index + 4..];
> + let dsn = match data.iter().next() {
> + Some(b) => {
> + if (*b as char).is_ascii_digit() {
> + (*b as char).to_digit(10).unwrap()
> + } else {
> + return;
> + }
> + }
> + None => return,
> + };
> +
> + qe.borrow_mut().add_to_entry(
> + to,
> + relay,
> + DStatus::Dsn(dsn),
> + parser.current_record_state.timestamp,
> + );
> +
> + if &*parser.current_record_state.sys == b"postfix/lmtp" {
> + let sent_index = match find(data, b"status=sent (250 2.") {
> + Some(i) => i,
> + None => return,
> + };
> + let mut data = &data[sent_index + 19..];
> + if data.starts_with(b"5.0 OK") {
> + data = &data[8..];
> + } else if data.starts_with(b"7.0 BLOCKED") {
> + data = &data[13..];
> + } else {
> + return;
> + }
> +
> + let (qid, _) = match parse_qid(data, 25) {
> + Some(t) => t,
> + None => return,
> + };
> +
> + qe.borrow_mut().filtered = true;
> + if let Some(fe) = parser.fentries.get(qid) {
> + qe.borrow_mut().filter = Some(Rc::downgrade(fe));
> + }
> + }
> +}
> +
> +fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> + let se = get_or_create_sentry(
> + &mut parser.sentries,
> + parser.current_record_state.pid,
> + parser.rel_line_nr,
> + parser.current_record_state.timestamp,
> + );
> +
> + if parser.string_match {
> + se.borrow_mut().string_match = parser.string_match;
> + }
> + se.borrow_mut()
> + .log
> + .push((complete_line.into(), parser.lines));
> +
> + if msg.starts_with(b"connect from ") {
> + let addr = &msg[13..];
> + se.borrow_mut().set_connect(addr);
> + return;
> + }
> +
> + if msg.starts_with(b"disconnect from") {
> + parser.sentries.remove(&parser.current_record_state.pid);
> + se.borrow_mut().disconnect = true;
> +
> + if se.borrow_mut().remove_unneeded_refs(parser) == 0 {
> + se.borrow_mut().print(parser);
> + parser.free_sentry(se.borrow().pid);
> + } else {
> + se.borrow_mut().finalize_refs(parser);
> + }
> + return;
> + }
> +
> + if msg.starts_with(b"NOQUEUE:") {
> + let data = &msg[8..];
> + let colon_index = match find(data, b":") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[colon_index + 1..];
> + let colon_index = match find(data, b":") {
> + Some(i) => i,
> + None => return,
> + };
> + let data = &data[colon_index + 1..];
> + let semicolon_index = match find(data, b";") {
> + Some(i) => i,
> + None => return,
> + };
> + let (grey, data) = data.split_at(semicolon_index);
> + let dstatus = if find(
> + grey,
> + b"Recipient address rejected: Service is unavailable (try later)",
> + )
> + .is_some()
> + {
> + DStatus::Greylist
> + } else {
> + DStatus::Noqueue
> + };
> +
> + if !data.starts_with(b"; from=<") {
> + return;
> + }
> + let data = &data[8..];
> + let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let (from, data) = data.split_at(from_count);
> +
> + if !data.starts_with(b"> to=<") {
> + return;
> + }
> + let data = &data[6..];
> + let to_count = data.iter().take_while(|b| (**b as char) != '>').count();
> + let to = &data[..to_count];
> +
> + se.borrow_mut()
> + .add_noqueue_entry(from, to, dstatus, parser.current_record_state.timestamp);
> + return;
> + }
> +
> + let (qid, data) = match parse_qid(msg, 15) {
> + Some(t) => t,
> + None => return,
> + };
> + let data = &data[2..];
> +
> + let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> + if parser.string_match {
> + qe.borrow_mut().string_match = parser.string_match;
> + }
> +
> + SEntry::add_ref(&se, &qe);
> +
> + if !data.starts_with(b"client=") {
> + return;
> + }
> + let data = &data[7..];
> + let client_count = data
> + .iter()
> + .take_while(|b| !(**b as char).is_ascii_whitespace())
> + .count();
> + let client = &data[..client_count];
> +
> + qe.borrow_mut().set_client(client);
> +}
> +
> +fn handle_cleanup_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
> + let (qid, data) = match parse_qid(msg, 15) {
> + Some(t) => t,
> + None => return,
> + };
> + let data = &data[2..];
> +
> + let qe = get_or_create_qentry(&mut parser.qentries, qid);
> +
> + if parser.string_match {
> + qe.borrow_mut().string_match = parser.string_match;
> + }
> + qe.borrow_mut()
> + .log
> + .push((complete_line.into(), parser.lines));
> +
> + if !data.starts_with(b"message-id=") {
> + return;
> + }
> + let data = &data[11..];
> + let msgid_count = data
> + .iter()
> + .take_while(|b| !(**b as char).is_ascii_whitespace())
> + .count();
> + let msgid = &data[..msgid_count];
> +
> + if !msgid.is_empty() {
> + if qe.borrow().msgid.is_empty() {
> + qe.borrow_mut().msgid = msgid.into();
> + }
> + qe.borrow_mut().cleanup = true;
> + }
> +}
> +
> +#[derive(Default, Debug)]
> +struct NoqueueEntry {
> + from: Box<[u8]>,
> + to: Box<[u8]>,
> + dstatus: DStatus,
> + timestamp: u64,
> +}
> +
> +#[derive(Debug)]
> +struct ToEntry {
> + to: Box<[u8]>,
> + relay: Box<[u8]>,
> + dstatus: DStatus,
> + timestamp: u64,
> +}
> +
> +impl Default for ToEntry {
> + fn default() -> Self {
> + ToEntry {
> + to: Default::default(),
> + relay: (&b"none"[..]).into(),
> + dstatus: Default::default(),
> + timestamp: Default::default(),
> + }
> + }
> +}
> +
> +#[derive(Debug, PartialEq)]
> +enum DStatus {
> + Invalid,
> + Accept,
> + Quarantine,
> + Block,
> + Greylist,
> + Noqueue,
> + Dsn(u32),
> +}
> +
> +impl Default for DStatus {
> + fn default() -> Self {
> + DStatus::Invalid
> + }
> +}
> +
> +impl std::fmt::Display for DStatus {
> + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
> + let c = match self {
> + DStatus::Invalid => '\0', // other default
> + DStatus::Accept => 'A',
> + DStatus::Quarantine => 'Q',
> + DStatus::Block => 'B',
> + DStatus::Greylist => 'G',
> + DStatus::Noqueue => 'N',
> + DStatus::Dsn(v) => std::char::from_digit(*v, 10).unwrap(),
> + };
> + write!(f, "{}", c)
> + }
> +}
> +
> +impl DStatus {
> + fn is_dsn(&self, value: Option<u32>) -> bool {
> + match (self, value) {
> + (DStatus::Dsn(v), Some(val)) => *v == val,
> + (DStatus::Dsn(_), None) => true,
> + _ => false,
> + }
> + }
> +}
> +
> +#[derive(Debug, Default)]
> +struct SEntry {
> + log: Vec<(Box<[u8]>, u64)>,
> + connect: Box<[u8]>,
> + cursor: Box<[u8]>,
> + pid: u64,
> + refs: Vec<Weak<RefCell<QEntry>>>,
> + nq_entries: Vec<NoqueueEntry>,
> + disconnect: bool,
> + string_match: bool,
> + timestamp: u64,
> + rel_line_nr: u64,
> +}
> +
> +impl SEntry {
> + fn add_noqueue_entry(&mut self, from: &[u8], to: &[u8], dstatus: DStatus, timestamp: u64) {
> + let ne = NoqueueEntry {
> + to: to.into(),
> + from: from.into(),
> + dstatus,
> + timestamp,
> + };
> + self.nq_entries.push(ne);
> + }
> +
> + fn set_connect(&mut self, client: &[u8]) {
> + if self.connect.is_empty() {
> + self.connect = client.into();
> + }
> + }
> +
> + fn print(&mut self, parser: &mut Parser) {
> + if !parser.options.msgid.is_empty() {
> + return;
> + }
> +
> + if !parser.options.host.is_empty() {
> + if self.connect.is_empty() {
> + return;
> + }
> + if find_lowercase(&self.connect, parser.options.host.as_bytes()).is_none() {
> + return;
> + }
> + }
> +
> + if !parser.options.match_list.is_empty() {
> + let mut found = false;
> + for m in parser.options.match_list.iter() {
> + match m {
> + Match::Qid(_) => return,
> + Match::RelLineNr(t, l) => {
> + if (*t as u64) == self.timestamp && *l == self.rel_line_nr {
> + found = true;
> + break;
> + }
> + }
> + }
> + }
> + if !found {
> + return;
> + }
> + }
> +
> + if !parser.options.from.is_empty()
> + || !parser.options.to.is_empty()
> + || parser.options.exclude_greylist
> + || parser.options.exclude_ndr
> + {
> + let mut found = false;
> + for nq in self.nq_entries.iter_mut().rev() {
> + if !parser.options.from.is_empty()
> + && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none()
> + {
> + nq.dstatus = DStatus::Invalid;
> + }
> +
> + if parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist {
> + nq.dstatus = DStatus::Invalid;
> + }
> + if parser.options.exclude_ndr && nq.from.is_empty() {
> + nq.dstatus = DStatus::Invalid;
> + }
> +
> + if !parser.options.to.is_empty()
> + && !nq.to.is_empty()
> + && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none()
> + {
> + nq.dstatus = DStatus::Invalid;
> + }
> +
> + if nq.dstatus != DStatus::Invalid {
> + found = true;
> + }
> + }
> +
> + if !found {
> + return;
> + }
> + }
> +
> + if !parser.options.string_match.is_empty() && !self.string_match {
> + return;
> + }
> +
> + if parser.options.verbose > 0 {
> + parser.write_all_ok(format!(
> + "SMTPD: T{:8X}L{:08X}\n",
> + self.timestamp as u32, self.rel_line_nr as u32
> + ));
> + parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime).as_bytes());
> +
> + if !self.connect.is_empty() {
> + parser.write_all_ok(b"CLIENT: ");
> + parser.write_all_ok(&self.connect);
> + parser.write_all_ok(b"\n");
> + }
> + }
> +
> + for nq in self.nq_entries.iter().rev() {
> + if nq.dstatus != DStatus::Invalid {
> + parser.write_all_ok(format!(
> + "TO:{:X}:T{:08X}L{:08X}:{}: from <", //{}> to <{}>",
> + nq.timestamp as i32, self.timestamp as i32, self.rel_line_nr, nq.dstatus,
> + ));
> + parser.write_all_ok(&nq.from);
> + parser.write_all_ok(b"> to <");
> + parser.write_all_ok(&nq.to);
> + parser.write_all_ok(b">\n");
> + parser.count += 1;
> + }
> + }
> +
> + if parser.options.verbose > 1 {
> + parser.write_all_ok(b"LOGS:\n");
> + for (log, line) in self.log.iter() {
> + parser.write_all_ok(format!("L{:08X} ", *line as u32));
> + parser.write_all_ok(log);
> + parser.write_all_ok(b"\n");
> + }
> + }
> + parser.write_all_ok(b"\n");
> + }
> +
> + fn delete_ref(&mut self, qentry: &QEntry) -> u32 {
> + let mut count: u32 = 0;
> + self.refs.retain(|q| {
> + let q = match q.upgrade() {
> + Some(q) => q,
> + None => return false,
> + };
> + if std::ptr::eq(&*q.borrow(), qentry) {
> + return false;
> + } else if qentry.cleanup {
> + count += 1;
> + }
> + true
> + });
> + count
> + }
> +
> + fn remove_unneeded_refs(&mut self, parser: &mut Parser) -> u32 {
> + let mut count: u32 = 0;
> + self.refs.retain(|q| {
> + let q = match q.upgrade() {
> + Some(q) => q,
> + None => return false,
> + };
> + let is_cleanup = q.borrow().cleanup;
> + if !is_cleanup {
> + q.borrow_mut().smtpd = None;
> + parser.free_qentry(&q.borrow().qid);
> + false
> + } else {
> + count += 1;
> + true
> + }
> + });
> + count
> + }
> +
> + fn finalize_refs(&mut self, parser: &mut Parser) {
> + let mut qentries = Vec::new();
> + for q in self.refs.iter() {
> + let q = match q.upgrade() {
> + Some(q) => q,
> + None => continue,
> + };
> +
> + if !q.borrow().removed {
> + continue;
> + }
> +
> + let fe = &q.borrow().filter;
> + if let Some(f) = fe {
> + if let Some(f) = f.upgrade() {
> + if !f.borrow().finished {
> + continue;
> + }
> + }
> + }
> +
> + qentries.push(Rc::clone(&q));
> + }
> +
> + for q in qentries.iter() {
> + q.borrow_mut().print(parser, Some(self));
> + q.borrow_mut().smtpd = None;
> + parser.free_qentry(&q.borrow().qid);
> +
> + if let Some(f) = &q.borrow().filter {
> + if let Some(f) = f.upgrade() {
> + parser.free_fentry(&f.borrow().logid);
> + }
> + }
> + }
> + }
> +
> + fn add_ref(sentry: &Rc<RefCell<SEntry>>, qentry: &Rc<RefCell<QEntry>>) {
> + let smtpd = qentry.borrow().smtpd.clone();
> + if let Some(s) = smtpd {
> + if let Some(s) = s.upgrade() {
> + if !Rc::ptr_eq(sentry, &s) {
> + eprintln!("Error: qentry ref already set");
> + }
> + }
> + return;
> + }
> +
> + for q in sentry.borrow().refs.iter() {
> + let q = match q.upgrade() {
> + Some(q) => q,
> + None => continue,
> + };
> + if Rc::ptr_eq(&q, qentry) {
> + return;
> + }
> + }
> +
> + sentry.borrow_mut().refs.push(Rc::downgrade(qentry));
> + qentry.borrow_mut().smtpd = Some(Rc::downgrade(sentry));
> + }
> +}
> +
> +#[derive(Default, Debug)]
> +struct QEntry {
> + log: Vec<(Box<[u8]>, u64)>,
> + smtpd: Option<Weak<RefCell<SEntry>>>,
> + filter: Option<Weak<RefCell<FEntry>>>,
> + qid: Box<[u8]>,
> + from: Box<[u8]>,
> + client: Box<[u8]>,
> + msgid: Box<[u8]>,
> + size: u64,
> + to_entries: Vec<ToEntry>,
> + cleanup: bool,
> + removed: bool,
> + filtered: bool,
> + string_match: bool,
> +}
> +
> +impl QEntry {
> + fn add_to_entry(&mut self, to: &[u8], relay: &[u8], dstatus: DStatus, timestamp: u64) {
> + let te = ToEntry {
> + to: to.into(),
> + relay: relay.into(),
> + dstatus,
> + timestamp,
> + };
> + self.to_entries.push(te);
> + }
> +
> + fn finalize(&mut self, parser: &mut Parser) {
> + if self.removed {
> + if let Some(se) = &self.smtpd {
> + if let Some(se) = se.upgrade() {
> + if !se.borrow().disconnect {
> + return;
> + }
> + }
> + }
> +
> + if let Some(fe) = self.filter.clone() {
> + if let Some(fe) = fe.upgrade() {
> + if !fe.borrow().finished {
> + return;
> + }
> + }
> +
> + match self.smtpd.clone() {
> + Some(s) => {
> + if let Some(s) = s.upgrade() {
> + self.print(parser, Some(&*s.borrow()))
> + } else {
> + self.print(parser, None)
> + }
> + }
> + None => self.print(parser, None),
> + };
> + parser.free_qentry(&self.qid);
> +
> + if let Some(fe) = fe.upgrade() {
> + parser.free_fentry(&fe.borrow().logid);
> + }
> + } else {
> + match self.smtpd.clone() {
> + Some(s) => {
> + if let Some(s) = s.upgrade() {
> + self.print(parser, Some(&*s.borrow()))
> + } else {
> + self.print(parser, None)
> + }
> + }
> + None => self.print(parser, None),
> + };
> + parser.free_qentry(&self.qid);
> + }
> + }
> + }
> +
> + fn msgid_matches(&self, parser: &Parser) -> bool {
> + if !parser.options.msgid.is_empty() {
> + if self.msgid.is_empty() {
> + return false;
> + }
> + let qentry_msgid_lowercase = self.msgid.to_ascii_lowercase();
> + let msgid_lowercase = parser.options.msgid.as_bytes().to_ascii_lowercase();
> + if qentry_msgid_lowercase != msgid_lowercase {
> + return false;
> + }
> + }
> + true
> + }
> +
> + fn match_list_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> + let fe = &self.filter;
> + if !parser.options.match_list.is_empty() {
> + let mut found = false;
> + for m in parser.options.match_list.iter() {
> + match m {
> + Match::Qid(q) => {
> + if let Some(f) = fe {
> + if let Some(f) = f.upgrade() {
> + if &f.borrow().logid == q {
> + found = true;
> + break;
> + }
> + }
> + }
> + if &self.qid == q {
> + found = true;
> + break;
> + }
> + }
> + Match::RelLineNr(t, l) => {
> + if let Some(s) = se {
> + if s.timestamp == (*t as u64) && s.rel_line_nr == *l {
> + found = true;
> + break;
> + }
> + }
> + }
> + }
> + }
> + if !found {
> + return false;
> + }
> + }
> + true
> + }
> +
> + fn host_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> + if !parser.options.host.is_empty() {
> + let mut found = false;
> + if let Some(s) = se {
> + if !s.connect.is_empty()
> + && find_lowercase(&s.connect, parser.options.host.as_bytes()).is_some()
> + {
> + found = true;
> + }
> + }
> + if !self.client.is_empty()
> + && find_lowercase(&self.client, parser.options.host.as_bytes()).is_some()
> + {
> + found = true;
> + }
> +
> + if !found {
> + return false;
> + }
> + }
> + true
> + }
> +
> + fn from_to_matches(&mut self, parser: &Parser) -> bool {
> + if !parser.options.from.is_empty() {
> + if self.from.is_empty() {
> + return false;
> + }
> + if find_lowercase(&self.from, parser.options.from.as_bytes()).is_none() {
> + return false;
> + }
> + } else if parser.options.exclude_ndr && self.from.is_empty() {
> + return false;
> + }
> +
> + if !parser.options.to.is_empty() {
> + let mut found = false;
> + self.to_entries.retain(|to| {
> + if find_lowercase(&to.to, parser.options.to.as_bytes()).is_none() {
> + false
> + } else {
> + found = true;
> + true
> + }
> + });
> + if !found {
> + return false;
> + }
> + }
> + true
> + }
> +
> + fn string_matches(&self, parser: &Parser, se: Option<&SEntry>) -> bool {
> + let fe = &self.filter;
> + if !parser.options.string_match.is_empty() {
> + let mut string_match = self.string_match;
> +
> + if let Some(s) = se {
> + if s.string_match {
> + string_match = true;
> + }
> + }
> + if let Some(f) = fe {
> + if let Some(f) = f.upgrade() {
> + if f.borrow().string_match {
> + string_match = true;
> + }
> + }
> + }
> + if !string_match {
> + return false;
> + }
> + }
> + true
> + }
> +
> + fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) {
> + let fe = self.filter.clone();
> +
> + if !self.msgid_matches(parser) {
> + return;
> + }
> +
> + if !self.match_list_matches(parser, se) {
> + return;
> + }
> +
> + if !self.host_matches(parser, se) {
> + return;
> + }
> +
> + if !self.from_to_matches(parser) {
> + return;
> + }
> +
> + if !self.string_matches(parser, se) {
> + return;
> + }
> +
> + if parser.options.verbose > 0 {
> + parser.write_all_ok(b"QENTRY: ");
> + parser.write_all_ok(&self.qid);
> + parser.write_all_ok(b"\n");
> + parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime));
> + parser.write_all_ok(format!("SIZE: {}\n", self.size));
> +
> + if !self.client.is_empty() {
> + parser.write_all_ok(b"CLIENT: ");
> + parser.write_all_ok(&self.client);
> + parser.write_all_ok(b"\n");
> + } else if let Some(s) = se {
> + if !s.connect.is_empty() {
> + parser.write_all_ok(b"CLIENT: ");
> + parser.write_all_ok(&s.connect);
> + parser.write_all_ok(b"\n");
> + }
> + }
> +
> + if !self.msgid.is_empty() {
> + parser.write_all_ok(b"MSGID: ");
> + parser.write_all_ok(&self.msgid);
> + parser.write_all_ok(b"\n");
> + }
> + }
> +
> + for to in self.to_entries.iter().rev() {
> + if !to.to.is_empty() {
> + let final_rc;
> + let final_borrow;
> + let mut final_to: &ToEntry = to;
> + if to.dstatus.is_dsn(Some(2)) {
> + if let Some(f) = &fe {
> + if let Some(f) = f.upgrade() {
> + final_rc = f;
> + final_borrow = final_rc.borrow();
> + for to2 in final_borrow.to_entries.iter().rev() {
> + if to.to == to2.to {
> + final_to = to2;
> + break;
> + }
> + }
> + }
> + }
> + }
> +
> + parser.write_all_ok(format!("TO:{:X}:", to.timestamp as i32,));
> + parser.write_all_ok(&self.qid);
> + parser.write_all_ok(format!(":{}: from <", final_to.dstatus));
> + parser.write_all_ok(&self.from);
> + parser.write_all_ok(b"> to <");
> + parser.write_all_ok(&final_to.to);
> + parser.write_all_ok(b"> (");
> + parser.write_all_ok(&final_to.relay);
> + parser.write_all_ok(b")\n");
> + parser.count += 1;
> + }
> + }
> +
> + if parser.options.verbose > 1 {
> + let print_log = |parser: &mut Parser, logs: &Vec<(Box<[u8]>, u64)>| {
> + for (log, line) in logs.iter() {
> + parser.write_all_ok(format!("L{:08X} ", *line as u32));
> + parser.write_all_ok(log);
> + parser.write_all_ok(b"\n");
> + }
> + };
> + if let Some(s) = se {
> + if !s.log.is_empty() {
> + parser.write_all_ok(b"SMTP:\n");
> + print_log(parser, &s.log);
> + }
> + }
> +
> + if let Some(f) = fe {
> + if let Some(f) = f.upgrade() {
> + if !f.borrow().log.is_empty() {
> + parser.write_all_ok(format!("FILTER: {}\n", unsafe {
> + std::str::from_utf8_unchecked(&f.borrow().logid)
> + }));
> + print_log(parser, &f.borrow().log);
> + }
> + }
> + }
> +
> + if !self.log.is_empty() {
> + parser.write_all_ok(b"QMGR:\n");
> + print_log(parser, &self.log);
> + }
> + }
> + parser.write_all_ok(b"\n")
> + }
> +
> + fn set_client(&mut self, client: &[u8]) {
> + if self.client.is_empty() {
> + self.client = client.into();
> + }
> + }
> +}
> +
> +impl Drop for QEntry {
> + fn drop(&mut self) {
> + if let Some(se) = self.smtpd.take() {
> + if let Some(se) = se.upgrade() {
> + se.borrow_mut().delete_ref(self);
> + }
> + }
> + }
> +}
> +
> +#[derive(Default, Debug)]
> +struct FEntry {
> + log: Vec<(Box<[u8]>, u64)>,
> + logid: Box<[u8]>,
> + to_entries: Vec<ToEntry>,
> + processing_time: Box<[u8]>,
> + string_match: bool,
> + finished: bool,
> +}
> +
> +impl FEntry {
> + fn set_accept(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
> + let te = ToEntry {
> + to: to.into(),
> + relay: qid.into(),
> + dstatus: DStatus::Accept,
> + timestamp,
> + };
> + self.to_entries.push(te);
> + }
> +
> + fn set_quarantine(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
> + let te = ToEntry {
> + to: to.into(),
> + relay: qid.into(),
> + dstatus: DStatus::Quarantine,
> + timestamp,
> + };
> + self.to_entries.push(te);
> + }
> +
> + fn set_block(&mut self, to: &[u8], timestamp: u64) {
> + let te = ToEntry {
> + to: to.into(),
> + relay: (&b"none"[..]).into(),
> + dstatus: DStatus::Block,
> + timestamp,
> + };
> + self.to_entries.push(te);
> + }
> +
> + fn set_processing_time(&mut self, time: &[u8]) {
> + self.processing_time = time.into();
> + self.finished = true;
> + }
> +}
> +
> +#[derive(Debug)]
> +struct Parser {
> + sentries: HashMap<u64, Rc<RefCell<SEntry>>>,
> + fentries: HashMap<Box<[u8]>, Rc<RefCell<FEntry>>>,
> + qentries: HashMap<Box<[u8]>, Rc<RefCell<QEntry>>>,
> +
> + current_record_state: RecordState,
> + rel_line_nr: u64,
> +
> + current_year: [i64; 32],
> + current_month: i64,
> + current_file_index: usize,
> +
> + count: u64,
> +
> + buffered_stdout: BufWriter<std::io::Stdout>,
> +
> + options: Options,
> +
> + start_tm: time::Tm,
> + end_tm: time::Tm,
> +
> + ctime: libc::time_t,
> + string_match: bool,
> +
> + lines: u64,
> +}
> +
> +impl Parser {
> + fn new() -> Self {
> + let mut years: [i64; 32] = [0; 32];
> + let mut tv: libc::timeval = libc::timeval {
> + tv_sec: 0,
> + tv_usec: 0,
> + };
> + let mut ltime: *mut libc::tm;
> +
> + for (i, year) in years.iter_mut().enumerate() {
> + unsafe {
> + libc::gettimeofday(&mut tv, std::ptr::null_mut());
> + }
> + tv.tv_sec -= (3600 * 24 * i) as i64;
> + ltime = unsafe { libc::localtime(&tv.tv_sec) };
> + *year = (unsafe { (*ltime).tm_year + 1900 }) as i64;
> + }
> +
> + Self {
> + sentries: HashMap::new(),
> + fentries: HashMap::new(),
> + qentries: HashMap::new(),
> + current_record_state: Default::default(),
> + rel_line_nr: 0,
> + current_year: years,
> + current_month: 0,
> + current_file_index: 0,
> + count: 0,
> + buffered_stdout: BufWriter::with_capacity(4 * 1024 * 1024, std::io::stdout()),
> + options: Options::default(),
> + start_tm: time::empty_tm(),
> + end_tm: time::empty_tm(),
> + ctime: 0,
> + string_match: false,
> + lines: 0,
> + }
> + }
> +
> + fn free_sentry(&mut self, sentry_pid: u64) {
> + self.sentries.remove(&sentry_pid);
> + }
> +
> + fn free_qentry(&mut self, qentry_qid: &[u8]) {
> + self.qentries.remove(qentry_qid);
> + }
> +
> + fn free_fentry(&mut self, fentry_logid: &[u8]) {
> + self.fentries.remove(fentry_logid);
> + }
> +
> + fn parse_files(&mut self) -> Result<(), Error> {
> + if !self.options.inputfile.is_empty() {
> + if self.options.inputfile == "-" {
> + self.current_file_index = 0;
> + let mut reader = BufReader::new(std::io::stdin());
> + self.handle_input_by_line(&mut reader)?;
> + } else if let Ok(file) = File::open(&self.options.inputfile) {
> + self.current_file_index = 0;
> + let mut reader = BufReader::with_capacity(4 * 1024 * 1024, file);
> + self.handle_input_by_line(&mut reader)?;
> + }
> + } else {
> + let filecount = self.count_files_in_time_range();
> + for i in (0..filecount).rev() {
> + self.current_month = 0;
> + if let Ok(file) = File::open(LOGFILES[i]) {
> + self.current_file_index = i;
> + if i > 1 {
> + let gzdecoder = read::GzDecoder::new(file);
> + let mut reader = BufReader::with_capacity(4 * 1024 * 1024, gzdecoder);
> + self.handle_input_by_line(&mut reader)?;
> + } else {
> + let mut reader = BufReader::with_capacity(4 * 1024 * 1024, file);
> + self.handle_input_by_line(&mut reader)?;
> + }
> + }
> + }
> + }
> +
> + Ok(())
> + }
> +
> + fn handle_input_by_line(&mut self, reader: &mut dyn BufRead) -> Result<(), Error> {
> + let mut buffer = Vec::<u8>::with_capacity(4096);
> + let mut prev_time = 0;
> + loop {
> + if self.options.limit > 0 && (self.count >= self.options.limit) {
> + self.write_all_ok("STATUS: aborted by limit (too many hits)\n");
> + self.buffered_stdout.flush()?;
> + std::process::exit(0);
> + }
> +
> + buffer.clear();
> + let size = match reader.read_until(b'\n', &mut buffer) {
> + Err(e) => return Err(e.into()),
> + Ok(0) => return Ok(()),
> + Ok(s) => s,
> + };
> + let line = &buffer[0..size - 1];
> + let complete_line = line;
> +
> + let (time, line) = match parse_time(
> + line,
> + self.current_year[self.current_file_index],
> + &mut self.current_month,
> + ) {
> + Some(t) => t,
> + None => continue,
> + };
> + if time != prev_time {
> + self.rel_line_nr = 0;
> + } else {
> + self.rel_line_nr += 1;
> + }
> + prev_time = time;
> +
> + if time < self.options.start {
> + continue;
> + }
> + if time > self.options.end {
> + break;
> + }
> +
> + self.lines += 1;
> +
> + let (host, service, pid, line) = match parse_host_service_pid(line) {
> + Some((h, s, p, l)) => (h, s, p, l),
> + None => continue,
> + };
> +
> + self.ctime = time;
> +
> + self.current_record_state.host = host.into();
> + self.current_record_state.sys = service.into();
> + self.current_record_state.pid = pid;
> + self.current_record_state.timestamp = time as u64;
> +
> + self.string_match = false;
> + if !self.options.string_match.is_empty()
> + && find(complete_line, self.options.string_match.as_bytes()).is_some()
> + {
> + self.string_match = true;
> + eprintln!("{}", std::str::from_utf8(complete_line)?);
> + }
> +
> + if service == b"pmg-smtp-filter" {
> + handle_pmg_smtp_filter_message(line, self, complete_line);
> + } else if service == b"postfix/postscreen" {
> + handle_postscreen_message(line, self, complete_line);
> + } else if service == b"postfix/qmgr" {
> + handle_qmgr_message(line, self, complete_line);
> + } else if service == b"postfix/lmtp"
> + || service == b"postfix/smtp"
> + || service == b"postfix/local"
> + || service == b"postfix/error"
> + {
> + handle_lmtp_message(line, self, complete_line);
> + } else if service == b"postfix/smtpd" {
> + handle_smtpd_message(line, self, complete_line);
> + } else if service == b"postfix/cleanup" {
> + handle_cleanup_message(line, self, complete_line);
> + }
> + }
> + Ok(())
> + }
> +
> + /// Returns the number of files to parse. Does not error out if it can't access any file
> + /// (permission denied)
> + fn count_files_in_time_range(&mut self) -> usize {
> + let mut count = 0;
> + let mut buffer = Vec::new();
> +
> + for (i, item) in LOGFILES.iter().enumerate() {
> + self.current_month = 0;
> +
> + count = i;
> + if let Ok(file) = File::open(item) {
> + self.current_file_index = i;
> + buffer.clear();
> + if i > 1 {
> + let gzdecoder = read::GzDecoder::new(file);
> + let mut reader = BufReader::new(gzdecoder);
> + if let Ok(size) = reader.read_until(b'\n', &mut buffer) {
> + if size == 0 {
> + return count;
> + }
> + if let Some((time, _)) = parse_time(
> + &buffer[0..size],
> + self.current_year[i],
> + &mut self.current_month,
> + ) {
> + if time < self.options.start {
> + break;
> + }
> + }
> + } else {
> + return count;
> + }
> + } else {
> + let mut reader = BufReader::new(file);
> + if let Ok(size) = reader.read_until(b'\n', &mut buffer) {
> + if size == 0 {
> + return count;
> + }
> + if let Some((time, _)) = parse_time(
> + &buffer[0..size],
> + self.current_year[i],
> + &mut self.current_month,
> + ) {
> + if time < self.options.start {
> + break;
> + }
> + }
> + } else {
> + return count;
> + }
> + }
> + } else {
> + return count;
> + }
> + }
> +
> + count + 1
> + }
> +
> + fn handle_args(&mut self, args: clap::ArgMatches) -> Result<(), Error> {
> + if let Some(inputfile) = args.value_of("inputfile") {
> + self.options.inputfile = inputfile.to_string();
> + }
> +
> + if let Some(start) = args.value_of("start") {
> + if let Ok(res) = time::strptime(&start, "%F %T") {
> + self.options.start = mkgmtime(&res);
> + self.start_tm = res;
> + } else if let Ok(res) = time::strptime(&start, "%s") {
> + self.options.start = mkgmtime(&res);
> + self.start_tm = res;
> + } else {
> + failure::bail!(failure::err_msg("failed to parse start time"));
> + }
> + } else {
> + let mut ltime = time::now();
> + ltime.tm_sec = 0;
> + ltime.tm_min = 0;
> + ltime.tm_hour = 0;
> + self.options.start = mkgmtime(<ime);
> + self.start_tm = ltime;
> + }
> +
> + if let Some(end) = args.value_of("end") {
> + if let Ok(res) = time::strptime(&end, "%F %T") {
> + self.options.end = mkgmtime(&res);
> + self.end_tm = res;
> + } else if let Ok(res) = time::strptime(&end, "%s") {
> + self.options.end = mkgmtime(&res);
> + self.end_tm = res;
> + } else {
> + failure::bail!(failure::err_msg("failed to parse end time"));
> + }
> + } else {
> + let ltime = time::now();
> + self.options.end = mkgmtime(<ime);
> + self.end_tm = ltime;
> + }
> +
> + if self.options.end < self.options.start {
> + failure::bail!(failure::err_msg("end time before start time"));
> + }
> +
> + self.options.limit = match args.value_of("limit") {
> + Some(l) => l.parse().unwrap(),
> + None => 0,
> + };
> +
> + if let Some(qids) = args.values_of("qids") {
> + for q in qids {
> + let ltime: libc::time_t = 0;
> + let rel_line_nr: libc::c_ulong = 0;
> + let input = CString::new(q)?;
> + let bytes = concat!("T%08lXL%08lX", "\0");
> + let format =
> + unsafe { std::ffi::CStr::from_bytes_with_nul_unchecked(bytes.as_bytes()) };
> + if unsafe {
> + libc::sscanf(input.as_ptr(), format.as_ptr(), <ime, &rel_line_nr) == 2
> + } {
> + self.options
> + .match_list
> + .push(Match::RelLineNr(ltime, rel_line_nr));
> + } else {
> + self.options
> + .match_list
> + .push(Match::Qid(q.as_bytes().into()));
> + }
> + }
> + }
> +
> + if let Some(from) = args.value_of("from") {
> + self.options.from = from.to_string();
> + }
> + if let Some(to) = args.value_of("to") {
> + self.options.to = to.to_string();
> + }
> + if let Some(host) = args.value_of("host") {
> + self.options.host = host.to_string();
> + }
> + if let Some(msgid) = args.value_of("msgid") {
> + self.options.msgid = msgid.to_string();
> + }
> +
> + self.options.exclude_greylist = args.is_present("exclude_greylist");
> + self.options.exclude_ndr = args.is_present("exclude_ndr");
> +
> + self.options.verbose = args.occurrences_of("verbose") as _;
> +
> + if let Some(string_match) = args.value_of("search") {
> + self.options.string_match = string_match.to_string();
> + }
> +
> + Ok(())
> + }
> +
> + fn write_all_ok<T: AsRef<[u8]>>(&mut self, data: T) {
> + self.buffered_stdout
> + .write_all(data.as_ref())
> + .expect("failed to write to stdout");
> + }
> +}
> +
> +impl Drop for Parser {
> + fn drop(&mut self) {
> + let mut qentries = std::mem::replace(&mut self.qentries, HashMap::new());
> + for q in qentries.values() {
> + let smtpd = q.borrow().smtpd.clone();
> + match smtpd {
> + Some(s) => {
> + if let Some(s) = s.upgrade() {
> + q.borrow_mut().print(self, Some(&*s.borrow()));
> + } else {
> + q.borrow_mut().print(self, None);
> + }
> + }
> + None => {
> + q.borrow_mut().print(self, None);
> + }
> + };
> + }
> + qentries.clear();
> + let mut sentries = std::mem::replace(&mut self.sentries, HashMap::new());
> + for s in sentries.values() {
> + s.borrow_mut().print(self);
> + }
> + sentries.clear();
> + }
> +}
> +
> +#[derive(Debug, Default)]
> +struct Options {
> + match_list: Vec<Match>,
> + inputfile: String,
> + string_match: String,
> + host: String,
> + msgid: String,
> + from: String,
> + to: String,
> + start: libc::time_t,
> + end: libc::time_t,
> + limit: u64,
> + verbose: u32,
> + exclude_greylist: bool,
> + exclude_ndr: bool,
> +}
> +
> +#[derive(Debug)]
> +enum Match {
> + Qid(Box<[u8]>),
> + RelLineNr(libc::time_t, u64),
> +}
> +
> +#[derive(Debug, Default)]
> +struct RecordState {
> + host: Box<[u8]>,
> + sys: Box<[u8]>,
> + pid: u64,
> + timestamp: u64,
> +}
> +
> +fn get_or_create_qentry(
> + qentries: &mut HashMap<Box<[u8]>, Rc<RefCell<QEntry>>>,
> + qid: &[u8],
> +) -> Rc<RefCell<QEntry>> {
> + if let Some(qe) = qentries.get(qid) {
> + Rc::clone(qe)
> + } else {
> + let qe = Rc::new(RefCell::new(QEntry::default()));
> + qe.borrow_mut().qid = qid.into();
> + qentries.insert(qid.into(), qe.clone());
> + qe
> + }
> +}
> +
> +fn get_or_create_sentry(
> + sentries: &mut HashMap<u64, Rc<RefCell<SEntry>>>,
> + pid: u64,
> + rel_line_nr: u64,
> + timestamp: u64,
> +) -> Rc<RefCell<SEntry>> {
> + if let Some(se) = sentries.get(&pid) {
> + Rc::clone(se)
> + } else {
> + let se = Rc::new(RefCell::new(SEntry::default()));
> + se.borrow_mut().rel_line_nr = rel_line_nr;
> + se.borrow_mut().timestamp = timestamp;
> + sentries.insert(pid, se.clone());
> + se
> + }
> +}
> +
> +fn get_or_create_fentry(
> + fentries: &mut HashMap<Box<[u8]>, Rc<RefCell<FEntry>>>,
> + qid: &[u8],
> +) -> Rc<RefCell<FEntry>> {
> + if let Some(fe) = fentries.get(qid) {
> + Rc::clone(fe)
> + } else {
> + let fe = Rc::new(RefCell::new(FEntry::default()));
> + fe.borrow_mut().logid = qid.into();
> + fentries.insert(qid.into(), fe.clone());
> + fe
> + }
> +}
> +
> +fn mkgmtime(tm: &time::Tm) -> libc::time_t {
> + let mut res: libc::time_t;
> +
> + let mut year = (tm.tm_year + 1900) as i64;
> + let mon = tm.tm_mon;
> +
> + res = (year - 1970) * 365 + CAL_MTOD[mon as usize];
> +
> + if mon <= 1 {
> + year -= 1;
> + }
> +
> + res += (year - 1968) / 4;
> + res -= (year - 1900) / 100;
> + res += (year - 1600) / 400;
> +
> + res += (tm.tm_mday - 1) as i64;
> + res = res * 24 + tm.tm_hour as i64;
> + res = res * 60 + tm.tm_min as i64;
> + res = res * 60 + tm.tm_sec as i64;
> +
> + res
> +}
> +
> +const LOGFILES: [&str; 32] = [
> + "/var/log/syslog",
> + "/var/log/syslog.1",
> + "/var/log/syslog.2.gz",
> + "/var/log/syslog.3.gz",
> + "/var/log/syslog.4.gz",
> + "/var/log/syslog.5.gz",
> + "/var/log/syslog.6.gz",
> + "/var/log/syslog.7.gz",
> + "/var/log/syslog.8.gz",
> + "/var/log/syslog.9.gz",
> + "/var/log/syslog.10.gz",
> + "/var/log/syslog.11.gz",
> + "/var/log/syslog.12.gz",
> + "/var/log/syslog.13.gz",
> + "/var/log/syslog.14.gz",
> + "/var/log/syslog.15.gz",
> + "/var/log/syslog.16.gz",
> + "/var/log/syslog.17.gz",
> + "/var/log/syslog.18.gz",
> + "/var/log/syslog.19.gz",
> + "/var/log/syslog.20.gz",
> + "/var/log/syslog.21.gz",
> + "/var/log/syslog.22.gz",
> + "/var/log/syslog.23.gz",
> + "/var/log/syslog.24.gz",
> + "/var/log/syslog.25.gz",
> + "/var/log/syslog.26.gz",
> + "/var/log/syslog.27.gz",
> + "/var/log/syslog.28.gz",
> + "/var/log/syslog.29.gz",
> + "/var/log/syslog.30.gz",
> + "/var/log/syslog.31.gz",
> +];
> +
> +/// Parse a QID ([A-Z]+). Returns a tuple of (qid, remaining_text) or None.
> +fn parse_qid(data: &[u8], max: usize) -> Option<(&[u8], &[u8])> {
> + let mut bytes = data.iter();
> + let mut count = 0;
> + while count < max {
> + match bytes.next() {
> + Some(c) => {
> + if c.is_ascii_hexdigit() {
> + count += 1;
> + } else {
> + break;
> + }
> + }
> + None => break,
> + }
> + }
> + if count > 1 {
> + return Some((&data[..count], &data[count..]));
> + }
> + None
> +}
> +
> +/// Parse a number. Returns a tuple of (parsed_number, remaining_text) or None.
> +fn parse_number(data: &[u8], max_digits: usize) -> Option<(usize, &[u8])> {
> + let mut bytes = data.iter();
> + let mut number: usize = 0;
> + let mut digits: usize = 0;
> + while digits < max_digits {
> + let c = match bytes.next() {
> + Some(c) => c,
> + None => break,
> + };
> + if (*c as char).is_digit(10) {
> + digits += 1;
> + number *= 10;
> + number += (*c as char).to_digit(10).unwrap() as usize;
> + } else {
> + break;
> + }
> + }
> + if digits == 0 {
> + None
> + } else {
> + Some((number, &data[digits..]))
> + }
> +}
> +
> +/// Parse time. Returns a tuple of (parsed_time, remaining_text) or None.
> +fn parse_time<'a>(
> + data: &'a [u8],
> + cur_year: i64,
> + cur_month: &mut i64,
> +) -> Option<(libc::time_t, &'a [u8])> {
> + if data.len() < 15 {
> + return None;
> + }
> +
> + let mon = match &data[0..3] {
> + b"Jan" => 0,
> + b"Feb" => 1,
> + b"Mar" => 2,
> + b"Apr" => 3,
> + b"May" => 4,
> + b"Jun" => 5,
> + b"Jul" => 6,
> + b"Aug" => 7,
> + b"Sep" => 8,
> + b"Oct" => 9,
> + b"Nov" => 10,
> + b"Dec" => 11,
> + _ => return None,
> + };
> + let data = &data[3..];
> +
> + let mut ltime: libc::time_t;
> + let mut year = cur_year;
> +
> + if *cur_month == 11 && mon == 0 {
> + year += 1;
> + }
> + if mon > *cur_month {
> + *cur_month = mon;
> + }
> +
> + ltime = (year - 1970) * 365 + CAL_MTOD[mon as usize];
> +
> + if mon <= 1 {
> + year -= 1;
> + }
> +
> + ltime += (year - 1968) / 4;
> + ltime -= (year - 1900) / 100;
> + ltime += (year - 1600) / 400;
> +
> + let whitespace_count = data.iter().take_while(|b| b.is_ascii_whitespace()).count();
> + let data = &data[whitespace_count..];
> +
> + let (mday, data) = match parse_number(data, 2) {
> + Some(t) => t,
> + None => {
> + eprintln!("no day matched");
> + return None;
> + }
> + };
> + if mday == 0 {
> + eprintln!("mday == 0");
> + return None;
> + }
> +
> + ltime += (mday - 1) as i64;
> +
> + let data = &data[1..];
> +
> + let (hour, data) = match parse_number(data, 2) {
> + Some(t) => t,
> + None => {
> + eprintln!("no hour matched");
> + return None;
> + }
> + };
> +
> + ltime *= 24;
> + ltime += hour as i64;
> +
> + if let Some(c) = data.iter().next() {
> + if (*c as char) != ':' {
> + eprintln!("char != ':'");
> + return None;
> + }
> + } else {
> + eprintln!("no next char");
> + return None;
> + }
> + let data = &data[1..];
> +
> + let (min, data) = match parse_number(data, 2) {
> + Some(t) => t,
> + None => {
> + eprintln!("no min matched");
> + return None;
> + }
> + };
> +
> + ltime *= 60;
> + ltime += min as i64;
> +
> + if let Some(c) = data.iter().next() {
> + if (*c as char) != ':' {
> + eprintln!("char != ':'");
> + return None;
> + }
> + } else {
> + eprintln!("no next char");
> + return None;
> + }
> + let data = &data[1..];
> +
> + let (sec, data) = match parse_number(data, 2) {
> + Some(t) => t,
> + None => {
> + eprintln!("no sec matched");
> + return None;
> + }
> + };
> +
> + ltime *= 60;
> + ltime += sec as i64;
> +
> + let data = &data[1..];
> +
> + Some((ltime, data))
> +}
> +
> +const CAL_MTOD: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
> +
> +type ByteSlice<'a> = &'a [u8];
> +/// Parse Host, Service and PID at the beginning of data. Returns a tuple of (host, service, pid, remaining_text).
> +fn parse_host_service_pid(data: &[u8]) -> Option<(ByteSlice, ByteSlice, u64, ByteSlice)> {
> + let host_count = data
> + .iter()
> + .take_while(|b| !(**b as char).is_ascii_whitespace())
> + .count();
> + let host = &data[0..host_count];
> + let data = &data[host_count + 1..]; // whitespace after host
> +
> + let service_count = data
> + .iter()
> + .take_while(|b| {
> + (**b as char).is_ascii_alphabetic() || (**b as char) == '/' || (**b as char) == '-'
> + })
> + .count();
> + let service = &data[0..service_count];
> + let data = &data[service_count..];
> + if data.get(0) != Some(&b'[') {
> + return None;
> + }
> + let data = &data[1..];
> +
> + let pid_count = data
> + .iter()
> + .take_while(|b| (**b as char).is_ascii_digit())
> + .count();
> + let pid = match unsafe { std::str::from_utf8_unchecked(&data[0..pid_count]) }.parse() {
> + // all ascii digits so valid utf8
> + Ok(p) => p,
> + Err(e) => {
> + eprintln!("failed to parse PID: {}", e);
> + return None;
> + }
> + };
> + let data = &data[pid_count..];
> + if !data.starts_with(b"]: ") {
> + eprintln!("error after PID");
> + return None;
> + }
> + let data = &data[3..];
> +
> + Some((host, service, pid, data))
> +}
> +
> +/// A find implementation for [u8]. Returns the index or None.
> +fn find<T: PartialOrd>(data: &[T], needle: &[T]) -> Option<usize> {
> + data.windows(needle.len()).position(|d| d == needle)
> +}
> +
> +/// A find implementation for [u8] that converts to lowercase before the comparison. Returns the
> +/// index or None.
> +fn find_lowercase(data: &[u8], needle: &[u8]) -> Option<usize> {
> + let data = data.to_ascii_lowercase();
> + let needle = needle.to_ascii_lowercase();
> + data.windows(needle.len()).position(|d| d == &needle[..])
> +}
More information about the pmg-devel
mailing list