From 6eb696019ad3cd7fb06f2ffa3ffea7ac615fc074 Mon Sep 17 00:00:00 2001 From: csehviktor Date: Tue, 15 Jul 2025 01:34:27 +0200 Subject: [PATCH] implement duration specifier for history --- server/src/broker/subscriber.rs | 13 ++++++++++--- server/src/server/http.rs | 16 ++++++++++++++-- server/src/storage/memory.rs | 21 ++++++++++++++++++--- server/src/storage/mod.rs | 6 +++++- server/src/storage/sqlite.rs | 26 +++++++++++++++++++++----- 5 files changed, 68 insertions(+), 14 deletions(-) diff --git a/server/src/broker/subscriber.rs b/server/src/broker/subscriber.rs index fb327a2..54eb5a3 100644 --- a/server/src/broker/subscriber.rs +++ b/server/src/broker/subscriber.rs @@ -1,6 +1,8 @@ use common::{MQTT_TOPIC, StatusMessage}; +use futures_util::FutureExt; use rumqttd::{Broker, Notification, local::LinkRx}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use tokio::time::interval; use crate::{bridge::ClientManager, storage::StorageRepository}; @@ -27,6 +29,9 @@ impl MqttSubscriber { } pub async fn run(&mut self) -> anyhow::Result<()> { + let mut interval = interval(Duration::from_secs(60)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + while let Ok(notification) = self.link_rx.next().await { if let Notification::Forward(forward) = notification.unwrap() { let payload = StatusMessage::try_from(&forward.publish.payload[..])?; @@ -37,8 +42,10 @@ impl MqttSubscriber { eprintln!("failed to record uptime for {}: {}", &payload.agent, e); } - if let Err(e) = self.storage.record_message(&payload).await { - eprintln!("failed to record message for {}: {}", &payload.agent, e); + if interval.tick().now_or_never().is_some() { + if let Err(e) = self.storage.record_message(&payload).await { + eprintln!("failed to record message for {}: {}", &payload.agent, e); + } } } } diff --git a/server/src/server/http.rs b/server/src/server/http.rs index a7ea5f1..f2e20eb 100644 --- a/server/src/server/http.rs +++ b/server/src/server/http.rs @@ -1,3 +1,4 @@ +use chrono::{Duration, Utc}; use std::sync::Arc; use warp::{Filter, Rejection, Reply, reply::json}; @@ -21,7 +22,7 @@ impl HttpRoutes { .and(warp::any().map(move || agents_storage.clone())) .and_then(Self::get_agents); - let history_route = warp::path!("history" / String) + let history_route = warp::path!("history" / String / String) .and(warp::get()) .and(warp::any().map(move || history_storage.clone())) .and_then(Self::get_history); @@ -37,9 +38,20 @@ impl HttpRoutes { async fn get_history( agent: String, + dur: String, storage: Arc, ) -> Result { - let history = storage.get_history(&agent).await.unwrap(); + let now = Utc::now(); + + let duration = match dur.as_str() { + "hour" => Some(now - Duration::hours(1)), + "day" => Some(now - Duration::days(1)), + "week" => Some(now - Duration::weeks(1)), + "month" => Some(now - Duration::weeks(4)), + _ => None, + }; + + let history = storage.get_history(&agent, duration).await.unwrap(); Ok(json(&history)) } diff --git a/server/src/storage/memory.rs b/server/src/storage/memory.rs index 5e4e425..a3f6359 100644 --- a/server/src/storage/memory.rs +++ b/server/src/storage/memory.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use chrono::Utc; +use chrono::{DateTime, Utc}; use common::StatusMessage; use std::collections::HashMap; use tokio::sync::Mutex; @@ -51,10 +51,25 @@ impl StorageRepository for InMemoryRepository { Ok(()) } - async fn get_history(&self, agent: &str) -> anyhow::Result> { + async fn get_history( + &self, + agent: &str, + duration: Option>, + ) -> anyhow::Result> { let messages = self.messages.lock().await; - Ok(messages.get(agent).cloned().unwrap_or_default()) + let agent_messages = messages.get(agent).cloned().unwrap_or_default(); + + let filtered_messages = if let Some(duration) = duration { + agent_messages + .into_iter() + .filter(|msg| msg.timestamp >= duration) + .collect() + } else { + agent_messages + }; + + Ok(filtered_messages) } async fn get_agents(&self) -> anyhow::Result> { diff --git a/server/src/storage/mod.rs b/server/src/storage/mod.rs index ac5fc42..5bd1dd6 100644 --- a/server/src/storage/mod.rs +++ b/server/src/storage/mod.rs @@ -65,7 +65,11 @@ impl TryFrom<&Row<'_>> for UptimeStorageModel { pub trait StorageRepository: Send + Sync { async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()>; async fn record_uptime(&self, agent: &str) -> anyhow::Result<()>; - async fn get_history(&self, agent: &str) -> anyhow::Result>; + async fn get_history( + &self, + agent: &str, + duration: Option>, + ) -> anyhow::Result>; async fn get_agents(&self) -> anyhow::Result>; } diff --git a/server/src/storage/sqlite.rs b/server/src/storage/sqlite.rs index d3ae64f..5b06dad 100644 --- a/server/src/storage/sqlite.rs +++ b/server/src/storage/sqlite.rs @@ -1,6 +1,6 @@ use anyhow::Ok; use async_trait::async_trait; -use chrono::Utc; +use chrono::{DateTime, Utc}; use common::StatusMessage; use rusqlite::Connection; use std::path::Path; @@ -92,12 +92,28 @@ impl StorageRepository for SQLiteRepository { Ok(()) } - async fn get_history(&self, agent: &str) -> anyhow::Result> { + async fn get_history( + &self, + agent: &str, + duration: Option>, + ) -> anyhow::Result> { let conn = self.conn.lock().await; - let mut stmt = - conn.prepare("SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ?")?; - let rows = stmt.query_map([agent], |row| { + let (sql, params) = if let Some(duration) = duration { + ( + "SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ? AND timestamp <= ?", + vec![agent.to_string(), duration.to_rfc3339()], + ) + } else { + ( + "SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ?", + vec![agent.to_string()], + ) + }; + + let mut stmt = conn.prepare(sql)?; + + let rows = stmt.query_map(rusqlite::params_from_iter(params), |row| { let row: String = row.get::<_, String>(1).unwrap(); StatusMessage::try_from(row).map_err(|e| {