from dataclasses import dataclass from datetime import datetime, date import re import subprocess import jinja2 import platform import locale import argparse @dataclass class Ban: ban_time: datetime ip: str jail: str def read_fail2ban_bans(log_path: str = "/var/log/fail2ban.log") -> list[Ban]: # Regex to capture full log line components log_pattern = re.compile( r"^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P[^\s]+)\s*\[(?P\d+)\]: (?P\w+)\s*\[(?P\w+)\] Ban (?P\d+\.\d+\.\d+\.\d+)" ) entries: list[Ban] = [] try: with open(log_path, "r") as log_file: for line in log_file: match = log_pattern.match(line) if match: d = match.groupdict() entries.append( Ban( datetime.strptime(d["datetime"], "%Y-%m-%d %H:%M:%S,%f"), d["ip"], d["jail"], ) ) except FileNotFoundError: print(f"Log file not found: {log_path}") except PermissionError: print(f"Permission denied when accessing: {log_path}") return entries def read_fail2ban_recidivist() -> list[Ban]: try: result = subprocess.run( ["fail2ban-client", "get", "recidive", "banip", "--with-time"], capture_output=True, text=True, check=True, ) bans: list[Ban] = [] for line in result.stdout.splitlines(): match = re.match( r"^(?P\d+\.\d+\.\d+\.\d+)\s+(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \+ (?P\d+) = (?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", line, ) if not match: print(f"Cannot parse line: {line}") continue d = match.groupdict() bans.append( Ban( datetime.strptime(d["datetime"], "%Y-%m-%d %H:%M:%S"), d["ip"], "recidive", ) ) return bans except subprocess.CalledProcessError as e: print(f"Error running fail2ban-client: {e}") return [] def filter_daily_bans(entries: list[Ban], day: date) -> list[Ban]: return [ban for ban in entries if ban.ban_time.date() == day] @dataclass class UniqueBan: ip: str count: int bans: list[Ban] new: bool def filter_unique_bans( entries: list[Ban], new_bans: list[Ban], day: date ) -> dict[str, UniqueBan]: uniques: dict[str, UniqueBan] = dict() new_ips = list(map(lambda b: b.ip, new_bans)) for ban in entries: if ban.ban_time.date() <= day: if ban.ip not in uniques: uniques[ban.ip] = UniqueBan(ban.ip, 0, [], ban.ip in new_ips) uniques[ban.ip].count += 1 uniques[ban.ip].bans.append(ban) return uniques def filter_new_bans(all_entries: list[Ban], daily_entries: list[Ban]) -> list[Ban]: new_bans: list[Ban] = [] for ban in daily_entries: previous_bans = [b.ip for b in all_entries if b.ban_time < ban.ban_time] if ban.ip not in previous_bans: new_bans.append(ban) return new_bans def filter_recidivist_bans(entries: list[Ban]) -> list[Ban]: return [ban for ban in entries if ban.jail == "recidive"] if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "date", nargs="?", help="Report for the given date (dd/mm/yyyy)" ) args = parser.parse_args() report_date = ( datetime.today() if args.date is None else datetime.strptime(args.date, "%d/%m/%Y") ) locale.setlocale(locale.LC_TIME, "fr_FR.utf8") ban_entries = read_fail2ban_bans() daily_bans = filter_daily_bans(ban_entries, report_date.date()) new_bans = filter_new_bans(ban_entries, daily_bans) unique_ips = filter_unique_bans(daily_bans, new_bans, report_date.date()) recidivist_bans = filter_recidivist_bans(daily_bans) all_recidivist_bans = read_fail2ban_recidivist() env = jinja2.Environment( loader=jinja2.FileSystemLoader("./templates"), trim_blocks=True, lstrip_blocks=True, ) template = env.get_template("report.html") s = template.render( { "server_name": platform.node(), "report_date": report_date.date(), "all_bans": ban_entries, "daily_bans": daily_bans, "unique_ips": unique_ips, "new_bans": new_bans, "recidivist_bans": recidivist_bans, "all_recidivist_bans": all_recidivist_bans, } ) print(s)