Files
f2b-report/fail2ban_report.py
2025-08-19 13:33:28 +02:00

155 lines
4.7 KiB
Python

from dataclasses import dataclass
from datetime import datetime, date
import re
import subprocess
import jinja2
import platform
import locale
import argparse
@dataclass
class Ban:
ban_time: datetime
ip: str
jail: str
def read_fail2ban_bans(log_path: str = "/var/log/fail2ban.log") -> list[Ban]:
# Regex to capture full log line components
log_pattern = re.compile(
r"^(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<process>[^\s]+)\s*\[(?P<processid>\d+)\]: (?P<level>\w+)\s*\[(?P<jail>\w+)\] Ban (?P<ip>\d+\.\d+\.\d+\.\d+)"
)
entries: list[Ban] = []
try:
with open(log_path, "r") as log_file:
for line in log_file:
match = log_pattern.match(line)
if match:
d = match.groupdict()
entries.append(
Ban(
datetime.strptime(d["datetime"], "%Y-%m-%d %H:%M:%S,%f"),
d["ip"],
d["jail"],
)
)
except FileNotFoundError:
print(f"Log file not found: {log_path}")
except PermissionError:
print(f"Permission denied when accessing: {log_path}")
return entries
def read_fail2ban_recidivist() -> list[Ban]:
try:
result = subprocess.run(
["fail2ban-client", "get", "recidive", "banip", "--with-time"],
capture_output=True,
text=True,
check=True,
)
bans: list[Ban] = []
for line in result.stdout.splitlines():
match = re.match(
r"^(?P<ip>\d+\.\d+\.\d+\.\d+)\s+(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \+ (?P<bantime>\d+) = (?P<endban>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})",
line,
)
if not match:
print(f"Cannot parse line: {line}")
continue
d = match.groupdict()
bans.append(
Ban(
datetime.strptime(d["datetime"], "%Y-%m-%d %H:%M:%S"),
d["ip"],
"recidive",
)
)
return bans
except subprocess.CalledProcessError as e:
print(f"Error running fail2ban-client: {e}")
return []
def filter_daily_bans(entries: list[Ban], day: date) -> list[Ban]:
return [ban for ban in entries if ban.ban_time.date() == day]
@dataclass
class UniqueBan:
ip: str
count: int
bans: list[Ban]
new: bool
def filter_unique_bans(
entries: list[Ban], new_bans: list[Ban], day: date
) -> dict[str, UniqueBan]:
uniques: dict[str, UniqueBan] = dict()
new_ips = list(map(lambda b: b.ip, new_bans))
for ban in entries:
if ban.ban_time.date() <= day:
if ban.ip not in uniques:
uniques[ban.ip] = UniqueBan(ban.ip, 0, [], ban.ip in new_ips)
uniques[ban.ip].count += 1
uniques[ban.ip].bans.append(ban)
return uniques
def filter_new_bans(all_entries: list[Ban], daily_entries: list[Ban]) -> list[Ban]:
new_bans: list[Ban] = []
for ban in daily_entries:
previous_bans = [b.ip for b in all_entries if b.ban_time < ban.ban_time]
if ban.ip not in previous_bans:
new_bans.append(ban)
return new_bans
def filter_recidivist_bans(entries: list[Ban]) -> list[Ban]:
return [ban for ban in entries if ban.jail == "recidive"]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"date", nargs="?", help="Report for the given date (dd/mm/yyyy)"
)
args = parser.parse_args()
report_date = (
datetime.today()
if args.date is None
else datetime.strptime(args.date, "%d/%m/%Y")
)
locale.setlocale(locale.LC_TIME, "fr_FR.utf8")
ban_entries = read_fail2ban_bans()
daily_bans = filter_daily_bans(ban_entries, report_date.date())
new_bans = filter_new_bans(ban_entries, daily_bans)
unique_ips = filter_unique_bans(daily_bans, new_bans, report_date.date())
recidivist_bans = filter_recidivist_bans(daily_bans)
all_recidivist_bans = read_fail2ban_recidivist()
env = jinja2.Environment(
loader=jinja2.FileSystemLoader("./templates"),
trim_blocks=True,
lstrip_blocks=True,
)
template = env.get_template("report.html")
s = template.render(
{
"server_name": platform.node(),
"report_date": report_date.date(),
"all_bans": ban_entries,
"daily_bans": daily_bans,
"unique_ips": unique_ips,
"new_bans": new_bans,
"recidivist_bans": recidivist_bans,
"all_recidivist_bans": all_recidivist_bans,
}
)
print(s)