Heray-Was-Here
Server : Apache
System : Linux vps37394.inmotionhosting.com 3.10.0-1160.119.1.vz7.224.4 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : jasonp18 ( 1000)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /usr/lib/rads/venv/lib/python3.13/site-packages/exim_analytics/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/rads/venv/lib/python3.13/site-packages/exim_analytics/api.py
from typing import Literal
import logging
from . import config

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


def session():
    session = requests.Session()
    retries = Retry(total=5, backoff_factor=0.1)
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.headers.update(headers())
    return session


def url_for(route: str) -> str:
    """Construct full URL for given route."""
    return f"{config.data.endpoint_url.rstrip('/')}/{route.lstrip('/')}"


def headers():
    return {
        "Authorization": f"Bearer {config.data.bearer_token}",
        "Content-Type": "application/json",
    }


def send_batch(
    buffer: dict[str, list[dict]],
    platform: Literal["dedi", "vps"] = "dedi",
) -> bool:
    """Send a single batch to the endpoint. Returns True on success, False on failure."""
    sess = session()
    try:
        if platform == "dedi":
            if len(buffer) != 1:
                raise ValueError(
                    "For 'dedi' platform, buffer must contain exactly one host."
                )
            for host, records in buffer.items():
                url = url_for("/v1/ingest/dedi")
                resp = sess.post(url, json={"hostname": host, "data": records})
                if resp.status_code in (200, 201):
                    return True
                logging.warning("Received status code %s", resp.status_code)
                return False
        else:
            to_send = [
                {"hostname": host, "data": records}
                for host, records in buffer.items()
            ]
            url = url_for("/v1/ingest/vps")
            resp = sess.post(url, json=to_send)
            if resp.status_code in (200, 201):
                return True
            logging.warning("Received status code %s", resp.status_code)
            return False
    except Exception:
        logging.exception("Failed to send batch")
        return None


def send_no_mail(
    hostname: str,
) -> bool:
    """
    Notify webserver we have no data this run
    """
    sess = session()

    endpoint = url_for(f"/v1/ingest/no-mail/{hostname}")
    try:
        resp = sess.post(
            endpoint,
        )

        return resp.status_code == 200

    except requests.RequestException:
        return None


def check_status(
    hostname: str,
) -> bool:
    
    sess = session()
    endpoint = url_for(f"/status?host={hostname}")
    try:
        resp = sess.get(
            endpoint,
        )

        return resp.status_code == 200

    except requests.RequestException:
        return None

Hry