diff --git a/Cargo.lock b/Cargo.lock index 3821c24..0a64956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1983,6 +1983,7 @@ dependencies = [ "common", "futures-util", "rumqttd", + "serde", "tokio", "toml 0.8.23", "warp", diff --git a/agent/src/mqtt.rs b/agent/src/mqtt.rs index be765e9..eea340f 100644 --- a/agent/src/mqtt.rs +++ b/agent/src/mqtt.rs @@ -1,7 +1,7 @@ -use common::{metrics::Metrics, mqtt::{StatusMessage, STATUS_TOPIC}}; +use common::{metrics::Metrics, StatusMessage, MQTT_TOPIC}; use rumqttc::{AsyncClient, MqttOptions, QoS}; -use std::time::Duration; use tokio::task::JoinHandle; +use std::time::Duration; pub struct MqttHandle { pub agent: String, @@ -36,7 +36,7 @@ impl MqttHandle { let message = StatusMessage::new(&self.agent, metrics).to_string()?; self.client - .publish(STATUS_TOPIC, QoS::AtLeastOnce, false, message.as_bytes()) + .publish(MQTT_TOPIC, QoS::AtLeastOnce, false, message.as_bytes()) .await .expect("Failed to publish metrics"); diff --git a/common/src/lib.rs b/common/src/lib.rs index c2df196..9ce5c00 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,2 +1,37 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Error; +use crate::metrics::Metrics; + pub mod metrics; -pub mod mqtt; + +pub const MQTT_TOPIC: &str = "system/metrics"; + +#[derive(Debug, Serialize, Deserialize)] +pub struct StatusMessage<'a> { + pub agent: &'a str, + pub metrics: Metrics, + pub timestamp: DateTime, +} + +impl<'a> StatusMessage<'a> { + pub fn new(agent: &'a str, metrics: Metrics) -> Self { + Self { + agent, + metrics, + timestamp: Utc::now(), + } + } + + pub fn to_string(&self) -> Result { + serde_json::to_string_pretty(&self) + } +} + +impl<'a> TryFrom<&'a [u8]> for StatusMessage<'a> { + type Error = serde_json::Error; + + fn try_from(value: &'a [u8]) -> Result { + serde_json::from_slice(value) + } +} diff --git a/common/src/mqtt.rs b/common/src/mqtt.rs deleted file mode 100644 index f5fdb69..0000000 --- a/common/src/mqtt.rs +++ /dev/null @@ -1,35 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use serde_json::Error; -use crate::metrics::Metrics; - -pub const STATUS_TOPIC: &str = "system/metrics"; - -#[derive(Debug, Serialize, Deserialize)] -pub struct StatusMessage<'a> { - pub agent: &'a str, - pub metrics: Metrics, - pub timestamp: DateTime, -} - -impl<'a> StatusMessage<'a> { - pub fn new(agent: &'a str, metrics: Metrics) -> Self { - Self { - agent, - metrics, - timestamp: Utc::now(), - } - } - - pub fn to_string(&self) -> Result { - serde_json::to_string_pretty(&self) - } -} - -impl<'a> TryFrom<&'a [u8]> for StatusMessage<'a> { - type Error = serde_json::Error; - - fn try_from(value: &'a [u8]) -> Result { - serde_json::from_slice(value) - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index e29dbe7..3a45fe1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,3 +11,4 @@ rumqttd = "0.19.0" toml = "0.8.23" warp = "0.3.7" futures-util = "0.3.31" +serde = { version = "1.0.219", features = ["derive"] } diff --git a/server/mqtt.toml b/server/config.toml similarity index 83% rename from server/mqtt.toml rename to server/config.toml index b1a401e..cfb8a53 100644 --- a/server/mqtt.toml +++ b/server/config.toml @@ -1,18 +1,19 @@ +[mqtt] id = 0 -[router] +[mqtt.router] id = 0 max_connections = 10010 max_outgoing_packet_count = 200 max_segment_size = 104857600 max_segment_count = 10 -[v4.1] +[mqtt.v4.1] name = "v4-1" listen = "0.0.0.0:1883" next_connection_delay_ms = 1 -[v4.1.connections] +[mqtt.v4.1.connections] connection_timeout_ms = 60000 max_payload_size = 20480 max_inflight_count = 100 diff --git a/server/src/bridge.rs b/server/src/bridge.rs new file mode 100644 index 0000000..32955c5 --- /dev/null +++ b/server/src/bridge.rs @@ -0,0 +1,26 @@ +use tokio::sync::{mpsc::Sender, Mutex}; + +pub struct ClientManager { + clients: Mutex>>, +} + +impl ClientManager { + pub fn new() -> Self { + Self { + clients: Mutex::new(Vec::new()), + } + } + + pub async fn add_client(&self, client: Sender) { + let mut clients = self.clients.lock().await; + clients.push(client); + } + + pub async fn broadcast(&self, message: String) { + let mut clients = self.clients.lock().await; + + clients.retain(|client| { + client.try_send(message.clone()).is_ok() + }); + } +} diff --git a/server/src/broker/manager.rs b/server/src/broker/manager.rs new file mode 100644 index 0000000..34162ff --- /dev/null +++ b/server/src/broker/manager.rs @@ -0,0 +1,40 @@ +use rumqttd::{Broker, Config}; +use std::sync::Arc; + +use crate::bridge::ClientManager; +use super::subscriber::MqttSubscriber; + +pub struct MqttBroker { + broker: &'static mut Broker, + clients: Arc, +} + +impl MqttBroker { + pub async fn new(cfg: Config) -> Self { + let clients = Arc::new(ClientManager::new()); + let broker: &'static mut Broker = Box::leak(Box::new(Broker::new(cfg))); + + Self { + broker, + clients, + } + } + + pub fn clients(&self) -> Arc { + self.clients.clone() + } + + pub async fn run(self) -> anyhow::Result<()> { + let mut subscriber = MqttSubscriber::new(&self.broker, self.clients); + + println!("starting mqtt broker on specified port"); + + tokio::spawn(async move { + if let Err(e) = self.broker.start() { + eprintln!("broker exited with error: {}", e); + } + }); + + subscriber.run().await + } +} diff --git a/server/src/broker/mod.rs b/server/src/broker/mod.rs new file mode 100644 index 0000000..5565356 --- /dev/null +++ b/server/src/broker/mod.rs @@ -0,0 +1,2 @@ +pub mod manager; +pub mod subscriber; diff --git a/server/src/broker/subscriber.rs b/server/src/broker/subscriber.rs new file mode 100644 index 0000000..7044de5 --- /dev/null +++ b/server/src/broker/subscriber.rs @@ -0,0 +1,36 @@ +use rumqttd::{local::LinkRx, Broker, Notification}; +use common::{StatusMessage, MQTT_TOPIC}; +use std::sync::Arc; + +use crate::bridge::ClientManager; + +pub struct MqttSubscriber { + link_rx: LinkRx, + clients: Arc, +} + +impl MqttSubscriber { + pub fn new(broker: &Broker, clients: Arc) -> Self { + let (mut link_tx, link_rx) = broker.link("internal-subscriber").unwrap(); + link_tx.subscribe(MQTT_TOPIC).unwrap(); + + Self { + link_rx, + clients, + } + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + while let Ok(notification) = self.link_rx.next().await { + if let Notification::Forward(forward) = notification.unwrap() { + let payload = StatusMessage::try_from(&forward.publish.payload[..])?; + + if let Ok(payload_str) = payload.to_string() { + self.clients.broadcast(payload_str).await; + } + } + } + + Ok(()) + } +} diff --git a/server/src/config.rs b/server/src/config.rs index 7682caa..34d9244 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -1,14 +1,20 @@ use rumqttd::Config; +use serde::Deserialize; -const BROKER_CONFIG_PATH: &str = if cfg!(debug_assertions) { - "server/mqtt.toml" +const CONFIG_PATH: &str = if cfg!(debug_assertions) { + "server/config.toml" } else { - "mqtt.toml" + "config.toml" }; -pub fn load_broker_config() -> anyhow::Result { - let content = std::fs::read_to_string(BROKER_CONFIG_PATH)?; - let config: Config = toml::from_str(&content)?; +#[derive(Debug, Deserialize)] +pub struct Configuration { + pub mqtt: Config, +} + +pub fn load_config() -> anyhow::Result { + let content = std::fs::read_to_string(CONFIG_PATH)?; + let config: Configuration = toml::from_str(&content)?; Ok(config) } diff --git a/server/src/main.rs b/server/src/main.rs index b8902b3..2530b69 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,83 +1,23 @@ -use std::sync::{Arc, Mutex}; - -use rumqttd::{local::LinkRx, Broker, Notification}; -use common::mqtt::{StatusMessage, STATUS_TOPIC}; -use config::load_broker_config; -use tokio::sync::mpsc::{channel, Sender}; -use warp::{filters::ws::{Message, WebSocket, Ws}, Filter}; -use futures_util::{SinkExt, StreamExt}; +use broker::manager::MqttBroker; +use config::load_config; +use websocket::server::Websocket; +pub mod broker; +pub mod bridge; pub mod config; - -type Clients = Arc>>>; +pub mod websocket; #[tokio::main] async fn main() -> anyhow::Result<()> { - let cfg = load_broker_config()?; + let cfg = load_config()?; - let mut broker = Broker::new(cfg); + let broker = MqttBroker::new(cfg.mqtt).await; + let ws = Websocket::new(broker.clients()); - let (mut link_tx, link_rx) = broker.link("internal-subscriber")?; - link_tx.subscribe(STATUS_TOPIC)?; - - let clients = Clients::default(); - - println!("starting mqtt broker on specified port"); - tokio::spawn(async move { - if let Err(err) = broker.start() { - eprintln!("broker exited with error: {:?}", err); - } - }); - - let mqtt_clients = clients.clone(); - tokio::spawn(async move { - if let Err(err) = process_mqtt_notifications(link_rx, mqtt_clients).await { - eprintln!("mqtt processing error: {:?}", err); - } - }); - - let ws_route = warp::path("ws") - .and(warp::ws()) - .and(warp::any().map(move || clients.clone())) - .map(|ws: Ws, clients: Clients| { - ws.on_upgrade(move |websocket| handle_ws_connection(websocket, clients)) - }); - - println!("starting websocket server on :3000"); - warp::serve(ws_route).run(([0, 0, 0, 0], 3000)).await; - - Ok(()) -} - -async fn process_mqtt_notifications(mut link_rx: LinkRx, clients: Clients) -> anyhow::Result<()> { - while let Ok(notification) = link_rx.next().await { - if let Notification::Forward(forward) = notification.unwrap() { - let payload = StatusMessage::try_from(&forward.publish.payload[..])?; - - if let Ok(payload_str) = payload.to_string() { - let mut clients_guard = clients.lock().unwrap(); - - clients_guard.retain(|client_tx| { - client_tx.try_send(payload_str.clone()).is_ok() - }); - } - } + tokio::select! { + res = broker.run() => res?, + res = ws.run() => res, } + Ok(()) } - -async fn handle_ws_connection(websocket: WebSocket, clients: Clients) { - let (mut ws_tx, _) = websocket.split(); - let (tx, mut rx) = channel::(100); - - clients.lock().unwrap().push(tx); - - tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - if let Err(e) = ws_tx.send(Message::text(msg)).await { - eprintln!("error sending message: {}", e); - break; - } - } - }); -} diff --git a/server/src/websocket/mod.rs b/server/src/websocket/mod.rs new file mode 100644 index 0000000..af1b101 --- /dev/null +++ b/server/src/websocket/mod.rs @@ -0,0 +1,23 @@ +use warp::filters::ws::{Message, WebSocket}; +use futures_util::{SinkExt, StreamExt}; +use tokio::sync::mpsc::channel; +use std::sync::Arc; + +use crate::bridge::ClientManager; + +pub mod server; + +pub async fn handle_ws_connection(websocket: WebSocket, clients: Arc) { + let (mut ws_tx, _) = websocket.split(); + let (tx, mut rx) = channel::(100); + + clients.add_client(tx).await; + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if ws_tx.send(Message::text(msg)).await.is_err() { + break; + } + } + }); +} diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs new file mode 100644 index 0000000..e3cc05b --- /dev/null +++ b/server/src/websocket/server.rs @@ -0,0 +1,27 @@ +use warp::Filter; +use std::sync::Arc; + +use crate::bridge::ClientManager; +use super::handle_ws_connection; + +pub struct Websocket { + clients: Arc, +} + +impl Websocket { + pub fn new(clients: Arc) -> Self { + Self { clients } + } + + pub async fn run(self) { + let route = warp::path("ws") + .and(warp::ws()) + .and(warp::any().map(move || self.clients.clone())) + .map(|ws: warp::ws::Ws, clients| { + ws.on_upgrade(move |websocket| handle_ws_connection(websocket, clients)) + }); + + println!("starting websocket server on :3000"); + warp::serve(route).run(([0, 0, 0, 0], 3000)).await; + } +}