use std::sync::{Arc, Mutex}; use rumqttd::{local::LinkRx, Broker, Notification}; use common::mqtt::{StatusMessage, STATUS_TOPIC}; use config::load_broker_config; use tokio::sync::mpsc::{channel, Sender}; use warp::{filters::ws::{Message, WebSocket, Ws}, Filter}; use futures_util::{SinkExt, StreamExt}; pub mod config; type Clients = Arc>>>; #[tokio::main] async fn main() -> anyhow::Result<()> { let cfg = load_broker_config()?; let mut broker = Broker::new(cfg); let (mut link_tx, link_rx) = broker.link("internal-subscriber")?; link_tx.subscribe(STATUS_TOPIC)?; let clients = Clients::default(); println!("starting mqtt broker on specified port"); tokio::spawn(async move { if let Err(err) = broker.start() { eprintln!("broker exited with error: {:?}", err); } }); let mqtt_clients = clients.clone(); tokio::spawn(async move { if let Err(err) = process_mqtt_notifications(link_rx, mqtt_clients).await { eprintln!("mqtt processing error: {:?}", err); } }); let ws_route = warp::path("ws") .and(warp::ws()) .and(warp::any().map(move || clients.clone())) .map(|ws: Ws, clients: Clients| { ws.on_upgrade(move |websocket| handle_ws_connection(websocket, clients)) }); println!("starting websocket server on :3000"); warp::serve(ws_route).run(([0, 0, 0, 0], 3000)).await; Ok(()) } async fn process_mqtt_notifications(mut link_rx: LinkRx, clients: Clients) -> anyhow::Result<()> { while let Ok(notification) = link_rx.next().await { if let Notification::Forward(forward) = notification.unwrap() { let payload = StatusMessage::try_from(&forward.publish.payload[..])?; if let Ok(payload_str) = payload.to_string() { let mut clients_guard = clients.lock().unwrap(); clients_guard.retain(|client_tx| { client_tx.try_send(payload_str.clone()).is_ok() }); } } } Ok(()) } async fn handle_ws_connection(websocket: WebSocket, clients: Clients) { let (mut ws_tx, _) = websocket.split(); let (tx, mut rx) = channel::(100); clients.lock().unwrap().push(tx); tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Err(e) = ws_tx.send(Message::text(msg)).await { eprintln!("error sending message: {}", e); break; } } }); }