feat: remove rss, status, and badge features

This commit is contained in:
Jet 2026-03-19 01:33:56 -07:00
parent 553d7d1780
commit 36720e2ba5
No known key found for this signature in database
21 changed files with 904 additions and 1200 deletions

View file

@ -1,8 +1,8 @@
# Cache Service
The central hub. Sits between the Pi and everything else.
The central hub. Sits between the Pi and Discord.
It does two things: polls the Pi on a timer to keep a local copy of the door state, and receives push webhooks from the Pi when the state actually changes. Either way, updates get written to SQLite and forwarded to downstream services (Discord, etc.) via outbound webhooks.
It does two things: polls the Pi on a timer to keep a local copy of the door state, and receives push webhooks from the Pi when the state actually changes. Either way, updates get written to SQLite and forwarded to downstream services via outbound webhooks.
If the Pi stops responding to polls (configurable threshold, default 3 misses), the cache marks it as offline and notifies downstream.
@ -10,12 +10,12 @@ If the Pi stops responding to polls (configurable threshold, default 3 misses),
| Method | Path | Auth | Description |
|--------|------|------|-------------|
| `GET` | `/status` | — | Current door status (`status`, `timestamp`, `last_seen`) |
| `GET` | `/info` | — | Cached Pi system info |
| `GET` | `/history` | — | Last 100 state changes |
| `GET` | `/status` | — | Current door status (`status`, `since`, `last_checked`) |
| `POST` | `/webhook` | Bearer | Inbound webhook from the Pi |
| `GET` | `/health` | — | Health check |
`since` is the Pi-reported time when the current state began. `last_checked` is when the cache most recently attempted a poll.
## Configuration
NixOS options under `services.noisebell-cache`:
@ -28,7 +28,6 @@ NixOS options under `services.noisebell-cache`:
| `inboundApiKeyFile` | required | Key file for validating inbound webhooks |
| `port` | `3000` | Listen port |
| `statusPollIntervalSecs` | `60` | How often to poll `GET /` on the Pi |
| `infoPollIntervalSecs` | `300` | How often to poll `GET /info` on the Pi |
| `offlineThreshold` | `3` | Consecutive failed polls before marking offline |
| `retryAttempts` | `3` | Outbound webhook retry count |
| `retryBaseDelaySecs` | `1` | Exponential backoff base delay |

View file

@ -39,11 +39,6 @@ in
default = 60;
};
infoPollIntervalSecs = lib.mkOption {
type = lib.types.ints.positive;
default = 300;
};
offlineThreshold = lib.mkOption {
type = lib.types.ints.positive;
default = 3;
@ -70,16 +65,18 @@ in
};
outboundWebhooks = lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options = {
url = lib.mkOption { type = lib.types.str; };
secretFile = lib.mkOption {
type = lib.types.nullOr lib.types.path;
default = null;
type = lib.types.listOf (
lib.types.submodule {
options = {
url = lib.mkOption { type = lib.types.str; };
secretFile = lib.mkOption {
type = lib.types.nullOr lib.types.path;
default = null;
};
};
};
});
default = [];
}
);
default = [ ];
};
};
@ -88,61 +85,65 @@ in
isSystemUser = true;
group = "noisebell-cache";
};
users.groups.noisebell-cache = {};
users.groups.noisebell-cache = { };
services.caddy.virtualHosts.${cfg.domain}.extraConfig = ''
redir / https://git.extremist.software/jet/noisebell 302
reverse_proxy localhost:${toString cfg.port}
'';
systemd.services.noisebell-cache = let
webhookExports = lib.concatImapStringsSep "\n" (i: wh:
let idx = toString (i - 1); in
''export NOISEBELL_CACHE_WEBHOOK_${idx}_URL="${wh.url}"'' +
lib.optionalString (wh.secretFile != null)
systemd.services.noisebell-cache =
let
webhookExports = lib.concatImapStringsSep "\n" (
i: wh:
let
idx = toString (i - 1);
in
''export NOISEBELL_CACHE_WEBHOOK_${idx}_URL="${wh.url}"''
+ lib.optionalString (wh.secretFile != null) ''
export NOISEBELL_CACHE_WEBHOOK_${idx}_SECRET="$(cat ${wh.secretFile})"
''
export NOISEBELL_CACHE_WEBHOOK_${idx}_SECRET="$(cat ${wh.secretFile})"''
) cfg.outboundWebhooks;
in {
description = "Noisebell cache service";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
environment = {
NOISEBELL_CACHE_PORT = toString cfg.port;
NOISEBELL_CACHE_PI_ADDRESS = cfg.piAddress;
NOISEBELL_CACHE_DATA_DIR = cfg.dataDir;
NOISEBELL_CACHE_STATUS_POLL_INTERVAL_SECS = toString cfg.statusPollIntervalSecs;
NOISEBELL_CACHE_INFO_POLL_INTERVAL_SECS = toString cfg.infoPollIntervalSecs;
NOISEBELL_CACHE_OFFLINE_THRESHOLD = toString cfg.offlineThreshold;
NOISEBELL_CACHE_RETRY_ATTEMPTS = toString cfg.retryAttempts;
NOISEBELL_CACHE_RETRY_BASE_DELAY_SECS = toString cfg.retryBaseDelaySecs;
NOISEBELL_CACHE_HTTP_TIMEOUT_SECS = toString cfg.httpTimeoutSecs;
RUST_LOG = "info";
) cfg.outboundWebhooks;
in
{
description = "Noisebell cache service";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
environment = {
NOISEBELL_CACHE_PORT = toString cfg.port;
NOISEBELL_CACHE_PI_ADDRESS = cfg.piAddress;
NOISEBELL_CACHE_DATA_DIR = cfg.dataDir;
NOISEBELL_CACHE_STATUS_POLL_INTERVAL_SECS = toString cfg.statusPollIntervalSecs;
NOISEBELL_CACHE_OFFLINE_THRESHOLD = toString cfg.offlineThreshold;
NOISEBELL_CACHE_RETRY_ATTEMPTS = toString cfg.retryAttempts;
NOISEBELL_CACHE_RETRY_BASE_DELAY_SECS = toString cfg.retryBaseDelaySecs;
NOISEBELL_CACHE_HTTP_TIMEOUT_SECS = toString cfg.httpTimeoutSecs;
RUST_LOG = "info";
};
script = ''
export NOISEBELL_CACHE_INBOUND_API_KEY="$(cat ${cfg.inboundApiKeyFile})"
export NOISEBELL_CACHE_PI_API_KEY="$(cat ${cfg.piApiKeyFile})"
${webhookExports}
exec ${bin}
'';
serviceConfig = {
Type = "simple";
Restart = "on-failure";
RestartSec = 5;
User = "noisebell-cache";
Group = "noisebell-cache";
StateDirectory = "noisebell-cache";
NoNewPrivileges = true;
ProtectSystem = "strict";
ProtectHome = true;
PrivateTmp = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
RestrictSUIDSGID = true;
ReadWritePaths = [ cfg.dataDir ];
};
};
script = ''
export NOISEBELL_CACHE_INBOUND_API_KEY="$(cat ${cfg.inboundApiKeyFile})"
export NOISEBELL_CACHE_PI_API_KEY="$(cat ${cfg.piApiKeyFile})"
${webhookExports}
exec ${bin}
'';
serviceConfig = {
Type = "simple";
Restart = "on-failure";
RestartSec = 5;
User = "noisebell-cache";
Group = "noisebell-cache";
StateDirectory = "noisebell-cache";
NoNewPrivileges = true;
ProtectSystem = "strict";
ProtectHome = true;
PrivateTmp = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
RestrictSUIDSGID = true;
ReadWritePaths = [ cfg.dataDir ];
};
};
};
}

View file

@ -1,15 +1,16 @@
use std::sync::Arc;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode, header};
use axum::response::IntoResponse;
use axum::http::{header, HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::Json;
use noisebell_common::{validate_bearer, HistoryEntry, WebhookPayload};
use noisebell_common::{validate_bearer, CacheStatusResponse, DoorStatus, WebhookPayload};
use tokio::sync::Mutex;
use tracing::{error, info};
use crate::db;
use crate::types::{DoorStatus, WebhookTarget};
use crate::db::ApplyStateOutcome;
use crate::types::WebhookTarget;
use crate::webhook;
static OPEN_PNG: &[u8] = include_bytes!("../assets/open.png");
@ -46,12 +47,18 @@ pub async fn post_webhook(
return StatusCode::UNAUTHORIZED;
}
// Simple rate limiting: reset tokens every window, reject if exhausted
// Simple rate limiting: reset tokens every window, reject if exhausted.
let now = unix_now();
let last = state.webhook_last_request.load(std::sync::atomic::Ordering::Relaxed);
let last = state
.webhook_last_request
.load(std::sync::atomic::Ordering::Relaxed);
if now.saturating_sub(last) >= WEBHOOK_RATE_WINDOW_SECS {
state.webhook_tokens.store(WEBHOOK_RATE_LIMIT, std::sync::atomic::Ordering::Relaxed);
state.webhook_last_request.store(now, std::sync::atomic::Ordering::Relaxed);
state
.webhook_tokens
.store(WEBHOOK_RATE_LIMIT, std::sync::atomic::Ordering::Relaxed);
state
.webhook_last_request
.store(now, std::sync::atomic::Ordering::Relaxed);
}
let remaining = state.webhook_tokens.fetch_update(
std::sync::atomic::Ordering::Relaxed,
@ -62,43 +69,63 @@ pub async fn post_webhook(
return StatusCode::TOO_MANY_REQUESTS;
}
let Some(status) = DoorStatus::from_str(&body.status) else {
return StatusCode::BAD_REQUEST;
};
let now = unix_now();
let db = state.db.clone();
let status = body.status;
let timestamp = body.timestamp;
let result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::update_state(&conn, status, timestamp, now)
db::apply_state(&conn, status, timestamp, now)
})
.await
.expect("db task panicked");
if let Err(e) = result {
error!(error = %e, "failed to update state from webhook");
return StatusCode::INTERNAL_SERVER_ERROR;
match result {
Ok(ApplyStateOutcome::Applied) => {
info!(
status = %status,
timestamp = body.timestamp,
"state updated via webhook"
);
webhook::forward(
&state.client,
&state.webhooks,
&WebhookPayload {
status,
timestamp: body.timestamp,
},
state.retry_attempts,
state.retry_base_delay_secs,
)
.await;
}
Ok(ApplyStateOutcome::Duplicate) => {
info!(
status = %status,
timestamp = body.timestamp,
"duplicate webhook ignored"
);
}
Ok(ApplyStateOutcome::Stale) => {
info!(
status = %status,
timestamp = body.timestamp,
"stale webhook ignored"
);
}
Err(e) => {
error!(error = %e, "failed to update state from webhook");
return StatusCode::INTERNAL_SERVER_ERROR;
}
}
info!(status = status.as_str(), timestamp = body.timestamp, "state updated via webhook");
webhook::forward(
&state.client,
&state.webhooks,
&WebhookPayload {
status: status.as_str().to_string(),
timestamp: body.timestamp,
},
state.retry_attempts,
state.retry_base_delay_secs,
)
.await;
StatusCode::OK
}
pub async fn get_status(State(state): State<Arc<AppState>>) -> Result<Json<serde_json::Value>, StatusCode> {
pub async fn get_status(
State(state): State<Arc<AppState>>,
) -> Result<Json<CacheStatusResponse>, StatusCode> {
let db = state.db.clone();
let status = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
@ -111,125 +138,70 @@ pub async fn get_status(State(state): State<Arc<AppState>>) -> Result<Json<serde
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::to_value(status).unwrap()))
}
pub async fn get_info(State(state): State<Arc<AppState>>) -> Result<Json<serde_json::Value>, StatusCode> {
let db = state.db.clone();
let info = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_pi_info(&conn)
})
.await
.expect("db task panicked")
.map_err(|e| {
error!(error = %e, "failed to get pi info");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(info))
Ok(Json(status))
}
pub async fn health() -> StatusCode {
StatusCode::OK
}
pub async fn get_history(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<HistoryEntry>>, StatusCode> {
let limit = 100u32;
let db = state.db.clone();
let entries = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_history(&conn, limit)
})
.await
.expect("db task panicked")
.map_err(|e| {
error!(error = %e, "failed to get history");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(entries))
}
pub async fn get_image_open() -> impl IntoResponse {
([(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")], OPEN_PNG)
(
[
(header::CONTENT_TYPE, "image/png"),
(header::CACHE_CONTROL, "public, max-age=86400"),
],
OPEN_PNG,
)
}
pub async fn get_image_closed() -> impl IntoResponse {
([(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")], CLOSED_PNG)
(
[
(header::CONTENT_TYPE, "image/png"),
(header::CACHE_CONTROL, "public, max-age=86400"),
],
CLOSED_PNG,
)
}
pub async fn get_image_offline() -> impl IntoResponse {
([(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")], OFFLINE_PNG)
(
[
(header::CONTENT_TYPE, "image/png"),
(header::CACHE_CONTROL, "public, max-age=86400"),
],
OFFLINE_PNG,
)
}
pub async fn get_image(State(state): State<Arc<AppState>>) -> impl IntoResponse {
pub async fn get_image(State(state): State<Arc<AppState>>) -> Response {
let db = state.db.clone();
let status = tokio::task::spawn_blocking(move || {
let status = match tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_current_status(&conn)
})
.await
.expect("db task panicked")
.unwrap_or(DoorStatus::Offline);
{
Ok(status) => status,
Err(e) => {
error!(error = %e, "failed to get current status for image");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
let image = match status {
DoorStatus::Open => OPEN_PNG,
DoorStatus::Closed => CLOSED_PNG,
DoorStatus::Offline => OFFLINE_PNG,
};
([(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=5")], image)
}
pub async fn get_badge(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let db = state.db.clone();
let status = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_current_status(&conn)
})
.await
.expect("db task panicked")
.unwrap_or(DoorStatus::Offline);
let (label, color) = match status {
DoorStatus::Open => ("open", "#57f287"),
DoorStatus::Closed => ("closed", "#ed4245"),
DoorStatus::Offline => ("offline", "#99aab5"),
};
let label_width = 70u32;
let value_width = 10 + label.len() as u32 * 7;
let total_width = label_width + value_width;
let label_x = label_width as f32 / 2.0;
let value_x = label_width as f32 + value_width as f32 / 2.0;
let svg = format!(
"<svg xmlns=\"http://www.w3.org/2000/svg\" width=\"{total_width}\" height=\"20\">\
<linearGradient id=\"s\" x2=\"0\" y2=\"100%\">\
<stop offset=\"0\" stop-color=\"#bbb\" stop-opacity=\".1\"/>\
<stop offset=\"1\" stop-opacity=\".1\"/>\
</linearGradient>\
<clipPath id=\"r\"><rect width=\"{total_width}\" height=\"20\" rx=\"3\" fill=\"#fff\"/></clipPath>\
<g clip-path=\"url(#r)\">\
<rect width=\"{label_width}\" height=\"20\" fill=\"#555\"/>\
<rect x=\"{label_width}\" width=\"{value_width}\" height=\"20\" fill=\"{color}\"/>\
<rect width=\"{total_width}\" height=\"20\" fill=\"url(#s)\"/>\
</g>\
<g fill=\"#fff\" text-anchor=\"middle\" font-family=\"Verdana,Geneva,sans-serif\" font-size=\"11\">\
<text x=\"{label_x}\" y=\"15\" fill=\"#010101\" fill-opacity=\".3\">noisebell</text>\
<text x=\"{label_x}\" y=\"14\">noisebell</text>\
<text x=\"{value_x}\" y=\"15\" fill=\"#010101\" fill-opacity=\".3\">{label}</text>\
<text x=\"{value_x}\" y=\"14\">{label}</text>\
</g></svg>"
);
(
[
(header::CONTENT_TYPE, "image/svg+xml"),
(header::CACHE_CONTROL, "no-cache, max-age=0"),
(header::CONTENT_TYPE, "image/png"),
(header::CACHE_CONTROL, "public, max-age=5"),
],
svg,
image,
)
.into_response()
}

View file

@ -1,7 +1,72 @@
use anyhow::{Context, Result};
use rusqlite::Connection;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::types::{DoorStatus, StatusResponse};
use anyhow::{Context, Result};
use noisebell_common::{CacheStatusResponse, DoorStatus};
use rusqlite::{Connection, OptionalExtension};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LiveDoorStatus {
Open,
Closed,
}
impl LiveDoorStatus {
const fn into_door_status(self) -> DoorStatus {
match self {
Self::Open => DoorStatus::Open,
Self::Closed => DoorStatus::Closed,
}
}
}
impl TryFrom<DoorStatus> for LiveDoorStatus {
type Error = &'static str;
fn try_from(value: DoorStatus) -> std::result::Result<Self, Self::Error> {
match value {
DoorStatus::Open => Ok(Self::Open),
DoorStatus::Closed => Ok(Self::Closed),
DoorStatus::Offline => Err("offline is not a live door state"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CachedState {
Unknown,
Live { status: LiveDoorStatus, since: u64 },
Offline { since: u64 },
}
impl CachedState {
const fn status_for_api(self) -> DoorStatus {
match self {
Self::Unknown | Self::Offline { .. } => DoorStatus::Offline,
Self::Live { status, .. } => status.into_door_status(),
}
}
const fn since_for_api(self) -> Option<u64> {
match self {
Self::Unknown => None,
Self::Live { since, .. } | Self::Offline { since } => Some(since),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CurrentStateRow {
state: CachedState,
last_seen: Option<u64>,
last_checked: Option<u64>,
}
fn parse_status(status: &str, location: &str) -> Result<DoorStatus> {
status
.parse()
.with_context(|| format!("invalid door status {status:?} in {location}"))
}
pub fn init(path: &str) -> Result<Connection> {
let conn = Connection::open(path).context("failed to open SQLite database")?;
@ -10,69 +75,173 @@ pub fn init(path: &str) -> Result<Connection> {
"
CREATE TABLE IF NOT EXISTS current_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
status TEXT NOT NULL DEFAULT 'offline',
status TEXT NOT NULL DEFAULT 'offline' CHECK (status IN ('open', 'closed', 'offline')),
timestamp INTEGER,
last_seen INTEGER,
last_checked INTEGER
);
CREATE TABLE IF NOT EXISTS state_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL,
recorded_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS pi_info (
id INTEGER PRIMARY KEY CHECK (id = 1),
data TEXT NOT NULL,
fetched_at INTEGER NOT NULL
);
INSERT OR IGNORE INTO current_state (id) VALUES (1);
INSERT OR IGNORE INTO pi_info (id, data, fetched_at) VALUES (1, '{}', 0);
",
)
.context("failed to initialize database schema")?;
migrate_current_state(&conn)?;
Ok(conn)
}
pub fn get_status(conn: &Connection) -> Result<StatusResponse> {
let (status_str, timestamp, last_checked) = conn.query_row(
"SELECT status, timestamp, last_checked FROM current_state WHERE id = 1",
fn current_state_has_column(conn: &Connection, column: &str) -> Result<bool> {
let mut stmt = conn.prepare("PRAGMA table_info(current_state)")?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let name: String = row.get(1)?;
if name == column {
return Ok(true);
}
}
Ok(false)
}
fn migrate_current_state(conn: &Connection) -> Result<()> {
if !current_state_has_column(conn, "last_checked")? {
conn.execute(
"ALTER TABLE current_state ADD COLUMN last_checked INTEGER",
[],
)
.context("failed to add current_state.last_checked")?;
}
conn.execute(
"UPDATE current_state SET status = 'offline' WHERE status IS NULL",
[],
)
.context("failed to backfill NULL current_state.status")?;
validate_status_values(conn)?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyStateOutcome {
Applied,
Duplicate,
Stale,
}
fn validate_status_column(conn: &Connection, table: &str) -> Result<()> {
let query = format!(
"SELECT status FROM {table} WHERE status IS NULL OR status NOT IN ('open', 'closed', 'offline') LIMIT 1"
);
let invalid: Option<Option<String>> = conn
.query_row(&query, [], |row| row.get(0))
.optional()
.context(format!("failed to validate {table}.status"))?;
if let Some(status) = invalid {
match status {
Some(status) => anyhow::bail!("invalid door status {status:?} in {table}.status"),
None => anyhow::bail!("invalid NULL door status in {table}.status"),
}
}
Ok(())
}
fn validate_status_values(conn: &Connection) -> Result<()> {
validate_status_column(conn, "current_state")?;
Ok(())
}
fn current_state_row(conn: &Connection) -> Result<CurrentStateRow> {
let (status_str, since, last_seen, last_checked) = conn.query_row(
"SELECT status, timestamp, last_seen, last_checked FROM current_state WHERE id = 1",
[],
|row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, String>(0)?,
row.get::<_, Option<u64>>(1)?,
row.get::<_, Option<u64>>(2)?,
row.get::<_, Option<u64>>(3)?,
))
},
)?;
let status = status_str
.as_deref()
.and_then(DoorStatus::from_str)
.unwrap_or(DoorStatus::Offline);
Ok(StatusResponse {
status,
since: timestamp,
let status = parse_status(&status_str, "current_state.status")?;
let state = match (status, since) {
(DoorStatus::Open, Some(since)) => CachedState::Live {
status: LiveDoorStatus::Open,
since,
},
(DoorStatus::Closed, Some(since)) => CachedState::Live {
status: LiveDoorStatus::Closed,
since,
},
(DoorStatus::Offline, Some(since)) => CachedState::Offline { since },
(DoorStatus::Offline, None) => CachedState::Unknown,
(DoorStatus::Open | DoorStatus::Closed, None) => {
anyhow::bail!("live current_state.status must have a timestamp")
}
};
Ok(CurrentStateRow {
state,
last_seen,
last_checked,
})
}
pub fn update_state(conn: &Connection, status: DoorStatus, timestamp: u64, now: u64) -> Result<()> {
pub fn get_status(conn: &Connection) -> Result<CacheStatusResponse> {
let row = current_state_row(conn)?;
Ok(CacheStatusResponse {
status: row.state.status_for_api(),
since: row.state.since_for_api(),
last_checked: row.last_checked,
})
}
fn write_state_change(
conn: &Connection,
status: DoorStatus,
timestamp: u64,
now: u64,
) -> Result<()> {
let status_str = status.as_str();
conn.execute(
"UPDATE current_state SET status = ?1, timestamp = ?2, last_seen = ?3 WHERE id = 1",
rusqlite::params![status_str, now, now],
)?;
conn.execute(
"INSERT INTO state_log (status, timestamp, recorded_at) VALUES (?1, ?2, ?3)",
rusqlite::params![status_str, timestamp, now],
)?;
Ok(())
}
pub fn apply_state(
conn: &Connection,
status: DoorStatus,
timestamp: u64,
now: u64,
) -> Result<ApplyStateOutcome> {
let current = current_state_row(conn)?;
let live_status = LiveDoorStatus::try_from(status).map_err(anyhow::Error::msg)?;
let outcome = match current.state {
CachedState::Unknown => ApplyStateOutcome::Applied,
CachedState::Offline { since } if timestamp < since => ApplyStateOutcome::Stale,
CachedState::Offline { .. } => ApplyStateOutcome::Applied,
CachedState::Live { status: _, since } if timestamp < since => ApplyStateOutcome::Stale,
CachedState::Live {
status: current_status,
since,
} if timestamp == since && live_status == current_status => ApplyStateOutcome::Duplicate,
CachedState::Live { .. } => ApplyStateOutcome::Applied,
};
match outcome {
ApplyStateOutcome::Applied => write_state_change(conn, status, timestamp, now)?,
ApplyStateOutcome::Duplicate | ApplyStateOutcome::Stale => update_last_seen(conn, now)?,
}
Ok(outcome)
}
pub fn update_last_seen(conn: &Connection, now: u64) -> Result<()> {
conn.execute(
"UPDATE current_state SET last_seen = ?1 WHERE id = 1",
@ -90,62 +259,16 @@ pub fn update_last_checked(conn: &Connection, now: u64) -> Result<()> {
}
pub fn mark_offline(conn: &Connection, now: u64) -> Result<()> {
let offline = DoorStatus::Offline.as_str();
conn.execute(
"UPDATE current_state SET status = 'offline', timestamp = ?1 WHERE id = 1",
rusqlite::params![now],
)?;
conn.execute(
"INSERT INTO state_log (status, timestamp, recorded_at) VALUES ('offline', ?1, ?1)",
rusqlite::params![now],
"UPDATE current_state SET status = ?1, timestamp = ?2 WHERE id = 1",
rusqlite::params![offline, now],
)?;
Ok(())
}
pub fn get_current_status(conn: &Connection) -> Result<DoorStatus> {
let status_str: Option<String> = conn.query_row(
"SELECT status FROM current_state WHERE id = 1",
[],
|row| row.get(0),
)?;
Ok(status_str
.as_deref()
.and_then(DoorStatus::from_str)
.unwrap_or(DoorStatus::Offline))
}
pub fn get_history(conn: &Connection, limit: u32) -> Result<Vec<noisebell_common::HistoryEntry>> {
let mut stmt = conn.prepare(
"SELECT id, status, timestamp, recorded_at FROM state_log ORDER BY id DESC LIMIT ?1",
)?;
let entries = stmt
.query_map(rusqlite::params![limit], |row| {
Ok(noisebell_common::HistoryEntry {
id: row.get(0)?,
status: row.get(1)?,
timestamp: row.get(2)?,
recorded_at: row.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(entries)
}
pub fn get_pi_info(conn: &Connection) -> Result<serde_json::Value> {
let data: String = conn.query_row(
"SELECT data FROM pi_info WHERE id = 1",
[],
|row| row.get(0),
)?;
Ok(serde_json::from_str(&data).unwrap_or(serde_json::json!({})))
}
pub fn update_pi_info(conn: &Connection, data: &serde_json::Value, now: u64) -> Result<()> {
let json = serde_json::to_string(data)?;
conn.execute(
"INSERT OR REPLACE INTO pi_info (id, data, fetched_at) VALUES (1, ?1, ?2)",
rusqlite::params![json, now],
)?;
Ok(())
Ok(current_state_row(conn)?.state.status_for_api())
}
#[cfg(test)]
@ -156,6 +279,31 @@ mod tests {
init(":memory:").expect("failed to init test db")
}
fn temp_db_path(label: &str) -> std::path::PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
std::env::temp_dir().join(format!("noisebell-{label}-{nanos}.sqlite"))
}
fn create_legacy_db(path: &Path) {
let conn = Connection::open(path).unwrap();
conn.execute_batch(
"
CREATE TABLE current_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
status TEXT,
timestamp INTEGER,
last_seen INTEGER
);
INSERT INTO current_state (id, status, timestamp, last_seen) VALUES (1, NULL, NULL, NULL);
",
)
.unwrap();
}
#[test]
fn initial_status_is_offline() {
let conn = test_db();
@ -166,19 +314,20 @@ mod tests {
}
#[test]
fn update_state_changes_status() {
fn apply_state_changes_status_and_preserves_event_timestamp() {
let conn = test_db();
update_state(&conn, DoorStatus::Open, 1000, 1001).unwrap();
let outcome = apply_state(&conn, DoorStatus::Open, 1000, 1001).unwrap();
assert_eq!(outcome, ApplyStateOutcome::Applied);
let status = get_status(&conn).unwrap();
assert_eq!(status.status, DoorStatus::Open);
assert_eq!(status.since, Some(1001));
assert_eq!(status.since, Some(1000));
}
#[test]
fn mark_offline_sets_offline_status() {
let conn = test_db();
update_state(&conn, DoorStatus::Open, 1000, 1001).unwrap();
apply_state(&conn, DoorStatus::Open, 1000, 1001).unwrap();
mark_offline(&conn, 2000).unwrap();
let status = get_status(&conn).unwrap();
@ -191,7 +340,7 @@ mod tests {
let conn = test_db();
assert_eq!(get_current_status(&conn).unwrap(), DoorStatus::Offline);
update_state(&conn, DoorStatus::Closed, 1000, 1001).unwrap();
apply_state(&conn, DoorStatus::Closed, 1000, 1001).unwrap();
assert_eq!(get_current_status(&conn).unwrap(), DoorStatus::Closed);
}
@ -204,23 +353,9 @@ mod tests {
assert_eq!(status.last_checked, Some(5000));
}
#[test]
fn history_records_state_changes() {
let conn = test_db();
update_state(&conn, DoorStatus::Open, 1000, 1001).unwrap();
update_state(&conn, DoorStatus::Closed, 2000, 2001).unwrap();
mark_offline(&conn, 3000).unwrap();
let history = get_history(&conn, 10).unwrap();
assert_eq!(history.len(), 3);
assert_eq!(history[0].status, "offline");
assert_eq!(history[1].status, "closed");
assert_eq!(history[2].status, "open");
}
#[test]
fn status_response_serializes_correctly() {
let resp = StatusResponse {
let resp = CacheStatusResponse {
status: DoorStatus::Open,
since: Some(1234),
last_checked: Some(5678),
@ -232,20 +367,110 @@ mod tests {
}
#[test]
fn null_status_migration_converts_to_offline() {
// Simulate an old database with NULL status
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch("
CREATE TABLE current_state (id INTEGER PRIMARY KEY CHECK (id = 1), status TEXT, timestamp INTEGER, last_seen INTEGER);
INSERT INTO current_state (id, status, timestamp, last_seen) VALUES (1, NULL, NULL, NULL);
CREATE TABLE state_log (id INTEGER PRIMARY KEY AUTOINCREMENT, status TEXT NOT NULL, timestamp INTEGER NOT NULL, recorded_at INTEGER NOT NULL);
CREATE TABLE pi_info (id INTEGER PRIMARY KEY CHECK (id = 1), data TEXT NOT NULL, fetched_at INTEGER NOT NULL);
INSERT INTO pi_info (id, data, fetched_at) VALUES (1, '{}', 0);
").unwrap();
fn apply_state_deduplicates_same_event() {
let conn = test_db();
assert_eq!(
apply_state(&conn, DoorStatus::Open, 1000, 1001).unwrap(),
ApplyStateOutcome::Applied
);
assert_eq!(
apply_state(&conn, DoorStatus::Open, 1000, 1002).unwrap(),
ApplyStateOutcome::Duplicate
);
let status = get_status(&conn).unwrap();
assert_eq!(status.status, DoorStatus::Open);
assert_eq!(status.since, Some(1000));
}
#[test]
fn apply_state_ignores_stale_events() {
let conn = test_db();
assert_eq!(
apply_state(&conn, DoorStatus::Open, 2000, 2001).unwrap(),
ApplyStateOutcome::Applied
);
assert_eq!(
apply_state(&conn, DoorStatus::Closed, 1999, 2002).unwrap(),
ApplyStateOutcome::Stale
);
// Re-init should migrate
let conn = init(":memory:").unwrap();
let status = get_current_status(&conn).unwrap();
assert_eq!(status, DoorStatus::Offline);
assert_eq!(status, DoorStatus::Open);
}
#[test]
fn apply_state_accepts_newer_same_status_event() {
let conn = test_db();
assert_eq!(
apply_state(&conn, DoorStatus::Open, 1000, 1001).unwrap(),
ApplyStateOutcome::Applied
);
assert_eq!(
apply_state(&conn, DoorStatus::Open, 2000, 2001).unwrap(),
ApplyStateOutcome::Applied
);
let status = get_status(&conn).unwrap();
assert_eq!(status.status, DoorStatus::Open);
assert_eq!(status.since, Some(2000));
}
#[test]
fn apply_state_after_offline_recovers_with_event_timestamp() {
let conn = test_db();
mark_offline(&conn, 3000).unwrap();
assert_eq!(
apply_state(&conn, DoorStatus::Open, 2500, 3100).unwrap(),
ApplyStateOutcome::Stale
);
assert_eq!(
apply_state(&conn, DoorStatus::Open, 3200, 3201).unwrap(),
ApplyStateOutcome::Applied
);
let status = get_status(&conn).unwrap();
assert_eq!(status.status, DoorStatus::Open);
assert_eq!(status.since, Some(3200));
}
#[test]
fn legacy_db_is_migrated_in_place() {
let path = temp_db_path("legacy-migration");
create_legacy_db(&path);
let conn = init(path.to_str().unwrap()).unwrap();
assert!(current_state_has_column(&conn, "last_checked").unwrap());
let status = get_status(&conn).unwrap();
assert_eq!(status.status, DoorStatus::Offline);
assert_eq!(status.since, None);
assert_eq!(status.last_checked, None);
drop(conn);
std::fs::remove_file(path).unwrap();
}
#[test]
fn invalid_legacy_status_is_rejected() {
let path = temp_db_path("legacy-invalid-status");
create_legacy_db(&path);
let conn = Connection::open(&path).unwrap();
conn.execute(
"UPDATE current_state SET status = 'mystery' WHERE id = 1",
[],
)
.unwrap();
drop(conn);
let err = init(path.to_str().unwrap()).unwrap_err().to_string();
assert!(err.contains("invalid door status"));
std::fs::remove_file(path).unwrap();
}
}

View file

@ -4,8 +4,8 @@ use std::time::Duration;
use anyhow::{Context, Result};
use axum::routing::{get, post};
use axum::Router;
use tokio::sync::Mutex;
use std::sync::atomic::AtomicU64;
use tokio::sync::Mutex;
use tower_http::trace::TraceLayer;
use tracing::{info, Level};
@ -15,7 +15,7 @@ mod poller;
mod types;
mod webhook;
use types::WebhookTarget;
use types::{WebhookAuth, WebhookTarget};
#[tokio::main]
async fn main() -> Result<()> {
@ -39,19 +39,14 @@ async fn main() -> Result<()> {
let inbound_api_key = std::env::var("NOISEBELL_CACHE_INBOUND_API_KEY")
.context("NOISEBELL_CACHE_INBOUND_API_KEY is required")?;
let data_dir =
std::env::var("NOISEBELL_CACHE_DATA_DIR").unwrap_or_else(|_| "/var/lib/noisebell-cache".into());
let data_dir = std::env::var("NOISEBELL_CACHE_DATA_DIR")
.unwrap_or_else(|_| "/var/lib/noisebell-cache".into());
let status_poll_interval_secs: u64 = std::env::var("NOISEBELL_CACHE_STATUS_POLL_INTERVAL_SECS")
.unwrap_or_else(|_| "60".into())
.parse()
.context("NOISEBELL_CACHE_STATUS_POLL_INTERVAL_SECS must be a valid u64")?;
let info_poll_interval_secs: u64 = std::env::var("NOISEBELL_CACHE_INFO_POLL_INTERVAL_SECS")
.unwrap_or_else(|_| "300".into())
.parse()
.context("NOISEBELL_CACHE_INFO_POLL_INTERVAL_SECS must be a valid u64")?;
let offline_threshold: u32 = std::env::var("NOISEBELL_CACHE_OFFLINE_THRESHOLD")
.unwrap_or_else(|_| "3".into())
.parse()
@ -79,8 +74,11 @@ async fn main() -> Result<()> {
match std::env::var(&url_key) {
Ok(url) => {
let secret_key = format!("NOISEBELL_CACHE_WEBHOOK_{i}_SECRET");
let secret = std::env::var(&secret_key).ok();
webhooks.push(WebhookTarget { url, secret });
let auth = match std::env::var(&secret_key) {
Ok(secret) => WebhookAuth::Bearer(secret),
Err(_) => WebhookAuth::None,
};
webhooks.push(WebhookTarget { url, auth });
}
Err(_) => break,
}
@ -106,7 +104,6 @@ async fn main() -> Result<()> {
pi_address,
pi_api_key,
status_poll_interval: Duration::from_secs(status_poll_interval_secs),
info_poll_interval: Duration::from_secs(info_poll_interval_secs),
offline_threshold,
retry_attempts,
retry_base_delay_secs,
@ -114,7 +111,6 @@ async fn main() -> Result<()> {
});
poller::spawn_status_poller(poller_config.clone(), db.clone(), client.clone());
poller::spawn_info_poller(poller_config, db.clone(), client.clone());
let app_state = Arc::new(api::AppState {
db,
@ -131,13 +127,10 @@ async fn main() -> Result<()> {
.route("/health", get(api::health))
.route("/webhook", post(api::post_webhook))
.route("/status", get(api::get_status))
.route("/info", get(api::get_info))
.route("/history", get(api::get_history))
.route("/image", get(api::get_image))
.route("/image/open.png", get(api::get_image_open))
.route("/image/closed.png", get(api::get_image_closed))
.route("/image/offline.png", get(api::get_image_offline))
.route("/badge.svg", get(api::get_badge))
.layer(
TraceLayer::new_for_http()
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))

View file

@ -1,19 +1,19 @@
use std::sync::Arc;
use std::time::Duration;
use noisebell_common::WebhookPayload;
use noisebell_common::{DoorStatus, PiStatusResponse, WebhookPayload};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use crate::db;
use crate::types::{DoorStatus, WebhookTarget};
use crate::db::ApplyStateOutcome;
use crate::types::WebhookTarget;
use crate::webhook;
pub struct PollerConfig {
pub pi_address: String,
pub pi_api_key: String,
pub status_poll_interval: Duration,
pub info_poll_interval: Duration,
pub offline_threshold: u32,
pub retry_attempts: u32,
pub retry_base_delay_secs: u64,
@ -64,53 +64,75 @@ pub fn spawn_status_poller(
}
let now = unix_now();
if let Ok(body) = resp.json::<serde_json::Value>().await {
let status_str = body.get("status").and_then(|s| s.as_str()).map(String::from);
let event_timestamp = body.get("timestamp").and_then(|t| t.as_u64());
match resp.json::<PiStatusResponse>().await {
Ok(body) => {
let status = body.status;
let event_timestamp = body.timestamp;
let db = db.clone();
let update_result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
if let Err(e) = db::update_last_seen(&conn, now) {
error!(error = %e, "failed to update last_seen");
}
let db = db.clone();
let update_result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
if let Err(e) = db::update_last_seen(&conn, now) {
error!(error = %e, "failed to update last_seen");
}
if let Some(ref status_str) = status_str {
if let Some(status) = DoorStatus::from_str(status_str) {
let current = db::get_current_status(&conn);
let changed = match &current {
Ok(current) => *current != status,
Err(_) => true,
};
if changed {
let timestamp = event_timestamp.unwrap_or(now);
if let Err(e) = db::update_state(&conn, status, timestamp, now) {
error!(error = %e, "failed to update state from poll");
return None;
}
return Some((status, timestamp));
match db::apply_state(&conn, status, event_timestamp, now) {
Ok(ApplyStateOutcome::Applied) => {
Some((status, event_timestamp, ApplyStateOutcome::Applied))
}
Ok(ApplyStateOutcome::Duplicate) => Some((
status,
event_timestamp,
ApplyStateOutcome::Duplicate,
)),
Ok(ApplyStateOutcome::Stale) => {
Some((status, event_timestamp, ApplyStateOutcome::Stale))
}
Err(e) => {
error!(error = %e, "failed to update state from poll");
None
}
}
})
.await
.expect("db task panicked");
if let Some((status, timestamp, outcome)) = update_result {
match outcome {
ApplyStateOutcome::Applied => {
info!(
status = %status,
timestamp,
"state updated from poll"
);
webhook::forward(
&client,
&config.webhooks,
&WebhookPayload { status, timestamp },
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
ApplyStateOutcome::Duplicate => {
info!(
status = %status,
timestamp,
"duplicate poll state ignored"
);
}
ApplyStateOutcome::Stale => {
warn!(
status = %status,
timestamp,
"stale poll state ignored"
);
}
}
}
None
})
.await
.expect("db task panicked");
if let Some((status, timestamp)) = update_result {
info!(status = status.as_str(), "state updated from poll");
webhook::forward(
&client,
&config.webhooks,
&WebhookPayload {
status: status.as_str().to_string(),
timestamp,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
Err(e) => {
error!(error = %e, "failed to parse status poll response");
}
}
}
@ -139,12 +161,15 @@ pub fn spawn_status_poller(
match marked {
Ok(()) => {
info!("Pi marked offline after {} consecutive failures", consecutive_failures);
info!(
"Pi marked offline after {} consecutive failures",
consecutive_failures
);
webhook::forward(
&client,
&config.webhooks,
&WebhookPayload {
status: "offline".to_string(),
status: DoorStatus::Offline,
timestamp: now,
},
config.retry_attempts,
@ -164,47 +189,3 @@ pub fn spawn_status_poller(
}
});
}
pub fn spawn_info_poller(
config: Arc<PollerConfig>,
db: Arc<Mutex<rusqlite::Connection>>,
client: reqwest::Client,
) {
tokio::spawn(async move {
loop {
let result = client
.get(format!("{}/info", config.pi_address))
.bearer_auth(&config.pi_api_key)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
if let Ok(data) = resp.json::<serde_json::Value>().await {
let now = unix_now();
let db = db.clone();
let result = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::update_pi_info(&conn, &data, now)
})
.await
.expect("db task panicked");
if let Err(e) = result {
error!(error = %e, "failed to update pi_info");
}
}
}
_ => {
let err_msg = match &result {
Ok(resp) => format!("HTTP {}", resp.status()),
Err(e) => e.to_string(),
};
warn!(error = %err_msg, "info poll failed");
}
}
tokio::time::sleep(config.info_poll_interval).await;
}
});
}

View file

@ -1,72 +1,11 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DoorStatus {
Open,
Closed,
Offline,
}
impl DoorStatus {
pub fn as_str(self) -> &'static str {
match self {
DoorStatus::Open => "open",
DoorStatus::Closed => "closed",
DoorStatus::Offline => "offline",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"open" => Some(DoorStatus::Open),
"closed" => Some(DoorStatus::Closed),
"offline" => Some(DoorStatus::Offline),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct StatusResponse {
pub status: DoorStatus,
pub since: Option<u64>, // when the current status was set
pub last_checked: Option<u64>, // when the cache last attempted to poll
#[derive(Debug, Clone)]
pub enum WebhookAuth {
None,
Bearer(String),
}
#[derive(Debug, Clone)]
pub struct WebhookTarget {
pub url: String,
pub secret: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn door_status_round_trip() {
for status in [DoorStatus::Open, DoorStatus::Closed, DoorStatus::Offline] {
let s = status.as_str();
assert_eq!(DoorStatus::from_str(s), Some(status));
}
}
#[test]
fn door_status_from_str_rejects_unknown() {
assert_eq!(DoorStatus::from_str("unknown"), None);
assert_eq!(DoorStatus::from_str(""), None);
}
#[test]
fn door_status_serde_lowercase() {
let json = serde_json::to_string(&DoorStatus::Open).unwrap();
assert_eq!(json, "\"open\"");
let deserialized: DoorStatus = serde_json::from_str("\"closed\"").unwrap();
assert_eq!(deserialized, DoorStatus::Closed);
let deserialized: DoorStatus = serde_json::from_str("\"offline\"").unwrap();
assert_eq!(deserialized, DoorStatus::Offline);
}
pub auth: WebhookAuth,
}

View file

@ -3,7 +3,7 @@ use std::time::Duration;
use noisebell_common::WebhookPayload;
use tracing::{error, info, warn};
use crate::types::WebhookTarget;
use crate::types::{WebhookAuth, WebhookTarget};
pub async fn forward(
client: &reqwest::Client,
@ -18,14 +18,14 @@ pub async fn forward(
let payload = payload.clone();
let client = client.clone();
let url = target.url.clone();
let secret = target.secret.clone();
let auth = target.auth.clone();
set.spawn(async move {
info!(url = %url, status = %payload.status, "forwarding to outbound webhook");
for attempt in 0..=retry_attempts {
let mut req = client.post(&url).json(&payload);
if let Some(ref secret) = secret {
if let WebhookAuth::Bearer(secret) = &auth {
req = req.bearer_auth(secret);
}