import socket import threading import time import httpx import uvicorn from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from jinja2 import Environment, PackageLoader from velping.config import HTTPServiceConfig, TCPServiceConfig, load_cfg http = httpx.Client( headers={"User-Agent": "velping"}, follow_redirects=True, ) app = FastAPI() app.mount("/static", StaticFiles(packages=["velping"]), name="static") uptime: dict[str, list[str]] = {} templates = Environment(loader=PackageLoader("velping", "templates")) cfg = load_cfg() def handle_service_status( service_name: str, down: bool = False, error: str | None = None, ) -> None: """Update service status and send notifications if configured.""" last_status = uptime[service_name][-1] new_status = "O" if down else "I" if cfg.ntfy.enabled and last_status != new_status: message = f"Service {service_name} is {'offline' if down else 'online'}" http.post( url=f"https://{cfg.ntfy.server}/{cfg.ntfy.topic}", data=message, ) uptime[service_name].pop(0) uptime[service_name].append(new_status) if down: print(f"[E] Error pinging {service_name}: {error}") else: print(f"[I] Pinging {service_name} succeeded!") def tcp_ping(service: TCPServiceConfig) -> None: """Continuously ping a service via TCP.""" socket_type = socket.AF_INET if service.ipv == 4 else socket.AF_INET6 while True: print(f"[I] Pinging {service.name}") try: with socket.socket(family=socket_type, type=socket.SOCK_STREAM) as sock: sock.settimeout(service.timeout) sock.connect((service.hostname, service.port)) handle_service_status(service_name=service.name, down=False) except socket.timeout: handle_service_status( service_name=service.name, down=True, error=f"Connection to {service.name} timed out", ) except Exception as err: handle_service_status( service_name=service.name, down=True, error=str(err), ) time.sleep(cfg.pinging.rate) def http_ping(service: HTTPServiceConfig) -> None: """Continuously ping a service via HTTP.""" while True: print(f"[I] Pinging {service.name}") try: resp = http.get(url=service.url, timeout=service.timeout) resp.raise_for_status() handle_service_status(service_name=service.name, down=False) except Exception as e: handle_service_status( service_name=service.name, down=True, error=str(e), ) time.sleep(cfg.pinging.rate) @app.get("/api/v1/status") async def api_v1_up() -> dict: rsp = {"healthy": True, "services": {}} for name, record in uptime.items(): if record[-1] == "I": rsp["services"][name] = {"healthy": True} else: rsp["services"][name] = {"healthy": False} rsp["healthy"] = False return rsp @app.get("/", response_class=HTMLResponse) async def root() -> str: template = templates.get_template("index.xht") return template.render( cfg=cfg, services=cfg.services, uptime=uptime, ping_rate=cfg.pinging.rate, ) for service in cfg.services: uptime[service.name] = ["?"] * cfg.web.pings ping_func = http_ping if service.type == "http" else tcp_ping threading.Thread( target=ping_func, args=(service,), daemon=True, ).start() uvicorn.run(app, host=cfg.web.addr, port=cfg.web.port)