diff --git a/server/src/controller/http.rs b/server/src/controller/http.rs new file mode 100644 index 0000000..f2e20eb --- /dev/null +++ b/server/src/controller/http.rs @@ -0,0 +1,58 @@ +use chrono::{Duration, Utc}; +use std::sync::Arc; +use warp::{Filter, Rejection, Reply, reply::json}; + +use crate::storage::StorageRepository; + +pub struct HttpRoutes { + storage: Arc, +} + +impl HttpRoutes { + pub fn new(storage: Arc) -> Self { + Self { storage } + } + + pub fn routes(self: Arc) -> impl Filter + Clone { + let agents_storage = self.storage.clone(); + let history_storage = self.storage.clone(); + + let agents_route = warp::path!("agents") + .and(warp::get()) + .and(warp::any().map(move || agents_storage.clone())) + .and_then(Self::get_agents); + + let history_route = warp::path!("history" / String / String) + .and(warp::get()) + .and(warp::any().map(move || history_storage.clone())) + .and_then(Self::get_history); + + agents_route.or(history_route) + } + + async fn get_agents(storage: Arc) -> Result { + let agents = storage.get_agents().await.unwrap(); + + Ok(json(&agents)) + } + + async fn get_history( + agent: String, + dur: String, + storage: Arc, + ) -> Result { + let now = Utc::now(); + + let duration = match dur.as_str() { + "hour" => Some(now - Duration::hours(1)), + "day" => Some(now - Duration::days(1)), + "week" => Some(now - Duration::weeks(1)), + "month" => Some(now - Duration::weeks(4)), + _ => None, + }; + + let history = storage.get_history(&agent, duration).await.unwrap(); + + Ok(json(&history)) + } +} diff --git a/server/src/controller/mod.rs b/server/src/controller/mod.rs new file mode 100644 index 0000000..db2c838 --- /dev/null +++ b/server/src/controller/mod.rs @@ -0,0 +1,32 @@ +use http::HttpRoutes; +use std::sync::Arc; +use warp::Filter; +use websocket::WebsocketRoutes; + +use crate::{bridge::ClientManager, storage::StorageRepositoryImpl}; + +pub mod http; +pub mod websocket; + +pub struct Server { + clients: Arc, + storage: Arc, +} + +impl Server { + pub fn new(clients: Arc, storage: Arc) -> Self { + Self { clients, storage } + } + + pub async fn serve(&self) { + let http_routes = Arc::new(HttpRoutes::new(self.storage.inner())); + let ws_routes = Arc::new(WebsocketRoutes::new(self.clients.clone())); + + let cors = warp::cors().allow_any_origin().allow_methods(vec!["GET"]); + + let routes = http_routes.routes().with(cors).or(ws_routes.routes()); + + println!("starting server on :{}", 3000); + warp::serve(routes).run(([0, 0, 0, 0], 3000)).await; + } +} diff --git a/server/src/controller/websocket.rs b/server/src/controller/websocket.rs new file mode 100644 index 0000000..a6cbc25 --- /dev/null +++ b/server/src/controller/websocket.rs @@ -0,0 +1,57 @@ +use common::StatusMessage; +use futures_util::{SinkExt, StreamExt}; +use std::sync::Arc; +use tokio::sync::mpsc::channel; +use warp::{ + Filter, Rejection, Reply, + filters::ws::{Message, WebSocket}, +}; + +use crate::bridge::ClientManager; + +pub struct WebsocketRoutes { + clients: Arc, +} + +impl WebsocketRoutes { + pub fn new(clients: Arc) -> Self { + Self { clients } + } + + pub fn routes(self: Arc) -> impl Filter + Clone { + warp::path!("ws" / String) + .and(warp::get()) + .and(warp::ws()) + .and(warp::any().map(move || self.clients.clone())) + .map( + |agent: String, websocket: warp::ws::Ws, clients: Arc| { + websocket.on_upgrade(move |websocket| { + Self::handle_ws_connection(agent, websocket, clients) + }) + }, + ) + } + + async fn handle_ws_connection( + agent: String, + websocket: WebSocket, + clients: Arc, + ) { + let (mut ws_tx, _) = websocket.split(); + let (tx, mut rx) = channel::(100); + + clients.add_client(tx).await; + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if msg.agent != agent { + continue; + } + + if ws_tx.send(Message::text(msg)).await.is_err() { + break; + } + } + }); + } +}