feat: add noisebell observability

This commit is contained in:
Jet 2026-05-27 20:09:44 -07:00
parent b57927a395
commit e6c1b82679
No known key found for this signature in database
24 changed files with 2289 additions and 137 deletions

View file

@ -17,6 +17,5 @@ rusqlite = { version = "0.33", features = ["bundled"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "sync", "signal", "time"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -12,11 +12,16 @@ If the Pi stops responding to polls (configurable threshold, default 3 misses),
|--------|------|------|-------------|
| `GET` | `/status` | — | Current door status (`status`, `since`, `last_checked`) |
| `GET` | `/badge.svg` | — | Live README badge with Noisebridge logo |
| `GET` | `/metrics` | — | Prometheus metrics, scraped locally by the DO Prometheus |
| `POST` | `/webhook` | Bearer | Inbound webhook from the Pi |
| `GET` | `/health` | — | Health check |
`since` is the Pi-reported time when the current state began. `last_checked` is when the cache most recently attempted a poll.
The public Caddy vhost returns `404` for `/metrics`; Prometheus scrapes the cache directly on localhost. Metrics include the configured Pi target, poll interval, offline threshold, last poll result, last HTTP status, last poll duration, last poll attempt/success/failure timestamps, and failure counters split into HTTP, timeout, connect, request-other, and parse failures.
Regular timer-driven poll data should be debugged from Prometheus and Grafana, not by scanning logs. The cache logs sparse events instead: state changes applied from the Pi, offline/online transitions, first or changed poll failures in a failure streak, stale events, auth/rate-limit rejections, outbound webhook deliveries, retries, and final failures. Successful unchanged polls, badge/image/status reads, and metrics scrapes are intentionally quiet at `INFO`.
## Badge
`/badge.svg` serves a classic shields.io-style SVG badge with the Noisebridge logo and the current cache status (`open`, `closed`, or `offline`).

View file

@ -89,6 +89,7 @@ in
services.caddy.virtualHosts.${cfg.domain}.extraConfig = ''
redir / https://git.extremist.software/jet/noisebell 302
respond /metrics 404
reverse_proxy localhost:${toString cfg.port}
'';

View file

@ -1,3 +1,4 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use axum::extract::State;
@ -6,12 +7,16 @@ use axum::response::{IntoResponse, Response};
use axum::Json;
use chrono::{DateTime, Utc};
use chrono_tz::America::Los_Angeles;
use noisebell_common::{validate_bearer, CacheStatusResponse, DoorStatus, WebhookPayload};
use noisebell_common::{
prometheus_escape_label_value, validate_bearer, CacheStatusResponse, DoorStatus,
WebhookPayload, PROMETHEUS_CONTENT_TYPE,
};
use tokio::sync::Mutex;
use tracing::{error, info};
use tracing::{error, info, warn};
use crate::db;
use crate::db::ApplyStateOutcome;
use crate::metrics::{atomic_seconds_from_millis, atomic_value, CacheMetrics, PollResultKind};
use crate::types::WebhookTarget;
use crate::webhook;
@ -108,6 +113,7 @@ pub struct AppState {
pub retry_base_delay_secs: u64,
pub webhook_last_request: std::sync::atomic::AtomicU64,
pub webhook_tokens: std::sync::atomic::AtomicU32,
pub metrics: Arc<CacheMetrics>,
}
fn unix_now() -> u64 {
@ -186,8 +192,14 @@ pub async fn post_webhook(
Json(body): Json<WebhookPayload>,
) -> StatusCode {
if !validate_bearer(&headers, &state.inbound_api_key) {
warn!(
status = %body.status,
timestamp = body.timestamp,
"unauthorized webhook rejected"
);
return StatusCode::UNAUTHORIZED;
}
state.metrics.webhook_received_total.fetch_add(1, Ordering::Relaxed);
// Simple rate limiting: reset tokens every window, reject if exhausted.
let now = unix_now();
@ -202,6 +214,14 @@ pub async fn post_webhook(
|n| if n > 0 { Some(n - 1) } else { None },
);
if remaining.is_err() {
state.metrics.webhook_rate_limited_total.fetch_add(1, Ordering::Relaxed);
warn!(
status = %body.status,
timestamp = body.timestamp,
limit = WEBHOOK_RATE_LIMIT,
window_seconds = WEBHOOK_RATE_WINDOW_SECS,
"webhook rate limit exceeded"
);
return StatusCode::TOO_MANY_REQUESTS;
}
@ -224,7 +244,7 @@ pub async fn post_webhook(
"state updated via webhook"
);
webhook::forward(
let summary = webhook::forward(
&state.client,
&state.webhooks,
&WebhookPayload { status, timestamp: body.timestamp },
@ -232,16 +252,11 @@ pub async fn post_webhook(
state.retry_base_delay_secs,
)
.await;
state.metrics.add_outbound(summary.delivered, summary.failed);
}
Ok(ApplyStateOutcome::Duplicate) => {
info!(
status = %status,
timestamp = body.timestamp,
"duplicate webhook ignored"
);
}
Ok(ApplyStateOutcome::Duplicate) => {}
Ok(ApplyStateOutcome::Stale) => {
info!(
warn!(
status = %status,
timestamp = body.timestamp,
"stale webhook ignored"
@ -281,6 +296,231 @@ pub async fn health() -> StatusCode {
StatusCode::OK
}
pub async fn get_metrics(State(state): State<Arc<AppState>>) -> Response {
let db = state.db.clone();
let snapshot = match tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::get_metrics_snapshot(&conn)
})
.await
.expect("db task panicked")
{
Ok(snapshot) => snapshot,
Err(e) => {
error!(error = %e, "failed to get metrics snapshot");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
let mut body = String::new();
body.push_str("# HELP noisebell_cache_status Current cached door status.\n");
body.push_str("# TYPE noisebell_cache_status gauge\n");
for status in DoorStatus::ALL {
let value = u8::from(snapshot.status == status);
let status = prometheus_escape_label_value(status.as_str());
body.push_str(&format!("noisebell_cache_status{{status=\"{status}\"}} {value}\n"));
}
body.push_str("# HELP noisebell_cache_status_since_timestamp_seconds Unix timestamp for when the current cache state began.\n");
body.push_str("# TYPE noisebell_cache_status_since_timestamp_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_status_since_timestamp_seconds {}\n",
snapshot.since.unwrap_or(0)
));
body.push_str("# HELP noisebell_cache_last_seen_timestamp_seconds Unix timestamp for the last successful Pi state update.\n");
body.push_str("# TYPE noisebell_cache_last_seen_timestamp_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_last_seen_timestamp_seconds {}\n",
snapshot.last_seen.unwrap_or(0)
));
body.push_str("# HELP noisebell_cache_last_checked_timestamp_seconds Unix timestamp for the last Pi poll attempt.\n");
body.push_str("# TYPE noisebell_cache_last_checked_timestamp_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_last_checked_timestamp_seconds {}\n",
snapshot.last_checked.unwrap_or(0)
));
body.push_str("# HELP noisebell_cache_process_start_time_seconds Unix timestamp when the cache service started.\n");
body.push_str("# TYPE noisebell_cache_process_start_time_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_process_start_time_seconds {}\n",
state.metrics.process_start_time
));
let pi_address = prometheus_escape_label_value(&state.metrics.pi_address);
body.push_str("# HELP noisebell_cache_pi_target_info Configured Pi polling target.\n");
body.push_str("# TYPE noisebell_cache_pi_target_info gauge\n");
body.push_str(&format!("noisebell_cache_pi_target_info{{address=\"{pi_address}\"}} 1\n"));
body.push_str("# HELP noisebell_cache_poll_interval_seconds Configured Pi poll interval.\n");
body.push_str("# TYPE noisebell_cache_poll_interval_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_interval_seconds {}\n",
state.metrics.poll_interval_secs
));
body.push_str("# HELP noisebell_cache_poll_offline_threshold Configured consecutive failure threshold before marking the Pi offline.\n");
body.push_str("# TYPE noisebell_cache_poll_offline_threshold gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_offline_threshold {}\n",
state.metrics.offline_threshold
));
body.push_str("# HELP noisebell_cache_retry_attempts Configured outbound retry attempts.\n");
body.push_str("# TYPE noisebell_cache_retry_attempts gauge\n");
body.push_str(&format!("noisebell_cache_retry_attempts {}\n", state.metrics.retry_attempts));
body.push_str("# HELP noisebell_cache_http_timeout_seconds Configured HTTP timeout for cache HTTP clients.\n");
body.push_str("# TYPE noisebell_cache_http_timeout_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_http_timeout_seconds {}\n",
state.metrics.http_timeout_secs
));
body.push_str(
"# HELP noisebell_cache_webhook_received_total Authenticated inbound webhooks received.\n",
);
body.push_str("# TYPE noisebell_cache_webhook_received_total counter\n");
body.push_str(&format!(
"noisebell_cache_webhook_received_total {}\n",
atomic_value(&state.metrics.webhook_received_total)
));
body.push_str("# HELP noisebell_cache_webhook_rate_limited_total Inbound webhooks rejected by rate limiting.\n");
body.push_str("# TYPE noisebell_cache_webhook_rate_limited_total counter\n");
body.push_str(&format!(
"noisebell_cache_webhook_rate_limited_total {}\n",
atomic_value(&state.metrics.webhook_rate_limited_total)
));
body.push_str("# HELP noisebell_cache_outbound_webhook_success_total Outbound webhook deliveries that succeeded.\n");
body.push_str("# TYPE noisebell_cache_outbound_webhook_success_total counter\n");
body.push_str(&format!(
"noisebell_cache_outbound_webhook_success_total {}\n",
atomic_value(&state.metrics.outbound_success_total)
));
body.push_str("# HELP noisebell_cache_outbound_webhook_failure_total Outbound webhook deliveries that failed after retries.\n");
body.push_str("# TYPE noisebell_cache_outbound_webhook_failure_total counter\n");
body.push_str(&format!(
"noisebell_cache_outbound_webhook_failure_total {}\n",
atomic_value(&state.metrics.outbound_failure_total)
));
body.push_str("# HELP noisebell_cache_poll_success_total Successful Pi status polls.\n");
body.push_str("# TYPE noisebell_cache_poll_success_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_success_total {}\n",
atomic_value(&state.metrics.poll_success_total)
));
body.push_str("# HELP noisebell_cache_poll_attempt_total Pi status poll attempts.\n");
body.push_str("# TYPE noisebell_cache_poll_attempt_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_attempt_total {}\n",
atomic_value(&state.metrics.poll_attempt_total)
));
body.push_str("# HELP noisebell_cache_poll_failure_total Failed Pi status polls.\n");
body.push_str("# TYPE noisebell_cache_poll_failure_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_failure_total {}\n",
atomic_value(&state.metrics.poll_failure_total)
));
body.push_str("# HELP noisebell_cache_poll_http_error_total Pi poll responses with non-success HTTP status.\n");
body.push_str("# TYPE noisebell_cache_poll_http_error_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_http_error_total {}\n",
atomic_value(&state.metrics.poll_http_error_total)
));
body.push_str(
"# HELP noisebell_cache_poll_request_timeout_total Pi poll request timeout failures.\n",
);
body.push_str("# TYPE noisebell_cache_poll_request_timeout_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_request_timeout_total {}\n",
atomic_value(&state.metrics.poll_request_timeout_total)
));
body.push_str(
"# HELP noisebell_cache_poll_request_connect_total Pi poll connection failures.\n",
);
body.push_str("# TYPE noisebell_cache_poll_request_connect_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_request_connect_total {}\n",
atomic_value(&state.metrics.poll_request_connect_total)
));
body.push_str("# HELP noisebell_cache_poll_request_other_total Pi poll request failures that were not timeout/connect failures.\n");
body.push_str("# TYPE noisebell_cache_poll_request_other_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_request_other_total {}\n",
atomic_value(&state.metrics.poll_request_other_total)
));
body.push_str("# HELP noisebell_cache_poll_parse_failure_total Successful Pi HTTP responses that could not be parsed.\n");
body.push_str("# TYPE noisebell_cache_poll_parse_failure_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_parse_failure_total {}\n",
atomic_value(&state.metrics.poll_parse_failure_total)
));
body.push_str("# HELP noisebell_cache_poll_offline_transition_total Times the cache marked the Pi offline.\n");
body.push_str("# TYPE noisebell_cache_poll_offline_transition_total counter\n");
body.push_str(&format!(
"noisebell_cache_poll_offline_transition_total {}\n",
atomic_value(&state.metrics.poll_offline_transition_total)
));
body.push_str(
"# HELP noisebell_cache_poll_consecutive_failures Current consecutive Pi poll failures.\n",
);
body.push_str("# TYPE noisebell_cache_poll_consecutive_failures gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_consecutive_failures {}\n",
atomic_value(&state.metrics.poll_consecutive_failures)
));
body.push_str("# HELP noisebell_cache_poll_last_attempt_timestamp_seconds Unix timestamp of the last Pi poll attempt.\n");
body.push_str("# TYPE noisebell_cache_poll_last_attempt_timestamp_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_attempt_timestamp_seconds {}\n",
atomic_value(&state.metrics.poll_last_attempt_timestamp)
));
body.push_str("# HELP noisebell_cache_poll_last_success_timestamp_seconds Unix timestamp of the last successful Pi poll.\n");
body.push_str("# TYPE noisebell_cache_poll_last_success_timestamp_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_success_timestamp_seconds {}\n",
atomic_value(&state.metrics.poll_last_success_timestamp)
));
body.push_str("# HELP noisebell_cache_poll_last_failure_timestamp_seconds Unix timestamp of the last failed Pi poll.\n");
body.push_str("# TYPE noisebell_cache_poll_last_failure_timestamp_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_failure_timestamp_seconds {}\n",
atomic_value(&state.metrics.poll_last_failure_timestamp)
));
body.push_str(
"# HELP noisebell_cache_poll_last_duration_seconds Duration of the most recent Pi poll.\n",
);
body.push_str("# TYPE noisebell_cache_poll_last_duration_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_duration_seconds {}\n",
atomic_seconds_from_millis(&state.metrics.poll_last_duration_millis)
));
body.push_str("# HELP noisebell_cache_poll_last_success_duration_seconds Duration of the most recent successful Pi poll.\n");
body.push_str("# TYPE noisebell_cache_poll_last_success_duration_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_success_duration_seconds {}\n",
atomic_seconds_from_millis(&state.metrics.poll_last_success_duration_millis)
));
body.push_str("# HELP noisebell_cache_poll_last_failure_duration_seconds Duration of the most recent failed Pi poll.\n");
body.push_str("# TYPE noisebell_cache_poll_last_failure_duration_seconds gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_failure_duration_seconds {}\n",
atomic_seconds_from_millis(&state.metrics.poll_last_failure_duration_millis)
));
body.push_str("# HELP noisebell_cache_poll_last_http_status HTTP status from the most recent Pi poll, or 0 when no HTTP response was received.\n");
body.push_str("# TYPE noisebell_cache_poll_last_http_status gauge\n");
body.push_str(&format!(
"noisebell_cache_poll_last_http_status {}\n",
atomic_value(&state.metrics.poll_last_http_status)
));
let last_result = PollResultKind::from_code(atomic_value(&state.metrics.poll_last_result));
body.push_str(
"# HELP noisebell_cache_poll_last_result Last Pi poll result as one-hot labels.\n",
);
body.push_str("# TYPE noisebell_cache_poll_last_result gauge\n");
for result in PollResultKind::ALL {
let value = u8::from(result == last_result);
body.push_str(&format!(
"noisebell_cache_poll_last_result{{result=\"{}\"}} {value}\n",
result.as_str()
));
}
([(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], body).into_response()
}
pub async fn get_image_open() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],

