From 1b3cf07b191ebc02cc7828b4615ac19cbe076fca Mon Sep 17 00:00:00 2001 From: rainfall Date: Tue, 19 Aug 2025 13:33:28 +0200 Subject: [PATCH] First commit --- .flake8 | 43 ++++++++++++ .gitignore | 5 ++ .vscode/settings.json | 3 + README.md | 11 +++ fail2ban_report.py | 154 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + templates/report.html | 32 +++++++++ 7 files changed, 249 insertions(+) create mode 100644 .flake8 create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 README.md create mode 100644 fail2ban_report.py create mode 100644 requirements.txt create mode 100644 templates/report.html diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..7c0c1c9 --- /dev/null +++ b/.flake8 @@ -0,0 +1,43 @@ +[flake8] +# Documentation for flake8 http://flake8.pycqa.org/en/3.1.1/user/index.html +ignore = + # Suppress - line too long (> 79 characters) + E501 + # Suppress - Continuation line missing indentation or outdented + E127 + # Suppress - Function is too complex + C901 + # Multiple statements on one line + E704 + # Assign a lambda expression + E731 + # Bare except + E722 + # Local variable + F841 + # May be undefined + F405 + # list comprehension redefines + F812 + # module level imports + E402 + E126 + E128 + # line break before binary operator + W503 + # line break after binary operator + W504 + # undefined file name exception + F821 + + +exclude = + # No need to traverse our git directory + .git, + # Exclude unittests + tests, + # There's no value in checking cache directories + __pycache__, + # This contains of branch that we don't want to check + # dev +max-complexity = 10 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..86d442d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/ +*.tar +*.gz +*.log +local/ \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..153d107 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.analysis.typeCheckingMode": "strict" +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4557c88 --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +Provide a daily report of banned IPs, recidivists and some other statistics. + +```text +usage: fail2ban_report.py [-h] [date] + +positional arguments: + date Report for the given date (dd/mm/yyyy) + +options: + -h, --help show this help message and exit +``` \ No newline at end of file diff --git a/fail2ban_report.py b/fail2ban_report.py new file mode 100644 index 0000000..92f944c --- /dev/null +++ b/fail2ban_report.py @@ -0,0 +1,154 @@ +from dataclasses import dataclass +from datetime import datetime, date +import re +import subprocess +import jinja2 +import platform +import locale +import argparse + + +@dataclass +class Ban: + ban_time: datetime + ip: str + jail: str + + +def read_fail2ban_bans(log_path: str = "/var/log/fail2ban.log") -> list[Ban]: + # Regex to capture full log line components + log_pattern = re.compile( + r"^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P[^\s]+)\s*\[(?P\d+)\]: (?P\w+)\s*\[(?P\w+)\] Ban (?P\d+\.\d+\.\d+\.\d+)" + ) + + entries: list[Ban] = [] + + try: + with open(log_path, "r") as log_file: + for line in log_file: + match = log_pattern.match(line) + if match: + d = match.groupdict() + entries.append( + Ban( + datetime.strptime(d["datetime"], "%Y-%m-%d %H:%M:%S,%f"), + d["ip"], + d["jail"], + ) + ) + except FileNotFoundError: + print(f"Log file not found: {log_path}") + except PermissionError: + print(f"Permission denied when accessing: {log_path}") + + return entries + + +def read_fail2ban_recidivist() -> list[Ban]: + try: + result = subprocess.run( + ["fail2ban-client", "get", "recidive", "banip", "--with-time"], + capture_output=True, + text=True, + check=True, + ) + bans: list[Ban] = [] + for line in result.stdout.splitlines(): + match = re.match( + r"^(?P\d+\.\d+\.\d+\.\d+)\s+(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \+ (?P\d+) = (?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", + line, + ) + if not match: + print(f"Cannot parse line: {line}") + continue + d = match.groupdict() + bans.append( + Ban( + datetime.strptime(d["datetime"], "%Y-%m-%d %H:%M:%S"), + d["ip"], + "recidive", + ) + ) + return bans + except subprocess.CalledProcessError as e: + print(f"Error running fail2ban-client: {e}") + return [] + + +def filter_daily_bans(entries: list[Ban], day: date) -> list[Ban]: + return [ban for ban in entries if ban.ban_time.date() == day] + + +@dataclass +class UniqueBan: + ip: str + count: int + bans: list[Ban] + new: bool + + +def filter_unique_bans( + entries: list[Ban], new_bans: list[Ban], day: date +) -> dict[str, UniqueBan]: + uniques: dict[str, UniqueBan] = dict() + new_ips = list(map(lambda b: b.ip, new_bans)) + for ban in entries: + if ban.ban_time.date() <= day: + if ban.ip not in uniques: + uniques[ban.ip] = UniqueBan(ban.ip, 0, [], ban.ip in new_ips) + uniques[ban.ip].count += 1 + uniques[ban.ip].bans.append(ban) + return uniques + + +def filter_new_bans(all_entries: list[Ban], daily_entries: list[Ban]) -> list[Ban]: + new_bans: list[Ban] = [] + for ban in daily_entries: + previous_bans = [b.ip for b in all_entries if b.ban_time < ban.ban_time] + if ban.ip not in previous_bans: + new_bans.append(ban) + return new_bans + + +def filter_recidivist_bans(entries: list[Ban]) -> list[Ban]: + return [ban for ban in entries if ban.jail == "recidive"] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "date", nargs="?", help="Report for the given date (dd/mm/yyyy)" + ) + args = parser.parse_args() + report_date = ( + datetime.today() + if args.date is None + else datetime.strptime(args.date, "%d/%m/%Y") + ) + locale.setlocale(locale.LC_TIME, "fr_FR.utf8") + ban_entries = read_fail2ban_bans() + daily_bans = filter_daily_bans(ban_entries, report_date.date()) + new_bans = filter_new_bans(ban_entries, daily_bans) + unique_ips = filter_unique_bans(daily_bans, new_bans, report_date.date()) + recidivist_bans = filter_recidivist_bans(daily_bans) + all_recidivist_bans = read_fail2ban_recidivist() + + env = jinja2.Environment( + loader=jinja2.FileSystemLoader("./templates"), + trim_blocks=True, + lstrip_blocks=True, + ) + template = env.get_template("report.html") + s = template.render( + { + "server_name": platform.node(), + "report_date": report_date.date(), + "all_bans": ban_entries, + "daily_bans": daily_bans, + "unique_ips": unique_ips, + "new_bans": new_bans, + "recidivist_bans": recidivist_bans, + "all_recidivist_bans": all_recidivist_bans, + } + ) + print(s) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7f7afbf --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +jinja2 diff --git a/templates/report.html b/templates/report.html new file mode 100644 index 0000000..602c043 --- /dev/null +++ b/templates/report.html @@ -0,0 +1,32 @@ +Fail2Ban Daily Report + +Here is the daily report for {{ server_name }} on {{ report_date.strftime("%x") }}. + +Statistics + + * {{ all_bans|length }} all-time bans + * {{ daily_bans|length }} ban action performed (all jails) + * {{ unique_ips|length }} unique IP banned + * {{ new_bans|length }} newly seen IP banned + * {{ recidivist_bans|length }} recidivist IP banned +{% if unique_ips %} + +Unique IPs banned today: +{% for ip in unique_ips %} +- {{ ip }} (banned {{ unique_ips[ip].count }} times) {{ 'NEW!' if unique_ips[ip].new }} +{% endfor %} +{%endif %} +{% if recidivist_bans %} + +Recidivist IPs banned: +{% for ban in recidivist_bans %} +- {{ ban.ip }} ({{ ban.ban_time.strftime("%X") }}) +{% endfor %} +{% endif %} +{% if all_recidivist_bans %} + +All recidivist IPs banned: +{% for ban in all_recidivist_bans %} +- {{ ban.ip }} ({{ ban.ban_time.strftime("%c") }}) +{% endfor %} +{% endif %} \ No newline at end of file