diff --git a/agent/src/collector.rs b/agent/src/collector.rs index d418e64..1b7240b 100644 --- a/agent/src/collector.rs +++ b/agent/src/collector.rs @@ -1,4 +1,4 @@ -use common::metrics::{Disk, Memory, Metrics, Network, SystemInfo, CPU}; +use common::metrics::{CPU, Disk, Memory, Metrics, Network, SystemInfo}; use sysinfo::{Disks, Networks, System}; use crate::cpu::CPUStatReader; @@ -54,12 +54,16 @@ impl Collector { CPU { usage: self.sys.global_cpu_usage(), threads: self.sys.cpus().len(), - breakdown: self.cpu_reader.read_global_cpu_stats().unwrap_or_default() + breakdown: self.cpu_reader.read_global_cpu_stats().unwrap_or_default(), } } fn collect_disk(&self) -> Disk { - let disk = self.disks.iter().max_by_key(|disk| disk.total_space()).unwrap(); + let disk = self + .disks + .iter() + .max_by_key(|disk| disk.total_space()) + .unwrap(); Disk { free: disk.available_space(), @@ -77,17 +81,14 @@ impl Collector { } fn collect_network(&self) -> Network { - let (down, up): (u64, u64) = self.networks + let (down, up): (u64, u64) = self + .networks .values() .map(|data| (data.received(), data.transmitted())) .fold((0, 0), |(acc_down, acc_up), (down, up)| { (acc_down + down, acc_up + up) }); - Network { - down, - up, - } + Network { down, up } } - } diff --git a/agent/src/cpu.rs b/agent/src/cpu.rs index 71293f9..3fdf810 100644 --- a/agent/src/cpu.rs +++ b/agent/src/cpu.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; use anyhow::anyhow; use common::metrics::CPUBreakdown; +use std::collections::HashMap; #[derive(Default, Debug, Clone, Copy)] struct CPUValues { @@ -18,8 +18,16 @@ struct CPUValues { impl CPUValues { pub fn total(&self) -> u64 { - self.user + self.nice + self.system + self.idle + self.iowait + - self.irq + self.softirq + self.steal + self.guest + self.guest_nice + self.user + + self.nice + + self.system + + self.idle + + self.iowait + + self.irq + + self.softirq + + self.steal + + self.guest + + self.guest_nice } } @@ -29,7 +37,9 @@ pub struct CPUStatReader { impl CPUStatReader { pub fn new() -> Self { - Self { previous_stats: HashMap::new() } + Self { + previous_stats: HashMap::new(), + } } pub fn read_global_cpu_stats(&mut self) -> anyhow::Result { @@ -46,7 +56,8 @@ impl CPUStatReader { fn parse_cpu_line(&mut self, line: &str) -> anyhow::Result { let mut parts = line.split_whitespace(); - let cpu_name = parts.next() + let cpu_name = parts + .next() .ok_or_else(|| anyhow!("missing cpu name"))? .to_string(); @@ -80,7 +91,11 @@ impl CPUStatReader { Ok(percentages) } - fn calculate_percentages(&self, current: &CPUValues, previous: Option) -> CPUBreakdown { + fn calculate_percentages( + &self, + current: &CPUValues, + previous: Option, + ) -> CPUBreakdown { let Some(prev) = previous else { return CPUBreakdown::default(); }; @@ -90,9 +105,8 @@ impl CPUStatReader { return CPUBreakdown::default(); } - let calculate_pct = |current: u64, prev: u64| { - (current.saturating_sub(prev) as f32 / total_delta) * 100.0 - }; + let calculate_pct = + |current: u64, prev: u64| (current.saturating_sub(prev) as f32 / total_delta) * 100.0; CPUBreakdown { system: calculate_pct(current.system, prev.system), diff --git a/agent/src/main.rs b/agent/src/main.rs index 3022eb8..abec932 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,18 +1,18 @@ -use common::MQTT_SEND_INTERVAL; -use collector::Collector; -use std::time::Duration; -use mqtt::MqttHandle; use clap::Parser; +use collector::Collector; +use common::MQTT_SEND_INTERVAL; +use mqtt::MqttHandle; use std::thread; +use std::time::Duration; pub mod collector; -pub mod mqtt; pub mod cpu; +pub mod mqtt; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { - #[arg(long, short, default_value = "unknown-agent")] + #[arg(long, short, default_value = "agent")] name: String, #[arg(long, default_value = "0.0.0.0")] diff --git a/agent/src/mqtt.rs b/agent/src/mqtt.rs index eea340f..fb81586 100644 --- a/agent/src/mqtt.rs +++ b/agent/src/mqtt.rs @@ -1,7 +1,7 @@ -use common::{metrics::Metrics, StatusMessage, MQTT_TOPIC}; +use common::{MQTT_TOPIC, StatusMessage, metrics::Metrics}; use rumqttc::{AsyncClient, MqttOptions, QoS}; -use tokio::task::JoinHandle; use std::time::Duration; +use tokio::task::JoinHandle; pub struct MqttHandle { pub agent: String, @@ -28,7 +28,7 @@ impl MqttHandle { Self { agent, client, - eventloop_handle + eventloop_handle, } } diff --git a/common/src/lib.rs b/common/src/lib.rs index abbad5e..c39b729 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,7 +1,7 @@ +use crate::metrics::Metrics; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Error; -use crate::metrics::Metrics; pub mod metrics; @@ -37,6 +37,14 @@ impl<'a> TryFrom<&'a [u8]> for StatusMessage { } } +impl TryFrom for StatusMessage { + type Error = serde_json::Error; + + fn try_from(value: String) -> Result { + serde_json::from_str(&value) + } +} + impl From for String { fn from(msg: StatusMessage) -> String { serde_json::to_string(&msg).unwrap_or_else(|_| String::new()) diff --git a/server/src/bridge.rs b/server/src/bridge.rs index de41c61..ab5e399 100644 --- a/server/src/bridge.rs +++ b/server/src/bridge.rs @@ -1,5 +1,5 @@ use common::StatusMessage; -use tokio::sync::{mpsc::Sender, Mutex}; +use tokio::sync::{Mutex, mpsc::Sender}; pub struct ClientManager { clients: Mutex>>, @@ -20,8 +20,6 @@ impl ClientManager { pub async fn broadcast(&self, message: &StatusMessage) { let mut clients = self.clients.lock().await; - clients.retain(|client| { - client.try_send(message.clone()).is_ok() - }); + clients.retain(|client| client.try_send(message.clone()).is_ok()); } } diff --git a/server/src/broker/manager.rs b/server/src/broker/manager.rs index 0ba9d64..1677344 100644 --- a/server/src/broker/manager.rs +++ b/server/src/broker/manager.rs @@ -1,8 +1,8 @@ use rumqttd::{Broker, Config}; use std::sync::Arc; -use crate::{bridge::ClientManager, storage::StorageRepositoryImpl}; use super::subscriber::MqttSubscriber; +use crate::{bridge::ClientManager, storage::StorageRepositoryImpl}; pub struct MqttBroker { broker: &'static mut Broker, diff --git a/server/src/broker/subscriber.rs b/server/src/broker/subscriber.rs index 3831cd3..fb327a2 100644 --- a/server/src/broker/subscriber.rs +++ b/server/src/broker/subscriber.rs @@ -1,5 +1,5 @@ -use rumqttd::{local::LinkRx, Broker, Notification}; -use common::{StatusMessage, MQTT_TOPIC}; +use common::{MQTT_TOPIC, StatusMessage}; +use rumqttd::{Broker, Notification, local::LinkRx}; use std::sync::Arc; use crate::{bridge::ClientManager, storage::StorageRepository}; @@ -11,7 +11,11 @@ pub struct MqttSubscriber { } impl MqttSubscriber { - pub fn new(broker: &Broker, clients: Arc, storage: Arc) -> Self { + pub fn new( + broker: &Broker, + clients: Arc, + storage: Arc, + ) -> Self { let (mut link_tx, link_rx) = broker.link("internal-subscriber").unwrap(); link_tx.subscribe(MQTT_TOPIC).unwrap(); @@ -32,6 +36,10 @@ impl MqttSubscriber { if let Err(e) = self.storage.record_uptime(&payload.agent).await { eprintln!("failed to record uptime for {}: {}", &payload.agent, e); } + + if let Err(e) = self.storage.record_message(&payload).await { + eprintln!("failed to record message for {}: {}", &payload.agent, e); + } } } diff --git a/server/src/config.rs b/server/src/config.rs index 41709e5..b86ae81 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -1,5 +1,5 @@ -use serde::Deserialize; use rumqttd::Config; +use serde::Deserialize; const CONFIG_PATH: &str = if cfg!(debug_assertions) { "server/config.toml" diff --git a/server/src/main.rs b/server/src/main.rs index 413a0ed..a19d362 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,21 +1,24 @@ -use storage::{StorageRepositoryImpl, StorageStrategy}; use broker::manager::MqttBroker; use config::load_config; use server::Server; use std::sync::Arc; +use storage::{StorageRepositoryImpl, StorageStrategy}; -pub mod broker; pub mod bridge; +pub mod broker; pub mod config; -pub mod storage; pub mod server; +pub mod storage; #[tokio::main] async fn main() -> anyhow::Result<()> { let cfg = load_config()?; let storage = Arc::new(if cfg.storage.sqlite { - StorageRepositoryImpl::new(StorageStrategy::SQLite(format!("{}agents.db", cfg.storage.db_path))) + StorageRepositoryImpl::new(StorageStrategy::SQLite(format!( + "{}agents.db", + cfg.storage.db_path + ))) } else { StorageRepositoryImpl::new(StorageStrategy::InMemory) }); diff --git a/server/src/server/http.rs b/server/src/server/http.rs index a1f83f6..a7ea5f1 100644 --- a/server/src/server/http.rs +++ b/server/src/server/http.rs @@ -1,5 +1,5 @@ -use warp::{reply::json, Filter, Reply, Rejection}; use std::sync::Arc; +use warp::{Filter, Rejection, Reply, reply::json}; use crate::storage::StorageRepository; @@ -13,10 +13,20 @@ impl HttpRoutes { } pub fn routes(self: Arc) -> impl Filter + Clone { - warp::path!("agents") + let agents_storage = self.storage.clone(); + let history_storage = self.storage.clone(); + + let agents_route = warp::path!("agents") .and(warp::get()) - .and(warp::any().map(move || self.storage.clone())) - .and_then(Self::get_agents) + .and(warp::any().map(move || agents_storage.clone())) + .and_then(Self::get_agents); + + let history_route = warp::path!("history" / String) + .and(warp::get()) + .and(warp::any().map(move || history_storage.clone())) + .and_then(Self::get_history); + + agents_route.or(history_route) } async fn get_agents(storage: Arc) -> Result { @@ -24,4 +34,13 @@ impl HttpRoutes { Ok(json(&agents)) } + + async fn get_history( + agent: String, + storage: Arc, + ) -> Result { + let history = storage.get_history(&agent).await.unwrap(); + + Ok(json(&history)) + } } diff --git a/server/src/server/mod.rs b/server/src/server/mod.rs index c004364..84f0e67 100644 --- a/server/src/server/mod.rs +++ b/server/src/server/mod.rs @@ -1,7 +1,7 @@ use http::HttpRoutes; +use std::sync::Arc; use warp::Filter; use websocket::WebsocketRoutes; -use std::sync::Arc; use crate::{bridge::ClientManager, storage::StorageRepositoryImpl}; @@ -15,19 +15,14 @@ pub struct Server { impl Server { pub fn new(clients: Arc, storage: Arc) -> Self { - Self { - clients, - storage, - } + Self { clients, storage } } pub async fn serve(&self) { let http_routes = Arc::new(HttpRoutes::new(self.storage.inner())); let ws_routes = Arc::new(WebsocketRoutes::new(self.clients.clone())); - let cors = warp::cors() - .allow_any_origin() - .allow_methods(vec!["GET"]); + let cors = warp::cors().allow_any_origin().allow_methods(vec!["GET"]); let routes = http_routes.routes().with(cors).or(ws_routes.routes()); diff --git a/server/src/server/websocket.rs b/server/src/server/websocket.rs index 3f0adfa..a6cbc25 100644 --- a/server/src/server/websocket.rs +++ b/server/src/server/websocket.rs @@ -1,8 +1,11 @@ use common::StatusMessage; -use warp::{filters::ws::{Message, WebSocket}, Filter, Reply, Rejection}; -use futures_util::{StreamExt, SinkExt}; -use tokio::sync::mpsc::channel; +use futures_util::{SinkExt, StreamExt}; use std::sync::Arc; +use tokio::sync::mpsc::channel; +use warp::{ + Filter, Rejection, Reply, + filters::ws::{Message, WebSocket}, +}; use crate::bridge::ClientManager; @@ -16,17 +19,24 @@ impl WebsocketRoutes { } pub fn routes(self: Arc) -> impl Filter + Clone { - warp::path("ws") - .and(warp::path::param::()) + warp::path!("ws" / String) .and(warp::get()) .and(warp::ws()) .and(warp::any().map(move || self.clients.clone())) - .map(|agent: String, websocket: warp::ws::Ws, clients: Arc| { - websocket.on_upgrade(move |websocket| Self::handle_ws_connection(agent, websocket, clients)) - }) + .map( + |agent: String, websocket: warp::ws::Ws, clients: Arc| { + websocket.on_upgrade(move |websocket| { + Self::handle_ws_connection(agent, websocket, clients) + }) + }, + ) } - async fn handle_ws_connection(agent: String, websocket: WebSocket, clients: Arc) { + async fn handle_ws_connection( + agent: String, + websocket: WebSocket, + clients: Arc, + ) { let (mut ws_tx, _) = websocket.split(); let (tx, mut rx) = channel::(100); @@ -44,5 +54,4 @@ impl WebsocketRoutes { } }); } - } diff --git a/server/src/storage/memory.rs b/server/src/storage/memory.rs index 30bd035..5e4e425 100644 --- a/server/src/storage/memory.rs +++ b/server/src/storage/memory.rs @@ -1,29 +1,42 @@ -use std::collections::HashMap; -use chrono::Utc; -use tokio::sync::Mutex; use async_trait::async_trait; +use chrono::Utc; +use common::StatusMessage; +use std::collections::HashMap; +use tokio::sync::Mutex; use super::{StorageRepository, UptimeMessage, UptimeStorageModel}; pub struct InMemoryRepository { - agents: Mutex> + agents: Mutex>, + messages: Mutex>>, } impl InMemoryRepository { pub fn new() -> Self { Self { agents: Default::default(), + messages: Default::default(), } } } #[async_trait] impl StorageRepository for InMemoryRepository { + async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()> { + let mut messages = self.messages.lock().await; + + let agent_messages = messages.entry(message.agent.clone()).or_default(); + agent_messages.push(message.clone()); + + Ok(()) + } + async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> { let mut agents = self.agents.lock().await; let now = Utc::now(); - agents.entry(agent.to_string()) + agents + .entry(agent.to_string()) .and_modify(|a| { a.last_seen = now; a.message_count += 1; @@ -38,18 +51,11 @@ impl StorageRepository for InMemoryRepository { Ok(()) } - /* - async fn get_uptime(&self, agent: &str) -> anyhow::Result> { - let agents = self.agents.lock().await; + async fn get_history(&self, agent: &str) -> anyhow::Result> { + let messages = self.messages.lock().await; - match agents.get(agent) { - Some(data) => { - Ok(Some(data.clone().into())) - } - None => Ok(None), - } + Ok(messages.get(agent).cloned().unwrap_or_default()) } - */ async fn get_agents(&self) -> anyhow::Result> { let agents = self.agents.lock().await; diff --git a/server/src/storage/mod.rs b/server/src/storage/mod.rs index 07a8a72..ac5fc42 100644 --- a/server/src/storage/mod.rs +++ b/server/src/storage/mod.rs @@ -1,10 +1,11 @@ -use serde::{Deserialize, Serialize}; +use async_trait::async_trait; use chrono::{DateTime, Utc}; +use common::{MQTT_SEND_INTERVAL, StatusMessage}; +use memory::InMemoryRepository; +use rusqlite::Row; +use serde::{Deserialize, Serialize}; use sqlite::SQLiteRepository; use std::sync::Arc; -use memory::InMemoryRepository; -use async_trait::async_trait; -use common::MQTT_SEND_INTERVAL; pub mod memory; pub mod sqlite; @@ -39,9 +40,32 @@ impl Into for UptimeStorageModel { } } +impl TryFrom<&Row<'_>> for UptimeStorageModel { + type Error = rusqlite::Error; + + fn try_from(row: &Row) -> Result { + let first_seen: DateTime = row.get::<_, String>(1)?.parse().map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e)) + })?; + + let last_seen: DateTime = row.get::<_, String>(2)?.parse().map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(e)) + })?; + + Ok(UptimeStorageModel { + id: row.get(0)?, + first_seen, + last_seen, + message_count: row.get(1)?, + }) + } +} + #[async_trait] pub trait StorageRepository: Send + Sync { + async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()>; async fn record_uptime(&self, agent: &str) -> anyhow::Result<()>; + async fn get_history(&self, agent: &str) -> anyhow::Result>; async fn get_agents(&self) -> anyhow::Result>; } @@ -51,7 +75,7 @@ pub enum StorageStrategy { } pub struct StorageRepositoryImpl { - inner: Arc + inner: Arc, } impl StorageRepositoryImpl { diff --git a/server/src/storage/sqlite.rs b/server/src/storage/sqlite.rs index 2785660..d3ae64f 100644 --- a/server/src/storage/sqlite.rs +++ b/server/src/storage/sqlite.rs @@ -1,13 +1,15 @@ +use anyhow::Ok; use async_trait::async_trait; +use chrono::Utc; +use common::StatusMessage; use rusqlite::Connection; -use tokio::sync::Mutex; use std::path::Path; -use chrono::{DateTime, Utc}; +use tokio::sync::Mutex; use super::{StorageRepository, UptimeMessage, UptimeStorageModel}; pub struct SQLiteRepository { - conn: Mutex + conn: Mutex, } impl SQLiteRepository { @@ -27,53 +29,101 @@ impl SQLiteRepository { message_count INTEGER NOT NULL DEFAULT 0 ) STRICT; + CREATE TABLE IF NOT EXISTS agents ( + id TEXT PRIMARY KEY AUTO_INCREMENT, + agent_id TEXT NOT NULL, + message TEXT NOT NULL, + timestamp TEXT NOT NULL + + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE + ) STRICT; + CREATE INDEX IF NOT EXISTS idx_agents_id ON agents(id); CREATE INDEX IF NOT EXISTS idx_agents_times ON agents(first_seen, last_seen); - "# - ).unwrap(); + CREATE INDEX IF NOT EXISTS idx_messages_agent_id ON messages(agent_id); + CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp); + "#, + ) + .unwrap(); Self { - conn: Mutex::new(conn) + conn: Mutex::new(conn), } } } #[async_trait] impl StorageRepository for SQLiteRepository { - async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> { + async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()> { let conn = self.conn.lock().await; - let now = Utc::now().to_rfc3339(); - conn.execute(r#" - INSERT INTO agents (id, first_seen, last_seen, message_count) - VALUES (?1, ?2, ?2, 1) - ON CONFLICT (id) DO UPDATE SET - last_seen = excluded.last_seen, - message_count = message_count + 1; - "#, [agent, &now] + let payload_str = message.to_string().unwrap(); + + conn.execute( + r#" + INSERT INTO messages (agent_id, message, timestamp) + VALUES (?1, ?2, ?3) + "#, + [ + &message.agent, + &payload_str, + &message.timestamp.to_rfc3339(), + ], )?; Ok(()) } - async fn get_agents(&self) -> anyhow::Result> { + async fn record_uptime(&self, agent: &str) -> anyhow::Result<()> { let conn = self.conn.lock().await; - let mut stmt = conn.prepare("SELECT id, first_seen, last_seen, message_count FROM agents")?; + let now = Utc::now().to_rfc3339(); - let result = stmt.query_map([], |row| { - let first_seen: DateTime = row.get::<_, String>(1)?.parse().unwrap(); - let last_seen: DateTime = row.get::<_, String>(2)?.parse().unwrap(); + conn.execute( + r#" + INSERT INTO agents (id, first_seen, last_seen, message_count) + VALUES (?1, ?2, ?2, 1) + ON CONFLICT (id) DO UPDATE SET + last_seen = excluded.last_seen, + message_count = message_count + 1; + "#, + [agent, &now], + )?; - Ok(UptimeStorageModel { - id: row.get(0)?, - first_seen, - last_seen, - message_count: row.get(3)?, + Ok(()) + } + + async fn get_history(&self, agent: &str) -> anyhow::Result> { + let conn = self.conn.lock().await; + let mut stmt = + conn.prepare("SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ?")?; + + let rows = stmt.query_map([agent], |row| { + let row: String = row.get::<_, String>(1).unwrap(); + + StatusMessage::try_from(row).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 1, + rusqlite::types::Type::Text, + Box::new(e), + ) }) })?; - let models: Result, _> = result.collect(); + let models: Vec = rows.collect::, _>>()?; - Ok(models?.into_iter().map(Into::into).collect()) + Ok(models.into_iter().map(Into::into).collect()) + } + + async fn get_agents(&self) -> anyhow::Result> { + let conn = self.conn.lock().await; + let mut stmt = + conn.prepare("SELECT id, first_seen, last_seen, message_count FROM agents")?; + + let rows = stmt.query_map([], |row| UptimeStorageModel::try_from(row))?; + + let models: Vec = + rows.collect::, _>>()?; + + Ok(models.into_iter().map(Into::into).collect()) } }