python Tools March 15, 2025

Security Log Analyzer

A Python script that parses auth.log and syslog to detect brute-force attempts, suspicious logins, and privilege escalation events.

pythonlogsmonitoringforensicsautomation

Description

Security Log Analyzer is a lightweight Python utility designed for rapid triage of Linux authentication logs. It parses /var/log/auth.log (or any compatible syslog-format file) and surfaces the events that matter most during an incident investigation: brute-force SSH attempts, successful logins from unexpected sources, and privilege escalation via sudo.

The tool groups failed authentication attempts by source IP, flags successful logins that fall outside a configurable allowlist, and tracks every sudo invocation with the associated user and command. Results are presented as a color-coded terminal summary or exported as structured JSON for integration with SIEMs and automation pipelines.

Features

  • Brute-force detection — aggregates failed SSH login attempts per IP and flags sources that exceed a configurable threshold.
  • Suspicious login detection — identifies successful authentications originating from IPs not present in a trusted allowlist.
  • Sudo monitoring — captures all sudo commands, including the invoking user and the executed command string.
  • Top offenders summary — ranks source IPs by number of failed attempts for rapid prioritization.
  • JSON output — pass --json to emit machine-readable results suitable for log forwarding or downstream processing.
  • Colored terminal output — uses ANSI colors for quick visual scanning (auto-disabled when piped).
  • Custom log path — analyze any auth.log-compatible file with --file.
  • Configurable threshold — adjust the failed-attempt threshold with --threshold.

Usage

Basic analysis with default settings

python3 log_analyzer.py

Analyze a specific log file with a custom threshold

python3 log_analyzer.py --file /var/log/auth.log.1 --threshold 3

Export results as JSON

python3 log_analyzer.py --json

Pipe JSON into jq for further processing

python3 log_analyzer.py --json | jq '.brute_force_candidates'

Combine with trusted IP allowlist

python3 log_analyzer.py --trusted 192.168.1.10,10.0.0.5

Source Code

#!/usr/bin/env python3
"""Security Log Analyzer — parse auth.log to detect brute-force attempts,
suspicious logins, and privilege escalation events."""

import argparse
import json
import re
import sys
from collections import defaultdict
from datetime import datetime

# ---------------------------------------------------------------------------
# ANSI helpers
# ---------------------------------------------------------------------------

USE_COLOR = sys.stdout.isatty()

def _c(code: str, text: str) -> str:
    return f"\033[{code}m{text}\033[0m" if USE_COLOR else text

def red(t: str) -> str:
    return _c("1;31", t)

def yellow(t: str) -> str:
    return _c("1;33", t)

def green(t: str) -> str:
    return _c("1;32", t)

def cyan(t: str) -> str:
    return _c("1;36", t)

def bold(t: str) -> str:
    return _c("1", t)

# ---------------------------------------------------------------------------
# Regex patterns for auth.log
# ---------------------------------------------------------------------------

RE_FAILED_SSH = re.compile(
    r"(\w+\s+\d+\s+[\d:]+)\s+\S+\s+sshd\[\d+\]:\s+Failed password for"
    r"(?: invalid user)?\s+(\S+)\s+from\s+([\d.]+)"
)
RE_ACCEPTED_SSH = re.compile(
    r"(\w+\s+\d+\s+[\d:]+)\s+\S+\s+sshd\[\d+\]:\s+Accepted\s+\S+\s+for"
    r"\s+(\S+)\s+from\s+([\d.]+)"
)
RE_SUDO = re.compile(
    r"(\w+\s+\d+\s+[\d:]+)\s+\S+\s+sudo:\s+(\S+)\s+:.*COMMAND=(.*)"
)

# ---------------------------------------------------------------------------
# Parsing
# ---------------------------------------------------------------------------

def parse_log(path: str):
    failed = defaultdict(list)   # ip -> [(timestamp, user)]
    accepted = []                # [(timestamp, user, ip)]
    sudo_events = []             # [(timestamp, user, command)]

    try:
        with open(path, "r", errors="replace") as fh:
            for line in fh:
                m = RE_FAILED_SSH.search(line)
                if m:
                    ts, user, ip = m.group(1), m.group(2), m.group(3)
                    failed[ip].append((ts, user))
                    continue

                m = RE_ACCEPTED_SSH.search(line)
                if m:
                    ts, user, ip = m.group(1), m.group(2), m.group(3)
                    accepted.append((ts, user, ip))
                    continue

                m = RE_SUDO.search(line)
                if m:
                    ts, user, cmd = m.group(1), m.group(2), m.group(3).strip()
                    sudo_events.append((ts, user, cmd))
    except FileNotFoundError:
        print(red(f"[!] File not found: {path}"), file=sys.stderr)
        sys.exit(1)
    except PermissionError:
        print(red(f"[!] Permission denied: {path} (try with sudo)"), file=sys.stderr)
        sys.exit(1)

    return failed, accepted, sudo_events

# ---------------------------------------------------------------------------
# Analysis helpers
# ---------------------------------------------------------------------------

def get_brute_force(failed, threshold):
    return {ip: evts for ip, evts in failed.items() if len(evts) >= threshold}

