From 58a4ab9974879a793b6cc6f557575369b9372666 Mon Sep 17 00:00:00 2001 From: csehviktor Date: Thu, 3 Jul 2025 06:09:09 +0200 Subject: [PATCH] improve mqtt --- agent/src/mqtt.rs | 47 ++++++++++++++++++++++++++++++++++++++++++++ common/src/mqtt.rs | 35 +++++++++++++++++++++++++++++++++ server/mqtt.toml | 19 ++++++++++++++++++ server/src/config.rs | 14 +++++++++++++ 4 files changed, 115 insertions(+) create mode 100644 agent/src/mqtt.rs create mode 100644 common/src/mqtt.rs create mode 100644 server/mqtt.toml create mode 100644 server/src/config.rs diff --git a/agent/src/mqtt.rs b/agent/src/mqtt.rs new file mode 100644 index 0000000..4eaf22a --- /dev/null +++ b/agent/src/mqtt.rs @@ -0,0 +1,47 @@ +use common::{metrics::Metrics, mqtt::{StatusMessage, STATUS_TOPIC}}; +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use std::time::Duration; +use tokio::task::JoinHandle; + +use crate::config::MqttConfig; + +pub struct MqttHandle { + pub agent: &'static str, + pub client: AsyncClient, + pub eventloop_handle: JoinHandle<()>, +} + +impl MqttHandle { + pub async fn create(cfg: &'static MqttConfig) -> Self { + let mut mqttoptions = MqttOptions::new(&cfg.agent, &cfg.host, cfg.port); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + + let eventloop_handle = tokio::spawn(async move { + loop { + if let Err(e) = eventloop.poll().await { + eprintln!("event loop error: {:?}", e); + break; + } + } + }); + + Self { + agent: &cfg.agent, + client, + eventloop_handle + } + } + + pub async fn send_metrics(&self, metrics: Metrics) -> anyhow::Result<()> { + let message = StatusMessage::new(&self.agent, metrics).to_string()?; + + self.client + .publish(STATUS_TOPIC, QoS::AtLeastOnce, false, message.as_bytes()) + .await + .expect("Failed to publish metrics"); + + Ok(()) + } +} diff --git a/common/src/mqtt.rs b/common/src/mqtt.rs new file mode 100644 index 0000000..f5fdb69 --- /dev/null +++ b/common/src/mqtt.rs @@ -0,0 +1,35 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Error; +use crate::metrics::Metrics; + +pub const STATUS_TOPIC: &str = "system/metrics"; + +#[derive(Debug, Serialize, Deserialize)] +pub struct StatusMessage<'a> { + pub agent: &'a str, + pub metrics: Metrics, + pub timestamp: DateTime, +} + +impl<'a> StatusMessage<'a> { + pub fn new(agent: &'a str, metrics: Metrics) -> Self { + Self { + agent, + metrics, + timestamp: Utc::now(), + } + } + + pub fn to_string(&self) -> Result { + serde_json::to_string_pretty(&self) + } +} + +impl<'a> TryFrom<&'a [u8]> for StatusMessage<'a> { + type Error = serde_json::Error; + + fn try_from(value: &'a [u8]) -> Result { + serde_json::from_slice(value) + } +} diff --git a/server/mqtt.toml b/server/mqtt.toml new file mode 100644 index 0000000..b1a401e --- /dev/null +++ b/server/mqtt.toml @@ -0,0 +1,19 @@ +id = 0 + +[router] +id = 0 +max_connections = 10010 +max_outgoing_packet_count = 200 +max_segment_size = 104857600 +max_segment_count = 10 + +[v4.1] +name = "v4-1" +listen = "0.0.0.0:1883" +next_connection_delay_ms = 1 + +[v4.1.connections] +connection_timeout_ms = 60000 +max_payload_size = 20480 +max_inflight_count = 100 +dynamic_filters = true diff --git a/server/src/config.rs b/server/src/config.rs new file mode 100644 index 0000000..7682caa --- /dev/null +++ b/server/src/config.rs @@ -0,0 +1,14 @@ +use rumqttd::Config; + +const BROKER_CONFIG_PATH: &str = if cfg!(debug_assertions) { + "server/mqtt.toml" +} else { + "mqtt.toml" +}; + +pub fn load_broker_config() -> anyhow::Result { + let content = std::fs::read_to_string(BROKER_CONFIG_PATH)?; + let config: Config = toml::from_str(&content)?; + + Ok(config) +}