use rumqttd::{local::LinkRx, Broker, Notification}; use common::{StatusMessage, MQTT_TOPIC}; use std::sync::Arc; use crate::{bridge::ClientManager, storage::StorageRepository}; pub struct MqttSubscriber { link_rx: LinkRx, clients: Arc, storage: Arc, } impl MqttSubscriber { pub fn new(broker: &Broker, clients: Arc, storage: Arc) -> Self { let (mut link_tx, link_rx) = broker.link("internal-subscriber").unwrap(); link_tx.subscribe(MQTT_TOPIC).unwrap(); Self { link_rx, clients, storage, } } pub async fn run(&mut self) -> anyhow::Result<()> { while let Ok(notification) = self.link_rx.next().await { if let Notification::Forward(forward) = notification.unwrap() { let payload = StatusMessage::try_from(&forward.publish.payload[..])?; self.clients.broadcast(&payload).await; if let Err(e) = self.storage.record_uptime(&payload.agent).await { eprintln!("failed to record uptime for {}: {}", &payload.agent, e); } } } Ok(()) } }