feat: reorganize with remote

This commit is contained in:
Jet Pham 2026-03-10 19:43:24 -07:00 committed by Jet
parent a74e5753fa
commit dc7b8cbadd
28 changed files with 622 additions and 3024 deletions

View file

@ -1,13 +1,14 @@
use std::sync::Arc;
use axum::extract::{Query, State};
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::Json;
use noisebell_common::{validate_bearer, HistoryEntry, WebhookPayload};
use tokio::sync::Mutex;
use tracing::{error, info};
use crate::db;
use crate::types::{DoorStatus, InboundWebhook, OutboundPayload, WebhookTarget};
use crate::types::{DoorStatus, WebhookTarget};
use crate::webhook;
pub struct AppState {
@ -26,18 +27,10 @@ fn unix_now() -> u64 {
.as_secs()
}
fn validate_bearer(headers: &HeaderMap, expected: &str) -> bool {
headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.map(|v| v.strip_prefix("Bearer ").unwrap_or("") == expected)
.unwrap_or(false)
}
pub async fn post_webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(body): Json<InboundWebhook>,
Json(body): Json<WebhookPayload>,
) -> StatusCode {
if !validate_bearer(&headers, &state.inbound_api_key) {
return StatusCode::UNAUTHORIZED;
@ -48,12 +41,18 @@ pub async fn post_webhook(
};
let now = unix_now();
{
let conn = state.db.lock().await;
if let Err(e) = db::update_state(&conn, status, body.timestamp, now) {
error!(error = %e, "failed to update state from webhook");
return StatusCode::INTERNAL_SERVER_ERROR;
}
let db = state.db.clone();
let timestamp = body.timestamp;
let result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::update_state(&conn, status, timestamp, now)
})
.await
.expect("db task panicked");
if let Err(e) = result {
error!(error = %e, "failed to update state from webhook");
return StatusCode::INTERNAL_SERVER_ERROR;
}
info!(status = status.as_str(), timestamp = body.timestamp, "state updated via webhook");
@ -61,7 +60,7 @@ pub async fn post_webhook(
webhook::forward(
&state.client,
&state.webhooks,
&OutboundPayload {
&WebhookPayload {
status: status.as_str().to_string(),
timestamp: body.timestamp,
},
@ -74,43 +73,56 @@ pub async fn post_webhook(
}
pub async fn get_status(State(state): State<Arc<AppState>>) -> Result<Json<serde_json::Value>, StatusCode> {
let conn = state.db.lock().await;
match db::get_status(&conn) {
Ok(status) => Ok(Json(serde_json::to_value(status).unwrap())),
Err(e) => {
error!(error = %e, "failed to get status");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
let db = state.db.clone();
let status = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_status(&conn)
})
.await
.expect("db task panicked")
.map_err(|e| {
error!(error = %e, "failed to get status");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::to_value(status).unwrap()))
}
pub async fn get_info(State(state): State<Arc<AppState>>) -> Result<Json<serde_json::Value>, StatusCode> {
let conn = state.db.lock().await;
match db::get_pi_info(&conn) {
Ok(info) => Ok(Json(info)),
Err(e) => {
error!(error = %e, "failed to get pi info");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
let db = state.db.clone();
let info = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_pi_info(&conn)
})
.await
.expect("db task panicked")
.map_err(|e| {
error!(error = %e, "failed to get pi info");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(info))
}
#[derive(serde::Deserialize)]
pub struct HistoryQuery {
pub limit: Option<u32>,
pub async fn health() -> StatusCode {
StatusCode::OK
}
pub async fn get_history(
State(state): State<Arc<AppState>>,
Query(query): Query<HistoryQuery>,
) -> Result<Json<Vec<crate::types::HistoryEntry>>, StatusCode> {
let limit = query.limit.unwrap_or(50);
let conn = state.db.lock().await;
match db::get_history(&conn, limit) {
Ok(entries) => Ok(Json(entries)),
Err(e) => {
error!(error = %e, "failed to get history");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
) -> Result<Json<Vec<HistoryEntry>>, StatusCode> {
let limit = 100u32;
let db = state.db.clone();
let entries = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_history(&conn, limit)
})
.await
.expect("db task panicked")
.map_err(|e| {
error!(error = %e, "failed to get history");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(entries))
}

View file

@ -1,10 +1,11 @@
use anyhow::{Context, Result};
use rusqlite::Connection;
use crate::types::{DoorStatus, HistoryEntry, StatusResponse};
use crate::types::{DoorStatus, StatusResponse};
pub fn init(path: &str) -> Result<Connection> {
let conn = Connection::open(path).context("failed to open SQLite database")?;
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
conn.execute_batch(
"
CREATE TABLE IF NOT EXISTS current_state (
@ -96,16 +97,17 @@ pub fn get_current_status_str(conn: &Connection) -> Result<Option<String>> {
Ok(status)
}
pub fn get_history(conn: &Connection, limit: u32) -> Result<Vec<HistoryEntry>> {
pub fn get_history(conn: &Connection, limit: u32) -> Result<Vec<noisebell_common::HistoryEntry>> {
let mut stmt = conn.prepare(
"SELECT status, timestamp, recorded_at FROM state_log ORDER BY id DESC LIMIT ?1",
"SELECT id, status, timestamp, recorded_at FROM state_log ORDER BY id DESC LIMIT ?1",
)?;
let entries = stmt
.query_map(rusqlite::params![limit], |row| {
Ok(HistoryEntry {
status: row.get(0)?,
timestamp: row.get(1)?,
recorded_at: row.get(2)?,
Ok(noisebell_common::HistoryEntry {
id: row.get(0)?,
status: row.get(1)?,
timestamp: row.get(2)?,
recorded_at: row.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;

View file

@ -5,7 +5,8 @@ use anyhow::{Context, Result};
use axum::routing::{get, post};
use axum::Router;
use tokio::sync::Mutex;
use tracing::info;
use tower_http::trace::TraceLayer;
use tracing::{info, Level};
mod api;
mod db;
@ -27,7 +28,9 @@ async fn main() -> Result<()> {
.context("NOISEBELL_CACHE_PORT must be a valid u16")?;
let pi_address = std::env::var("NOISEBELL_CACHE_PI_ADDRESS")
.context("NOISEBELL_CACHE_PI_ADDRESS is required")?;
.context("NOISEBELL_CACHE_PI_ADDRESS is required")?
.trim_end_matches('/')
.to_string();
let pi_api_key = std::env::var("NOISEBELL_CACHE_PI_API_KEY")
.context("NOISEBELL_CACHE_PI_API_KEY is required")?;
@ -106,7 +109,6 @@ async fn main() -> Result<()> {
offline_threshold,
retry_attempts,
retry_base_delay_secs,
http_timeout_secs,
webhooks: webhooks.clone(),
});
@ -123,10 +125,16 @@ async fn main() -> Result<()> {
});
let app = Router::new()
.route("/health", get(api::health))
.route("/webhook", post(api::post_webhook))
.route("/status", get(api::get_status))
.route("/info", get(api::get_info))
.route("/history", get(api::get_history))
.layer(
TraceLayer::new_for_http()
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)),
)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))

View file

@ -1,11 +1,12 @@
use std::sync::Arc;
use std::time::Duration;
use noisebell_common::WebhookPayload;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use crate::db;
use crate::types::{DoorStatus, OutboundPayload, WebhookTarget};
use crate::types::{DoorStatus, WebhookTarget};
use crate::webhook;
pub struct PollerConfig {
@ -16,7 +17,6 @@ pub struct PollerConfig {
pub offline_threshold: u32,
pub retry_attempts: u32,
pub retry_base_delay_secs: u64,
pub http_timeout_secs: u64,
pub webhooks: Vec<WebhookTarget>,
}
@ -37,8 +37,6 @@ pub fn spawn_status_poller(
let mut was_offline = false;
loop {
tokio::time::sleep(config.status_poll_interval).await;
let result = client
.get(format!("{}/", config.pi_address))
.bearer_auth(&config.pi_api_key)
@ -55,44 +53,53 @@ pub fn spawn_status_poller(
let now = unix_now();
if let Ok(body) = resp.json::<serde_json::Value>().await {
let conn = db.lock().await;
if let Err(e) = db::update_last_seen(&conn, now) {
error!(error = %e, "failed to update last_seen");
}
let status_str = body.get("status").and_then(|s| s.as_str()).map(String::from);
let event_timestamp = body.get("timestamp").and_then(|t| t.as_u64());
// Update state if it differs from current
if let Some(status_str) = body.get("status").and_then(|s| s.as_str()) {
if let Some(status) = DoorStatus::from_str(status_str) {
let current = db::get_current_status_str(&conn);
let changed = match &current {
Ok(Some(s)) => s != status.as_str(),
Ok(None) => true, // was offline
Err(_) => true,
};
if changed {
let timestamp = body
.get("timestamp")
.and_then(|t| t.as_u64())
.unwrap_or(now);
if let Err(e) = db::update_state(&conn, status, timestamp, now) {
error!(error = %e, "failed to update state from poll");
} else {
info!(status = status.as_str(), "state updated from poll");
drop(conn);
webhook::forward(
&client,
&config.webhooks,
&OutboundPayload {
status: status.as_str().to_string(),
timestamp,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
let db = db.clone();
let update_result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
if let Err(e) = db::update_last_seen(&conn, now) {
error!(error = %e, "failed to update last_seen");
}
if let Some(ref status_str) = status_str {
if let Some(status) = DoorStatus::from_str(status_str) {
let current = db::get_current_status_str(&conn);
let changed = match &current {
Ok(Some(s)) => s != status.as_str(),
Ok(None) => true,
Err(_) => true,
};
if changed {
let timestamp = event_timestamp.unwrap_or(now);
if let Err(e) = db::update_state(&conn, status, timestamp, now) {
error!(error = %e, "failed to update state from poll");
return None;
}
return Some((status, timestamp));
}
}
}
None
})
.await
.expect("db task panicked");
if let Some((status, timestamp)) = update_result {
info!(status = status.as_str(), "state updated from poll");
webhook::forward(
&client,
&config.webhooks,
&WebhookPayload {
status: status.as_str().to_string(),
timestamp,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
}
}
@ -111,27 +118,38 @@ pub fn spawn_status_poller(
if consecutive_failures >= config.offline_threshold && !was_offline {
was_offline = true;
let now = unix_now();
let conn = db.lock().await;
if let Err(e) = db::mark_offline(&conn, now) {
error!(error = %e, "failed to mark Pi offline");
} else {
info!("Pi marked offline after {} consecutive failures", consecutive_failures);
drop(conn);
webhook::forward(
&client,
&config.webhooks,
&OutboundPayload {
status: "offline".to_string(),
timestamp: now,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
let db = db.clone();
let marked = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::mark_offline(&conn, now)
})
.await
.expect("db task panicked");
match marked {
Ok(()) => {
info!("Pi marked offline after {} consecutive failures", consecutive_failures);
webhook::forward(
&client,
&config.webhooks,
&WebhookPayload {
status: "offline".to_string(),
timestamp: now,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
Err(e) => {
error!(error = %e, "failed to mark Pi offline");
}
}
}
}
}
tokio::time::sleep(config.status_poll_interval).await;
}
});
}
@ -143,8 +161,6 @@ pub fn spawn_info_poller(
) {
tokio::spawn(async move {
loop {
tokio::time::sleep(config.info_poll_interval).await;
let result = client
.get(format!("{}/info", config.pi_address))
.bearer_auth(&config.pi_api_key)
@ -155,8 +171,15 @@ pub fn spawn_info_poller(
Ok(resp) if resp.status().is_success() => {
if let Ok(data) = resp.json::<serde_json::Value>().await {
let now = unix_now();
let conn = db.lock().await;
if let Err(e) = db::update_pi_info(&conn, &data, now) {
let db = db.clone();
let result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::update_pi_info(&conn, &data, now)
})
.await
.expect("db task panicked");
if let Err(e) = result {
error!(error = %e, "failed to update pi_info");
}
}
@ -169,6 +192,8 @@ pub fn spawn_info_poller(
warn!(error = %err_msg, "info poll failed");
}
}
tokio::time::sleep(config.info_poll_interval).await;
}
});
}

View file

@ -31,25 +31,6 @@ pub struct StatusResponse {
pub last_seen: Option<u64>,
}
#[derive(Debug, Deserialize)]
pub struct InboundWebhook {
pub status: String,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct OutboundPayload {
pub status: String,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct HistoryEntry {
pub status: String,
pub timestamp: u64,
pub recorded_at: u64,
}
#[derive(Debug, Clone)]
pub struct WebhookTarget {
pub url: String,

View file

@ -1,25 +1,26 @@
use std::time::Duration;
use noisebell_common::WebhookPayload;
use tracing::{error, info, warn};
use crate::types::{OutboundPayload, WebhookTarget};
use crate::types::WebhookTarget;
pub async fn forward(
client: &reqwest::Client,
targets: &[WebhookTarget],
payload: &OutboundPayload,
payload: &WebhookPayload,
retry_attempts: u32,
retry_base_delay_secs: u64,
) {
let mut set = tokio::task::JoinSet::new();
for target in targets {
let payload = payload.clone();
let client = client.clone();
let url = target.url.clone();
let secret = target.secret.clone();
let retry_attempts = retry_attempts;
let retry_base_delay_secs = retry_base_delay_secs;
tokio::spawn(async move {
set.spawn(async move {
info!(url = %url, status = %payload.status, "forwarding to outbound webhook");
for attempt in 0..=retry_attempts {
@ -50,4 +51,10 @@ pub async fn forward(
}
});
}
while let Some(result) = set.join_next().await {
if let Err(e) = result {
error!(error = %e, "webhook task panicked");
}
}
}