diff --git a/Cargo.lock b/Cargo.lock index 2e13649..8b6b43b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,6 +150,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -157,6 +172,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -165,6 +181,40 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + [[package]] name = "futures-task" version = "0.3.31" @@ -177,10 +227,16 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -557,6 +613,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "futures", "reqwest", "rppal", "serde", @@ -726,7 +783,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 12f029d..e35de53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-feature serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = "0.4" +futures = "0.3.31" diff --git a/deploy.sh b/deploy.sh index a33b1fe..a3bab98 100755 --- a/deploy.sh +++ b/deploy.sh @@ -4,10 +4,11 @@ set -e echo "Building for Raspberry Pi..." -cargo zigbuild --release --target aarch64-unknown-linux-gnu +cross build --release --target aarch64-unknown-linux-gnu echo "Copying to Raspberry Pi..." scp target/aarch64-unknown-linux-gnu/release/noisebell noisebridge@noisebell.local:~/ +scp endpoints.json noisebridge@noisebell.local:/home/noisebridge/endpoints.json echo "Setting permissions" ssh noisebridge@noisebell.local "chmod +x ~/noisebell " diff --git a/src/main.rs b/src/main.rs index ebe6595..d0093ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,12 +13,19 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::init(); const DEFAULT_GPIO_PIN: u8 = 17; + const DEFAULT_WEBHOOK_RETRIES: u32 = 3; + let gpio_pin = std::env::var("GPIO_PIN") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(DEFAULT_GPIO_PIN); - let webhook_notifier = webhook::WebhookNotifier::new()?; + let webhook_retries = std::env::var("WEBHOOK_RETRIES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_WEBHOOK_RETRIES); + + let webhook_notifier = webhook::WebhookNotifier::new(webhook_retries)?; let mut gpio_monitor = gpio::GpioMonitor::new(gpio_pin, Duration::from_millis(100))?; let callback = move |event: gpio::CircuitEvent| { diff --git a/src/webhook.rs b/src/webhook.rs index 14dae9a..0dec54e 100644 --- a/src/webhook.rs +++ b/src/webhook.rs @@ -3,6 +3,8 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use std::fs; use tracing::{error, info}; +use std::time::Duration; +use futures::future::join_all; use crate::gpio::CircuitEvent; @@ -28,19 +30,75 @@ struct WebhookPayload { pub struct WebhookNotifier { client: Client, endpoints: Vec, + max_retries: u32, } impl WebhookNotifier { - pub fn new() -> Result { + pub fn new(max_retries: u32) -> Result { let config = fs::read_to_string("endpoints.json")?; let endpoints_config: EndpointsConfig = serde_json::from_str(&config)?; Ok(Self { client: Client::new(), endpoints: endpoints_config.endpoints, + max_retries, }) } + async fn send_webhook(&self, endpoint: &Endpoint, payload: &WebhookPayload) -> Result<()> { + match self.client + .post(&endpoint.url) + .json(payload) + .send() + .await + { + Ok(response) => { + if response.status().is_success() { + info!( + "Successfully sent webhook to {}: {}", + endpoint.description, + response.status() + ); + Ok(()) + } else { + error!( + "Webhook request to {} failed with status: {}", + endpoint.description, + response.status() + ); + Err(anyhow::anyhow!("Failed with status: {}", response.status())) + } + } + Err(e) => { + error!( + "Failed to send webhook to {}: {}", + endpoint.description, + e + ); + Err(anyhow::anyhow!("Request failed: {}", e)) + } + } + } + + async fn send_webhook_with_retries(&self, endpoint: &Endpoint, payload: &WebhookPayload) { + let mut attempts = 0; + + while attempts < self.max_retries { + match self.send_webhook(endpoint, payload).await { + Ok(_) => break, + Err(e) => { + attempts += 1; + if attempts < self.max_retries { + tokio::time::sleep(Duration::from_secs(1)).await; + } else { + error!("Failed to send webhook to {} after {} attempts: {}", + endpoint.description, self.max_retries, e); + } + } + } + } + } + pub async fn notify_all(&self, event_type: &str, state: CircuitEvent) { let payload = WebhookPayload { event_type: event_type.to_string(), @@ -48,29 +106,13 @@ impl WebhookNotifier { new_state: state.to_string(), }; - for endpoint in &self.endpoints { - info!("Sending webhook to {}: {}", endpoint.description, serde_json::to_string(&payload).unwrap()); - match self.client - .post(&endpoint.url) - .json(&payload) - .send() - .await - { - Ok(response) => { - info!( - "Successfully sent webhook to {}: {}", - endpoint.description, - response.status() - ); - } - Err(e) => { - error!( - "Failed to send webhook to {}: {}", - endpoint.description, - e - ); - } - } - } + let webhook_futures: Vec<_> = self.endpoints.iter() + .map(|endpoint| { + info!("Sending webhook to {}: {}", endpoint.description, serde_json::to_string(&payload).unwrap()); + self.send_webhook_with_retries(endpoint, &payload) + }) + .collect(); + + join_all(webhook_futures).await; } } \ No newline at end of file