View file

@ -59,6 +59,14 @@ struct CurrentStateRow {
last_checked: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CacheMetricsSnapshot {
pub status: DoorStatus,
pub since: Option<u64>,
pub last_seen: Option<u64>,
pub last_checked: Option<u64>,
}
fn parse_status(status: &str, location: &str) -> Result<DoorStatus> {
status.parse().with_context(|| format!("invalid door status {status:?} in {location}"))
}
@ -183,6 +191,16 @@ pub fn get_status(conn: &Connection) -> Result<CacheStatusResponse> {
})
}
pub fn get_metrics_snapshot(conn: &Connection) -> Result<CacheMetricsSnapshot> {
let row = current_state_row(conn)?;
Ok(CacheMetricsSnapshot {
status: row.state.status_for_api(),
since: row.state.since_for_api(),
last_seen: row.last_seen,
last_checked: row.last_checked,
})
}
fn write_state_change(
conn: &Connection,
status: DoorStatus,

View file

@ -6,11 +6,11 @@ use axum::routing::{get, post};
use axum::Router;
use std::sync::atomic::AtomicU64;
use tokio::sync::Mutex;
use tower_http::trace::TraceLayer;
use tracing::{info, Level};
use tracing::info;
mod api;
mod db;
mod metrics;
mod poller;
mod types;
mod webhook;
@ -100,6 +100,14 @@ async fn main() -> Result<()> {
.build()
.context("failed to build HTTP client")?;
let metrics = Arc::new(metrics::CacheMetrics::new(
pi_address.clone(),
status_poll_interval_secs,
offline_threshold,
retry_attempts,
http_timeout_secs,
));
let poller_config = Arc::new(poller::PollerConfig {
pi_address,
pi_api_key,
@ -108,6 +116,7 @@ async fn main() -> Result<()> {
retry_attempts,
retry_base_delay_secs,
webhooks: webhooks.clone(),
metrics: metrics.clone(),
});
poller::spawn_status_poller(poller_config.clone(), db.clone(), client.clone());
@ -121,22 +130,19 @@ async fn main() -> Result<()> {
retry_base_delay_secs,
webhook_last_request: AtomicU64::new(0),
webhook_tokens: std::sync::atomic::AtomicU32::new(10),
metrics,
});
let app = Router::new()
.route("/health", get(api::health))
.route("/webhook", post(api::post_webhook))
.route("/status", get(api::get_status))
.route("/metrics", get(api::get_metrics))
.route("/badge.svg", get(api::get_badge))
.route("/image", get(api::get_image))
.route("/image/open.png", get(api::get_image_open))
.route("/image/closed.png", get(api::get_image_closed))
.route("/image/offline.png", get(api::get_image_offline))
.layer(
TraceLayer::new_for_http()
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)),
)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))

