formatter + implement status history

This commit is contained in:
csehviktor
2025-07-15 01:08:51 +02:00
parent 61fb1b1583
commit 6a311246f6
16 changed files with 245 additions and 110 deletions
+10 -9
View File
@@ -1,4 +1,4 @@
use common::metrics::{Disk, Memory, Metrics, Network, SystemInfo, CPU}; use common::metrics::{CPU, Disk, Memory, Metrics, Network, SystemInfo};
use sysinfo::{Disks, Networks, System}; use sysinfo::{Disks, Networks, System};
use crate::cpu::CPUStatReader; use crate::cpu::CPUStatReader;
@@ -54,12 +54,16 @@ impl Collector {
CPU { CPU {
usage: self.sys.global_cpu_usage(), usage: self.sys.global_cpu_usage(),
threads: self.sys.cpus().len(), threads: self.sys.cpus().len(),
breakdown: self.cpu_reader.read_global_cpu_stats().unwrap_or_default() breakdown: self.cpu_reader.read_global_cpu_stats().unwrap_or_default(),
} }
} }
fn collect_disk(&self) -> Disk { fn collect_disk(&self) -> Disk {
let disk = self.disks.iter().max_by_key(|disk| disk.total_space()).unwrap(); let disk = self
.disks
.iter()
.max_by_key(|disk| disk.total_space())
.unwrap();
Disk { Disk {
free: disk.available_space(), free: disk.available_space(),
@@ -77,17 +81,14 @@ impl Collector {
} }
fn collect_network(&self) -> Network { fn collect_network(&self) -> Network {
let (down, up): (u64, u64) = self.networks let (down, up): (u64, u64) = self
.networks
.values() .values()
.map(|data| (data.received(), data.transmitted())) .map(|data| (data.received(), data.transmitted()))
.fold((0, 0), |(acc_down, acc_up), (down, up)| { .fold((0, 0), |(acc_down, acc_up), (down, up)| {
(acc_down + down, acc_up + up) (acc_down + down, acc_up + up)
}); });
Network { Network { down, up }
down,
up,
} }
}
} }
+23 -9
View File
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use anyhow::anyhow; use anyhow::anyhow;
use common::metrics::CPUBreakdown; use common::metrics::CPUBreakdown;
use std::collections::HashMap;
#[derive(Default, Debug, Clone, Copy)] #[derive(Default, Debug, Clone, Copy)]
struct CPUValues { struct CPUValues {
@@ -18,8 +18,16 @@ struct CPUValues {
impl CPUValues { impl CPUValues {
pub fn total(&self) -> u64 { pub fn total(&self) -> u64 {
self.user + self.nice + self.system + self.idle + self.iowait + self.user
self.irq + self.softirq + self.steal + self.guest + self.guest_nice + self.nice
+ self.system
+ self.idle
+ self.iowait
+ self.irq
+ self.softirq
+ self.steal
+ self.guest
+ self.guest_nice
} }
} }
@@ -29,7 +37,9 @@ pub struct CPUStatReader {
impl CPUStatReader { impl CPUStatReader {
pub fn new() -> Self { pub fn new() -> Self {
Self { previous_stats: HashMap::new() } Self {
previous_stats: HashMap::new(),
}
} }
pub fn read_global_cpu_stats(&mut self) -> anyhow::Result<CPUBreakdown> { pub fn read_global_cpu_stats(&mut self) -> anyhow::Result<CPUBreakdown> {
@@ -46,7 +56,8 @@ impl CPUStatReader {
fn parse_cpu_line(&mut self, line: &str) -> anyhow::Result<CPUBreakdown> { fn parse_cpu_line(&mut self, line: &str) -> anyhow::Result<CPUBreakdown> {
let mut parts = line.split_whitespace(); let mut parts = line.split_whitespace();
let cpu_name = parts.next() let cpu_name = parts
.next()
.ok_or_else(|| anyhow!("missing cpu name"))? .ok_or_else(|| anyhow!("missing cpu name"))?
.to_string(); .to_string();
@@ -80,7 +91,11 @@ impl CPUStatReader {
Ok(percentages) Ok(percentages)
} }
fn calculate_percentages(&self, current: &CPUValues, previous: Option<CPUValues>) -> CPUBreakdown { fn calculate_percentages(
&self,
current: &CPUValues,
previous: Option<CPUValues>,
) -> CPUBreakdown {
let Some(prev) = previous else { let Some(prev) = previous else {
return CPUBreakdown::default(); return CPUBreakdown::default();
}; };
@@ -90,9 +105,8 @@ impl CPUStatReader {
return CPUBreakdown::default(); return CPUBreakdown::default();
} }
let calculate_pct = |current: u64, prev: u64| { let calculate_pct =
(current.saturating_sub(prev) as f32 / total_delta) * 100.0 |current: u64, prev: u64| (current.saturating_sub(prev) as f32 / total_delta) * 100.0;
};
CPUBreakdown { CPUBreakdown {
system: calculate_pct(current.system, prev.system), system: calculate_pct(current.system, prev.system),
+6 -6
View File
@@ -1,18 +1,18 @@
use common::MQTT_SEND_INTERVAL;
use collector::Collector;
use std::time::Duration;
use mqtt::MqttHandle;
use clap::Parser; use clap::Parser;
use collector::Collector;
use common::MQTT_SEND_INTERVAL;
use mqtt::MqttHandle;
use std::thread; use std::thread;
use std::time::Duration;
pub mod collector; pub mod collector;
pub mod mqtt;
pub mod cpu; pub mod cpu;
pub mod mqtt;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version, about, long_about = None)] #[command(version, about, long_about = None)]
struct Args { struct Args {
#[arg(long, short, default_value = "unknown-agent")] #[arg(long, short, default_value = "agent")]
name: String, name: String,
#[arg(long, default_value = "0.0.0.0")] #[arg(long, default_value = "0.0.0.0")]
+3 -3
View File
@@ -1,7 +1,7 @@
use common::{metrics::Metrics, StatusMessage, MQTT_TOPIC}; use common::{MQTT_TOPIC, StatusMessage, metrics::Metrics};
use rumqttc::{AsyncClient, MqttOptions, QoS}; use rumqttc::{AsyncClient, MqttOptions, QoS};
use tokio::task::JoinHandle;
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle;
pub struct MqttHandle { pub struct MqttHandle {
pub agent: String, pub agent: String,
@@ -28,7 +28,7 @@ impl MqttHandle {
Self { Self {
agent, agent,
client, client,
eventloop_handle eventloop_handle,
} }
} }
+9 -1
View File
@@ -1,7 +1,7 @@
use crate::metrics::Metrics;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Error; use serde_json::Error;
use crate::metrics::Metrics;
pub mod metrics; pub mod metrics;
@@ -37,6 +37,14 @@ impl<'a> TryFrom<&'a [u8]> for StatusMessage {
} }
} }
impl TryFrom<String> for StatusMessage {
type Error = serde_json::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
serde_json::from_str(&value)
}
}
impl From<StatusMessage> for String { impl From<StatusMessage> for String {
fn from(msg: StatusMessage) -> String { fn from(msg: StatusMessage) -> String {
serde_json::to_string(&msg).unwrap_or_else(|_| String::new()) serde_json::to_string(&msg).unwrap_or_else(|_| String::new())
+2 -4
View File
@@ -1,5 +1,5 @@
use common::StatusMessage; use common::StatusMessage;
use tokio::sync::{mpsc::Sender, Mutex}; use tokio::sync::{Mutex, mpsc::Sender};
pub struct ClientManager { pub struct ClientManager {
clients: Mutex<Vec<Sender<StatusMessage>>>, clients: Mutex<Vec<Sender<StatusMessage>>>,
@@ -20,8 +20,6 @@ impl ClientManager {
pub async fn broadcast(&self, message: &StatusMessage) { pub async fn broadcast(&self, message: &StatusMessage) {
let mut clients = self.clients.lock().await; let mut clients = self.clients.lock().await;
clients.retain(|client| { clients.retain(|client| client.try_send(message.clone()).is_ok());
client.try_send(message.clone()).is_ok()
});
} }
} }
+1 -1
View File
@@ -1,8 +1,8 @@
use rumqttd::{Broker, Config}; use rumqttd::{Broker, Config};
use std::sync::Arc; use std::sync::Arc;
use crate::{bridge::ClientManager, storage::StorageRepositoryImpl};
use super::subscriber::MqttSubscriber; use super::subscriber::MqttSubscriber;
use crate::{bridge::ClientManager, storage::StorageRepositoryImpl};
pub struct MqttBroker { pub struct MqttBroker {
broker: &'static mut Broker, broker: &'static mut Broker,
+11 -3
View File
@@ -1,5 +1,5 @@
use rumqttd::{local::LinkRx, Broker, Notification}; use common::{MQTT_TOPIC, StatusMessage};
use common::{StatusMessage, MQTT_TOPIC}; use rumqttd::{Broker, Notification, local::LinkRx};
use std::sync::Arc; use std::sync::Arc;
use crate::{bridge::ClientManager, storage::StorageRepository}; use crate::{bridge::ClientManager, storage::StorageRepository};
@@ -11,7 +11,11 @@ pub struct MqttSubscriber {
} }
impl MqttSubscriber { impl MqttSubscriber {
pub fn new(broker: &Broker, clients: Arc<ClientManager>, storage: Arc<dyn StorageRepository>) -> Self { pub fn new(
broker: &Broker,
clients: Arc<ClientManager>,
storage: Arc<dyn StorageRepository>,
) -> Self {
let (mut link_tx, link_rx) = broker.link("internal-subscriber").unwrap(); let (mut link_tx, link_rx) = broker.link("internal-subscriber").unwrap();
link_tx.subscribe(MQTT_TOPIC).unwrap(); link_tx.subscribe(MQTT_TOPIC).unwrap();
@@ -32,6 +36,10 @@ impl MqttSubscriber {
if let Err(e) = self.storage.record_uptime(&payload.agent).await { if let Err(e) = self.storage.record_uptime(&payload.agent).await {
eprintln!("failed to record uptime for {}: {}", &payload.agent, e); eprintln!("failed to record uptime for {}: {}", &payload.agent, e);
} }
if let Err(e) = self.storage.record_message(&payload).await {
eprintln!("failed to record message for {}: {}", &payload.agent, e);
}
} }
} }
+1 -1
View File
@@ -1,5 +1,5 @@
use serde::Deserialize;
use rumqttd::Config; use rumqttd::Config;
use serde::Deserialize;
const CONFIG_PATH: &str = if cfg!(debug_assertions) { const CONFIG_PATH: &str = if cfg!(debug_assertions) {
"server/config.toml" "server/config.toml"
+7 -4
View File
@@ -1,21 +1,24 @@
use storage::{StorageRepositoryImpl, StorageStrategy};
use broker::manager::MqttBroker; use broker::manager::MqttBroker;
use config::load_config; use config::load_config;
use server::Server; use server::Server;
use std::sync::Arc; use std::sync::Arc;
use storage::{StorageRepositoryImpl, StorageStrategy};
pub mod broker;
pub mod bridge; pub mod bridge;
pub mod broker;
pub mod config; pub mod config;
pub mod storage;
pub mod server; pub mod server;
pub mod storage;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let cfg = load_config()?; let cfg = load_config()?;
let storage = Arc::new(if cfg.storage.sqlite { let storage = Arc::new(if cfg.storage.sqlite {
StorageRepositoryImpl::new(StorageStrategy::SQLite(format!("{}agents.db", cfg.storage.db_path))) StorageRepositoryImpl::new(StorageStrategy::SQLite(format!(
"{}agents.db",
cfg.storage.db_path
)))
} else { } else {
StorageRepositoryImpl::new(StorageStrategy::InMemory) StorageRepositoryImpl::new(StorageStrategy::InMemory)
}); });
+23 -4
View File
@@ -1,5 +1,5 @@
use warp::{reply::json, Filter, Reply, Rejection};
use std::sync::Arc; use std::sync::Arc;
use warp::{Filter, Rejection, Reply, reply::json};
use crate::storage::StorageRepository; use crate::storage::StorageRepository;
@@ -13,10 +13,20 @@ impl HttpRoutes {
} }
pub fn routes(self: Arc<Self>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn routes(self: Arc<Self>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path!("agents") let agents_storage = self.storage.clone();
let history_storage = self.storage.clone();
let agents_route = warp::path!("agents")
.and(warp::get()) .and(warp::get())
.and(warp::any().map(move || self.storage.clone())) .and(warp::any().map(move || agents_storage.clone()))
.and_then(Self::get_agents) .and_then(Self::get_agents);
let history_route = warp::path!("history" / String)
.and(warp::get())
.and(warp::any().map(move || history_storage.clone()))
.and_then(Self::get_history);
agents_route.or(history_route)
} }
async fn get_agents(storage: Arc<dyn StorageRepository>) -> Result<impl Reply, Rejection> { async fn get_agents(storage: Arc<dyn StorageRepository>) -> Result<impl Reply, Rejection> {
@@ -24,4 +34,13 @@ impl HttpRoutes {
Ok(json(&agents)) Ok(json(&agents))
} }
async fn get_history(
agent: String,
storage: Arc<dyn StorageRepository>,
) -> Result<impl Reply, Rejection> {
let history = storage.get_history(&agent).await.unwrap();
Ok(json(&history))
}
} }
+3 -8
View File
@@ -1,7 +1,7 @@
use http::HttpRoutes; use http::HttpRoutes;
use std::sync::Arc;
use warp::Filter; use warp::Filter;
use websocket::WebsocketRoutes; use websocket::WebsocketRoutes;
use std::sync::Arc;
use crate::{bridge::ClientManager, storage::StorageRepositoryImpl}; use crate::{bridge::ClientManager, storage::StorageRepositoryImpl};
@@ -15,19 +15,14 @@ pub struct Server {
impl Server { impl Server {
pub fn new(clients: Arc<ClientManager>, storage: Arc<StorageRepositoryImpl>) -> Self { pub fn new(clients: Arc<ClientManager>, storage: Arc<StorageRepositoryImpl>) -> Self {
Self { Self { clients, storage }
clients,
storage,
}
} }
pub async fn serve(&self) { pub async fn serve(&self) {
let http_routes = Arc::new(HttpRoutes::new(self.storage.inner())); let http_routes = Arc::new(HttpRoutes::new(self.storage.inner()));
let ws_routes = Arc::new(WebsocketRoutes::new(self.clients.clone())); let ws_routes = Arc::new(WebsocketRoutes::new(self.clients.clone()));
let cors = warp::cors() let cors = warp::cors().allow_any_origin().allow_methods(vec!["GET"]);
.allow_any_origin()
.allow_methods(vec!["GET"]);
let routes = http_routes.routes().with(cors).or(ws_routes.routes()); let routes = http_routes.routes().with(cors).or(ws_routes.routes());
+18 -9
View File
@@ -1,8 +1,11 @@
use common::StatusMessage; use common::StatusMessage;
use warp::{filters::ws::{Message, WebSocket}, Filter, Reply, Rejection}; use futures_util::{SinkExt, StreamExt};
use futures_util::{StreamExt, SinkExt};
use tokio::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::channel;
use warp::{
Filter, Rejection, Reply,
filters::ws::{Message, WebSocket},
};
use crate::bridge::ClientManager; use crate::bridge::ClientManager;
@@ -16,17 +19,24 @@ impl WebsocketRoutes {
} }
pub fn routes(self: Arc<Self>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn routes(self: Arc<Self>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("ws") warp::path!("ws" / String)
.and(warp::path::param::<String>())
.and(warp::get()) .and(warp::get())
.and(warp::ws()) .and(warp::ws())
.and(warp::any().map(move || self.clients.clone())) .and(warp::any().map(move || self.clients.clone()))
.map(|agent: String, websocket: warp::ws::Ws, clients: Arc<ClientManager>| { .map(
websocket.on_upgrade(move |websocket| Self::handle_ws_connection(agent, websocket, clients)) |agent: String, websocket: warp::ws::Ws, clients: Arc<ClientManager>| {
websocket.on_upgrade(move |websocket| {
Self::handle_ws_connection(agent, websocket, clients)
}) })
},
)
} }
async fn handle_ws_connection(agent: String, websocket: WebSocket, clients: Arc<ClientManager>) { async fn handle_ws_connection(
agent: String,
websocket: WebSocket,
clients: Arc<ClientManager>,
) {
let (mut ws_tx, _) = websocket.split(); let (mut ws_tx, _) = websocket.split();
let (tx, mut rx) = channel::<StatusMessage>(100); let (tx, mut rx) = channel::<StatusMessage>(100);
@@ -44,5 +54,4 @@ impl WebsocketRoutes {
} }
}); });
} }
} }
+21 -15
View File
@@ -1,29 +1,42 @@
use std::collections::HashMap;
use chrono::Utc;
use tokio::sync::Mutex;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc;
use common::StatusMessage;
use std::collections::HashMap;
use tokio::sync::Mutex;
use super::{StorageRepository, UptimeMessage, UptimeStorageModel}; use super::{StorageRepository, UptimeMessage, UptimeStorageModel};
pub struct InMemoryRepository { pub struct InMemoryRepository {
agents: Mutex<HashMap<String, UptimeStorageModel>> agents: Mutex<HashMap<String, UptimeStorageModel>>,
messages: Mutex<HashMap<String, Vec<StatusMessage>>>,
} }
impl InMemoryRepository { impl InMemoryRepository {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
agents: Default::default(), agents: Default::default(),
messages: Default::default(),
} }
} }
} }
#[async_trait] #[async_trait]
impl StorageRepository for InMemoryRepository { impl StorageRepository for InMemoryRepository {
async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()> {
let mut messages = self.messages.lock().await;
let agent_messages = messages.entry(message.agent.clone()).or_default();
agent_messages.push(message.clone());
Ok(())
}
async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> { async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> {
let mut agents = self.agents.lock().await; let mut agents = self.agents.lock().await;
let now = Utc::now(); let now = Utc::now();
agents.entry(agent.to_string()) agents
.entry(agent.to_string())
.and_modify(|a| { .and_modify(|a| {
a.last_seen = now; a.last_seen = now;
a.message_count += 1; a.message_count += 1;
@@ -38,18 +51,11 @@ impl StorageRepository for InMemoryRepository {
Ok(()) Ok(())
} }
/* async fn get_history(&self, agent: &str) -> anyhow::Result<Vec<StatusMessage>> {
async fn get_uptime(&self, agent: &str) -> anyhow::Result<Option<UptimeMessage>> { let messages = self.messages.lock().await;
let agents = self.agents.lock().await;
match agents.get(agent) { Ok(messages.get(agent).cloned().unwrap_or_default())
Some(data) => {
Ok(Some(data.clone().into()))
} }
None => Ok(None),
}
}
*/
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>> { async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>> {
let agents = self.agents.lock().await; let agents = self.agents.lock().await;
+29 -5
View File
@@ -1,10 +1,11 @@
use serde::{Deserialize, Serialize}; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use common::{MQTT_SEND_INTERVAL, StatusMessage};
use memory::InMemoryRepository;
use rusqlite::Row;
use serde::{Deserialize, Serialize};
use sqlite::SQLiteRepository; use sqlite::SQLiteRepository;
use std::sync::Arc; use std::sync::Arc;
use memory::InMemoryRepository;
use async_trait::async_trait;
use common::MQTT_SEND_INTERVAL;
pub mod memory; pub mod memory;
pub mod sqlite; pub mod sqlite;
@@ -39,9 +40,32 @@ impl Into<UptimeMessage> for UptimeStorageModel {
} }
} }
impl TryFrom<&Row<'_>> for UptimeStorageModel {
type Error = rusqlite::Error;
fn try_from(row: &Row) -> Result<Self, Self::Error> {
let first_seen: DateTime<Utc> = row.get::<_, String>(1)?.parse().map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e))
})?;
let last_seen: DateTime<Utc> = row.get::<_, String>(2)?.parse().map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(e))
})?;
Ok(UptimeStorageModel {
id: row.get(0)?,
first_seen,
last_seen,
message_count: row.get(1)?,
})
}
}
#[async_trait] #[async_trait]
pub trait StorageRepository: Send + Sync { pub trait StorageRepository: Send + Sync {
async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()>;
async fn record_uptime(&self, agent: &str) -> anyhow::Result<()>; async fn record_uptime(&self, agent: &str) -> anyhow::Result<()>;
async fn get_history(&self, agent: &str) -> anyhow::Result<Vec<StatusMessage>>;
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>>; async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>>;
} }
@@ -51,7 +75,7 @@ pub enum StorageStrategy {
} }
pub struct StorageRepositoryImpl { pub struct StorageRepositoryImpl {
inner: Arc<dyn StorageRepository> inner: Arc<dyn StorageRepository>,
} }
impl StorageRepositoryImpl { impl StorageRepositoryImpl {
+77 -27
View File
@@ -1,13 +1,15 @@
use anyhow::Ok;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc;
use common::StatusMessage;
use rusqlite::Connection; use rusqlite::Connection;
use tokio::sync::Mutex;
use std::path::Path; use std::path::Path;
use chrono::{DateTime, Utc}; use tokio::sync::Mutex;
use super::{StorageRepository, UptimeMessage, UptimeStorageModel}; use super::{StorageRepository, UptimeMessage, UptimeStorageModel};
pub struct SQLiteRepository { pub struct SQLiteRepository {
conn: Mutex<Connection> conn: Mutex<Connection>,
} }
impl SQLiteRepository { impl SQLiteRepository {
@@ -27,53 +29,101 @@ impl SQLiteRepository {
message_count INTEGER NOT NULL DEFAULT 0 message_count INTEGER NOT NULL DEFAULT 0
) STRICT; ) STRICT;
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY AUTO_INCREMENT,
agent_id TEXT NOT NULL,
message TEXT NOT NULL,
timestamp TEXT NOT NULL
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
) STRICT;
CREATE INDEX IF NOT EXISTS idx_agents_id ON agents(id); CREATE INDEX IF NOT EXISTS idx_agents_id ON agents(id);
CREATE INDEX IF NOT EXISTS idx_agents_times ON agents(first_seen, last_seen); CREATE INDEX IF NOT EXISTS idx_agents_times ON agents(first_seen, last_seen);
"# CREATE INDEX IF NOT EXISTS idx_messages_agent_id ON messages(agent_id);
).unwrap(); CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp);
"#,
)
.unwrap();
Self { Self {
conn: Mutex::new(conn) conn: Mutex::new(conn),
} }
} }
} }
#[async_trait] #[async_trait]
impl StorageRepository for SQLiteRepository { impl StorageRepository for SQLiteRepository {
async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> { async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()> {
let conn = self.conn.lock().await; let conn = self.conn.lock().await;
let now = Utc::now().to_rfc3339();
conn.execute(r#" let payload_str = message.to_string().unwrap();
INSERT INTO agents (id, first_seen, last_seen, message_count)
VALUES (?1, ?2, ?2, 1) conn.execute(
ON CONFLICT (id) DO UPDATE SET r#"
last_seen = excluded.last_seen, INSERT INTO messages (agent_id, message, timestamp)
message_count = message_count + 1; VALUES (?1, ?2, ?3)
"#, [agent, &now] "#,
[
&message.agent,
&payload_str,
&message.timestamp.to_rfc3339(),
],
)?; )?;
Ok(()) Ok(())
} }
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>> { async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> {
let conn = self.conn.lock().await; let conn = self.conn.lock().await;
let mut stmt = conn.prepare("SELECT id, first_seen, last_seen, message_count FROM agents")?; let now = Utc::now().to_rfc3339();
let result = stmt.query_map([], |row| { conn.execute(
let first_seen: DateTime<Utc> = row.get::<_, String>(1)?.parse().unwrap(); r#"
let last_seen: DateTime<Utc> = row.get::<_, String>(2)?.parse().unwrap(); INSERT INTO agents (id, first_seen, last_seen, message_count)
VALUES (?1, ?2, ?2, 1)
ON CONFLICT (id) DO UPDATE SET
last_seen = excluded.last_seen,
message_count = message_count + 1;
"#,
[agent, &now],
)?;
Ok(UptimeStorageModel { Ok(())
id: row.get(0)?, }
first_seen,
last_seen, async fn get_history(&self, agent: &str) -> anyhow::Result<Vec<StatusMessage>> {
message_count: row.get(3)?, let conn = self.conn.lock().await;
let mut stmt =
conn.prepare("SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ?")?;
let rows = stmt.query_map([agent], |row| {
let row: String = row.get::<_, String>(1).unwrap();
StatusMessage::try_from(row).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
1,
rusqlite::types::Type::Text,
Box::new(e),
)
}) })
})?; })?;
let models: Result<Vec<UptimeStorageModel>, _> = result.collect(); let models: Vec<StatusMessage> = rows.collect::<Result<Vec<StatusMessage>, _>>()?;
Ok(models?.into_iter().map(Into::into).collect()) Ok(models.into_iter().map(Into::into).collect())
}
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>> {
let conn = self.conn.lock().await;
let mut stmt =
conn.prepare("SELECT id, first_seen, last_seen, message_count FROM agents")?;
let rows = stmt.query_map([], |row| UptimeStorageModel::try_from(row))?;
let models: Vec<UptimeStorageModel> =
rows.collect::<Result<Vec<UptimeStorageModel>, _>>()?;
Ok(models.into_iter().map(Into::into).collect())
} }
} }