[pmg-devel] [PATCH v6 pmg-log-tracker 3/4] add before queue filter support
Mira Limbeck
m.limbeck at proxmox.com
Fri Dec 20 15:08:33 CET 2019
Initial before queue filter support. Requires a patch to pmg-api to add
the pmg-smtp-filter ID to the replies on a reject to correctly match the
pmg-smtp-filter to the smtpd.
Signed-off-by: Mira Limbeck <m.limbeck at proxmox.com>
---
src/main.rs | 386 +++++++++++++++++++++++++++++++++++++---------------
1 file changed, 279 insertions(+), 107 deletions(-)
diff --git a/src/main.rs b/src/main.rs
index 061f48c..73cdc6a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -194,6 +194,14 @@ fn handle_pmg_smtp_filter_message(msg: &[u8], parser: &mut Parser, complete_line
fe.borrow_mut()
.set_accept(to, qid, parser.current_record_state.timestamp);
+ if let Some(qe) = parser.qentries.get(qid) {
+ if !qe.borrow().filtered {
+ qe.borrow_mut().bq_filtered = true;
+ qe.borrow_mut().filter = Some(Rc::clone(&fe));
+ fe.borrow_mut().qentry = Some(Rc::downgrade(qe));
+ }
+ }
+
return;
}
@@ -290,6 +298,11 @@ fn handle_postscreen_message(msg: &[u8], parser: &mut Parser, complete_line: &[u
se.borrow_mut().set_connect(client);
se.borrow_mut().disconnect = true;
se.borrow_mut().print(parser);
+ if let Some(f) = &se.borrow().filter {
+ if let Some(f) = f.upgrade() {
+ parser.free_fentry(&f.borrow().logid);
+ }
+ }
parser.free_sentry(parser.current_record_state.pid);
}
@@ -412,7 +425,18 @@ fn handle_lmtp_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
qe.borrow_mut().filtered = true;
if let Some(fe) = parser.fentries.get(qid) {
- qe.borrow_mut().filter = Some(Rc::downgrade(fe));
+ qe.borrow_mut().filter = Some(Rc::clone(fe));
+ let q = fe.borrow().qentry.clone();
+ if let Some(q) = q {
+ if let Some(q) = q.upgrade() {
+ if !Rc::ptr_eq(&q, &qe) {
+ q.borrow_mut().filtered = false;
+ q.borrow_mut().bq_filtered = false;
+ q.borrow_mut().filter = None;
+ fe.borrow_mut().qentry = Some(Rc::downgrade(&qe));
+ }
+ }
+ }
}
}
}
@@ -444,6 +468,11 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
if se.borrow_mut().remove_unneeded_refs(parser) == 0 {
se.borrow_mut().print(parser);
+ if let Some(f) = &se.borrow().filter {
+ if let Some(f) = f.upgrade() {
+ parser.free_fentry(&f.borrow().logid);
+ }
+ }
parser.free_sentry(se.borrow().pid);
} else {
se.borrow_mut().finalize_refs(parser);
@@ -498,6 +527,68 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
return;
}
+ if msg.starts_with(b"proxy-accept: ") {
+ let data = &msg[14..];
+ if !data.starts_with(b"END-OF-MESSAGE: ") {
+ return;
+ }
+ let data = &data[16..];
+ if !data.starts_with(b"250 2.5.0 OK (") {
+ return;
+ }
+ let data = &data[14..];
+ if let Some((qid, data)) = parse_qid(data, 25) {
+ let fe = get_or_create_fentry(&mut parser.fentries, qid);
+ fe.borrow_mut().is_bq = true;
+ if !fe.borrow().is_accepted {
+ se.borrow_mut().filter = Some(Rc::downgrade(&fe));
+
+ if let Some(from_index) = find(data, b"from=<") {
+ let data = &data[from_index + 6..];
+ let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
+ let from = &data[..from_count];
+ se.borrow_mut().bq_from = from.into();
+ }
+ } else if let Some(qe) = &fe.borrow().qentry {
+ if let Some(qe) = qe.upgrade() {
+ qe.borrow_mut().bq_sentry = Some(Rc::clone(&se));
+ SEntry::add_ref(&se, &qe, true);
+ }
+ }
+ se.borrow_mut().is_bq_accepted = true;
+ }
+
+ return;
+ }
+
+ if msg.starts_with(b"proxy-reject: ") {
+ let data = &msg[14..];
+ if !data.starts_with(b"END-OF-MESSAGE: ") {
+ return;
+ }
+ let data = &data[16..];
+ if let Some(qid_index) = find(data, b"(") {
+ let data = &data[qid_index + 1..];
+ if let Some((qid, data)) = parse_qid(data, 25) {
+ let fe = get_or_create_fentry(&mut parser.fentries, qid);
+ fe.borrow_mut().is_bq = true;
+ if !fe.borrow().is_accepted {
+ se.borrow_mut().filter = Some(Rc::downgrade(&fe));
+ }
+ se.borrow_mut().is_bq_rejected = true;
+
+ if let Some(from_index) = find(data, b"from=<") {
+ let data = &data[from_index + 6..];
+ let from_count = data.iter().take_while(|b| (**b as char) != '>').count();
+ let from = &data[..from_count];
+ se.borrow_mut().bq_from = from.into();
+ }
+ }
+ }
+
+ return;
+ }
+
let (qid, data) = match parse_qid(msg, 15) {
Some(t) => t,
None => return,
@@ -510,7 +601,7 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) {
qe.borrow_mut().string_match = parser.string_match;
}
- SEntry::add_ref(&se, &qe);
+ SEntry::add_ref(&se, &qe, false);
if !data.starts_with(b"client=") {
return;
@@ -586,7 +677,7 @@ impl Default for ToEntry {
}
}
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Copy, Clone)]
enum DStatus {
Invalid,
Accept,
@@ -618,16 +709,6 @@ impl std::fmt::Display for DStatus {
}
}
-impl DStatus {
- fn is_dsn(&self, value: Option<u32>) -> bool {
- match (self, value) {
- (DStatus::Dsn(v), Some(val)) => *v == val,
- (DStatus::Dsn(_), None) => true,
- _ => false,
- }
- }
-}
-
#[derive(Debug, Default)]
struct SEntry {
log: Vec<(Box<[u8]>, u64)>,
@@ -636,10 +717,14 @@ struct SEntry {
pid: u64,
refs: Vec<Weak<RefCell<QEntry>>>,
nq_entries: Vec<NoqueueEntry>,
+ filter: Option<Weak<RefCell<FEntry>>>,
disconnect: bool,
string_match: bool,
timestamp: u64,
rel_line_nr: u64,
+ is_bq_accepted: bool,
+ is_bq_rejected: bool,
+ bq_from: Box<[u8]>,
}
impl SEntry {
@@ -698,22 +783,13 @@ impl SEntry {
{
let mut found = false;
for nq in self.nq_entries.iter_mut().rev() {
- if !parser.options.from.is_empty()
- && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none()
- {
- nq.dstatus = DStatus::Invalid;
- }
-
- if parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist {
- nq.dstatus = DStatus::Invalid;
- }
- if parser.options.exclude_ndr && nq.from.is_empty() {
- nq.dstatus = DStatus::Invalid;
- }
-
- if !parser.options.to.is_empty()
- && !nq.to.is_empty()
- && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none()
+ if (!parser.options.from.is_empty()
+ && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none())
+ || (parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist)
+ || (parser.options.exclude_ndr && nq.from.is_empty())
+ || (!parser.options.to.is_empty()
+ && !nq.to.is_empty()
+ && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none())
{
nq.dstatus = DStatus::Invalid;
}
@@ -760,13 +836,57 @@ impl SEntry {
}
}
- if parser.options.verbose > 1 {
- parser.write_all_ok(b"LOGS:\n");
- for (log, line) in self.log.iter() {
+ let print_filter_to_entries_fn =
+ |fe: &Rc<RefCell<FEntry>>,
+ parser: &mut Parser,
+ se: &SEntry,
+ dstatus: Option<DStatus>| {
+ let mut dstatus = match dstatus {
+ Some(d) => d,
+ None => DStatus::Invalid,
+ };
+ for to in fe.borrow().to_entries.iter().rev() {
+ if dstatus == DStatus::Invalid {
+ dstatus = to.dstatus;
+ }
+ parser.write_all_ok(format!(
+ "TO:{:X}:T{:08X}L{:08X}:{}: from <",
+ to.timestamp as i32, se.timestamp as i32, se.rel_line_nr, dstatus,
+ ));
+ parser.write_all_ok(&se.bq_from);
+ parser.write_all_ok(b"> to <");
+ parser.write_all_ok(&to.to);
+ parser.write_all_ok(b">\n");
+ parser.count += 1;
+ }
+ };
+
+ if let Some(fe) = &self.filter {
+ if let Some(fe) = fe.upgrade() {
+ if fe.borrow().is_bq && !fe.borrow().is_accepted && self.is_bq_accepted {
+ print_filter_to_entries_fn(&fe, parser, self, None);
+ } else if fe.borrow().is_bq && !fe.borrow().is_accepted && self.is_bq_rejected {
+ print_filter_to_entries_fn(&fe, parser, self, Some(DStatus::Noqueue));
+ }
+ }
+ }
+
+ let print_log = |parser: &mut Parser, logs: &Vec<(Box<[u8]>, u64)>| {
+ for (log, line) in logs.iter() {
parser.write_all_ok(format!("L{:08X} ", *line as u32));
parser.write_all_ok(log);
parser.write_all_ok(b"\n");
}
+ };
+ if parser.options.verbose > 1 {
+ parser.write_all_ok(b"LOGS:\n");
+ print_log(parser, &self.log);
+
+ if let Some(f) = &self.filter {
+ if let Some(f) = f.upgrade() {
+ print_log(parser, &f.borrow().log);
+ }
+ }
}
parser.write_all_ok(b"\n");
}
@@ -803,7 +923,6 @@ impl SEntry {
});
for q in to_delete.iter().rev() {
- q.borrow_mut().smtpd = None;
parser.free_qentry(&q.borrow().qid, Some(self));
}
count
@@ -823,8 +942,15 @@ impl SEntry {
let fe = &q.borrow().filter;
if let Some(f) = fe {
- if let Some(f) = f.upgrade() {
- if !f.borrow().finished {
+ if !q.borrow().bq_filtered && !f.borrow().finished {
+ continue;
+ }
+ }
+
+ if !self.is_bq_accepted && q.borrow().bq_sentry.is_some() {
+ if let Some(se) = &q.borrow().bq_sentry {
+ if !se.borrow().disconnect {
+ Self::add_ref(&se, &q, true);
continue;
}
}
@@ -835,24 +961,23 @@ impl SEntry {
for q in qentries.iter().rev() {
q.borrow_mut().print(parser, Some(self));
- q.borrow_mut().smtpd = None;
parser.free_qentry(&q.borrow().qid, Some(self));
if let Some(f) = &q.borrow().filter {
- if let Some(f) = f.upgrade() {
- parser.free_fentry(&f.borrow().logid);
- }
+ parser.free_fentry(&f.borrow().logid);
}
}
}
- fn add_ref(sentry: &Rc<RefCell<SEntry>>, qentry: &Rc<RefCell<QEntry>>) {
+ fn add_ref(sentry: &Rc<RefCell<SEntry>>, qentry: &Rc<RefCell<QEntry>>, bq: bool) {
let smtpd = qentry.borrow().smtpd.clone();
- if let Some(s) = smtpd {
- if !Rc::ptr_eq(sentry, &s) {
- eprintln!("Error: qentry ref already set");
+ if !bq {
+ if let Some(s) = smtpd {
+ if !Rc::ptr_eq(sentry, &s) {
+ eprintln!("Error: qentry ref already set");
+ }
+ return;
}
- return;
}
for q in sentry.borrow().refs.iter() {
@@ -866,7 +991,9 @@ impl SEntry {
}
sentry.borrow_mut().refs.push(Rc::downgrade(qentry));
- qentry.borrow_mut().smtpd = Some(Rc::clone(sentry));
+ if !bq {
+ qentry.borrow_mut().smtpd = Some(Rc::clone(sentry));
+ }
}
}
@@ -874,7 +1001,7 @@ impl SEntry {
struct QEntry {
log: Vec<(Box<[u8]>, u64)>,
smtpd: Option<Rc<RefCell<SEntry>>>,
- filter: Option<Weak<RefCell<FEntry>>>,
+ filter: Option<Rc<RefCell<FEntry>>>,
qid: Box<[u8]>,
from: Box<[u8]>,
client: Box<[u8]>,
@@ -884,7 +1011,9 @@ struct QEntry {
cleanup: bool,
removed: bool,
filtered: bool,
+ bq_filtered: bool,
string_match: bool,
+ bq_sentry: Option<Rc<RefCell<SEntry>>>,
}
impl QEntry {
@@ -905,12 +1034,15 @@ impl QEntry {
return;
}
}
+ if let Some(s) = &self.bq_sentry {
+ if self.bq_filtered && !s.borrow().disconnect {
+ return;
+ }
+ }
if let Some(fe) = self.filter.clone() {
- if let Some(fe) = fe.upgrade() {
- if !fe.borrow().finished {
- return;
- }
+ if !self.bq_filtered && !fe.borrow().finished {
+ return;
}
match self.smtpd.clone() {
@@ -923,7 +1055,7 @@ impl QEntry {
parser.free_qentry(&self.qid, None);
}
- if let Some(fe) = fe.upgrade() {
+ if !self.bq_filtered {
parser.free_fentry(&fe.borrow().logid);
}
} else if let Some(s) = self.smtpd.clone() {
@@ -958,11 +1090,9 @@ impl QEntry {
match m {
Match::Qid(q) => {
if let Some(f) = fe {
- if let Some(f) = f.upgrade() {
- if &f.borrow().logid == q {
- found = true;
- break;
- }
+ if &f.borrow().logid == q {
+ found = true;
+ break;
}
}
if &self.qid == q {
@@ -1050,10 +1180,8 @@ impl QEntry {
}
}
if let Some(f) = fe {
- if let Some(f) = f.upgrade() {
- if f.borrow().string_match {
- string_match = true;
- }
+ if f.borrow().string_match {
+ string_match = true;
}
}
if !string_match {
@@ -1063,53 +1191,72 @@ impl QEntry {
true
}
- fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) {
- let fe = self.filter.clone();
-
- if !self.msgid_matches(parser) {
- return;
- }
+ fn print_qentry_boilerplate(
+ &mut self,
+ parser: &mut Parser,
+ is_se_bq_sentry: bool,
+ se: Option<&SEntry>,
+ ) {
+ parser.write_all_ok(b"QENTRY: ");
+ parser.write_all_ok(&self.qid);
+ parser.write_all_ok(b"\n");
+ parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime));
+ parser.write_all_ok(format!("SIZE: {}\n", self.size));
- if !self.match_list_matches(parser, se) {
- return;
+ if !self.client.is_empty() {
+ parser.write_all_ok(b"CLIENT: ");
+ parser.write_all_ok(&self.client);
+ parser.write_all_ok(b"\n");
+ } else if !is_se_bq_sentry {
+ if let Some(s) = se {
+ if !s.connect.is_empty() {
+ parser.write_all_ok(b"CLIENT: ");
+ parser.write_all_ok(&s.connect);
+ parser.write_all_ok(b"\n");
+ }
+ }
+ } else if let Some(s) = &self.smtpd {
+ if !s.borrow().connect.is_empty() {
+ parser.write_all_ok(b"CLIENT: ");
+ parser.write_all_ok(&s.borrow().connect);
+ parser.write_all_ok(b"\n");
+ }
}
- if !self.host_matches(parser, se) {
- return;
+ if !self.msgid.is_empty() {
+ parser.write_all_ok(b"MSGID: ");
+ parser.write_all_ok(&self.msgid);
+ parser.write_all_ok(b"\n");
}
+ }
- if !self.from_to_matches(parser) {
- return;
- }
+ fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) {
+ let fe = self.filter.clone();
- if !self.string_matches(parser, se) {
+ if !self.msgid_matches(parser)
+ || !self.match_list_matches(parser, se)
+ || !self.host_matches(parser, se)
+ || !self.from_to_matches(parser)
+ || !self.string_matches(parser, se)
+ {
return;
}
- if parser.options.verbose > 0 {
- parser.write_all_ok(b"QENTRY: ");
- parser.write_all_ok(&self.qid);
- parser.write_all_ok(b"\n");
- parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime));
- parser.write_all_ok(format!("SIZE: {}\n", self.size));
+ let is_se_bq_sentry = match (&self.bq_sentry, se) {
+ (Some(s), Some(se)) => std::ptr::eq(s.as_ptr(), se),
+ _ => false,
+ };
- if !self.client.is_empty() {
- parser.write_all_ok(b"CLIENT: ");
- parser.write_all_ok(&self.client);
- parser.write_all_ok(b"\n");
- } else if let Some(s) = se {
- if !s.connect.is_empty() {
- parser.write_all_ok(b"CLIENT: ");
- parser.write_all_ok(&s.connect);
- parser.write_all_ok(b"\n");
+ if is_se_bq_sentry {
+ if let Some(s) = &se {
+ if !s.disconnect {
+ return;
}
}
+ }
- if !self.msgid.is_empty() {
- parser.write_all_ok(b"MSGID: ");
- parser.write_all_ok(&self.msgid);
- parser.write_all_ok(b"\n");
- }
+ if parser.options.verbose > 0 {
+ self.print_qentry_boilerplate(parser, is_se_bq_sentry, se);
}
for to in self.to_entries.iter().rev() {
@@ -1117,9 +1264,9 @@ impl QEntry {
let final_rc;
let final_borrow;
let mut final_to: &ToEntry = to;
- if to.dstatus.is_dsn(Some(2)) {
+ if to.dstatus == DStatus::Dsn(2) {
if let Some(f) = &fe {
- if let Some(f) = f.upgrade() {
+ if !self.bq_filtered || (f.borrow().finished && f.borrow().is_bq) {
final_rc = f;
final_borrow = final_rc.borrow();
for to2 in final_borrow.to_entries.iter().rev() {
@@ -1139,7 +1286,11 @@ impl QEntry {
parser.write_all_ok(b"> to <");
parser.write_all_ok(&final_to.to);
parser.write_all_ok(b"> (");
- parser.write_all_ok(&final_to.relay);
+ if !self.bq_filtered {
+ parser.write_all_ok(&final_to.relay);
+ } else {
+ parser.write_all_ok(&to.relay);
+ }
parser.write_all_ok(b")\n");
parser.count += 1;
}
@@ -1153,21 +1304,38 @@ impl QEntry {
parser.write_all_ok(b"\n");
}
};
- if let Some(s) = se {
- if !s.log.is_empty() {
+ if !is_se_bq_sentry {
+ if let Some(s) = se {
+ let mut logs = s.log.clone();
+ if let Some(bq_se) = &self.bq_sentry {
+ logs.append(&mut bq_se.borrow().log.clone());
+ logs.sort_by(|a, b| a.1.cmp(&b.1));
+ }
+ if !logs.is_empty() {
+ parser.write_all_ok(b"SMTP:\n");
+ print_log(parser, &logs);
+ }
+ }
+ } else if let Some(s) = &self.smtpd {
+ let mut logs = s.borrow().log.clone();
+ if let Some(se) = se {
+ logs.append(&mut se.log.clone());
+ logs.sort_by(|a, b| a.1.cmp(&b.1));
+ }
+ if !logs.is_empty() {
parser.write_all_ok(b"SMTP:\n");
- print_log(parser, &s.log);
+ print_log(parser, &logs);
}
}
if let Some(f) = fe {
- if let Some(f) = f.upgrade() {
- if !f.borrow().log.is_empty() {
- parser.write_all_ok(format!("FILTER: {}\n", unsafe {
- std::str::from_utf8_unchecked(&f.borrow().logid)
- }));
- print_log(parser, &f.borrow().log);
- }
+ if (!self.bq_filtered || (f.borrow().finished && f.borrow().is_bq))
+ && !f.borrow().log.is_empty()
+ {
+ parser.write_all_ok(format!("FILTER: {}\n", unsafe {
+ std::str::from_utf8_unchecked(&f.borrow().logid)
+ }));
+ print_log(parser, &f.borrow().log);
}
}
@@ -1194,6 +1362,9 @@ struct FEntry {
processing_time: Box<[u8]>,
string_match: bool,
finished: bool,
+ is_accepted: bool,
+ qentry: Option<Weak<RefCell<QEntry>>>,
+ is_bq: bool,
}
impl FEntry {
@@ -1205,6 +1376,7 @@ impl FEntry {
timestamp,
};
self.to_entries.push(te);
+ self.is_accepted = true;
}
fn set_quarantine(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {
--
2.20.1
More information about the pmg-devel
mailing list