feat: add remote, with rss, cache, discord, and zulip

This commit is contained in:
Jet Pham 2026-03-09 23:08:01 -07:00
parent 50ec63a474
commit 83baab68e0
No known key found for this signature in database
32 changed files with 6615 additions and 40 deletions

1688
remote/cache-service/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,15 @@
[package]
name = "noisebell-cache"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
axum = "0.8"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
rusqlite = { version = "0.33", features = ["bundled"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "sync", "signal", "time"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

64
remote/cache-service/flake.lock generated Normal file
View file

@ -0,0 +1,64 @@
{
"nodes": {
"crane": {
"locked": {
"lastModified": 1773115265,
"narHash": "sha256-5fDkKTYEgue2klksd52WvcXfZdY1EIlbk0QggAwpFog=",
"owner": "ipetkov",
"repo": "crane",
"rev": "27711550d109bf6236478dc9f53b9e29c1a374c5",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1772963539,
"narHash": "sha256-9jVDGZnvCckTGdYT53d/EfznygLskyLQXYwJLKMPsZs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9dcb002ca1690658be4a04645215baea8b95f31d",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"crane": "crane",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1773115373,
"narHash": "sha256-bfK9FJFcQth6f3ydYggS5m0z2NRGF/PY6Y2XgZDJ6pg=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "1924b4672a2b8e4aee6e6652ec2e59a8d3c5648e",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -0,0 +1,45 @@
{
description = "Noisebell - cache service";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
crane.url = "github:ipetkov/crane";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, crane, rust-overlay }:
let
system = "x86_64-linux";
pkgs = import nixpkgs {
inherit system;
overlays = [ rust-overlay.overlays.default ];
};
rustToolchain = pkgs.rust-bin.stable.latest.default;
craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain;
src = craneLib.cleanCargoSource ./.;
commonArgs = {
inherit src;
strictDeps = true;
doCheck = false;
};
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
noisebell-cache = craneLib.buildPackage (commonArgs // {
inherit cargoArtifacts;
});
in
{
packages.${system}.default = noisebell-cache;
devShells.${system}.default = craneLib.devShell {
packages = [ pkgs.rust-analyzer ];
};
};
}

1
remote/cache-service/result Symbolic link
View file

@ -0,0 +1 @@
/nix/store/w4n48lrfqm5vhpyzjfcc9yxhmjr801xh-noisebell-cache-0.1.0

View file

@ -0,0 +1,116 @@
use std::sync::Arc;
use axum::extract::{Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::Json;
use tokio::sync::Mutex;
use tracing::{error, info};
use crate::db;
use crate::types::{DoorStatus, InboundWebhook, OutboundPayload, WebhookTarget};
use crate::webhook;
pub struct AppState {
pub db: Arc<Mutex<rusqlite::Connection>>,
pub client: reqwest::Client,
pub inbound_api_key: String,
pub webhooks: Vec<WebhookTarget>,
pub retry_attempts: u32,
pub retry_base_delay_secs: u64,
}
fn unix_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn validate_bearer(headers: &HeaderMap, expected: &str) -> bool {
headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.map(|v| v.strip_prefix("Bearer ").unwrap_or("") == expected)
.unwrap_or(false)
}
pub async fn post_webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(body): Json<InboundWebhook>,
) -> StatusCode {
if !validate_bearer(&headers, &state.inbound_api_key) {
return StatusCode::UNAUTHORIZED;
}
let Some(status) = DoorStatus::from_str(&body.status) else {
return StatusCode::BAD_REQUEST;
};
let now = unix_now();
{
let conn = state.db.lock().await;
if let Err(e) = db::update_state(&conn, status, body.timestamp, now) {
error!(error = %e, "failed to update state from webhook");
return StatusCode::INTERNAL_SERVER_ERROR;
}
}
info!(status = status.as_str(), timestamp = body.timestamp, "state updated via webhook");
webhook::forward(
&state.client,
&state.webhooks,
&OutboundPayload {
status: status.as_str().to_string(),
timestamp: body.timestamp,
},
state.retry_attempts,
state.retry_base_delay_secs,
)
.await;
StatusCode::OK
}
pub async fn get_status(State(state): State<Arc<AppState>>) -> Result<Json<serde_json::Value>, StatusCode> {
let conn = state.db.lock().await;
match db::get_status(&conn) {
Ok(status) => Ok(Json(serde_json::to_value(status).unwrap())),
Err(e) => {
error!(error = %e, "failed to get status");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn get_info(State(state): State<Arc<AppState>>) -> Result<Json<serde_json::Value>, StatusCode> {
let conn = state.db.lock().await;
match db::get_pi_info(&conn) {
Ok(info) => Ok(Json(info)),
Err(e) => {
error!(error = %e, "failed to get pi info");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
#[derive(serde::Deserialize)]
pub struct HistoryQuery {
pub limit: Option<u32>,
}
pub async fn get_history(
State(state): State<Arc<AppState>>,
Query(query): Query<HistoryQuery>,
) -> Result<Json<Vec<crate::types::HistoryEntry>>, StatusCode> {
let limit = query.limit.unwrap_or(50);
let conn = state.db.lock().await;
match db::get_history(&conn, limit) {
Ok(entries) => Ok(Json(entries)),
Err(e) => {
error!(error = %e, "failed to get history");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

View file

@ -0,0 +1,131 @@
use anyhow::{Context, Result};
use rusqlite::Connection;
use crate::types::{DoorStatus, HistoryEntry, StatusResponse};
pub fn init(path: &str) -> Result<Connection> {
let conn = Connection::open(path).context("failed to open SQLite database")?;
conn.execute_batch(
"
CREATE TABLE IF NOT EXISTS current_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
status TEXT,
timestamp INTEGER,
last_seen INTEGER
);
CREATE TABLE IF NOT EXISTS state_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL,
recorded_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS pi_info (
id INTEGER PRIMARY KEY CHECK (id = 1),
data TEXT NOT NULL,
fetched_at INTEGER NOT NULL
);
INSERT OR IGNORE INTO current_state (id, status, timestamp, last_seen) VALUES (1, NULL, NULL, NULL);
INSERT OR IGNORE INTO pi_info (id, data, fetched_at) VALUES (1, '{}', 0);
",
)
.context("failed to initialize database schema")?;
Ok(conn)
}
pub fn get_status(conn: &Connection) -> Result<StatusResponse> {
let (status, timestamp, last_seen) = conn.query_row(
"SELECT status, timestamp, last_seen FROM current_state WHERE id = 1",
[],
|row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<u64>>(1)?,
row.get::<_, Option<u64>>(2)?,
))
},
)?;
Ok(StatusResponse {
status: status.unwrap_or_else(|| "offline".to_string()),
timestamp,
last_seen,
})
}
pub fn update_state(conn: &Connection, status: DoorStatus, timestamp: u64, now: u64) -> Result<()> {
let status_str = status.as_str();
conn.execute(
"UPDATE current_state SET status = ?1, timestamp = ?2, last_seen = ?3 WHERE id = 1",
rusqlite::params![status_str, timestamp, now],
)?;
conn.execute(
"INSERT INTO state_log (status, timestamp, recorded_at) VALUES (?1, ?2, ?3)",
rusqlite::params![status_str, timestamp, now],
)?;
Ok(())
}
pub fn update_last_seen(conn: &Connection, now: u64) -> Result<()> {
conn.execute(
"UPDATE current_state SET last_seen = ?1 WHERE id = 1",
rusqlite::params![now],
)?;
Ok(())
}
pub fn mark_offline(conn: &Connection, now: u64) -> Result<()> {
conn.execute(
"UPDATE current_state SET status = NULL WHERE id = 1",
[],
)?;
conn.execute(
"INSERT INTO state_log (status, timestamp, recorded_at) VALUES ('offline', ?1, ?1)",
rusqlite::params![now],
)?;
Ok(())
}
pub fn get_current_status_str(conn: &Connection) -> Result<Option<String>> {
let status = conn.query_row(
"SELECT status FROM current_state WHERE id = 1",
[],
|row| row.get::<_, Option<String>>(0),
)?;
Ok(status)
}
pub fn get_history(conn: &Connection, limit: u32) -> Result<Vec<HistoryEntry>> {
let mut stmt = conn.prepare(
"SELECT status, timestamp, recorded_at FROM state_log ORDER BY id DESC LIMIT ?1",
)?;
let entries = stmt
.query_map(rusqlite::params![limit], |row| {
Ok(HistoryEntry {
status: row.get(0)?,
timestamp: row.get(1)?,
recorded_at: row.get(2)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(entries)
}
pub fn get_pi_info(conn: &Connection) -> Result<serde_json::Value> {
let data: String = conn.query_row(
"SELECT data FROM pi_info WHERE id = 1",
[],
|row| row.get(0),
)?;
Ok(serde_json::from_str(&data).unwrap_or(serde_json::json!({})))
}
pub fn update_pi_info(conn: &Connection, data: &serde_json::Value, now: u64) -> Result<()> {
let json = serde_json::to_string(data)?;
conn.execute(
"INSERT OR REPLACE INTO pi_info (id, data, fetched_at) VALUES (1, ?1, ?2)",
rusqlite::params![json, now],
)?;
Ok(())
}

View file

@ -0,0 +1,149 @@
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use axum::routing::{get, post};
use axum::Router;
use tokio::sync::Mutex;
use tracing::info;
mod api;
mod db;
mod poller;
mod types;
mod webhook;
use types::WebhookTarget;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let port: u16 = std::env::var("NOISEBELL_CACHE_PORT")
.unwrap_or_else(|_| "3000".into())
.parse()
.context("NOISEBELL_CACHE_PORT must be a valid u16")?;
let pi_address = std::env::var("NOISEBELL_CACHE_PI_ADDRESS")
.context("NOISEBELL_CACHE_PI_ADDRESS is required")?;
let pi_api_key = std::env::var("NOISEBELL_CACHE_PI_API_KEY")
.context("NOISEBELL_CACHE_PI_API_KEY is required")?;
let inbound_api_key = std::env::var("NOISEBELL_CACHE_INBOUND_API_KEY")
.context("NOISEBELL_CACHE_INBOUND_API_KEY is required")?;
let data_dir =
std::env::var("NOISEBELL_CACHE_DATA_DIR").unwrap_or_else(|_| "/var/lib/noisebell-cache".into());
let status_poll_interval_secs: u64 = std::env::var("NOISEBELL_CACHE_STATUS_POLL_INTERVAL_SECS")
.unwrap_or_else(|_| "60".into())
.parse()
.context("NOISEBELL_CACHE_STATUS_POLL_INTERVAL_SECS must be a valid u64")?;
let info_poll_interval_secs: u64 = std::env::var("NOISEBELL_CACHE_INFO_POLL_INTERVAL_SECS")
.unwrap_or_else(|_| "300".into())
.parse()
.context("NOISEBELL_CACHE_INFO_POLL_INTERVAL_SECS must be a valid u64")?;
let offline_threshold: u32 = std::env::var("NOISEBELL_CACHE_OFFLINE_THRESHOLD")
.unwrap_or_else(|_| "3".into())
.parse()
.context("NOISEBELL_CACHE_OFFLINE_THRESHOLD must be a valid u32")?;
let retry_attempts: u32 = std::env::var("NOISEBELL_CACHE_RETRY_ATTEMPTS")
.unwrap_or_else(|_| "3".into())
.parse()
.context("NOISEBELL_CACHE_RETRY_ATTEMPTS must be a valid u32")?;
let retry_base_delay_secs: u64 = std::env::var("NOISEBELL_CACHE_RETRY_BASE_DELAY_SECS")
.unwrap_or_else(|_| "1".into())
.parse()
.context("NOISEBELL_CACHE_RETRY_BASE_DELAY_SECS must be a valid u64")?;
let http_timeout_secs: u64 = std::env::var("NOISEBELL_CACHE_HTTP_TIMEOUT_SECS")
.unwrap_or_else(|_| "10".into())
.parse()
.context("NOISEBELL_CACHE_HTTP_TIMEOUT_SECS must be a valid u64")?;
// Parse outbound webhooks from NOISEBELL_CACHE_WEBHOOK_<n>_URL and _SECRET env vars
let mut webhooks = Vec::new();
for i in 0.. {
let url_key = format!("NOISEBELL_CACHE_WEBHOOK_{i}_URL");
match std::env::var(&url_key) {
Ok(url) => {
let secret_key = format!("NOISEBELL_CACHE_WEBHOOK_{i}_SECRET");
let secret = std::env::var(&secret_key).ok();
webhooks.push(WebhookTarget { url, secret });
}
Err(_) => break,
}
}
info!(
port,
%pi_address,
webhook_count = webhooks.len(),
"starting noisebell-cache"
);
let db_path = format!("{data_dir}/noisebell.db");
let conn = db::init(&db_path)?;
let db = Arc::new(Mutex::new(conn));
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(http_timeout_secs))
.build()
.context("failed to build HTTP client")?;
let poller_config = Arc::new(poller::PollerConfig {
pi_address,
pi_api_key,
status_poll_interval: Duration::from_secs(status_poll_interval_secs),
info_poll_interval: Duration::from_secs(info_poll_interval_secs),
offline_threshold,
retry_attempts,
retry_base_delay_secs,
http_timeout_secs,
webhooks: webhooks.clone(),
});
poller::spawn_status_poller(poller_config.clone(), db.clone(), client.clone());
poller::spawn_info_poller(poller_config, db.clone(), client.clone());
let app_state = Arc::new(api::AppState {
db,
client,
inbound_api_key,
webhooks,
retry_attempts,
retry_base_delay_secs,
});
let app = Router::new()
.route("/webhook", post(api::post_webhook))
.route("/status", get(api::get_status))
.route("/info", get(api::get_info))
.route("/history", get(api::get_history))
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))
.await
.context(format!("failed to bind to 0.0.0.0:{port}"))?;
info!(port, "listening");
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.context("failed to register SIGTERM handler")?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
sigterm.recv().await;
})
.await
.context("server error")?;
info!("shutdown complete");
Ok(())
}

View file

@ -0,0 +1,174 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use crate::db;
use crate::types::{DoorStatus, OutboundPayload, WebhookTarget};
use crate::webhook;
pub struct PollerConfig {
pub pi_address: String,
pub pi_api_key: String,
pub status_poll_interval: Duration,
pub info_poll_interval: Duration,
pub offline_threshold: u32,
pub retry_attempts: u32,
pub retry_base_delay_secs: u64,
pub http_timeout_secs: u64,
pub webhooks: Vec<WebhookTarget>,
}
fn unix_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
pub fn spawn_status_poller(
config: Arc<PollerConfig>,
db: Arc<Mutex<rusqlite::Connection>>,
client: reqwest::Client,
) {
tokio::spawn(async move {
let mut consecutive_failures: u32 = 0;
let mut was_offline = false;
loop {
tokio::time::sleep(config.status_poll_interval).await;
let result = client
.get(format!("{}/", config.pi_address))
.bearer_auth(&config.pi_api_key)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
consecutive_failures = 0;
if was_offline {
info!("Pi is back online");
was_offline = false;
}
let now = unix_now();
if let Ok(body) = resp.json::<serde_json::Value>().await {
let conn = db.lock().await;
if let Err(e) = db::update_last_seen(&conn, now) {
error!(error = %e, "failed to update last_seen");
}
// Update state if it differs from current
if let Some(status_str) = body.get("status").and_then(|s| s.as_str()) {
if let Some(status) = DoorStatus::from_str(status_str) {
let current = db::get_current_status_str(&conn);
let changed = match &current {
Ok(Some(s)) => s != status.as_str(),
Ok(None) => true, // was offline
Err(_) => true,
};
if changed {
let timestamp = body
.get("timestamp")
.and_then(|t| t.as_u64())
.unwrap_or(now);
if let Err(e) = db::update_state(&conn, status, timestamp, now) {
error!(error = %e, "failed to update state from poll");
} else {
info!(status = status.as_str(), "state updated from poll");
drop(conn);
webhook::forward(
&client,
&config.webhooks,
&OutboundPayload {
status: status.as_str().to_string(),
timestamp,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
}
}
}
}
}
_ => {
consecutive_failures += 1;
let err_msg = match &result {
Ok(resp) => format!("HTTP {}", resp.status()),
Err(e) => e.to_string(),
};
warn!(
error = %err_msg,
consecutive_failures,
"status poll failed"
);
if consecutive_failures >= config.offline_threshold && !was_offline {
was_offline = true;
let now = unix_now();
let conn = db.lock().await;
if let Err(e) = db::mark_offline(&conn, now) {
error!(error = %e, "failed to mark Pi offline");
} else {
info!("Pi marked offline after {} consecutive failures", consecutive_failures);
drop(conn);
webhook::forward(
&client,
&config.webhooks,
&OutboundPayload {
status: "offline".to_string(),
timestamp: now,
},
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
}
}
}
}
});
}
pub fn spawn_info_poller(
config: Arc<PollerConfig>,
db: Arc<Mutex<rusqlite::Connection>>,
client: reqwest::Client,
) {
tokio::spawn(async move {
loop {
tokio::time::sleep(config.info_poll_interval).await;
let result = client
.get(format!("{}/info", config.pi_address))
.bearer_auth(&config.pi_api_key)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
if let Ok(data) = resp.json::<serde_json::Value>().await {
let now = unix_now();
let conn = db.lock().await;
if let Err(e) = db::update_pi_info(&conn, &data, now) {
error!(error = %e, "failed to update pi_info");
}
}
}
_ => {
let err_msg = match &result {
Ok(resp) => format!("HTTP {}", resp.status()),
Err(e) => e.to_string(),
};
warn!(error = %err_msg, "info poll failed");
}
}
}
});
}

View file

@ -0,0 +1,57 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DoorStatus {
Open,
Closed,
}
impl DoorStatus {
pub fn as_str(self) -> &'static str {
match self {
DoorStatus::Open => "open",
DoorStatus::Closed => "closed",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"open" => Some(DoorStatus::Open),
"closed" => Some(DoorStatus::Closed),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct StatusResponse {
pub status: String, // "open", "closed", or "offline"
pub timestamp: Option<u64>,
pub last_seen: Option<u64>,
}
#[derive(Debug, Deserialize)]
pub struct InboundWebhook {
pub status: String,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct OutboundPayload {
pub status: String,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct HistoryEntry {
pub status: String,
pub timestamp: u64,
pub recorded_at: u64,
}
#[derive(Debug, Clone)]
pub struct WebhookTarget {
pub url: String,
pub secret: Option<String>,
}

View file

@ -0,0 +1,53 @@
use std::time::Duration;
use tracing::{error, info, warn};
use crate::types::{OutboundPayload, WebhookTarget};
pub async fn forward(
client: &reqwest::Client,
targets: &[WebhookTarget],
payload: &OutboundPayload,
retry_attempts: u32,
retry_base_delay_secs: u64,
) {
for target in targets {
let payload = payload.clone();
let client = client.clone();
let url = target.url.clone();
let secret = target.secret.clone();
let retry_attempts = retry_attempts;
let retry_base_delay_secs = retry_base_delay_secs;
tokio::spawn(async move {
info!(url = %url, status = %payload.status, "forwarding to outbound webhook");
for attempt in 0..=retry_attempts {
let mut req = client.post(&url).json(&payload);
if let Some(ref secret) = secret {
req = req.bearer_auth(secret);
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
info!(url = %url, "outbound webhook delivered");
return;
}
result => {
let err_msg = match &result {
Ok(resp) => format!("HTTP {}", resp.status()),
Err(e) => e.to_string(),
};
if attempt == retry_attempts {
error!(url = %url, error = %err_msg, "outbound webhook failed after {} attempts", retry_attempts + 1);
} else {
let delay = Duration::from_secs(retry_base_delay_secs * 2u64.pow(attempt));
warn!(url = %url, error = %err_msg, attempt = attempt + 1, "outbound webhook failed, retrying in {:?}", delay);
tokio::time::sleep(delay).await;
}
}
}
}
});
}
}