feat: expose configurations, add retry, make stable
This commit is contained in:
parent
c6e726c430
commit
50ec63a474
11 changed files with 494 additions and 221 deletions
|
|
@ -1,16 +1,29 @@
|
|||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use axum::{extract::State, routing::get, Json, Router};
|
||||
use rppal::gpio::{Gpio, Level, Trigger};
|
||||
use serde::Serialize;
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
struct AppState {
|
||||
is_open: AtomicBool,
|
||||
last_changed: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StatusResponse {
|
||||
status: &'static str,
|
||||
timestamp: u64,
|
||||
}
|
||||
|
||||
fn unix_timestamp() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
fn status_str(is_open: bool) -> &'static str {
|
||||
|
|
@ -21,15 +34,18 @@ fn status_str(is_open: bool) -> &'static str {
|
|||
}
|
||||
}
|
||||
|
||||
async fn get_status(State(is_open): State<Arc<AtomicBool>>) -> Json<StatusResponse> {
|
||||
async fn get_status(State(state): State<Arc<AppState>>) -> Json<StatusResponse> {
|
||||
Json(StatusResponse {
|
||||
status: status_str(is_open.load(Ordering::Relaxed)),
|
||||
status: status_str(state.is_open.load(Ordering::Relaxed)),
|
||||
timestamp: state.last_changed.load(Ordering::Relaxed),
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let gpio_pin: u8 = std::env::var("NOISEBELL_GPIO_PIN")
|
||||
.unwrap_or_else(|_| "17".into())
|
||||
|
|
@ -49,68 +65,136 @@ async fn main() -> Result<()> {
|
|||
let endpoint_url =
|
||||
std::env::var("NOISEBELL_ENDPOINT_URL").context("NOISEBELL_ENDPOINT_URL is required")?;
|
||||
|
||||
let retry_attempts: u32 = std::env::var("NOISEBELL_RETRY_ATTEMPTS")
|
||||
.unwrap_or_else(|_| "3".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_RETRY_ATTEMPTS must be a valid u32")?;
|
||||
|
||||
let retry_base_delay_secs: u64 = std::env::var("NOISEBELL_RETRY_BASE_DELAY_SECS")
|
||||
.unwrap_or_else(|_| "1".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_RETRY_BASE_DELAY_SECS must be a valid u64")?;
|
||||
|
||||
let http_timeout_secs: u64 = std::env::var("NOISEBELL_HTTP_TIMEOUT_SECS")
|
||||
.unwrap_or_else(|_| "10".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_HTTP_TIMEOUT_SECS must be a valid u64")?;
|
||||
|
||||
let bind_address = std::env::var("NOISEBELL_BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0".into());
|
||||
|
||||
let active_low: bool = std::env::var("NOISEBELL_ACTIVE_LOW")
|
||||
.unwrap_or_else(|_| "true".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_ACTIVE_LOW must be true or false")?;
|
||||
|
||||
info!(gpio_pin, debounce_secs, port, %endpoint_url, "starting noisebell");
|
||||
|
||||
let gpio = Gpio::new().context("failed to initialize GPIO")?;
|
||||
let pin = gpio
|
||||
.get(gpio_pin)
|
||||
.context(format!("failed to get GPIO pin {gpio_pin}"))?
|
||||
.into_input_pullup();
|
||||
.context(format!("failed to get GPIO pin {gpio_pin}"))?;
|
||||
let pin = if active_low {
|
||||
pin.into_input_pullup()
|
||||
} else {
|
||||
pin.into_input_pulldown()
|
||||
};
|
||||
|
||||
let is_open = Arc::new(AtomicBool::new(pin.read() == Level::Low));
|
||||
let open_level = if active_low { Level::Low } else { Level::High };
|
||||
let initial_open = pin.read() == open_level;
|
||||
let state = Arc::new(AppState {
|
||||
is_open: AtomicBool::new(initial_open),
|
||||
last_changed: AtomicU64::new(unix_timestamp()),
|
||||
});
|
||||
|
||||
info!(initial_status = status_str(is_open.load(Ordering::Relaxed)), "GPIO initialized");
|
||||
info!(initial_status = status_str(initial_open), "GPIO initialized");
|
||||
|
||||
// Channel to bridge sync GPIO callback -> async notification task
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<bool>();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(bool, u64)>();
|
||||
|
||||
// Set up async interrupt for state changes
|
||||
let state_for_interrupt = is_open.clone();
|
||||
pin.set_async_interrupt(
|
||||
// Sync initial state with the cache on startup
|
||||
let _ = tx.send((initial_open, unix_timestamp()));
|
||||
|
||||
let state_for_interrupt = state.clone();
|
||||
// pin must live for the entire program — rppal runs interrupts on a background
|
||||
// thread tied to the InputPin. If pin drops, the interrupt thread is joined and
|
||||
// monitoring stops. We move it into a binding that lives until main() returns.
|
||||
let _pin = pin;
|
||||
_pin.set_async_interrupt(
|
||||
Trigger::Both,
|
||||
Some(Duration::from_secs(debounce_secs)),
|
||||
move |event| {
|
||||
let new_open = match event.trigger {
|
||||
Trigger::FallingEdge => true,
|
||||
Trigger::RisingEdge => false,
|
||||
Trigger::FallingEdge => active_low,
|
||||
Trigger::RisingEdge => !active_low,
|
||||
_ => return,
|
||||
};
|
||||
let was_open = state_for_interrupt.swap(new_open, Ordering::Relaxed);
|
||||
let was_open = state_for_interrupt.is_open.swap(new_open, Ordering::Relaxed);
|
||||
if was_open != new_open {
|
||||
let _ = tx.send(new_open);
|
||||
let timestamp = unix_timestamp();
|
||||
state_for_interrupt.last_changed.store(timestamp, Ordering::Relaxed);
|
||||
let _ = tx.send((new_open, timestamp));
|
||||
}
|
||||
},
|
||||
)
|
||||
.context("failed to set GPIO interrupt")?;
|
||||
|
||||
// Task that POSTs state changes to the endpoint
|
||||
tokio::spawn(async move {
|
||||
let client = reqwest::Client::new();
|
||||
while let Some(new_open) = rx.recv().await {
|
||||
let status = status_str(new_open);
|
||||
info!(status, "state changed");
|
||||
let notify_handle = tokio::spawn(async move {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(http_timeout_secs))
|
||||
.build()
|
||||
.expect("failed to build HTTP client");
|
||||
|
||||
if let Err(e) = client
|
||||
.post(&endpoint_url)
|
||||
.json(&serde_json::json!({ "status": status }))
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
error!(%e, "failed to notify endpoint");
|
||||
while let Some((new_open, timestamp)) = rx.recv().await {
|
||||
let status = status_str(new_open);
|
||||
info!(status, timestamp, "state changed");
|
||||
|
||||
let payload = serde_json::json!({ "status": status, "timestamp": timestamp });
|
||||
|
||||
for attempt in 0..=retry_attempts {
|
||||
let result = client.post(&endpoint_url).json(&payload).send().await;
|
||||
match result {
|
||||
Ok(resp) if resp.status().is_success() => break,
|
||||
_ => {
|
||||
let err_msg = match &result {
|
||||
Ok(resp) => format!("HTTP {}", resp.status()),
|
||||
Err(e) => e.to_string(),
|
||||
};
|
||||
if attempt == retry_attempts {
|
||||
error!(error = %err_msg, "failed to notify endpoint after {} attempts", retry_attempts + 1);
|
||||
} else {
|
||||
let delay = Duration::from_secs(
|
||||
retry_base_delay_secs * 2u64.pow(attempt),
|
||||
);
|
||||
warn!(error = %err_msg, attempt = attempt + 1, "notify failed, retrying in {:?}", delay);
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let app = Router::new()
|
||||
.route("/status", get(get_status))
|
||||
.with_state(is_open);
|
||||
.route("/", get(get_status))
|
||||
.with_state(state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))
|
||||
let listener = tokio::net::TcpListener::bind((&*bind_address, port))
|
||||
.await
|
||||
.context(format!("failed to bind to port {port}"))?;
|
||||
.context(format!("failed to bind to {bind_address}:{port}"))?;
|
||||
|
||||
info!(port, "listening");
|
||||
axum::serve(listener, app).await.context("server error")?;
|
||||
|
||||
let shutdown = tokio::signal::ctrl_c();
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async { shutdown.await.ok(); })
|
||||
.await
|
||||
.context("server error")?;
|
||||
|
||||
info!("shutting down, draining notification queue");
|
||||
// Drop the interrupt to stop producing new messages, then wait
|
||||
// for the notification task to drain remaining messages.
|
||||
drop(_pin);
|
||||
let _ = notify_handle.await;
|
||||
|
||||
info!("shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue