Last active
March 22, 2019 21:11
-
-
Save pannal/ff8066e272e2ecd42621894f6c843dce to your computer and use it in GitHub Desktop.
Rspamd learn with automatic sender-whitelist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import sys | |
import subprocess | |
import json | |
import logging | |
import argparse | |
import email | |
import mailparser | |
_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] | |
def _log_level_string_to_int(log_level_string): | |
if not log_level_string in _LOG_LEVEL_STRINGS: | |
message = 'invalid choice: {0} (choose from {1})'.format(log_level_string, _LOG_LEVEL_STRINGS) | |
raise argparse.ArgumentTypeError(message) | |
log_level_int = getattr(logging, log_level_string, logging.INFO) | |
# check the logging log_level_choices have not changed from our expected values | |
assert isinstance(log_level_int, int) | |
return log_level_int | |
WL_PATH_DEF = "/etc/rspamd/local.d/whitelist.txt" | |
RC_PATH_DEF = "/usr/bin/rspamc" | |
parser = argparse.ArgumentParser(description="""Learn messages via rspamc and manage a sender whitelist. Depends on https://github.com/SpamScope/mail-parser.\n\nUse with local.d/multimap.conf:\nSENDER_FROM_WHITELIST {{ | |
type = "from"; | |
map = "file://{}"; # default; set this via -w/--whitelist-path | |
prefilter = true; | |
action = "accept"; | |
filter = "email"; # use "email:domain" for --use-domains mode | |
}}""".format(WL_PATH_DEF), formatter_class=argparse.RawDescriptionHelpFormatter) | |
parser.add_argument("message_class", help="What to classify the message as: 'ham' or 'spam'") | |
parser.add_argument("--input", "-i", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Mail message input; read from stdin by default") | |
parser.add_argument("-d", "--use-domains", help="Use domains instead of the full e-mail addresses inside the whitelist", default=False, action="store_true") | |
parser.add_argument("-w", "--whitelist-path", help="Path to whitelist file *NEEDS TO BE WRITABLE BY THE CURRENT USER*; Default: {}".format(WL_PATH_DEF), default=WL_PATH_DEF) | |
parser.add_argument("-r", "--rspamc-path", help="Path to rspamc binary; Default: {}".format(RC_PATH_DEF), default=RC_PATH_DEF) | |
parser.add_argument("--log-file", help="Path to log file; Default: stdout", default=None) | |
parser.add_argument('--log-level', | |
default='INFO', | |
dest='log_level', | |
type=_log_level_string_to_int, | |
nargs='?', | |
help='Set the logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS)) | |
parser.add_argument('--mailparse-log-level', | |
default='WARNING', | |
dest='mp_log_level', | |
type=_log_level_string_to_int, | |
nargs='?', | |
help='Set the mailparse logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS)) | |
log = logging.getLogger() | |
if __name__ == "__main__": | |
# parse argv | |
args = parser.parse_args(args=None if sys.argv[1:] else ['--help']) | |
# set up logging | |
formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') | |
if args.log_file: | |
fileHandler = logging.FileHandler("{}".format(args.log_file)) | |
fileHandler.setFormatter(formatter) | |
log.addHandler(fileHandler) | |
else: | |
consoleHandler = logging.StreamHandler() | |
consoleHandler.setFormatter(formatter) | |
log.addHandler(consoleHandler) | |
log.setLevel(args.log_level) | |
logging.getLogger("mailparser").setLevel(args.mp_log_level) | |
log.debug("Called rspamd_learn.py") | |
# main | |
try: | |
cls = args.message_class | |
if cls not in ("spam", "ham"): | |
raise ValueError("First argument must be 'ham' or 'spam'") | |
what = args.input.read() | |
if not what: | |
raise ValueError("Either pass the to-be-processed message as stdin or via -i/--input") | |
# use mailparser to get sender addresses | |
mail = mailparser.parse_from_string(what) | |
from_lines = mail.from_[:] | |
for k in ("return_path", "envelope_from", "sender", "x_mail_from"): | |
val = getattr(mail, k) | |
if not val: | |
continue | |
if not isinstance(val, list): | |
val = [val] | |
from_lines += email.utils.getaddresses(val) | |
# parse current whitelist and update it according to the current message | |
with open(args.whitelist_path, "r+", encoding="utf-8") as f: | |
# this might be a tad naive | |
whitelist_orig = f.read().split() | |
whitelist = whitelist_orig[:] | |
for omit, addr in list(set(from_lines)): | |
addr_or_domain = addr | |
if args.use_domains: | |
addr_or_domain = addr.split("@")[1] | |
if cls == "spam" and addr_or_domain in whitelist: | |
action = "remove" | |
elif cls == "ham" and addr_or_domain not in whitelist: | |
action = "append" | |
else: | |
log.debug("whitelist: {} already marked as {}".format(addr_or_domain, cls)) | |
continue | |
log.info("{}: {}".format(cls, addr_or_domain)) | |
getattr(whitelist, action)(addr_or_domain) | |
if whitelist_orig != whitelist: | |
f.truncate(0) | |
f.seek(0) | |
f.writelines("\n".join(list(set(whitelist)))+"\n") | |
# rspamc learn message | |
ret = subprocess.check_output([args.rspamc_path, "learn_{}".format(cls)], input=what, universal_newlines=True) | |
log.debug("Rspamc result: {}".format(ret)) | |
except Exception as e: | |
log.exception(e) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment