Heray-Was-Here
Server : Apache
System : Linux vps37394.inmotionhosting.com 3.10.0-1160.119.1.vz7.224.4 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : jasonp18 ( 1000)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /lib/rads/venv/lib/python3.13/site-packages/exim_analytics/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/rads/venv/lib/python3.13/site-packages/exim_analytics/scraper.py
"""
The purpose of this script is to preemptively detect abusive email behavior
via analytics from exim logs

We will never use this data for marketing or advertising purposes.
We only use this data to help protect our customers and our network.

"""

import json
import logging
import os
import sys
from pathlib import Path
from typing import Literal
from rads import setup_logging
from .api import send_batch, send_no_mail
from . import config
from . import Host
from . import parser

MAX_BATCH_SIZE_BYTES = 5 * 1024 * 1024  # 5MB


def init_logging():
    """Setup logging to /var/log/mail_analytics.log"""
    try:
        setup_logging(
            config.data.log_path, chmod=0o640, loglevel=config.data.log_level,
            print_out='stdout' if os.getenv('DEBUG') == '1' else None,
        )
        return True
    except OSError as e:
        print(f"Failed to setup logging: {e}", file=sys.stderr)
        return False


def read_and_delete_logfile(logfile: str, dry_run: bool = False):
    """Reads all records from logfile and deletes it (unless dry_run)."""
    if not os.path.exists(logfile):
        return

    skipped = 0
    with open(logfile, "r", encoding="utf-8", errors="replace") as f:
        last_pos = 0
        while (line := f.readline()) != "":
            pos = f.tell()
            try:
                rec = json.loads(line)
                yield rec, pos - last_pos
            except json.JSONDecodeError:
                logging.debug(
                    "Skipping invalid JSON line in %s: %s", logfile, line
                )
                skipped += 1
            finally:
                last_pos = pos
    if skipped:
        logging.warning(
            "Skipping %s invalid JSON lines in %s", skipped, logfile
        )
    if not dry_run:
        os.remove(logfile)


def process_hosts(
    hosts: list[Host],
    platform: Literal["dedi", "vps"] = "dedi",
    dry_run: bool = False,
) -> int:
    """Process all hosts and return total records sent."""
    total_sent = 0
    batch_bytes = 0
    buffer = {}
    batch_count = 0

    for h in hosts:
        if not Path(h.offset_file_path).parent.exists():
            Path(h.offset_file_path).parent.mkdir(parents=True, exist_ok=True)
        try:
            if not parser.parse_exim(
                OFFSET_FILE=h.offset_file_path,
                EXIM_LOG=h.exim_log_path,
                OUTPUT_LOG=h.parsed_log_path,
            ):
                logging.error(
                    "Failed to parse exim log for host: %s", h.hostname
                )
                continue
        except Exception:
            logging.exception(
                "Error parsing exim log for host: %s", h.hostname
            )
            continue

    def flush():
        nonlocal batch_count, total_sent, batch_bytes, buffer
        batch_count += 1
        human_mb = f"{batch_bytes / (1024 * 1024):.2f}"

        count = sum(len(records) for records in buffer.values())

        if dry_run:
            logging.info(
                "DRY RUN: Batch #%d: %d items, ~%s MB",
                batch_count,
                count,
                human_mb,
            )
            total_sent += count
        else:
            logging.info(
                "Processing batch #%d: %d items, ~%s MB",
                batch_count,
                count,
                human_mb,
            )
            if send_batch(buffer, platform):
                logging.info("Successfully sent batch #%d", batch_count)
                total_sent += count
            else:
                logging.error("Failed to send batch #%d", batch_count)
                raise RuntimeError(f"Failed to send batch #{batch_count}")
        buffer.clear()

    for host in hosts:
        logging.info(
            f"Processing host: {host.hostname} with log: {host.parsed_log_path}"
        )
        try:
            has_mail = False
            for rec, byte_count in read_and_delete_logfile(
                host.parsed_log_path, dry_run=dry_run
            ):
                records = buffer.setdefault(host.hostname, [])
                records.append(rec)
                batch_bytes += byte_count
                has_mail = True
                if batch_bytes >= MAX_BATCH_SIZE_BYTES:
                    flush()
                    batch_bytes = 0

            if not has_mail:
                logging.info("No mail records for host: %s", host.hostname)
        except Exception:
            logging.exception(
                "Error processing host %s", host.hostname
            )

    if buffer:
        flush()

    return total_sent

Hry