cleaup code

This commit is contained in:
csehviktor
2025-07-05 15:55:57 +02:00
parent 3bcfc67bdb
commit 81719e5574
14 changed files with 224 additions and 121 deletions

View File

@@ -11,3 +11,4 @@ rumqttd = "0.19.0"
toml = "0.8.23"
warp = "0.3.7"
futures-util = "0.3.31"
serde = { version = "1.0.219", features = ["derive"] }

View File

@@ -1,18 +1,19 @@
[mqtt]
id = 0
[router]
[mqtt.router]
id = 0
max_connections = 10010
max_outgoing_packet_count = 200
max_segment_size = 104857600
max_segment_count = 10
[v4.1]
[mqtt.v4.1]
name = "v4-1"
listen = "0.0.0.0:1883"
next_connection_delay_ms = 1
[v4.1.connections]
[mqtt.v4.1.connections]
connection_timeout_ms = 60000
max_payload_size = 20480
max_inflight_count = 100

26
server/src/bridge.rs Normal file
View File

@@ -0,0 +1,26 @@
use tokio::sync::{mpsc::Sender, Mutex};
pub struct ClientManager {
clients: Mutex<Vec<Sender<String>>>,
}
impl ClientManager {
pub fn new() -> Self {
Self {
clients: Mutex::new(Vec::new()),
}
}
pub async fn add_client(&self, client: Sender<String>) {
let mut clients = self.clients.lock().await;
clients.push(client);
}
pub async fn broadcast(&self, message: String) {
let mut clients = self.clients.lock().await;
clients.retain(|client| {
client.try_send(message.clone()).is_ok()
});
}
}

View File

@@ -0,0 +1,40 @@
use rumqttd::{Broker, Config};
use std::sync::Arc;
use crate::bridge::ClientManager;
use super::subscriber::MqttSubscriber;
pub struct MqttBroker {
broker: &'static mut Broker,
clients: Arc<ClientManager>,
}
impl MqttBroker {
pub async fn new(cfg: Config) -> Self {
let clients = Arc::new(ClientManager::new());
let broker: &'static mut Broker = Box::leak(Box::new(Broker::new(cfg)));
Self {
broker,
clients,
}
}
pub fn clients(&self) -> Arc<ClientManager> {
self.clients.clone()
}
pub async fn run(self) -> anyhow::Result<()> {
let mut subscriber = MqttSubscriber::new(&self.broker, self.clients);
println!("starting mqtt broker on specified port");
tokio::spawn(async move {
if let Err(e) = self.broker.start() {
eprintln!("broker exited with error: {}", e);
}
});
subscriber.run().await
}
}

2
server/src/broker/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod manager;
pub mod subscriber;

View File

@@ -0,0 +1,36 @@
use rumqttd::{local::LinkRx, Broker, Notification};
use common::{StatusMessage, MQTT_TOPIC};
use std::sync::Arc;
use crate::bridge::ClientManager;
pub struct MqttSubscriber {
link_rx: LinkRx,
clients: Arc<ClientManager>,
}
impl MqttSubscriber {
pub fn new(broker: &Broker, clients: Arc<ClientManager>) -> Self {
let (mut link_tx, link_rx) = broker.link("internal-subscriber").unwrap();
link_tx.subscribe(MQTT_TOPIC).unwrap();
Self {
link_rx,
clients,
}
}
pub async fn run(&mut self) -> anyhow::Result<()> {
while let Ok(notification) = self.link_rx.next().await {
if let Notification::Forward(forward) = notification.unwrap() {
let payload = StatusMessage::try_from(&forward.publish.payload[..])?;
if let Ok(payload_str) = payload.to_string() {
self.clients.broadcast(payload_str).await;
}
}
}
Ok(())
}
}

View File