View file

@ -0,0 +1,186 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PollResultKind {
Never = 0,
Success = 1,
HttpError = 2,
RequestTimeout = 3,
RequestConnect = 4,
RequestOther = 5,
ParseError = 6,
}
impl PollResultKind {
pub const ALL: [Self; 7] = [
Self::Never,
Self::Success,
Self::HttpError,
Self::RequestTimeout,
Self::RequestConnect,
Self::RequestOther,
Self::ParseError,
];
pub const fn as_str(self) -> &'static str {
match self {
Self::Never => "never",
Self::Success => "success",
Self::HttpError => "http_error",
Self::RequestTimeout => "request_timeout",
Self::RequestConnect => "request_connect",
Self::RequestOther => "request_other",
Self::ParseError => "parse_error",
}
}
pub const fn from_code(code: u64) -> Self {
match code {
1 => Self::Success,
2 => Self::HttpError,
3 => Self::RequestTimeout,
4 => Self::RequestConnect,
5 => Self::RequestOther,
6 => Self::ParseError,
_ => Self::Never,
}
}
}
#[derive(Debug)]
pub struct CacheMetrics {
pub process_start_time: u64,
pub pi_address: String,
pub poll_interval_secs: u64,
pub offline_threshold: u32,
pub retry_attempts: u32,
pub http_timeout_secs: u64,
pub webhook_received_total: AtomicU64,
pub webhook_rate_limited_total: AtomicU64,
pub outbound_success_total: AtomicU64,
pub outbound_failure_total: AtomicU64,
pub poll_attempt_total: AtomicU64,
pub poll_success_total: AtomicU64,
pub poll_failure_total: AtomicU64,
pub poll_http_error_total: AtomicU64,
pub poll_request_timeout_total: AtomicU64,
pub poll_request_connect_total: AtomicU64,
pub poll_request_other_total: AtomicU64,
pub poll_parse_failure_total: AtomicU64,
pub poll_offline_transition_total: AtomicU64,
pub poll_consecutive_failures: AtomicU64,
pub poll_last_attempt_timestamp: AtomicU64,
pub poll_last_success_timestamp: AtomicU64,
pub poll_last_failure_timestamp: AtomicU64,
pub poll_last_duration_millis: AtomicU64,
pub poll_last_success_duration_millis: AtomicU64,
pub poll_last_failure_duration_millis: AtomicU64,
pub poll_last_http_status: AtomicU64,
pub poll_last_result: AtomicU64,
}
impl CacheMetrics {
pub fn new(
pi_address: String,
poll_interval_secs: u64,
offline_threshold: u32,
retry_attempts: u32,
http_timeout_secs: u64,
) -> Self {
Self {
process_start_time: unix_now(),
pi_address,
poll_interval_secs,
offline_threshold,
retry_attempts,
http_timeout_secs,
webhook_received_total: AtomicU64::new(0),
webhook_rate_limited_total: AtomicU64::new(0),
outbound_success_total: AtomicU64::new(0),
outbound_failure_total: AtomicU64::new(0),
poll_attempt_total: AtomicU64::new(0),
poll_success_total: AtomicU64::new(0),
poll_failure_total: AtomicU64::new(0),
poll_http_error_total: AtomicU64::new(0),
poll_request_timeout_total: AtomicU64::new(0),
poll_request_connect_total: AtomicU64::new(0),
poll_request_other_total: AtomicU64::new(0),
poll_parse_failure_total: AtomicU64::new(0),
poll_offline_transition_total: AtomicU64::new(0),
poll_consecutive_failures: AtomicU64::new(0),
poll_last_attempt_timestamp: AtomicU64::new(0),
poll_last_success_timestamp: AtomicU64::new(0),
poll_last_failure_timestamp: AtomicU64::new(0),
poll_last_duration_millis: AtomicU64::new(0),
poll_last_success_duration_millis: AtomicU64::new(0),
poll_last_failure_duration_millis: AtomicU64::new(0),
poll_last_http_status: AtomicU64::new(0),
poll_last_result: AtomicU64::new(PollResultKind::Never as u64),
}
}
pub fn add_outbound(&self, delivered: u64, failed: u64) {
self.outbound_success_total.fetch_add(delivered, Ordering::Relaxed);
self.outbound_failure_total.fetch_add(failed, Ordering::Relaxed);
}
pub fn record_poll_attempt(&self, timestamp: u64) {
self.poll_attempt_total.fetch_add(1, Ordering::Relaxed);
self.poll_last_attempt_timestamp.store(timestamp, Ordering::Relaxed);
}
pub fn record_poll_success(&self, timestamp: u64, duration_millis: u64, status: u16) {
self.poll_success_total.fetch_add(1, Ordering::Relaxed);
self.poll_last_success_timestamp.store(timestamp, Ordering::Relaxed);
self.poll_last_duration_millis.store(duration_millis, Ordering::Relaxed);
self.poll_last_success_duration_millis.store(duration_millis, Ordering::Relaxed);
self.poll_last_http_status.store(u64::from(status), Ordering::Relaxed);
self.poll_last_result.store(PollResultKind::Success as u64, Ordering::Relaxed);
}
pub fn record_poll_failure(
&self,
kind: PollResultKind,
timestamp: u64,
duration_millis: u64,
status: Option<u16>,
) {
self.poll_failure_total.fetch_add(1, Ordering::Relaxed);
match kind {
PollResultKind::HttpError => {
self.poll_http_error_total.fetch_add(1, Ordering::Relaxed);
}
PollResultKind::RequestTimeout => {
self.poll_request_timeout_total.fetch_add(1, Ordering::Relaxed);
}
PollResultKind::RequestConnect => {
self.poll_request_connect_total.fetch_add(1, Ordering::Relaxed);
}
PollResultKind::RequestOther => {
self.poll_request_other_total.fetch_add(1, Ordering::Relaxed);
}
PollResultKind::ParseError => {
self.poll_parse_failure_total.fetch_add(1, Ordering::Relaxed);
}
PollResultKind::Never | PollResultKind::Success => {}
}
self.poll_last_failure_timestamp.store(timestamp, Ordering::Relaxed);
self.poll_last_duration_millis.store(duration_millis, Ordering::Relaxed);
self.poll_last_failure_duration_millis.store(duration_millis, Ordering::Relaxed);
self.poll_last_http_status.store(status.map(u64::from).unwrap_or(0), Ordering::Relaxed);
self.poll_last_result.store(kind as u64, Ordering::Relaxed);
}
}
fn unix_now() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
}
pub fn atomic_value(value: &AtomicU64) -> u64 {
value.load(Ordering::Relaxed)
}
pub fn atomic_seconds_from_millis(value: &AtomicU64) -> f64 {
value.load(Ordering::Relaxed) as f64 / 1000.0
}

