feat: add retires and errors on uncessessful attempts
This commit is contained in:
parent
0c2e129bbd
commit
cfc5f01221
5 changed files with 136 additions and 28 deletions
59
Cargo.lock
generated
59
Cargo.lock
generated
|
|
@ -150,6 +150,21 @@ dependencies = [
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
|
||||||
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
|
"futures-executor",
|
||||||
|
"futures-io",
|
||||||
|
"futures-sink",
|
||||||
|
"futures-task",
|
||||||
|
"futures-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-channel"
|
name = "futures-channel"
|
||||||
version = "0.3.31"
|
version = "0.3.31"
|
||||||
|
|
@ -157,6 +172,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
|
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-sink",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -165,6 +181,40 @@ version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
|
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-executor"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-task",
|
||||||
|
"futures-util",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-io"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-macro"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-sink"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-task"
|
name = "futures-task"
|
||||||
version = "0.3.31"
|
version = "0.3.31"
|
||||||
|
|
@ -177,10 +227,16 @@ version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"futures-macro",
|
||||||
|
"futures-sink",
|
||||||
"futures-task",
|
"futures-task",
|
||||||
|
"memchr",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"pin-utils",
|
"pin-utils",
|
||||||
|
"slab",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -557,6 +613,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"futures",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rppal",
|
"rppal",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
@ -726,7 +783,7 @@ dependencies = [
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"socket2",
|
"socket2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"windows-sys 0.52.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
||||||
|
|
@ -13,3 +13,4 @@ reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-feature
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
|
futures = "0.3.31"
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,11 @@
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
echo "Building for Raspberry Pi..."
|
echo "Building for Raspberry Pi..."
|
||||||
cargo zigbuild --release --target aarch64-unknown-linux-gnu
|
cross build --release --target aarch64-unknown-linux-gnu
|
||||||
|
|
||||||
echo "Copying to Raspberry Pi..."
|
echo "Copying to Raspberry Pi..."
|
||||||
scp target/aarch64-unknown-linux-gnu/release/noisebell noisebridge@noisebell.local:~/
|
scp target/aarch64-unknown-linux-gnu/release/noisebell noisebridge@noisebell.local:~/
|
||||||
|
scp endpoints.json noisebridge@noisebell.local:/home/noisebridge/endpoints.json
|
||||||
|
|
||||||
echo "Setting permissions"
|
echo "Setting permissions"
|
||||||
ssh noisebridge@noisebell.local "chmod +x ~/noisebell "
|
ssh noisebridge@noisebell.local "chmod +x ~/noisebell "
|
||||||
|
|
|
||||||
|
|
@ -13,12 +13,19 @@ async fn main() -> Result<()> {
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
const DEFAULT_GPIO_PIN: u8 = 17;
|
const DEFAULT_GPIO_PIN: u8 = 17;
|
||||||
|
const DEFAULT_WEBHOOK_RETRIES: u32 = 3;
|
||||||
|
|
||||||
let gpio_pin = std::env::var("GPIO_PIN")
|
let gpio_pin = std::env::var("GPIO_PIN")
|
||||||
.ok()
|
.ok()
|
||||||
.and_then(|v| v.parse().ok())
|
.and_then(|v| v.parse().ok())
|
||||||
.unwrap_or(DEFAULT_GPIO_PIN);
|
.unwrap_or(DEFAULT_GPIO_PIN);
|
||||||
|
|
||||||
let webhook_notifier = webhook::WebhookNotifier::new()?;
|
let webhook_retries = std::env::var("WEBHOOK_RETRIES")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(DEFAULT_WEBHOOK_RETRIES);
|
||||||
|
|
||||||
|
let webhook_notifier = webhook::WebhookNotifier::new(webhook_retries)?;
|
||||||
let mut gpio_monitor = gpio::GpioMonitor::new(gpio_pin, Duration::from_millis(100))?;
|
let mut gpio_monitor = gpio::GpioMonitor::new(gpio_pin, Duration::from_millis(100))?;
|
||||||
|
|
||||||
let callback = move |event: gpio::CircuitEvent| {
|
let callback = move |event: gpio::CircuitEvent| {
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@ use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
use std::time::Duration;
|
||||||
|
use futures::future::join_all;
|
||||||
|
|
||||||
use crate::gpio::CircuitEvent;
|
use crate::gpio::CircuitEvent;
|
||||||
|
|
||||||
|
|
@ -28,19 +30,75 @@ struct WebhookPayload {
|
||||||
pub struct WebhookNotifier {
|
pub struct WebhookNotifier {
|
||||||
client: Client,
|
client: Client,
|
||||||
endpoints: Vec<Endpoint>,
|
endpoints: Vec<Endpoint>,
|
||||||
|
max_retries: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebhookNotifier {
|
impl WebhookNotifier {
|
||||||
pub fn new() -> Result<Self> {
|
pub fn new(max_retries: u32) -> Result<Self> {
|
||||||
let config = fs::read_to_string("endpoints.json")?;
|
let config = fs::read_to_string("endpoints.json")?;
|
||||||
let endpoints_config: EndpointsConfig = serde_json::from_str(&config)?;
|
let endpoints_config: EndpointsConfig = serde_json::from_str(&config)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
client: Client::new(),
|
client: Client::new(),
|
||||||
endpoints: endpoints_config.endpoints,
|
endpoints: endpoints_config.endpoints,
|
||||||
|
max_retries,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_webhook(&self, endpoint: &Endpoint, payload: &WebhookPayload) -> Result<()> {
|
||||||
|
match self.client
|
||||||
|
.post(&endpoint.url)
|
||||||
|
.json(payload)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(response) => {
|
||||||
|
if response.status().is_success() {
|
||||||
|
info!(
|
||||||
|
"Successfully sent webhook to {}: {}",
|
||||||
|
endpoint.description,
|
||||||
|
response.status()
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
error!(
|
||||||
|
"Webhook request to {} failed with status: {}",
|
||||||
|
endpoint.description,
|
||||||
|
response.status()
|
||||||
|
);
|
||||||
|
Err(anyhow::anyhow!("Failed with status: {}", response.status()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"Failed to send webhook to {}: {}",
|
||||||
|
endpoint.description,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
Err(anyhow::anyhow!("Request failed: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_webhook_with_retries(&self, endpoint: &Endpoint, payload: &WebhookPayload) {
|
||||||
|
let mut attempts = 0;
|
||||||
|
|
||||||
|
while attempts < self.max_retries {
|
||||||
|
match self.send_webhook(endpoint, payload).await {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(e) => {
|
||||||
|
attempts += 1;
|
||||||
|
if attempts < self.max_retries {
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
} else {
|
||||||
|
error!("Failed to send webhook to {} after {} attempts: {}",
|
||||||
|
endpoint.description, self.max_retries, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn notify_all(&self, event_type: &str, state: CircuitEvent) {
|
pub async fn notify_all(&self, event_type: &str, state: CircuitEvent) {
|
||||||
let payload = WebhookPayload {
|
let payload = WebhookPayload {
|
||||||
event_type: event_type.to_string(),
|
event_type: event_type.to_string(),
|
||||||
|
|
@ -48,29 +106,13 @@ impl WebhookNotifier {
|
||||||
new_state: state.to_string(),
|
new_state: state.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
for endpoint in &self.endpoints {
|
let webhook_futures: Vec<_> = self.endpoints.iter()
|
||||||
info!("Sending webhook to {}: {}", endpoint.description, serde_json::to_string(&payload).unwrap());
|
.map(|endpoint| {
|
||||||
match self.client
|
info!("Sending webhook to {}: {}", endpoint.description, serde_json::to_string(&payload).unwrap());
|
||||||
.post(&endpoint.url)
|
self.send_webhook_with_retries(endpoint, &payload)
|
||||||
.json(&payload)
|
})
|
||||||
.send()
|
.collect();
|
||||||
.await
|
|
||||||
{
|
join_all(webhook_futures).await;
|
||||||
Ok(response) => {
|
|
||||||
info!(
|
|
||||||
"Successfully sent webhook to {}: {}",
|
|
||||||
endpoint.description,
|
|
||||||
response.status()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!(
|
|
||||||
"Failed to send webhook to {}: {}",
|
|
||||||
endpoint.description,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue