Refactor basically everything
This commit is contained in:
@@ -0,0 +1,120 @@
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
import httpx
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from jinja2 import Environment, PackageLoader
|
||||
|
||||
from velping.config import Config, ServiceConfig, load_cfg
|
||||
|
||||
http = httpx.Client(
|
||||
headers={"User-Agent": "velping"},
|
||||
follow_redirects=True,
|
||||
)
|
||||
app = FastAPI()
|
||||
app.mount("/static", StaticFiles(packages=["velping"]), name="static")
|
||||
uptime: dict[str, list[str]] = {}
|
||||
templates = Environment(loader=PackageLoader("velping", "templates"))
|
||||
cfg = load_cfg()
|
||||
|
||||
|
||||
def handle_service_status(
|
||||
service_name: str,
|
||||
down: bool = False,
|
||||
error: str | None = None,
|
||||
) -> None:
|
||||
"""Update service status and send notifications if configured."""
|
||||
last_status = uptime[service_name][-1]
|
||||
new_status = "O" if down else "I"
|
||||
|
||||
if cfg.ntfy.enabled and last_status != new_status:
|
||||
message = f"Service {service_name} is {'offline' if down else 'online'}"
|
||||
http.post(
|
||||
url=f"https://{cfg.ntfy.server}/{cfg.ntfy.topic}",
|
||||
data=message,
|
||||
)
|
||||
|
||||
uptime[service_name].pop(0)
|
||||
uptime[service_name].append(new_status)
|
||||
|
||||
if down:
|
||||
print(f"[E] Error pinging {service_name}: {error}")
|
||||
else:
|
||||
print(f"[I] Pinging {service_name} succeeded!")
|
||||
|
||||
|
||||
def tcp_ping(service: ServiceConfig) -> None:
|
||||
"""Continuously ping a service via TCP."""
|
||||
|
||||
socket_type = socket.AF_INET if service.ipv == 4 else socket.AF_INET6
|
||||
|
||||
while True:
|
||||
print(f"[I] Pinging {service.name}")
|
||||
|
||||
try:
|
||||
with socket.socket(family=socket_type, type=socket.SOCK_STREAM) as sock:
|
||||
sock.settimeout(service.timeout)
|
||||
sock.connect((service.hostname, service.port))
|
||||
|
||||
handle_service_status(service_name=service.name, down=False)
|
||||
|
||||
except socket.timeout:
|
||||
handle_service_status(
|
||||
service_name=service.name,
|
||||
down=True,
|
||||
error=f"Connection to {service.name} timed out",
|
||||
)
|
||||
except Exception as err:
|
||||
handle_service_status(
|
||||
service_name=service.name,
|
||||
down=True,
|
||||
error=str(err),
|
||||
)
|
||||
|
||||
time.sleep(cfg.pinging.rate)
|
||||
|
||||
|
||||
def http_ping(service: ServiceConfig) -> None:
|
||||
"""Continuously ping a service via HTTP."""
|
||||
while True:
|
||||
print(f"[I] Pinging {service.name}")
|
||||
|
||||
try:
|
||||
resp = http.get(url=service.url, timeout=service.timeout)
|
||||
resp.raise_for_status()
|
||||
handle_service_status(service_name=service.name, down=False)
|
||||
|
||||
except Exception as e:
|
||||
handle_service_status(
|
||||
service_name=service.name,
|
||||
down=True,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
time.sleep(cfg.pinging.rate)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root(cfg: Config) -> str:
|
||||
template = templates.get_template("index.xht")
|
||||
return template.render(
|
||||
cfg=cfg,
|
||||
services=cfg.services,
|
||||
uptime=uptime,
|
||||
)
|
||||
|
||||
|
||||
for service in cfg.services:
|
||||
uptime[service.name] = ["?"] * cfg.web.pings
|
||||
|
||||
ping_func = http_ping if service.type == "http" else tcp_ping
|
||||
threading.Thread(
|
||||
target=ping_func,
|
||||
args=(service,),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
uvicorn.run(app, host=cfg.web.addr, port=cfg.web.port)
|
||||
Reference in New Issue
Block a user