@@ -1,14 +1,20 @@
use rumqttd::Config;
use serde::Deserialize;
const BROKER_CONFIG_PATH: &str = if cfg!(debug_assertions) {
"server/mqtt.toml"
const CONFIG_PATH: &str = if cfg!(debug_assertions) {
"server/config.toml"
} else {
"mqtt.toml"
"config.toml"
};
pub fn load_broker_config() -> anyhow::Result<Config> {
let content = std::fs::read_to_string(BROKER_CONFIG_PATH)?;
let config: Config = toml::from_str(&content)?;
#[derive(Debug, Deserialize)]
pub struct Configuration {
pub mqtt: Config,
}
pub fn load_config() -> anyhow::Result<Configuration> {
let content = std::fs::read_to_string(CONFIG_PATH)?;
let config: Configuration = toml::from_str(&content)?;
Ok(config)
}

View File

@@ -1,83 +1,23 @@
use std::sync::{Arc, Mutex};
use rumqttd::{local::LinkRx, Broker, Notification};
use common::mqtt::{StatusMessage, STATUS_TOPIC};
use config::load_broker_config;
use tokio::sync::mpsc::{channel, Sender};
use warp::{filters::ws::{Message, WebSocket, Ws}, Filter};
use futures_util::{SinkExt, StreamExt};
use broker::manager::MqttBroker;
use config::load_config;
use websocket::server::Websocket;
pub mod broker;
pub mod bridge;
pub mod config;
type Clients = Arc<Mutex<Vec<Sender<String>>>>;
pub mod websocket;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cfg = load_broker_config()?;
let cfg = load_config()?;
let mut broker = Broker::new(cfg);
let broker = MqttBroker::new(cfg.mqtt).await;
let ws = Websocket::new(broker.clients());
let (mut link_tx, link_rx) = broker.link("internal-subscriber")?;
link_tx.subscribe(STATUS_TOPIC)?;
let clients = Clients::default();
println!("starting mqtt broker on specified port");
tokio::spawn(async move {
if let Err(err) = broker.start() {
eprintln!("broker exited with error: {:?}", err);
}
});
let mqtt_clients = clients.clone();
tokio::spawn(async move {
if let Err(err) = process_mqtt_notifications(link_rx, mqtt_clients).await {
eprintln!("mqtt processing error: {:?}", err);
}
});
let ws_route = warp::path("ws")
.and(warp::ws())
.and(warp::any().map(move || clients.clone()))
.map(|ws: Ws, clients: Clients| {
ws.on_upgrade(move |websocket| handle_ws_connection(websocket, clients))
});
println!("starting websocket server on :3000");
warp::serve(ws_route).run(([0, 0, 0, 0], 3000)).await;
Ok(())
}
async fn process_mqtt_notifications(mut link_rx: LinkRx, clients: Clients) -> anyhow::Result<()> {
while let Ok(notification) = link_rx.next().await {
if let Notification::Forward(forward) = notification.unwrap() {
let payload = StatusMessage::try_from(&forward.publish.payload[..])?;
if let Ok(payload_str) = payload.to_string() {
let mut clients_guard = clients.lock().unwrap();
clients_guard.retain(|client_tx| {
client_tx.try_send(payload_str.clone()).is_ok()
});
}
}
tokio::select! {
res = broker.run() => res?,
res = ws.run() => res,
}
Ok(())
}
async fn handle_ws_connection(websocket: WebSocket, clients: Clients) {
let (mut ws_tx, _) = websocket.split();
let (tx, mut rx) = channel::<String>(100);
clients.lock().unwrap().push(tx);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = ws_tx.send(Message::text(msg)).await {
eprintln!("error sending message: {}", e);
break;
}
}
});
}

View File

@@ -0,0 +1,23 @@
use warp::filters::ws::{Message, WebSocket};
use futures_util::{SinkExt, StreamExt};
use tokio::sync::mpsc::channel;
use std::sync::Arc;
use crate::bridge::ClientManager;
pub mod server;
pub async fn handle_ws_connection(websocket: WebSocket, clients: Arc<ClientManager>) {
let (mut ws_tx, _) = websocket.split();
let (tx, mut rx) = channel::<String>(100);
clients.add_client(tx).await;
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if ws_tx.send(Message::text(msg)).await.is_err() {
break;
}
}
});
}

View File

@@ -0,0 +1,27 @@
use warp::Filter;
use std::sync::Arc;
use crate::bridge::ClientManager;
use super::handle_ws_connection;
pub struct Websocket {
clients: Arc<ClientManager>,
}
impl Websocket {
pub fn new(clients: Arc<ClientManager>) -> Self {
Self { clients }
}
pub async fn run(self) {
let route = warp::path("ws")
.and(warp::ws())
.and(warp::any().map(move || self.clients.clone()))
.map(|ws: warp::ws::Ws, clients| {
ws.on_upgrade(move |websocket| handle_ws_connection(websocket, clients))
});
println!("starting websocket server on :3000");
warp::serve(route).run(([0, 0, 0, 0], 3000)).await;
}
}