noisebell/remote/cache-service/src/api.rs

267 lines
7.9 KiB
Rust

use std::sync::Arc;
use axum::extract::State;
use axum::http::{header, HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::Json;
use chrono::{DateTime, Utc};
use noisebell_common::{validate_bearer, CacheStatusResponse, DoorStatus, WebhookPayload};
use tokio::sync::Mutex;
use tracing::{error, info};
use crate::db;
use crate::db::ApplyStateOutcome;
use crate::types::WebhookTarget;
use crate::webhook;
static OPEN_PNG: &[u8] = include_bytes!("../assets/open.png");
static CLOSED_PNG: &[u8] = include_bytes!("../assets/closed.png");
static OFFLINE_PNG: &[u8] = include_bytes!("../assets/offline.png");
pub struct AppState {
pub db: Arc<Mutex<rusqlite::Connection>>,
pub client: reqwest::Client,
pub inbound_api_key: String,
pub webhooks: Vec<WebhookTarget>,
pub retry_attempts: u32,
pub retry_base_delay_secs: u64,
pub webhook_last_request: std::sync::atomic::AtomicU64,
pub webhook_tokens: std::sync::atomic::AtomicU32,
}
fn unix_now() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
}
fn format_full_timestamp(ts: u64) -> String {
DateTime::from_timestamp(ts as i64, 0)
.map(|dt: DateTime<Utc>| dt.format("%A, %B %-d, %Y at %-I:%M:%S %p UTC").to_string())
.unwrap_or_else(|| format!("unix timestamp {ts}"))
}
fn format_duration(seconds: u64) -> String {
let units = [(86_400, "day"), (3_600, "hour"), (60, "minute"), (1, "second")];
let mut remaining = seconds;
let mut parts = Vec::new();
for (unit_seconds, name) in units {
if remaining >= unit_seconds {
let count = remaining / unit_seconds;
remaining %= unit_seconds;
let suffix = if count == 1 { "" } else { "s" };
parts.push(format!("{count} {name}{suffix}"));
}
if parts.len() == 2 {
break;
}
}
if parts.is_empty() {
"0 seconds".to_string()
} else if parts.len() == 1 {
parts.remove(0)
} else {
format!("{} and {}", parts[0], parts[1])
}
}
fn status_summary(
status: DoorStatus,
since: Option<u64>,
last_checked: Option<u64>,
now: u64,
) -> String {
let since_text = since
.map(|ts| {
format!(
"We've been {} since {}, which was {} ago.",
status,
format_full_timestamp(ts),
format_duration(now.saturating_sub(ts)),
)
})
.unwrap_or_else(|| format!("We're currently {}, but the start time is unknown.", status));
match last_checked {
Some(ts) => format!(
"{since_text} Last checked {}, which was {} ago.",
format_full_timestamp(ts),
format_duration(now.saturating_sub(ts)),
),
None => format!("{since_text} Last checked time is unknown."),
}
}
const WEBHOOK_RATE_LIMIT: u32 = 10;
const WEBHOOK_RATE_WINDOW_SECS: u64 = 60;
pub async fn post_webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(body): Json<WebhookPayload>,
) -> StatusCode {
if !validate_bearer(&headers, &state.inbound_api_key) {
return StatusCode::UNAUTHORIZED;
}
// Simple rate limiting: reset tokens every window, reject if exhausted.
let now = unix_now();
let last = state.webhook_last_request.load(std::sync::atomic::Ordering::Relaxed);
if now.saturating_sub(last) >= WEBHOOK_RATE_WINDOW_SECS {
state.webhook_tokens.store(WEBHOOK_RATE_LIMIT, std::sync::atomic::Ordering::Relaxed);
state.webhook_last_request.store(now, std::sync::atomic::Ordering::Relaxed);
}
let remaining = state.webhook_tokens.fetch_update(
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
|n| if n > 0 { Some(n - 1) } else { None },
);
if remaining.is_err() {
return StatusCode::TOO_MANY_REQUESTS;
}
let now = unix_now();
let db = state.db.clone();
let status = body.status;
let timestamp = body.timestamp;
let result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::apply_state(&conn, status, timestamp, now)
})
.await
.expect("db task panicked");
match result {
Ok(ApplyStateOutcome::Applied) => {
info!(
status = %status,
timestamp = body.timestamp,
"state updated via webhook"
);
webhook::forward(
&state.client,
&state.webhooks,
&WebhookPayload { status, timestamp: body.timestamp },
state.retry_attempts,
state.retry_base_delay_secs,
)
.await;
}
Ok(ApplyStateOutcome::Duplicate) => {
info!(
status = %status,
timestamp = body.timestamp,
"duplicate webhook ignored"
);
}
Ok(ApplyStateOutcome::Stale) => {
info!(
status = %status,
timestamp = body.timestamp,
"stale webhook ignored"
);
}
Err(e) => {
error!(error = %e, "failed to update state from webhook");
return StatusCode::INTERNAL_SERVER_ERROR;
}
}
StatusCode::OK
}
pub async fn get_status(
State(state): State<Arc<AppState>>,
) -> Result<Json<CacheStatusResponse>, StatusCode> {
let db = state.db.clone();
let mut status = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_status(&conn)
})
.await
.expect("db task panicked")
.map_err(|e| {
error!(error = %e, "failed to get status");
StatusCode::INTERNAL_SERVER_ERROR
})?;
status.human_readable =
status_summary(status.status, status.since, status.last_checked, unix_now());
Ok(Json(status))
}
pub async fn health() -> StatusCode {
StatusCode::OK
}
pub async fn get_image_open() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],
OPEN_PNG,
)
}
pub async fn get_image_closed() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],
CLOSED_PNG,
)
}
pub async fn get_image_offline() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],
OFFLINE_PNG,
)
}
pub async fn get_image(State(state): State<Arc<AppState>>) -> Response {
let db = state.db.clone();
let status = match tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_current_status(&conn)
})
.await
.expect("db task panicked")
{
Ok(status) => status,
Err(e) => {
error!(error = %e, "failed to get current status for image");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
let image = match status {
DoorStatus::Open => OPEN_PNG,
DoorStatus::Closed => CLOSED_PNG,
DoorStatus::Offline => OFFLINE_PNG,
};
([(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=5")], image)
.into_response()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_duration_uses_two_units() {
assert_eq!(format_duration(57), "57 seconds");
assert_eq!(format_duration(125), "2 minutes and 5 seconds");
assert_eq!(format_duration(3_723), "1 hour and 2 minutes");
}
#[test]
fn status_summary_includes_since_and_last_checked() {
let summary = status_summary(DoorStatus::Open, Some(1_000), Some(1_125), 1_180);
assert!(summary.contains("We've been open since"));
assert!(summary.contains("which was 3 minutes ago"));
assert!(summary.contains("Last checked"));
assert!(summary.contains("55 seconds ago"));
}
}