diff --git a/agent/src/main.rs b/agent/src/main.rs index abec932..070a377 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -20,6 +20,13 @@ struct Args { #[arg(long, default_value_t = 1883)] port: u16, + + #[arg(long, default_value_t = 6)] + reconnect_after: u8, +} + +async fn connect(args: &Args) -> MqttHandle { + MqttHandle::create(args.name.clone(), args.host.clone(), args.port).await } #[tokio::main] @@ -28,12 +35,24 @@ async fn main() -> anyhow::Result<()> { println!("running agent: {:?}", args); - let client = MqttHandle::create(args.name, args.host, args.port).await; + let mut client = connect(&args).await; let mut collector = Collector::new(); + let mut consecutive_failures = 0; loop { let metrics = collector.collect_all(); - client.send_metrics(metrics).await?; + + match client.send_metrics(metrics).await { + Ok(_) => consecutive_failures = 0, + Err(_) => { + consecutive_failures += 1; + + if consecutive_failures >= args.reconnect_after { + client = connect(&args).await; + consecutive_failures = 0; + } + } + } thread::sleep(Duration::from_secs(MQTT_SEND_INTERVAL)); } diff --git a/agent/src/mqtt.rs b/agent/src/mqtt.rs index fb81586..530ada5 100644 --- a/agent/src/mqtt.rs +++ b/agent/src/mqtt.rs @@ -37,8 +37,7 @@ impl MqttHandle { self.client .publish(MQTT_TOPIC, QoS::AtLeastOnce, false, message.as_bytes()) - .await - .expect("Failed to publish metrics"); + .await?; Ok(()) }