#!/usr/bin/env python3 import Milter import argparse from email.header import decode_header from email.utils import parseaddr class CheckMilter(Milter.Base): def __init__(self): self.envelope_from = None self.header_from = None @Milter.noreply def connect(self, IPname, family, hostaddr): return Milter.CONTINUE def hello(self, heloname): return Milter.CONTINUE def envfrom(self, mailfrom, *args): self.envelope_from = parseaddr(mailfrom)[1] return Milter.CONTINUE @Milter.noreply def envrcpt(self, to, *str): return Milter.CONTINUE @Milter.noreply def header(self, name, hval): if name.lower() == "from": self.header_from = parseaddr(decode_header(hval)[-1][0])[1] return Milter.CONTINUE def eoh(self): if self.header_from is not None and self.header_from != "" and self.header_from != self.envelope_from: self.setreply("553", xcode="5.7.1", msg="<%s>: From header rejected: not matching envelope From %s" % (self.header_from, self.envelope_from)) return Milter.REJECT return Milter.CONTINUE @Milter.noreply def body(self, chunk): return Milter.CONTINUE def eom(self): return Milter.ACCEPT def close(self): return Milter.CONTINUE def abort(self): return Milter.CONTINUE if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--socket", "-s", type=str, help="socket to listen to") config = parser.parse_args() Milter.factory = CheckMilter Milter.runmilter("check_from", config.socket, timeout=300)