mirror of
https://github.com/csehviktor/status-monitor.git
synced 2025-08-08 18:06:14 +02:00
implement dynamic path for ws connections
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
use common::StatusMessage;
|
||||||
use warp::{filters::ws::{Message, WebSocket}, Filter, Reply, Rejection};
|
use warp::{filters::ws::{Message, WebSocket}, Filter, Reply, Rejection};
|
||||||
use futures_util::{StreamExt, SinkExt};
|
use futures_util::{StreamExt, SinkExt};
|
||||||
use tokio::sync::mpsc::channel;
|
use tokio::sync::mpsc::channel;
|
||||||
@@ -15,16 +16,17 @@ impl WebsocketRoutes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn routes(self: Arc<Self>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
|
pub fn routes(self: Arc<Self>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
|
||||||
warp::path("ws")
|
warp::path("agent")
|
||||||
|
.and(warp::path::param::<String>())
|
||||||
.and(warp::get())
|
.and(warp::get())
|
||||||
.and(warp::ws())
|
.and(warp::ws())
|
||||||
.and(warp::any().map(move || self.clients.clone()))
|
.and(warp::any().map(move || self.clients.clone()))
|
||||||
.map(|websocket: warp::ws::Ws, clients: Arc<ClientManager>| {
|
.map(|agent: String, websocket: warp::ws::Ws, clients: Arc<ClientManager>| {
|
||||||
websocket.on_upgrade(move |websocket| Self::handle_ws_connection(websocket, clients))
|
websocket.on_upgrade(move |websocket| Self::handle_ws_connection(agent, websocket, clients))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_ws_connection(websocket: WebSocket, clients: Arc<ClientManager>) {
|
async fn handle_ws_connection(agent: String, websocket: WebSocket, clients: Arc<ClientManager>) {
|
||||||
let (mut ws_tx, _) = websocket.split();
|
let (mut ws_tx, _) = websocket.split();
|
||||||
let (tx, mut rx) = channel::<StatusMessage>(100);
|
let (tx, mut rx) = channel::<StatusMessage>(100);
|
||||||
|
|
||||||
@@ -32,6 +34,10 @@ impl WebsocketRoutes {
|
|||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(msg) = rx.recv().await {
|
while let Some(msg) = rx.recv().await {
|
||||||
|
if msg.agent != agent {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if ws_tx.send(Message::text(msg)).await.is_err() {
|
if ws_tx.send(Message::text(msg)).await.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user