mirror of
https://github.com/csehviktor/status-monitor.git
synced 2025-08-08 18:06:14 +02:00
improve mqtt
This commit is contained in:
47
agent/src/mqtt.rs
Normal file
47
agent/src/mqtt.rs
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
use common::{metrics::Metrics, mqtt::{StatusMessage, STATUS_TOPIC}};
|
||||||
|
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
use crate::config::MqttConfig;
|
||||||
|
|
||||||
|
pub struct MqttHandle {
|
||||||
|
pub agent: &'static str,
|
||||||
|
pub client: AsyncClient,
|
||||||
|
pub eventloop_handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MqttHandle {
|
||||||
|
pub async fn create(cfg: &'static MqttConfig) -> Self {
|
||||||
|
let mut mqttoptions = MqttOptions::new(&cfg.agent, &cfg.host, cfg.port);
|
||||||
|
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
||||||
|
|
||||||
|
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
||||||
|
|
||||||
|
let eventloop_handle = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Err(e) = eventloop.poll().await {
|
||||||
|
eprintln!("event loop error: {:?}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Self {
|
||||||
|
agent: &cfg.agent,
|
||||||
|
client,
|
||||||
|
eventloop_handle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_metrics(&self, metrics: Metrics) -> anyhow::Result<()> {
|
||||||
|
let message = StatusMessage::new(&self.agent, metrics).to_string()?;
|
||||||
|
|
||||||
|
self.client
|
||||||
|
.publish(STATUS_TOPIC, QoS::AtLeastOnce, false, message.as_bytes())
|
||||||
|
.await
|
||||||
|
.expect("Failed to publish metrics");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
35
common/src/mqtt.rs
Normal file
35
common/src/mqtt.rs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::Error;
|
||||||
|
use crate::metrics::Metrics;
|
||||||
|
|
||||||
|
pub const STATUS_TOPIC: &str = "system/metrics";
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct StatusMessage<'a> {
|
||||||
|
pub agent: &'a str,
|
||||||
|
pub metrics: Metrics,
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> StatusMessage<'a> {
|
||||||
|
pub fn new(agent: &'a str, metrics: Metrics) -> Self {
|
||||||
|
Self {
|
||||||
|
agent,
|
||||||
|
metrics,
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_string(&self) -> Result<String, Error> {
|
||||||
|
serde_json::to_string_pretty(&self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> TryFrom<&'a [u8]> for StatusMessage<'a> {
|
||||||
|
type Error = serde_json::Error;
|
||||||
|
|
||||||
|
fn try_from(value: &'a [u8]) -> Result<Self, Self::Error> {
|
||||||
|
serde_json::from_slice(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
19
server/mqtt.toml
Normal file
19
server/mqtt.toml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
id = 0
|
||||||
|
|
||||||
|
[router]
|
||||||
|
id = 0
|
||||||
|
max_connections = 10010
|
||||||
|
max_outgoing_packet_count = 200
|
||||||
|
max_segment_size = 104857600
|
||||||
|
max_segment_count = 10
|
||||||
|
|
||||||
|
[v4.1]
|
||||||
|
name = "v4-1"
|
||||||
|
listen = "0.0.0.0:1883"
|
||||||
|
next_connection_delay_ms = 1
|
||||||
|
|
||||||
|
[v4.1.connections]
|
||||||
|
connection_timeout_ms = 60000
|
||||||
|
max_payload_size = 20480
|
||||||
|
max_inflight_count = 100
|
||||||
|
dynamic_filters = true
|
||||||
14
server/src/config.rs
Normal file
14
server/src/config.rs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
use rumqttd::Config;
|
||||||
|
|
||||||
|
const BROKER_CONFIG_PATH: &str = if cfg!(debug_assertions) {
|
||||||
|
"server/mqtt.toml"
|
||||||
|
} else {
|
||||||
|
"mqtt.toml"
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn load_broker_config() -> anyhow::Result<Config> {
|
||||||
|
let content = std::fs::read_to_string(BROKER_CONFIG_PATH)?;
|
||||||
|
let config: Config = toml::from_str(&content)?;
|
||||||
|
|
||||||
|
Ok(config)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user