123 lines
2.9 KiB
Python
123 lines
2.9 KiB
Python
import socket
|
|
import threading
|
|
import time
|
|
|
|
import httpx
|
|
import uvicorn
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from jinja2 import Environment, PackageLoader
|
|
|
|
from velping.config import HTTPServiceConfig, TCPServiceConfig, load_cfg
|
|
|
|
http = httpx.Client(
|
|
headers={"User-Agent": "velping"},
|
|
follow_redirects=True,
|
|
)
|
|
app = FastAPI()
|
|
app.mount("/static", StaticFiles(packages=["velping"]), name="static")
|
|
uptime: dict[str, list[str]] = {}
|
|
templates = Environment(loader=PackageLoader("velping", "templates"))
|
|
cfg = load_cfg()
|
|
|
|
|
|
def handle_service_status(
|
|
service_name: str,
|
|
down: bool = False,
|
|
error: str | None = None,
|
|
) -> None:
|
|
"""Update service status and send notifications if configured."""
|
|
last_status = uptime[service_name][-1]
|
|
new_status = "O" if down else "I"
|
|
|
|
if cfg.ntfy.enabled and last_status != new_status:
|
|
message = f"Service {service_name} is {'offline' if down else 'online'}"
|
|
http.post(
|
|
url=f"https://{cfg.ntfy.server}/{cfg.ntfy.topic}",
|
|
data=message,
|
|
)
|
|
|
|
uptime[service_name].pop(0)
|
|
uptime[service_name].append(new_status)
|
|
|
|
if down:
|
|
print(f"[E] Error pinging {service_name}: {error}")
|
|
else:
|
|
print(f"[I] Pinging {service_name} succeeded!")
|
|
|
|
|
|
def tcp_ping(service: TCPServiceConfig) -> None:
|
|
"""Continuously ping a service via TCP."""
|
|
|
|
socket_type = socket.AF_INET if service.ipv == 4 else socket.AF_INET6
|
|
|
|
while True:
|
|
print(f"[I] Pinging {service.name}")
|
|
|
|
try:
|
|
with socket.socket(family=socket_type, type=socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(service.timeout)
|
|
sock.connect((service.hostname, service.port))
|
|
|
|
handle_service_status(service_name=service.name, down=False)
|
|
|
|
except socket.timeout:
|
|
handle_service_status(
|
|
service_name=service.name,
|
|
down=True,
|
|
error=f"Connection to {service.name} timed out",
|
|
)
|
|
except Exception as err:
|
|
handle_service_status(
|
|
service_name=service.name,
|
|
down=True,
|
|
error=str(err),
|
|
)
|
|
|
|
time.sleep(cfg.pinging.rate)
|
|
|
|
|
|
def http_ping(service: HTTPServiceConfig) -> None:
|
|
"""Continuously ping a service via HTTP."""
|
|
while True:
|
|
print(f"[I] Pinging {service.name}")
|
|
|
|
try:
|
|
resp = http.get(url=service.url, timeout=service.timeout)
|
|
resp.raise_for_status()
|
|
handle_service_status(service_name=service.name, down=False)
|
|
|
|
except Exception as e:
|
|
handle_service_status(
|
|
service_name=service.name,
|
|
down=True,
|
|
error=str(e),
|
|
)
|
|
|
|
time.sleep(cfg.pinging.rate)
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root() -> str:
|
|
template = templates.get_template("index.xht")
|
|
return template.render(
|
|
cfg=cfg,
|
|
services=cfg.services,
|
|
uptime=uptime,
|
|
ping_rate=cfg.pinging.rate,
|
|
)
|
|
|
|
|
|
for service in cfg.services:
|
|
uptime[service.name] = ["?"] * cfg.web.pings
|
|
|
|
ping_func = http_ping if service.type == "http" else tcp_ping
|
|
threading.Thread(
|
|
target=ping_func,
|
|
args=(service,),
|
|
daemon=True,
|
|
).start()
|
|
|
|
uvicorn.run(app, host=cfg.web.addr, port=cfg.web.port)
|