def get_suspicious_logins(accepted, trusted_ips):
    return [(ts, user, ip) for ts, user, ip in accepted if ip not in trusted_ips]

def top_offenders(failed, n=10):
    ranked = sorted(failed.items(), key=lambda x: len(x[1]), reverse=True)
    return ranked[:n]

# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------

def print_report(failed, accepted, sudo_events, threshold, trusted_ips):
    brute = get_brute_force(failed, threshold)
    suspicious = get_suspicious_logins(accepted, trusted_ips)
    top = top_offenders(failed)

    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(bold(f"\n{'=' * 60}"))
    print(bold(f"  Security Log Analyzer — Report {now}"))
    print(bold(f"{'=' * 60}\n"))

    # Brute-force candidates
    print(cyan(f"[*] Brute-force candidates (>= {threshold} failures):\n"))
    if brute:
        for ip, evts in sorted(brute.items(), key=lambda x: -len(x[1])):
            users = set(u for _, u in evts)
            print(red(f"    {ip:>15}{len(evts)} failures  "
                      f"(users: {', '.join(users)})"))
    else:
        print(green("    None detected."))

    # Suspicious logins
    print(cyan(f"\n[*] Successful logins from untrusted IPs:\n"))
    if suspicious:
        for ts, user, ip in suspicious:
            print(yellow(f"    {ts}  {user:>12}  from  {ip}"))
    else:
        print(green("    None detected."))

    # Sudo events
    print(cyan(f"\n[*] Sudo commands ({len(sudo_events)} total):\n"))
    for ts, user, cmd in sudo_events[-20:]:
        print(f"    {ts}  {bold(user):>20}{cmd}")
    if len(sudo_events) > 20:
        print(f"    ... and {len(sudo_events) - 20} more")

    # Top offenders
    print(cyan("\n[*] Top offending IPs by failed attempts:\n"))
    for i, (ip, evts) in enumerate(top, 1):
        bar = "" * min(len(evts), 40)
        print(f"    {i:>2}. {ip:>15}  [{len(evts):>5}]  {red(bar)}")

    print(bold(f"\n{'=' * 60}\n"))

def emit_json(failed, accepted, sudo_events, threshold, trusted_ips):
    brute = get_brute_force(failed, threshold)
    suspicious = get_suspicious_logins(accepted, trusted_ips)
    top = top_offenders(failed)

    report = {
        "generated_at": datetime.now().isoformat(),
        "threshold": threshold,
        "total_failed_ips": len(failed),
        "total_accepted": len(accepted),
        "total_sudo": len(sudo_events),
        "brute_force_candidates": {
            ip: {"count": len(evts), "users": list(set(u for _, u in evts))}
            for ip, evts in brute.items()
        },
        "suspicious_logins": [
            {"timestamp": ts, "user": user, "ip": ip}
            for ts, user, ip in suspicious
        ],
        "sudo_events": [
            {"timestamp": ts, "user": user, "command": cmd}
            for ts, user, cmd in sudo_events
        ],
        "top_offenders": [
            {"ip": ip, "failures": len(evts)} for ip, evts in top
        ],
    }
    print(json.dumps(report, indent=2))

# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description="Analyze auth.log for brute-force, suspicious logins, and sudo usage."
    )
    parser.add_argument(
        "-f", "--file", default="/var/log/auth.log",
        help="Path to the log file (default: /var/log/auth.log)"
    )
    parser.add_argument(
        "-t", "--threshold", type=int, default=5,
        help="Minimum failed attempts to flag as brute-force (default: 5)"
    )
    parser.add_argument(
        "--trusted", default="",
        help="Comma-separated list of trusted IPs to exclude from suspicious logins"
    )
    parser.add_argument(
        "--json", action="store_true", dest="json_out",
        help="Output results as JSON instead of colored text"
    )
    args = parser.parse_args()

    trusted_ips = set(ip.strip() for ip in args.trusted.split(",") if ip.strip())
    failed, accepted, sudo_events = parse_log(args.file)

    if args.json_out:
        emit_json(failed, accepted, sudo_events, args.threshold, trusted_ips)
    else:
        print_report(failed, accepted, sudo_events, args.threshold, trusted_ips)

if __name__ == "__main__":
    main()

Notes

  • Permissions — reading /var/log/auth.log typically requires root or membership in the adm group. Run the script with sudo if you receive a permission error.
  • Log format — the regex patterns expect the standard Debian/Ubuntu auth.log format (Mon DD HH:MM:SS hostname service[pid]: message). Red Hat and derivatives that use journalctl may need journalctl -u sshd --no-pager > /tmp/auth.log first.
  • Trusted IPs — for recurring use, consider wrapping the --trusted flag in a shell alias or a small config file to avoid retyping known-good addresses.
  • Performance — the script processes logs line-by-line in a single pass, making it efficient even on multi-gigabyte rotated logs.
  • Extending detection — additional regex patterns can be added to the parsing section to capture events like PAM failures, su usage, or firewall drops from kern.log.
  • JSON pipeline — combine --json output with tools like jq, mlr, or direct ingestion into Elasticsearch for historical trending.
  • No external dependencies — the script uses only the Python 3 standard library and runs on any modern Linux distribution without installing additional packages.