mirror of
				https://github.com/csehviktor/status-monitor.git
				synced 2025-08-08 18:06:14 +02:00 
			
		
		
		
	finish primitive server logic
This commit is contained in:
		| @@ -1,3 +1,82 @@ | ||||
| fn main() { | ||||
|     println!("hello server"); | ||||
| use std::sync::{Arc, Mutex}; | ||||
|  | ||||
| use rumqttd::{local::LinkRx, Broker, Notification}; | ||||
| use common::mqtt::{StatusMessage, STATUS_TOPIC}; | ||||
| use config::load_broker_config; | ||||
| use tokio::sync::mpsc::{channel, Sender}; | ||||
| use warp::{filters::ws::{Message, WebSocket, Ws}, Filter}; | ||||
| use futures_util::{SinkExt, StreamExt}; | ||||
|  | ||||
| pub mod config; | ||||
|  | ||||
| type Clients = Arc<Mutex<Vec<Sender<String>>>>; | ||||
|  | ||||
| #[tokio::main] | ||||
| async fn main() -> anyhow::Result<()> { | ||||
|     let cfg = load_broker_config()?; | ||||
|  | ||||
|     let mut broker = Broker::new(cfg); | ||||
|  | ||||
|     let (mut link_tx, link_rx) = broker.link("internal-subscriber")?; | ||||
|     link_tx.subscribe(STATUS_TOPIC)?; | ||||
|  | ||||
|     let clients = Clients::default(); | ||||
|  | ||||
|     tokio::spawn(async move { | ||||
|         if let Err(err) = broker.start() { | ||||
|             eprintln!("broker exited with error: {:?}", err); | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|     let mqtt_clients = clients.clone(); | ||||
|     tokio::spawn(async move { | ||||
|         if let Err(err) = process_mqtt_notifications(link_rx, mqtt_clients).await { | ||||
|             eprintln!("mqtt processing error: {:?}", err); | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|     let ws_route = warp::path("ws") | ||||
|         .and(warp::ws()) | ||||
|         .and(warp::any().map(move || clients.clone())) | ||||
|         .map(|ws: Ws, clients: Clients| { | ||||
|             ws.on_upgrade(move |websocket| handle_ws_connection(websocket, clients)) | ||||
|         }); | ||||
|  | ||||
|     println!("starting websocket server on :8080"); | ||||
|     warp::serve(ws_route).run(([0, 0, 0, 0], 8080)).await; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| async fn process_mqtt_notifications(mut link_rx: LinkRx, clients: Clients) -> anyhow::Result<()> { | ||||
|     while let Ok(notification) = link_rx.next().await { | ||||
|         if let Notification::Forward(forward) = notification.unwrap() { | ||||
|             let payload = StatusMessage::try_from(&forward.publish.payload[..])?; | ||||
|  | ||||
|             if let Ok(payload_str) = payload.to_string() { | ||||
|                 let mut clients_guard = clients.lock().unwrap(); | ||||
|  | ||||
|                 clients_guard.retain(|client_tx| { | ||||
|                     client_tx.try_send(payload_str.clone()).is_ok() | ||||
|                 }); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| async fn handle_ws_connection(websocket: WebSocket, clients: Clients) { | ||||
|     let (mut ws_tx, _) = websocket.split(); | ||||
|     let (tx, mut rx) = channel::<String>(100); | ||||
|  | ||||
|     clients.lock().unwrap().push(tx); | ||||
|  | ||||
|     tokio::spawn(async move { | ||||
|         while let Some(msg) = rx.recv().await { | ||||
|             if let Err(e) = ws_tx.send(Message::text(msg)).await { | ||||
|                 eprintln!("error sending message: {}", e); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|     }); | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user