View file

@ -1,5 +1,6 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use noisebell_common::{DoorStatus, PiStatusResponse, WebhookPayload};
use tokio::sync::Mutex;
@ -7,6 +8,7 @@ use tracing::{error, info, warn};
use crate::db;
use crate::db::ApplyStateOutcome;
use crate::metrics::{CacheMetrics, PollResultKind};
use crate::types::WebhookTarget;
use crate::webhook;
@ -18,12 +20,81 @@ pub struct PollerConfig {
pub retry_attempts: u32,
pub retry_base_delay_secs: u64,
pub webhooks: Vec<WebhookTarget>,
pub metrics: Arc<CacheMetrics>,
}
fn unix_now() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
}
fn duration_millis(started_at: Instant) -> u64 {
let millis = started_at.elapsed().as_millis();
millis.try_into().unwrap_or(u64::MAX)
}
fn classify_request_error(error: &reqwest::Error) -> PollResultKind {
if error.is_timeout() {
PollResultKind::RequestTimeout
} else if error.is_connect() {
PollResultKind::RequestConnect
} else {
PollResultKind::RequestOther
}
}
fn should_log_poll_failure(
consecutive_failures: u32,
kind: PollResultKind,
status_code: Option<u16>,
last_failure_log: &mut Option<(PollResultKind, Option<u16>)>,
) -> bool {
let key = (kind, status_code);
let changed = match *last_failure_log {
Some(previous) => previous != key,
None => true,
};
*last_failure_log = Some(key);
consecutive_failures == 1 || changed
}
async fn mark_pi_offline(
config: &PollerConfig,
database: Arc<Mutex<rusqlite::Connection>>,
client: &reqwest::Client,
now: u64,
consecutive_failures: u32,
) {
let marked = tokio::task::spawn_blocking(move || {
let conn = database.blocking_lock();
db::mark_offline(&conn, now)
})
.await
.expect("db task panicked");
match marked {
Ok(()) => {
config.metrics.poll_offline_transition_total.fetch_add(1, Ordering::Relaxed);
warn!(
consecutive_failures,
threshold = config.offline_threshold,
"Pi marked offline after poll failures"
);
let summary = webhook::forward(
client,
&config.webhooks,
&WebhookPayload { status: DoorStatus::Offline, timestamp: now },
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
config.metrics.add_outbound(summary.delivered, summary.failed);
}
Err(e) => {
error!(error = %e, "failed to mark Pi offline");
}
}
}
pub fn spawn_status_poller(
config: Arc<PollerConfig>,
db: Arc<Mutex<rusqlite::Connection>>,
@ -32,10 +103,15 @@ pub fn spawn_status_poller(
tokio::spawn(async move {
let mut consecutive_failures: u32 = 0;
let mut was_offline = false;
let mut last_failure_log: Option<(PollResultKind, Option<u16>)> = None;
loop {
let poll_started_at = Instant::now();
let poll_started_timestamp = unix_now();
config.metrics.record_poll_attempt(poll_started_timestamp);
{
let now = unix_now();
let now = poll_started_timestamp;
let db = db.clone();
let _ = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
@ -53,16 +129,77 @@ pub fn spawn_status_poller(
.await;
match result {
Ok(resp) if resp.status().is_success() => {
consecutive_failures = 0;
if was_offline {
info!("Pi is back online");
was_offline = false;
Ok(resp) => {
let status_code = resp.status().as_u16();
if !resp.status().is_success() {
consecutive_failures += 1;
let kind = PollResultKind::HttpError;
config.metrics.record_poll_failure(
kind,
unix_now(),
duration_millis(poll_started_at),
Some(status_code),
);
config
.metrics
.poll_consecutive_failures
.store(consecutive_failures.into(), Ordering::Relaxed);
if should_log_poll_failure(
consecutive_failures,
kind,
Some(status_code),
&mut last_failure_log,
) {
warn!(
kind = kind.as_str(),
http_status = status_code,
consecutive_failures,
"Pi status poll failed"
);
}
if consecutive_failures >= config.offline_threshold && !was_offline {
was_offline = true;
let now = unix_now();
mark_pi_offline(
&config,
db.clone(),
&client,
now,
consecutive_failures,
)
.await;
}
tokio::time::sleep(config.status_poll_interval).await;
continue;
}
let now = unix_now();
match resp.json::<PiStatusResponse>().await {
Ok(body) => {
if was_offline {
info!(
previous_consecutive_failures = consecutive_failures,
"Pi is back online"
);
was_offline = false;
} else if consecutive_failures > 0 {
info!(
previous_consecutive_failures = consecutive_failures,
"Pi status poll recovered"
);
}
consecutive_failures = 0;
last_failure_log = None;
config.metrics.poll_consecutive_failures.store(0, Ordering::Relaxed);
config.metrics.record_poll_success(
now,
duration_millis(poll_started_at),
status_code,
);
let status = body.status;
let event_timestamp = body.timestamp;
@ -102,7 +239,7 @@ pub fn spawn_status_poller(
timestamp,
"state updated from poll"
);
webhook::forward(
let summary = webhook::forward(
&client,
&config.webhooks,
&WebhookPayload { status, timestamp },
@ -110,14 +247,11 @@ pub fn spawn_status_poller(
config.retry_base_delay_secs,
)
.await;
config
.metrics
.add_outbound(summary.delivered, summary.failed);
}
ApplyStateOutcome::Duplicate => {
info!(
status = %status,
timestamp,
"duplicate poll state ignored"
);
}
ApplyStateOutcome::Duplicate => {}
ApplyStateOutcome::Stale => {
warn!(
status = %status,
@ -129,52 +263,80 @@ pub fn spawn_status_poller(
}
}
Err(e) => {
error!(error = %e, "failed to parse status poll response");
consecutive_failures += 1;
let kind = PollResultKind::ParseError;
config.metrics.record_poll_failure(
kind,
unix_now(),
duration_millis(poll_started_at),
Some(status_code),
);
config
.metrics
.poll_consecutive_failures
.store(consecutive_failures.into(), Ordering::Relaxed);
if should_log_poll_failure(
consecutive_failures,
kind,
Some(status_code),
&mut last_failure_log,
) {
error!(
error = %e,
kind = kind.as_str(),
http_status = status_code,
consecutive_failures,
"failed to parse Pi status poll response"
);
}
if consecutive_failures >= config.offline_threshold && !was_offline {
was_offline = true;
let now = unix_now();
mark_pi_offline(
&config,
db.clone(),
&client,
now,
consecutive_failures,
)
.await;
}
}
}
}
_ => {
Err(e) => {
consecutive_failures += 1;
let err_msg = match &result {
Ok(resp) => format!("HTTP {}", resp.status()),
Err(e) => e.to_string(),
};
warn!(
error = %err_msg,
consecutive_failures,
"status poll failed"
let kind = classify_request_error(&e);
config.metrics.record_poll_failure(
kind,
unix_now(),
duration_millis(poll_started_at),
None,
);
config
.metrics
.poll_consecutive_failures
.store(consecutive_failures.into(), Ordering::Relaxed);
if should_log_poll_failure(
consecutive_failures,
kind,
None,
&mut last_failure_log,
) {
warn!(
error = %e,
kind = kind.as_str(),
consecutive_failures,
"Pi status poll failed"
);
}
if consecutive_failures >= config.offline_threshold && !was_offline {
was_offline = true;
let now = unix_now();
let db = db.clone();
let marked = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
db::mark_offline(&conn, now)
})
.await
.expect("db task panicked");
match marked {
Ok(()) => {
info!(
"Pi marked offline after {} consecutive failures",
consecutive_failures
);
webhook::forward(
&client,
&config.webhooks,
&WebhookPayload { status: DoorStatus::Offline, timestamp: now },
config.retry_attempts,
config.retry_base_delay_secs,
)
.await;
}
Err(e) => {
error!(error = %e, "failed to mark Pi offline");
}
}
mark_pi_offline(&config, db.clone(), &client, now, consecutive_failures)
.await;
}
}
}

View file

@ -1,17 +1,28 @@
use std::time::Duration;
use std::time::{Duration, Instant};
use noisebell_common::WebhookPayload;
use tracing::{error, info, warn};
use crate::types::{WebhookAuth, WebhookTarget};
#[derive(Debug, Default)]
pub struct ForwardSummary {
pub delivered: u64,
pub failed: u64,
}
fn duration_millis(started_at: Instant) -> u64 {
let millis = started_at.elapsed().as_millis();
millis.try_into().unwrap_or(u64::MAX)
}
pub async fn forward(
client: &reqwest::Client,
targets: &[WebhookTarget],
payload: &WebhookPayload,
retry_attempts: u32,
retry_base_delay_secs: u64,
) {
) -> ForwardSummary {
let mut set = tokio::task::JoinSet::new();
for target in targets {
@ -21,9 +32,8 @@ pub async fn forward(
let auth = target.auth.clone();
set.spawn(async move {
info!(url = %url, status = %payload.status, "forwarding to outbound webhook");
for attempt in 0..=retry_attempts {
let attempt_started_at = Instant::now();
let mut req = client.post(&url).json(&payload);
if let WebhookAuth::Bearer(secret) = &auth {
req = req.bearer_auth(secret);
@ -31,30 +41,75 @@ pub async fn forward(
match req.send().await {
Ok(resp) if resp.status().is_success() => {
info!(url = %url, "outbound webhook delivered");
return;
info!(
url = %url,
status = %payload.status,
timestamp = payload.timestamp,
http_status = resp.status().as_u16(),
duration_ms = duration_millis(attempt_started_at),
attempts = attempt + 1,
"outbound webhook delivered"
);
return true;
}
result => {
let err_msg = match &result {
Ok(resp) => format!("HTTP {}", resp.status()),
Err(e) => e.to_string(),
};
let http_status = result.as_ref().ok().map(|resp| resp.status().as_u16());
let kind =
if http_status.is_some() { "http_error" } else { "request_error" };
let duration_ms = duration_millis(attempt_started_at);
if attempt == retry_attempts {
error!(url = %url, error = %err_msg, "outbound webhook failed after {} attempts", retry_attempts + 1);
error!(
url = %url,
status = %payload.status,
timestamp = payload.timestamp,
error = %err_msg,
kind,
http_status = http_status.unwrap_or(0),
duration_ms,
attempts = retry_attempts + 1,
"outbound webhook failed after retries"
);
} else {
let delay = Duration::from_secs(retry_base_delay_secs * 2u64.pow(attempt));
warn!(url = %url, error = %err_msg, attempt = attempt + 1, "outbound webhook failed, retrying in {:?}", delay);
let delay =
Duration::from_secs(retry_base_delay_secs * 2u64.pow(attempt));
warn!(
url = %url,
status = %payload.status,
timestamp = payload.timestamp,
error = %err_msg,
kind,
http_status = http_status.unwrap_or(0),
duration_ms,
attempt = attempt + 1,
total_attempts = retry_attempts + 1,
delay_seconds = delay.as_secs(),
"outbound webhook failed, retrying"
);
tokio::time::sleep(delay).await;
}
}
}
}
false
});
}
let mut summary = ForwardSummary::default();
while let Some(result) = set.join_next().await {
if let Err(e) = result {
error!(error = %e, "webhook task panicked");
match result {
Ok(true) => summary.delivered += 1,
Ok(false) => summary.failed += 1,
Err(e) => {
summary.failed += 1;
error!(error = %e, "webhook task panicked");
}
}
}
summary
}

View file

@ -15,6 +15,5 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serenity = { version = "0.12", default-features = false, features = ["client", "gateway", "model", "rustls_backend"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "sync", "signal", "time"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -12,8 +12,7 @@ use serenity::all::{
CreateInteractionResponseMessage, CreateMessage, GatewayIntents, Interaction,
};
use serenity::async_trait;
use tower_http::trace::TraceLayer;
use tracing::{error, info, warn, Level};
use tracing::{error, info, warn};
struct AppState {
http: Arc<serenity::all::Http>,
@ -66,6 +65,11 @@ async fn post_webhook(
Json(body): Json<WebhookPayload>,
) -> StatusCode {
if !validate_bearer(&headers, &state.webhook_secret) {
warn!(
status = %body.status,
timestamp = body.timestamp,
"unauthorized Discord webhook rejected"
);
return StatusCode::UNAUTHORIZED;
}
@ -226,11 +230,6 @@ async fn main() -> Result<()> {
let app = Router::new()
.route("/health", get(|| async { StatusCode::OK }))
.route("/webhook", post(post_webhook))
.layer(
TraceLayer::new_for_http()
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)),
)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))

View file

@ -12,6 +12,21 @@ pub fn validate_bearer(headers: &HeaderMap, expected: &str) -> bool {
.unwrap_or(false)
}
pub fn prometheus_escape_label_value(value: &str) -> String {
let mut escaped = String::with_capacity(value.len());
for ch in value.chars() {
match ch {
'\\' => escaped.push_str("\\\\"),
'"' => escaped.push_str("\\\""),
'\n' => escaped.push_str("\\n"),
_ => escaped.push(ch),
}
}
escaped
}
pub const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DoorStatus {

View file

@ -10,6 +10,5 @@ chrono = "0.4"
noisebell-common = { path = "../noisebell-common" }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "signal", "time"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -9,8 +9,7 @@ use axum::routing::get;
use axum::{Json, Router};
use chrono::{DateTime, Utc};
use noisebell_common::{CacheStatusResponse, DoorStatus};
use tower_http::trace::TraceLayer;
use tracing::{error, info, Level};
use tracing::{error, info};
const FEED_TTL_MINUTES: u32 = 1;
const README_URL: &str =
@ -596,11 +595,6 @@ async fn main() -> Result<()> {
.route("/open/", get(get_open_rss))
.route("/open/rss.xml", get(get_open_rss))
.route("/open/atom.xml", get(get_open_atom))
.layer(
TraceLayer::new_for_http()
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)),
)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))

View file

@ -13,6 +13,5 @@ noisebell-common = { path = "../noisebell-common" }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "signal"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -7,8 +7,7 @@ use axum::routing::{get, post};
use axum::{Json, Router};
use noisebell_common::{validate_bearer, DoorStatus, WebhookPayload};
use serde::Serialize;
use tower_http::trace::TraceLayer;
use tracing::{error, info, Level};
use tracing::{error, info, warn};
struct AppState {
client: reqwest::Client,
@ -54,6 +53,11 @@ async fn post_webhook(
Json(body): Json<WebhookPayload>,
) -> StatusCode {
if !validate_bearer(&headers, &state.webhook_secret) {
warn!(
status = %body.status,
timestamp = body.timestamp,
"unauthorized Zulip webhook rejected"
);
return StatusCode::UNAUTHORIZED;
}
@ -131,11 +135,6 @@ async fn main() -> Result<()> {
let app = Router::new()
.route("/health", get(|| async { StatusCode::OK }))
.route("/webhook", post(post_webhook))
.layer(
TraceLayer::new_for_http()
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)),
)
.with_state(Arc::new(AppState {
client,
webhook_secret,