mirror of
https://github.com/csehviktor/status-monitor.git
synced 2025-08-08 18:06:14 +02:00
implement duration specifier for history
This commit is contained in:
@@ -1,6 +1,8 @@
|
|||||||
use common::{MQTT_TOPIC, StatusMessage};
|
use common::{MQTT_TOPIC, StatusMessage};
|
||||||
|
use futures_util::FutureExt;
|
||||||
use rumqttd::{Broker, Notification, local::LinkRx};
|
use rumqttd::{Broker, Notification, local::LinkRx};
|
||||||
use std::sync::Arc;
|
use std::{sync::Arc, time::Duration};
|
||||||
|
use tokio::time::interval;
|
||||||
|
|
||||||
use crate::{bridge::ClientManager, storage::StorageRepository};
|
use crate::{bridge::ClientManager, storage::StorageRepository};
|
||||||
|
|
||||||
@@ -27,6 +29,9 @@ impl MqttSubscriber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self) -> anyhow::Result<()> {
|
pub async fn run(&mut self) -> anyhow::Result<()> {
|
||||||
|
let mut interval = interval(Duration::from_secs(60));
|
||||||
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
while let Ok(notification) = self.link_rx.next().await {
|
while let Ok(notification) = self.link_rx.next().await {
|
||||||
if let Notification::Forward(forward) = notification.unwrap() {
|
if let Notification::Forward(forward) = notification.unwrap() {
|
||||||
let payload = StatusMessage::try_from(&forward.publish.payload[..])?;
|
let payload = StatusMessage::try_from(&forward.publish.payload[..])?;
|
||||||
@@ -37,8 +42,10 @@ impl MqttSubscriber {
|
|||||||
eprintln!("failed to record uptime for {}: {}", &payload.agent, e);
|
eprintln!("failed to record uptime for {}: {}", &payload.agent, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = self.storage.record_message(&payload).await {
|
if interval.tick().now_or_never().is_some() {
|
||||||
eprintln!("failed to record message for {}: {}", &payload.agent, e);
|
if let Err(e) = self.storage.record_message(&payload).await {
|
||||||
|
eprintln!("failed to record message for {}: {}", &payload.agent, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use chrono::{Duration, Utc};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use warp::{Filter, Rejection, Reply, reply::json};
|
use warp::{Filter, Rejection, Reply, reply::json};
|
||||||
|
|
||||||
@@ -21,7 +22,7 @@ impl HttpRoutes {
|
|||||||
.and(warp::any().map(move || agents_storage.clone()))
|
.and(warp::any().map(move || agents_storage.clone()))
|
||||||
.and_then(Self::get_agents);
|
.and_then(Self::get_agents);
|
||||||
|
|
||||||
let history_route = warp::path!("history" / String)
|
let history_route = warp::path!("history" / String / String)
|
||||||
.and(warp::get())
|
.and(warp::get())
|
||||||
.and(warp::any().map(move || history_storage.clone()))
|
.and(warp::any().map(move || history_storage.clone()))
|
||||||
.and_then(Self::get_history);
|
.and_then(Self::get_history);
|
||||||
@@ -37,9 +38,20 @@ impl HttpRoutes {
|
|||||||
|
|
||||||
async fn get_history(
|
async fn get_history(
|
||||||
agent: String,
|
agent: String,
|
||||||
|
dur: String,
|
||||||
storage: Arc<dyn StorageRepository>,
|
storage: Arc<dyn StorageRepository>,
|
||||||
) -> Result<impl Reply, Rejection> {
|
) -> Result<impl Reply, Rejection> {
|
||||||
let history = storage.get_history(&agent).await.unwrap();
|
let now = Utc::now();
|
||||||
|
|
||||||
|
let duration = match dur.as_str() {
|
||||||
|
"hour" => Some(now - Duration::hours(1)),
|
||||||
|
"day" => Some(now - Duration::days(1)),
|
||||||
|
"week" => Some(now - Duration::weeks(1)),
|
||||||
|
"month" => Some(now - Duration::weeks(4)),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let history = storage.get_history(&agent, duration).await.unwrap();
|
||||||
|
|
||||||
Ok(json(&history))
|
Ok(json(&history))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::Utc;
|
use chrono::{DateTime, Utc};
|
||||||
use common::StatusMessage;
|
use common::StatusMessage;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
@@ -51,10 +51,25 @@ impl StorageRepository for InMemoryRepository {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_history(&self, agent: &str) -> anyhow::Result<Vec<StatusMessage>> {
|
async fn get_history(
|
||||||
|
&self,
|
||||||
|
agent: &str,
|
||||||
|
duration: Option<DateTime<Utc>>,
|
||||||
|
) -> anyhow::Result<Vec<StatusMessage>> {
|
||||||
let messages = self.messages.lock().await;
|
let messages = self.messages.lock().await;
|
||||||
|
|
||||||
Ok(messages.get(agent).cloned().unwrap_or_default())
|
let agent_messages = messages.get(agent).cloned().unwrap_or_default();
|
||||||
|
|
||||||
|
let filtered_messages = if let Some(duration) = duration {
|
||||||
|
agent_messages
|
||||||
|
.into_iter()
|
||||||
|
.filter(|msg| msg.timestamp >= duration)
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
agent_messages
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(filtered_messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>> {
|
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>> {
|
||||||
|
|||||||
@@ -65,7 +65,11 @@ impl TryFrom<&Row<'_>> for UptimeStorageModel {
|
|||||||
pub trait StorageRepository: Send + Sync {
|
pub trait StorageRepository: Send + Sync {
|
||||||
async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()>;
|
async fn record_message(&self, message: &StatusMessage) -> anyhow::Result<()>;
|
||||||
async fn record_uptime(&self, agent: &str) -> anyhow::Result<()>;
|
async fn record_uptime(&self, agent: &str) -> anyhow::Result<()>;
|
||||||
async fn get_history(&self, agent: &str) -> anyhow::Result<Vec<StatusMessage>>;
|
async fn get_history(
|
||||||
|
&self,
|
||||||
|
agent: &str,
|
||||||
|
duration: Option<DateTime<Utc>>,
|
||||||
|
) -> anyhow::Result<Vec<StatusMessage>>;
|
||||||
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>>;
|
async fn get_agents(&self) -> anyhow::Result<Vec<UptimeMessage>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use anyhow::Ok;
|
use anyhow::Ok;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::Utc;
|
use chrono::{DateTime, Utc};
|
||||||
use common::StatusMessage;
|
use common::StatusMessage;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@@ -92,12 +92,28 @@ impl StorageRepository for SQLiteRepository {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_history(&self, agent: &str) -> anyhow::Result<Vec<StatusMessage>> {
|
async fn get_history(
|
||||||
|
&self,
|
||||||
|
agent: &str,
|
||||||
|
duration: Option<DateTime<Utc>>,
|
||||||
|
) -> anyhow::Result<Vec<StatusMessage>> {
|
||||||
let conn = self.conn.lock().await;
|
let conn = self.conn.lock().await;
|
||||||
let mut stmt =
|
|
||||||
conn.prepare("SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ?")?;
|
|
||||||
|
|
||||||
let rows = stmt.query_map([agent], |row| {
|
let (sql, params) = if let Some(duration) = duration {
|
||||||
|
(
|
||||||
|
"SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ? AND timestamp <= ?",
|
||||||
|
vec![agent.to_string(), duration.to_rfc3339()],
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
(
|
||||||
|
"SELECT agent_id, message, timestamp FROM messages WHERE agent_id = ?",
|
||||||
|
vec![agent.to_string()],
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut stmt = conn.prepare(sql)?;
|
||||||
|
|
||||||
|
let rows = stmt.query_map(rusqlite::params_from_iter(params), |row| {
|
||||||
let row: String = row.get::<_, String>(1).unwrap();
|
let row: String = row.get::<_, String>(1).unwrap();
|
||||||
|
|
||||||
StatusMessage::try_from(row).map_err(|e| {
|
StatusMessage::try_from(row).map_err(|e| {
|
||||||
|
|||||||
Reference in New Issue
Block a user