From e6c1b82679ace8b50297a4562116569d0398d416 Mon Sep 17 00:00:00 2001 From: Jet Date: Wed, 27 May 2026 20:09:44 -0700 Subject: [PATCH] feat: add noisebell observability --- Cargo.lock | 5 - README.md | 12 + hosts/noisebell-do/configuration.nix | 5 +- hosts/noisebell-do/observability.nix | 674 +++++++++++++++++++++++++++ pi/README.md | 20 +- pi/pi-relay/src/main.rs | 249 +++++++++- pi/pi-service/src/main.rs | 477 ++++++++++++++++++- remote/cache-service/Cargo.toml | 1 - remote/cache-service/README.md | 5 + remote/cache-service/module.nix | 1 + remote/cache-service/src/api.rs | 262 ++++++++++- remote/cache-service/src/db.rs | 18 + remote/cache-service/src/main.rs | 20 +- remote/cache-service/src/metrics.rs | 186 ++++++++ remote/cache-service/src/poller.rs | 266 ++++++++--- remote/cache-service/src/webhook.rs | 77 ++- remote/discord-bot/Cargo.toml | 1 - remote/discord-bot/src/main.rs | 13 +- remote/noisebell-common/src/lib.rs | 15 + remote/rss-service/Cargo.toml | 1 - remote/rss-service/src/main.rs | 8 +- remote/zulip-bot/Cargo.toml | 1 - remote/zulip-bot/src/main.rs | 13 +- scripts/deploy-pios-pi.sh | 96 +++- 24 files changed, 2289 insertions(+), 137 deletions(-) create mode 100644 hosts/noisebell-do/observability.nix create mode 100644 remote/cache-service/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index e9eefcc..ffde564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,7 +886,6 @@ dependencies = [ "serde", "serde_json", "tokio", - "tower-http", "tracing", "tracing-subscriber", ] @@ -912,7 +911,6 @@ dependencies = [ "serde_json", "serenity", "tokio", - "tower-http", "tracing", "tracing-subscriber", ] @@ -940,7 +938,6 @@ dependencies = [ "noisebell-common", "reqwest", "tokio", - "tower-http", "tracing", "tracing-subscriber", ] @@ -955,7 +952,6 @@ dependencies = [ "reqwest", "serde", "tokio", - "tower-http", "tracing", "tracing-subscriber", ] @@ -1808,7 +1804,6 @@ dependencies = [ "tower", "tower-layer", "tower-service", - "tracing", ] [[package]] diff --git a/README.md b/README.md index c30c505..f2e5399 100644 --- a/README.md +++ b/README.md @@ -34,3 +34,15 @@ Useful commands: - `scripts/deploy-pios-pi.sh pi@100.66.45.36` redeploys the Raspberry Pi OS machine The full Home Assistant relay workflow is documented in `pi/README.md`. + +## Observability + +The DigitalOcean host runs Prometheus, Loki, Grafana, Alloy, node_exporter, and blackbox_exporter via `hosts/noisebell-do/observability.nix`. Grafana provisions the `Noisebell DO + Pi` dashboard from code, with Prometheus panels for both hosts, detailed DO-to-Pi poll health, and Loki journal panels for both hosts. + +- Grafana: `http://noisebell-do:3030/` over Tailscale +- Prometheus: `http://noisebell-do:9090/` over Tailscale +- Loki: `http://noisebell-do:3100/` over Tailscale + +The Pi deploy script enables persistent journald, installs `prometheus-node-exporter`, and installs `noisebell-loki-journal.service` to ship Pi journal logs to Loki on the DO host. + +Prometheus is the source of truth for regular time-based data: scrape health, host CPU/memory/disk/uptime, DO-to-Pi poll counts and last results, GPIO state, Pi hardware readings, webhook counters, and retry counters. Loki/journald is reserved for sparse event logs that should be readable in chronological order: service start/stop, door state changes, cache state changes, Pi offline/online transitions, auth or rate-limit rejections, webhook retries/failures, stale events, and GPIO read errors. Routine successful polls, unchanged poll results, metrics scrapes, and badge/image/status reads are intentionally not logged at `INFO`. diff --git a/hosts/noisebell-do/configuration.nix b/hosts/noisebell-do/configuration.nix index 9a0f220..847a6d9 100644 --- a/hosts/noisebell-do/configuration.nix +++ b/hosts/noisebell-do/configuration.nix @@ -7,7 +7,10 @@ }: { - imports = [ (modulesPath + "/virtualisation/digital-ocean-config.nix") ]; + imports = [ + (modulesPath + "/virtualisation/digital-ocean-config.nix") + ./observability.nix + ]; system.stateVersion = "26.05"; diff --git a/hosts/noisebell-do/observability.nix b/hosts/noisebell-do/observability.nix new file mode 100644 index 0000000..be92a84 --- /dev/null +++ b/hosts/noisebell-do/observability.nix @@ -0,0 +1,674 @@ +{ lib, pkgs, ... }: + +let + prometheusPort = 9090; + lokiPort = 3100; + grafanaPort = 3030; + nodeExporterPort = 9100; + blackboxExporterPort = 9115; + + blackboxConfig = pkgs.writeText "noisebell-blackbox.yml" '' + modules: + http_2xx: + prober: http + timeout: 5s + http: + follow_redirects: true + preferred_ip_protocol: ip4 + tcp_connect: + prober: tcp + timeout: 5s + ''; + + prometheusDatasource = { + type = "prometheus"; + uid = "prometheus"; + }; + + lokiDatasource = { + type = "loki"; + uid = "loki"; + }; + + prometheusPanel = + { + id, + title, + type ? "timeseries", + x, + y, + w ? 12, + h ? 8, + targets, + }: + { + inherit id title type targets; + datasource = prometheusDatasource; + gridPos = { + inherit h w x y; + }; + }; + + lokiPanel = + { + id, + title, + type ? "logs", + x, + y, + w ? 12, + h ? 8, + targets, + }: + { + inherit id title type targets; + datasource = lokiDatasource; + gridPos = { + inherit h w x y; + }; + }; + + promTarget = refId: expr: legendFormat: { + inherit refId expr legendFormat; + }; + + lokiTarget = refId: expr: { + inherit refId expr; + }; + + dashboard = pkgs.writeText "noisebell-dashboard.json" (builtins.toJSON { + uid = "noisebell"; + title = "Noisebell DO + Pi"; + tags = [ + "noisebell" + "prometheus" + "loki" + ]; + timezone = "browser"; + schemaVersion = 39; + version = 1; + refresh = "30s"; + time = { + from = "now-6h"; + to = "now"; + }; + panels = [ + (prometheusPanel { + id = 1; + title = "Host Scrape Health (Prometheus)"; + type = "stat"; + x = 0; + y = 0; + w = 6; + h = 6; + targets = [ (promTarget "A" "up{job=~\"noisebell-(do|pi)-node\"}" "{{host}}") ]; + }) + (prometheusPanel { + id = 2; + title = "Noisebell Service Health (Prometheus)"; + type = "stat"; + x = 6; + y = 0; + w = 6; + h = 6; + targets = [ (promTarget "A" "up{job=~\"noisebell-(cache|pi-app|pi-relay)\"}" "{{job}}") ]; + }) + (prometheusPanel { + id = 3; + title = "Probe Health (Prometheus)"; + type = "stat"; + x = 12; + y = 0; + w = 6; + h = 6; + targets = [ (promTarget "A" "probe_success{job=~\"noisebell-.*-probes\"}" "{{instance}}") ]; + }) + (prometheusPanel { + id = 4; + title = "Door State (Prometheus)"; + type = "stat"; + x = 18; + y = 0; + w = 6; + h = 6; + targets = [ (promTarget "A" "noisebell_cache_status" "{{status}}") ]; + }) + (prometheusPanel { + id = 5; + title = "CPU Used % (DO + Pi)"; + x = 0; + y = 6; + w = 8; + targets = [ (promTarget "A" "100 - (avg by (host) (rate(node_cpu_seconds_total{job=~\"noisebell-(do|pi)-node\",mode=\"idle\"}[5m])) * 100)" "{{host}}") ]; + }) + (prometheusPanel { + id = 6; + title = "Memory Used % (DO + Pi)"; + x = 8; + y = 6; + w = 8; + targets = [ (promTarget "A" "100 * (1 - (node_memory_MemAvailable_bytes{job=~\"noisebell-(do|pi)-node\"} / node_memory_MemTotal_bytes{job=~\"noisebell-(do|pi)-node\"}))" "{{host}}") ]; + }) + (prometheusPanel { + id = 7; + title = "Root Disk Used % (DO + Pi)"; + x = 16; + y = 6; + w = 8; + targets = [ (promTarget "A" "100 * (1 - (node_filesystem_avail_bytes{job=~\"noisebell-(do|pi)-node\",mountpoint=\"/\",fstype!~\"tmpfs|overlay\"} / node_filesystem_size_bytes{job=~\"noisebell-(do|pi)-node\",mountpoint=\"/\",fstype!~\"tmpfs|overlay\"}))" "{{host}}") ]; + }) + (prometheusPanel { + id = 8; + title = "Host Uptime Hours (DO + Pi)"; + type = "stat"; + x = 0; + y = 14; + w = 6; + h = 6; + targets = [ (promTarget "A" "(time() - node_boot_time_seconds{job=~\"noisebell-(do|pi)-node\"}) / 3600" "{{host}}") ]; + }) + (prometheusPanel { + id = 9; + title = "Observability Stack Health (Prometheus)"; + type = "stat"; + x = 6; + y = 14; + w = 6; + h = 6; + targets = [ (promTarget "A" "up{job=~\"observability-.*\"}" "{{service}}") ]; + }) + (prometheusPanel { + id = 10; + title = "Pi Hardware (Prometheus)"; + x = 12; + y = 14; + w = 6; + h = 6; + targets = [ + (promTarget "A" "noisebell_pi_temperature_celsius" "temp C") + (promTarget "B" "noisebell_pi_throttled_flags" "throttled flags") + (promTarget "C" "noisebell_pi_tailscale_running" "tailscale running") + ]; + }) + (prometheusPanel { + id = 11; + title = "Pi Wi-Fi Signal (Prometheus)"; + x = 18; + y = 14; + w = 6; + h = 6; + targets = [ + (promTarget "A" "noisebell_pi_wifi_signal_dbm" "{{interface}} dBm") + (promTarget "B" "noisebell_pi_wifi_link_quality" "{{interface}} link") + ]; + }) + (prometheusPanel { + id = 12; + title = "Cache Poll Health (Prometheus)"; + x = 0; + y = 20; + targets = [ + (promTarget "A" "noisebell_cache_poll_consecutive_failures" "consecutive failures") + (promTarget "B" "rate(noisebell_cache_poll_failure_total[5m])" "failure rate") + (promTarget "C" "rate(noisebell_cache_poll_success_total[5m])" "success rate") + (promTarget "D" "noisebell_cache_poll_last_duration_seconds" "last duration") + ]; + }) + (prometheusPanel { + id = 13; + title = "Pi App Webhook Delivery (Prometheus)"; + x = 12; + y = 20; + targets = [ + (promTarget "A" "rate(noisebell_pi_notify_success_total[5m])" "success") + (promTarget "B" "rate(noisebell_pi_notify_attempt_failure_total[5m])" "attempt failures") + (promTarget "C" "rate(noisebell_pi_notify_failure_total[5m])" "final failures") + ]; + }) + (prometheusPanel { + id = 14; + title = "DO -> Pi Last Poll Details (Prometheus)"; + type = "stat"; + x = 0; + y = 28; + w = 12; + targets = [ + (promTarget "A" "noisebell_cache_poll_last_result" "result {{result}}") + (promTarget "B" "noisebell_cache_poll_last_http_status" "last HTTP status") + (promTarget "C" "noisebell_cache_poll_last_duration_seconds" "last duration sec") + (promTarget "D" "time() - noisebell_cache_poll_last_attempt_timestamp_seconds" "seconds since attempt") + (promTarget "E" "time() - noisebell_cache_poll_last_success_timestamp_seconds" "seconds since success") + (promTarget "F" "time() - noisebell_cache_poll_last_failure_timestamp_seconds" "seconds since failure") + ]; + }) + (prometheusPanel { + id = 15; + title = "DO -> Pi Poll Failure Types (Prometheus)"; + x = 12; + y = 28; + targets = [ + (promTarget "A" "rate(noisebell_cache_poll_http_error_total[5m])" "http error") + (promTarget "B" "rate(noisebell_cache_poll_request_timeout_total[5m])" "timeout") + (promTarget "C" "rate(noisebell_cache_poll_request_connect_total[5m])" "connect") + (promTarget "D" "rate(noisebell_cache_poll_request_other_total[5m])" "request other") + (promTarget "E" "rate(noisebell_cache_poll_parse_failure_total[5m])" "parse") + ]; + }) + (prometheusPanel { + id = 16; + title = "Relay Delivery (Prometheus)"; + x = 0; + y = 36; + targets = [ + (promTarget "A" "rate(noisebell_relay_forwarded_total[5m])" "forwarded") + (promTarget "B" "rate(noisebell_relay_attempt_failure_total[5m])" "attempt failures") + (promTarget "C" "rate(noisebell_relay_failed_total[5m])" "final failures") + (promTarget "D" "noisebell_relay_last_duration_seconds" "last duration") + ]; + }) + (lokiPanel { + id = 17; + title = "Journal Log Rate (Loki, DO + Pi)"; + type = "timeseries"; + x = 12; + y = 36; + targets = [ + { + refId = "A"; + expr = "sum by (host) (rate({job=\"journal\"}[5m]))"; + legendFormat = "{{host}}"; + } + ]; + }) + (lokiPanel { + id = 18; + title = "DO Journal Logs (Loki)"; + x = 0; + y = 44; + targets = [ (lokiTarget "A" "{job=\"journal\", host=\"noisebell-do\"}") ]; + }) + (lokiPanel { + id = 19; + title = "Pi Journal Logs (Loki)"; + x = 12; + y = 44; + targets = [ (lokiTarget "A" "{job=\"journal\", host=\"noisebell-pi\"}") ]; + }) + ]; + }); + + dashboardDir = pkgs.runCommand "noisebell-grafana-dashboards" { } '' + mkdir -p "$out" + cp ${dashboard} "$out/noisebell.json" + ''; + + blackboxRelabels = [ + { + source_labels = [ "__address__" ]; + target_label = "__param_target"; + } + { + source_labels = [ "__param_target" ]; + target_label = "instance"; + } + { + target_label = "__address__"; + replacement = "127.0.0.1:${toString blackboxExporterPort}"; + } + ]; +in +{ + services.prometheus = { + enable = true; + listenAddress = "0.0.0.0"; + port = prometheusPort; + retentionTime = "7d"; + globalConfig = { + scrape_interval = "15s"; + evaluation_interval = "15s"; + }; + exporters = { + node = { + enable = true; + port = nodeExporterPort; + enabledCollectors = [ "systemd" ]; + }; + blackbox = { + enable = true; + port = blackboxExporterPort; + configFile = blackboxConfig; + }; + }; + scrapeConfigs = [ + { + job_name = "noisebell-do-node"; + static_configs = [ + { + targets = [ "127.0.0.1:${toString nodeExporterPort}" ]; + labels.host = "noisebell-do"; + } + ]; + } + { + job_name = "noisebell-pi-node"; + static_configs = [ + { + targets = [ "noisebell-pi:${toString nodeExporterPort}" ]; + labels.host = "noisebell-pi"; + } + ]; + } + { + job_name = "noisebell-cache"; + metrics_path = "/metrics"; + static_configs = [ + { + targets = [ "127.0.0.1:3000" ]; + labels.host = "noisebell-do"; + } + ]; + } + { + job_name = "noisebell-pi-app"; + metrics_path = "/metrics"; + static_configs = [ + { + targets = [ "noisebell-pi:80" ]; + labels.host = "noisebell-pi"; + } + ]; + } + { + job_name = "noisebell-pi-relay"; + metrics_path = "/metrics"; + static_configs = [ + { + targets = [ "noisebell-pi:8090" ]; + labels.host = "noisebell-pi"; + } + ]; + } + { + job_name = "observability-prometheus"; + static_configs = [ + { + targets = [ "127.0.0.1:${toString prometheusPort}" ]; + labels = { + host = "noisebell-do"; + service = "prometheus"; + }; + } + ]; + } + { + job_name = "observability-loki"; + static_configs = [ + { + targets = [ "127.0.0.1:${toString lokiPort}" ]; + labels = { + host = "noisebell-do"; + service = "loki"; + }; + } + ]; + } + { + job_name = "observability-grafana"; + static_configs = [ + { + targets = [ "127.0.0.1:${toString grafanaPort}" ]; + labels = { + host = "noisebell-do"; + service = "grafana"; + }; + } + ]; + } + { + job_name = "observability-alloy"; + static_configs = [ + { + targets = [ "127.0.0.1:12345" ]; + labels = { + host = "noisebell-do"; + service = "alloy"; + }; + } + ]; + } + { + job_name = "observability-blackbox"; + static_configs = [ + { + targets = [ "127.0.0.1:${toString blackboxExporterPort}" ]; + labels = { + host = "noisebell-do"; + service = "blackbox"; + }; + } + ]; + } + { + job_name = "noisebell-http-probes"; + metrics_path = "/probe"; + params.module = [ "http_2xx" ]; + static_configs = [ + { + targets = [ + "http://noisebell-pi/metrics" + "http://noisebell-pi:8090/health" + "https://noisebell.extremist.software/status" + ]; + } + ]; + relabel_configs = blackboxRelabels; + } + { + job_name = "noisebell-tcp-probes"; + metrics_path = "/probe"; + params.module = [ "tcp_connect" ]; + static_configs = [ + { + targets = [ + "noisebell-pi:22" + "noisebell-pi:80" + "noisebell-pi:8090" + ]; + } + ]; + relabel_configs = blackboxRelabels; + } + ]; + rules = [ + '' + groups: + - name: noisebell + rules: + - alert: NoisebellPiAppDown + expr: up{job="noisebell-pi-app"} == 0 + for: 2m + labels: + severity: page + annotations: + summary: Noisebell Pi app metrics are down + - alert: NoisebellPiNodeExporterDown + expr: up{job="noisebell-pi-node"} == 0 + for: 2m + labels: + severity: page + annotations: + summary: Noisebell Pi node exporter is down + - alert: NoisebellProbeFailed + expr: probe_success{job=~"noisebell-.*-probes"} == 0 + for: 2m + labels: + severity: page + annotations: + summary: Noisebell probe failed for {{ $labels.instance }} + - alert: NoisebellCachePollFailures + expr: noisebell_cache_poll_consecutive_failures >= 3 + for: 1m + labels: + severity: page + annotations: + summary: Noisebell cache cannot poll the Pi + - alert: NoisebellPiRecentlyRebooted + expr: noisebell_pi_uptime_seconds < 300 + for: 30s + labels: + severity: info + annotations: + summary: Noisebell Pi rebooted recently + - alert: NoisebellPiThrottled + expr: noisebell_pi_throttled_flags > 0 + for: 1m + labels: + severity: warning + annotations: + summary: Noisebell Pi reports throttling flags + '' + ]; + }; + + services.loki = { + enable = true; + configuration = { + auth_enabled = false; + server = { + http_listen_address = "0.0.0.0"; + http_listen_port = lokiPort; + grpc_listen_address = "127.0.0.1"; + grpc_listen_port = 9096; + }; + common = { + path_prefix = "/var/lib/loki"; + replication_factor = 1; + ring.kvstore.store = "inmemory"; + storage.filesystem = { + chunks_directory = "/var/lib/loki/chunks"; + rules_directory = "/var/lib/loki/rules"; + }; + }; + schema_config.configs = [ + { + from = "2024-01-01"; + store = "tsdb"; + object_store = "filesystem"; + schema = "v13"; + index = { + prefix = "index_"; + period = "24h"; + }; + } + ]; + limits_config = { + retention_period = "168h"; + reject_old_samples = true; + reject_old_samples_max_age = "168h"; + }; + compactor = { + working_directory = "/var/lib/loki/compactor"; + compaction_interval = "10m"; + retention_enabled = true; + retention_delete_delay = "2h"; + retention_delete_worker_count = 1; + delete_request_store = "filesystem"; + }; + }; + }; + + services.alloy = { + enable = true; + extraFlags = [ "--server.http.listen-addr=127.0.0.1:12345" ]; + }; + + environment.etc."alloy/config.alloy".text = '' + loki.write "local" { + endpoint { + url = "http://127.0.0.1:${toString lokiPort}/loki/api/v1/push" + } + } + + loki.source.journal "system" { + max_age = "12h" + labels = { + job = "journal", + host = "noisebell-do", + } + forward_to = [loki.write.local.receiver] + } + ''; + + systemd.services.alloy = { + after = [ "loki.service" ]; + wants = [ "loki.service" ]; + }; + + services.grafana = { + enable = true; + settings = { + server = { + http_addr = "0.0.0.0"; + http_port = grafanaPort; + domain = "noisebell-do"; + root_url = "http://noisebell-do:${toString grafanaPort}/"; + }; + analytics.reporting_enabled = false; + metrics.enabled = true; + security = { + secret_key = "$__file{/var/lib/grafana/secret_key}"; + disable_initial_admin_creation = true; + }; + auth.disable_login_form = true; + users.allow_sign_up = false; + "auth.anonymous" = { + enabled = true; + org_role = "Viewer"; + }; + }; + provision = { + enable = true; + datasources.settings = { + apiVersion = 1; + prune = true; + datasources = [ + { + name = "Prometheus"; + uid = "prometheus"; + type = "prometheus"; + access = "proxy"; + url = "http://127.0.0.1:${toString prometheusPort}"; + isDefault = true; + editable = false; + } + { + name = "Loki"; + uid = "loki"; + type = "loki"; + access = "proxy"; + url = "http://127.0.0.1:${toString lokiPort}"; + editable = false; + } + ]; + }; + dashboards.settings = { + apiVersion = 1; + providers = [ + { + name = "Noisebell"; + type = "file"; + allowUiUpdates = false; + options.path = dashboardDir; + } + ]; + }; + }; + }; + + systemd.services.grafana.preStart = lib.mkBefore '' + if [ ! -s /var/lib/grafana/secret_key ]; then + umask 077 + ${pkgs.coreutils}/bin/head -c 64 /dev/urandom | ${pkgs.coreutils}/bin/base64 --wrap=0 > /var/lib/grafana/secret_key + fi + ''; +} diff --git a/pi/README.md b/pi/README.md index 76b26eb..32ab007 100644 --- a/pi/README.md +++ b/pi/README.md @@ -124,8 +124,11 @@ That script: 6. writes `/etc/noisebell/noisebell.env` 7. writes `/etc/noisebell/noisebell-relay.env` 8. installs `noisebell.service` and `noisebell-relay.service` -9. enables and starts both services -10. runs `tailscale up` with the decrypted auth key +9. enables persistent journald with a 30 day retention target +10. installs and enables `prometheus-node-exporter` +11. installs `noisebell-loki-journal.service` to ship Pi logs to Loki on `noisebell-do` +12. enables and starts the Noisebell services +13. runs `tailscale up` with the decrypted auth key ## Files written on the Pi @@ -143,6 +146,9 @@ The deploy script creates: - `/etc/noisebell/noisebell-relay.env` - `/etc/systemd/system/noisebell.service` - `/etc/systemd/system/noisebell-relay.service` +- `/etc/systemd/system/noisebell-loki-journal.service` +- `/usr/local/bin/noisebell-loki-journal` +- `/etc/systemd/journald.conf.d/noisebell-persistent.conf` All secret files are root-only. @@ -275,10 +281,18 @@ Important: Home Assistant webhook IDs are exact. If the automation shows a leadi ## API -All endpoints require `Authorization: Bearer `. +`GET /` requires `Authorization: Bearer `. **`GET /`** ```json {"status": "open", "timestamp": 1710000000} ``` + +**`GET /metrics`** + +Prometheus metrics for local door state, raw GPIO level, debounced state-change counters, webhook delivery counters, last webhook result/status/duration, boot identity, uptime, temperature, throttling flags, Wi-Fi signal, and Tailscale state. This endpoint is unauthenticated and intended for Tailscale-only scraping by the DO Prometheus. + +`noisebell-relay` also exposes unauthenticated Prometheus metrics at `GET /metrics` on port `8090`, including inbound webhook count, Home Assistant forwarding counters, and last forward result/status/duration. + +Routine sampled values belong in Prometheus, not logs: GPIO level, Wi-Fi signal, temperature, uptime, Tailscale state, scrape health, and webhook counters are graphed from `/metrics`. Journald/Loki logs are intended to stay event-oriented: startup/shutdown, initial state sync, debounced door state changes, successful state deliveries, delivery retries/failures, unauthorized requests, relay forwards, and GPIO read error/recovery events. diff --git a/pi/pi-relay/src/main.rs b/pi/pi-relay/src/main.rs index 9da54eb..71d84ee 100644 --- a/pi/pi-relay/src/main.rs +++ b/pi/pi-relay/src/main.rs @@ -1,12 +1,14 @@ +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{Context, Result}; use axum::extract::State; -use axum::http::{HeaderMap, StatusCode}; +use axum::http::{header, HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use axum::{Json, Router}; -use noisebell_common::{validate_bearer, WebhookPayload}; +use noisebell_common::{validate_bearer, WebhookPayload, PROMETHEUS_CONTENT_TYPE}; use tracing::{error, info, warn}; #[derive(Clone)] @@ -17,6 +19,109 @@ struct AppState { target_secret: Option, retry_attempts: u32, retry_base_delay_secs: u64, + metrics: Arc, +} + +#[derive(Debug)] +struct RelayMetrics { + process_start_time: u64, + received_total: AtomicU64, + forwarded_total: AtomicU64, + attempt_failure_total: AtomicU64, + failed_total: AtomicU64, + last_attempt_timestamp: AtomicU64, + last_success_timestamp: AtomicU64, + last_failure_timestamp: AtomicU64, + last_duration_millis: AtomicU64, + last_http_status: AtomicU64, + last_result: AtomicU64, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum RelayResultKind { + Never = 0, + Success = 1, + HttpError = 2, + RequestError = 3, +} + +impl RelayResultKind { + const ALL: [Self; 4] = [Self::Never, Self::Success, Self::HttpError, Self::RequestError]; + + const fn as_str(self) -> &'static str { + match self { + Self::Never => "never", + Self::Success => "success", + Self::HttpError => "http_error", + Self::RequestError => "request_error", + } + } + + const fn from_code(code: u64) -> Self { + match code { + 1 => Self::Success, + 2 => Self::HttpError, + 3 => Self::RequestError, + _ => Self::Never, + } + } +} + +impl RelayMetrics { + fn new() -> Self { + Self { + process_start_time: unix_timestamp(), + received_total: AtomicU64::new(0), + forwarded_total: AtomicU64::new(0), + attempt_failure_total: AtomicU64::new(0), + failed_total: AtomicU64::new(0), + last_attempt_timestamp: AtomicU64::new(0), + last_success_timestamp: AtomicU64::new(0), + last_failure_timestamp: AtomicU64::new(0), + last_duration_millis: AtomicU64::new(0), + last_http_status: AtomicU64::new(0), + last_result: AtomicU64::new(RelayResultKind::Never as u64), + } + } + + fn record_attempt(&self, timestamp: u64) { + self.last_attempt_timestamp.store(timestamp, Ordering::Relaxed); + } + + fn record_success(&self, timestamp: u64, duration_millis: u64, status: u16) { + self.forwarded_total.fetch_add(1, Ordering::Relaxed); + self.last_success_timestamp.store(timestamp, Ordering::Relaxed); + self.last_duration_millis.store(duration_millis, Ordering::Relaxed); + self.last_http_status.store(u64::from(status), Ordering::Relaxed); + self.last_result.store(RelayResultKind::Success as u64, Ordering::Relaxed); + } + + fn record_failure( + &self, + kind: RelayResultKind, + timestamp: u64, + duration_millis: u64, + status: Option, + final_failure: bool, + ) { + self.attempt_failure_total.fetch_add(1, Ordering::Relaxed); + if final_failure { + self.failed_total.fetch_add(1, Ordering::Relaxed); + } + self.last_failure_timestamp.store(timestamp, Ordering::Relaxed); + self.last_duration_millis.store(duration_millis, Ordering::Relaxed); + self.last_http_status.store(status.map(u64::from).unwrap_or(0), Ordering::Relaxed); + self.last_result.store(kind as u64, Ordering::Relaxed); + } +} + +fn unix_timestamp() -> u64 { + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() +} + +fn duration_millis(started_at: Instant) -> u64 { + let millis = started_at.elapsed().as_millis(); + millis.try_into().unwrap_or(u64::MAX) } async fn post_webhook( @@ -25,12 +130,20 @@ async fn post_webhook( Json(payload): Json, ) -> StatusCode { if !validate_bearer(&headers, &state.inbound_api_key) { + warn!( + status = %payload.status, + timestamp = payload.timestamp, + "unauthorized relay webhook rejected" + ); return StatusCode::UNAUTHORIZED; } + state.metrics.received_total.fetch_add(1, Ordering::Relaxed); info!(status = %payload.status, timestamp = payload.timestamp, "relay received webhook"); for attempt in 0..=state.retry_attempts { + let forward_started_at = Instant::now(); + state.metrics.record_attempt(unix_timestamp()); let mut req = state.client.post(&state.target_url).json(&payload); if let Some(secret) = &state.target_secret { req = req.bearer_auth(secret); @@ -38,7 +151,17 @@ async fn post_webhook( match req.send().await { Ok(resp) if resp.status().is_success() => { - info!(status = %payload.status, "relay forwarded webhook"); + let duration_ms = duration_millis(forward_started_at); + let http_status = resp.status().as_u16(); + state.metrics.record_success(unix_timestamp(), duration_ms, http_status); + info!( + status = %payload.status, + timestamp = payload.timestamp, + http_status, + duration_ms, + attempts = attempt + 1, + "relay forwarded webhook" + ); return StatusCode::OK; } result => { @@ -46,13 +169,47 @@ async fn post_webhook( Ok(resp) => format!("HTTP {}", resp.status()), Err(err) => err.to_string(), }; + let http_status = result.as_ref().ok().map(|resp| resp.status().as_u16()); + let kind = if http_status.is_some() { + RelayResultKind::HttpError + } else { + RelayResultKind::RequestError + }; + let duration_ms = duration_millis(forward_started_at); + state.metrics.record_failure( + kind, + unix_timestamp(), + duration_ms, + http_status, + attempt == state.retry_attempts, + ); if attempt == state.retry_attempts { - error!(error = %err_msg, "relay failed to forward webhook after {} attempts", state.retry_attempts + 1); + error!( + status = %payload.status, + timestamp = payload.timestamp, + error = %err_msg, + kind = kind.as_str(), + http_status = http_status.unwrap_or(0), + duration_ms, + attempts = state.retry_attempts + 1, + "relay failed to forward webhook after retries" + ); return StatusCode::BAD_GATEWAY; } let delay = Duration::from_secs(state.retry_base_delay_secs * 2u64.pow(attempt)); - warn!(error = %err_msg, attempt = attempt + 1, "relay forward failed, retrying in {:?}", delay); + warn!( + status = %payload.status, + timestamp = payload.timestamp, + error = %err_msg, + kind = kind.as_str(), + http_status = http_status.unwrap_or(0), + duration_ms, + attempt = attempt + 1, + total_attempts = state.retry_attempts + 1, + delay_seconds = delay.as_secs(), + "relay forward failed, retrying" + ); tokio::time::sleep(delay).await; } } @@ -65,6 +222,84 @@ async fn health() -> StatusCode { StatusCode::OK } +async fn get_metrics(State(state): State>) -> Response { + let mut body = String::new(); + body.push_str("# HELP noisebell_relay_process_start_time_seconds Unix timestamp when the relay service started.\n"); + body.push_str("# TYPE noisebell_relay_process_start_time_seconds gauge\n"); + body.push_str(&format!( + "noisebell_relay_process_start_time_seconds {}\n", + state.metrics.process_start_time + )); + body.push_str( + "# HELP noisebell_relay_received_total Authenticated inbound webhooks received.\n", + ); + body.push_str("# TYPE noisebell_relay_received_total counter\n"); + body.push_str(&format!( + "noisebell_relay_received_total {}\n", + state.metrics.received_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_forwarded_total Webhooks forwarded to Home Assistant successfully.\n"); + body.push_str("# TYPE noisebell_relay_forwarded_total counter\n"); + body.push_str(&format!( + "noisebell_relay_forwarded_total {}\n", + state.metrics.forwarded_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_attempt_failure_total Failed forward attempts before retry or final failure.\n"); + body.push_str("# TYPE noisebell_relay_attempt_failure_total counter\n"); + body.push_str(&format!( + "noisebell_relay_attempt_failure_total {}\n", + state.metrics.attempt_failure_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_failed_total Webhooks that failed after all retries.\n"); + body.push_str("# TYPE noisebell_relay_failed_total counter\n"); + body.push_str(&format!( + "noisebell_relay_failed_total {}\n", + state.metrics.failed_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_last_attempt_timestamp_seconds Unix timestamp of the last Home Assistant forward attempt.\n"); + body.push_str("# TYPE noisebell_relay_last_attempt_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_relay_last_attempt_timestamp_seconds {}\n", + state.metrics.last_attempt_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_last_success_timestamp_seconds Unix timestamp of the last successful Home Assistant forward.\n"); + body.push_str("# TYPE noisebell_relay_last_success_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_relay_last_success_timestamp_seconds {}\n", + state.metrics.last_success_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_last_failure_timestamp_seconds Unix timestamp of the last failed Home Assistant forward attempt.\n"); + body.push_str("# TYPE noisebell_relay_last_failure_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_relay_last_failure_timestamp_seconds {}\n", + state.metrics.last_failure_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_relay_last_duration_seconds Duration of the most recent Home Assistant forward attempt.\n"); + body.push_str("# TYPE noisebell_relay_last_duration_seconds gauge\n"); + body.push_str(&format!( + "noisebell_relay_last_duration_seconds {}\n", + state.metrics.last_duration_millis.load(Ordering::Relaxed) as f64 / 1000.0 + )); + body.push_str("# HELP noisebell_relay_last_http_status HTTP status from the most recent Home Assistant forward attempt, or 0 when no HTTP response was received.\n"); + body.push_str("# TYPE noisebell_relay_last_http_status gauge\n"); + body.push_str(&format!( + "noisebell_relay_last_http_status {}\n", + state.metrics.last_http_status.load(Ordering::Relaxed) + )); + let last_result = RelayResultKind::from_code(state.metrics.last_result.load(Ordering::Relaxed)); + body.push_str("# HELP noisebell_relay_last_result Last Home Assistant forward result as one-hot labels.\n"); + body.push_str("# TYPE noisebell_relay_last_result gauge\n"); + for result in RelayResultKind::ALL { + let value = u8::from(result == last_result); + body.push_str(&format!( + "noisebell_relay_last_result{{result=\"{}\"}} {value}\n", + result.as_str() + )); + } + + ([(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], body).into_response() +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -122,10 +357,12 @@ async fn main() -> Result<()> { target_secret, retry_attempts, retry_base_delay_secs, + metrics: Arc::new(RelayMetrics::new()), }); let app = Router::new() .route("/health", get(health)) + .route("/metrics", get(get_metrics)) .route("/webhook", post(post_webhook)) .with_state(state); diff --git a/pi/pi-service/src/main.rs b/pi/pi-service/src/main.rs index 5d42274..f27f2dc 100644 --- a/pi/pi-service/src/main.rs +++ b/pi/pi-service/src/main.rs @@ -1,15 +1,18 @@ use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::{fs, process::Command}; use anyhow::{Context, Result}; use axum::extract::State; -use axum::http::{HeaderMap, StatusCode}; +use axum::http::{header, HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; use axum::routing::get; use axum::{Json, Router}; use gpiod::{Bias, Chip, Options}; use noisebell_common::{ - validate_bearer, DoorStatus, PiStatusResponse, SignalLevel, WebhookPayload, + prometheus_escape_label_value, validate_bearer, DoorStatus, PiStatusResponse, SignalLevel, + WebhookPayload, PROMETHEUS_CONTENT_TYPE, }; use tracing::{error, info, warn}; @@ -44,10 +47,126 @@ impl LocalDoorState { } } +#[derive(Clone, Copy)] +enum StateEventKind { + Startup, + StateChange, +} + +impl StateEventKind { + const fn as_str(self) -> &'static str { + match self { + Self::Startup => "startup", + Self::StateChange => "state_change", + } + } +} + struct AppState { door_state: AtomicU8, last_changed: AtomicU64, inbound_api_key: String, + metrics: AppMetrics, +} + +struct AppMetrics { + process_start_time: u64, + notify_success_total: AtomicU64, + notify_attempt_failure_total: AtomicU64, + notify_failure_total: AtomicU64, + notify_last_attempt_timestamp: AtomicU64, + notify_last_success_timestamp: AtomicU64, + notify_last_failure_timestamp: AtomicU64, + notify_last_duration_millis: AtomicU64, + notify_last_http_status: AtomicU64, + notify_last_result: AtomicU64, + state_change_open_total: AtomicU64, + state_change_closed_total: AtomicU64, + gpio_last_read_timestamp: AtomicU64, + gpio_raw_level: AtomicU8, + gpio_read_error_total: AtomicU64, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum NotifyResultKind { + Never = 0, + Success = 1, + HttpError = 2, + RequestError = 3, +} + +impl NotifyResultKind { + const ALL: [Self; 4] = [Self::Never, Self::Success, Self::HttpError, Self::RequestError]; + + const fn as_str(self) -> &'static str { + match self { + Self::Never => "never", + Self::Success => "success", + Self::HttpError => "http_error", + Self::RequestError => "request_error", + } + } + + const fn from_code(code: u64) -> Self { + match code { + 1 => Self::Success, + 2 => Self::HttpError, + 3 => Self::RequestError, + _ => Self::Never, + } + } +} + +impl AppMetrics { + fn new() -> Self { + Self { + process_start_time: unix_timestamp(), + notify_success_total: AtomicU64::new(0), + notify_attempt_failure_total: AtomicU64::new(0), + notify_failure_total: AtomicU64::new(0), + notify_last_attempt_timestamp: AtomicU64::new(0), + notify_last_success_timestamp: AtomicU64::new(0), + notify_last_failure_timestamp: AtomicU64::new(0), + notify_last_duration_millis: AtomicU64::new(0), + notify_last_http_status: AtomicU64::new(0), + notify_last_result: AtomicU64::new(NotifyResultKind::Never as u64), + state_change_open_total: AtomicU64::new(0), + state_change_closed_total: AtomicU64::new(0), + gpio_last_read_timestamp: AtomicU64::new(0), + gpio_raw_level: AtomicU8::new(0), + gpio_read_error_total: AtomicU64::new(0), + } + } + + fn record_notify_attempt(&self, timestamp: u64) { + self.notify_last_attempt_timestamp.store(timestamp, Ordering::Relaxed); + } + + fn record_notify_success(&self, timestamp: u64, duration_millis: u64, status: u16) { + self.notify_success_total.fetch_add(1, Ordering::Relaxed); + self.notify_last_success_timestamp.store(timestamp, Ordering::Relaxed); + self.notify_last_duration_millis.store(duration_millis, Ordering::Relaxed); + self.notify_last_http_status.store(u64::from(status), Ordering::Relaxed); + self.notify_last_result.store(NotifyResultKind::Success as u64, Ordering::Relaxed); + } + + fn record_notify_failure( + &self, + kind: NotifyResultKind, + timestamp: u64, + duration_millis: u64, + status: Option, + final_failure: bool, + ) { + self.notify_attempt_failure_total.fetch_add(1, Ordering::Relaxed); + if final_failure { + self.notify_failure_total.fetch_add(1, Ordering::Relaxed); + } + self.notify_last_failure_timestamp.store(timestamp, Ordering::Relaxed); + self.notify_last_duration_millis.store(duration_millis, Ordering::Relaxed); + self.notify_last_http_status.store(status.map(u64::from).unwrap_or(0), Ordering::Relaxed); + self.notify_last_result.store(kind as u64, Ordering::Relaxed); + } } impl AppState { @@ -60,11 +179,17 @@ fn unix_timestamp() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() } +fn duration_millis(started_at: Instant) -> u64 { + let millis = started_at.elapsed().as_millis(); + millis.try_into().unwrap_or(u64::MAX) +} + async fn get_status( State(state): State>, headers: HeaderMap, ) -> Result, StatusCode> { if !validate_bearer(&headers, &state.inbound_api_key) { + warn!("unauthorized status request rejected"); return Err(StatusCode::UNAUTHORIZED); } Ok(Json(PiStatusResponse { @@ -73,6 +198,211 @@ async fn get_status( })) } +async fn get_metrics(State(state): State>) -> Response { + let mut body = String::new(); + let current_status = state.current_door_state().as_door_status(); + + body.push_str("# HELP noisebell_pi_door_status Current local Pi door status.\n"); + body.push_str("# TYPE noisebell_pi_door_status gauge\n"); + for status in [DoorStatus::Open, DoorStatus::Closed] { + let value = u8::from(current_status == status); + let status = prometheus_escape_label_value(status.as_str()); + body.push_str(&format!("noisebell_pi_door_status{{status=\"{status}\"}} {value}\n")); + } + body.push_str("# HELP noisebell_pi_last_changed_timestamp_seconds Unix timestamp for the last local door state change.\n"); + body.push_str("# TYPE noisebell_pi_last_changed_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_last_changed_timestamp_seconds {}\n", + state.last_changed.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_process_start_time_seconds Unix timestamp when the Pi service started.\n"); + body.push_str("# TYPE noisebell_pi_process_start_time_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_process_start_time_seconds {}\n", + state.metrics.process_start_time + )); + body.push_str( + "# HELP noisebell_pi_notify_success_total Successful state webhooks sent to the cache.\n", + ); + body.push_str("# TYPE noisebell_pi_notify_success_total counter\n"); + body.push_str(&format!( + "noisebell_pi_notify_success_total {}\n", + state.metrics.notify_success_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_notify_attempt_failure_total Failed state webhook attempts before retry or final failure.\n"); + body.push_str("# TYPE noisebell_pi_notify_attempt_failure_total counter\n"); + body.push_str(&format!( + "noisebell_pi_notify_attempt_failure_total {}\n", + state.metrics.notify_attempt_failure_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_notify_failure_total State changes that failed to reach the cache after all retries.\n"); + body.push_str("# TYPE noisebell_pi_notify_failure_total counter\n"); + body.push_str(&format!( + "noisebell_pi_notify_failure_total {}\n", + state.metrics.notify_failure_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_notify_last_attempt_timestamp_seconds Unix timestamp of the last cache webhook attempt.\n"); + body.push_str("# TYPE noisebell_pi_notify_last_attempt_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_notify_last_attempt_timestamp_seconds {}\n", + state.metrics.notify_last_attempt_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_notify_last_success_timestamp_seconds Unix timestamp of the last successful cache webhook.\n"); + body.push_str("# TYPE noisebell_pi_notify_last_success_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_notify_last_success_timestamp_seconds {}\n", + state.metrics.notify_last_success_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_notify_last_failure_timestamp_seconds Unix timestamp of the last failed cache webhook attempt.\n"); + body.push_str("# TYPE noisebell_pi_notify_last_failure_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_notify_last_failure_timestamp_seconds {}\n", + state.metrics.notify_last_failure_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_notify_last_duration_seconds Duration of the most recent cache webhook attempt.\n"); + body.push_str("# TYPE noisebell_pi_notify_last_duration_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_notify_last_duration_seconds {}\n", + state.metrics.notify_last_duration_millis.load(Ordering::Relaxed) as f64 / 1000.0 + )); + body.push_str("# HELP noisebell_pi_notify_last_http_status HTTP status from the most recent cache webhook attempt, or 0 when no HTTP response was received.\n"); + body.push_str("# TYPE noisebell_pi_notify_last_http_status gauge\n"); + body.push_str(&format!( + "noisebell_pi_notify_last_http_status {}\n", + state.metrics.notify_last_http_status.load(Ordering::Relaxed) + )); + let last_notify = + NotifyResultKind::from_code(state.metrics.notify_last_result.load(Ordering::Relaxed)); + body.push_str( + "# HELP noisebell_pi_notify_last_result Last cache webhook result as one-hot labels.\n", + ); + body.push_str("# TYPE noisebell_pi_notify_last_result gauge\n"); + for result in NotifyResultKind::ALL { + let value = u8::from(result == last_notify); + body.push_str(&format!( + "noisebell_pi_notify_last_result{{result=\"{}\"}} {value}\n", + result.as_str() + )); + } + body.push_str("# HELP noisebell_pi_state_change_total Local debounced door state changes by resulting status.\n"); + body.push_str("# TYPE noisebell_pi_state_change_total counter\n"); + body.push_str(&format!( + "noisebell_pi_state_change_total{{status=\"open\"}} {}\n", + state.metrics.state_change_open_total.load(Ordering::Relaxed) + )); + body.push_str(&format!( + "noisebell_pi_state_change_total{{status=\"closed\"}} {}\n", + state.metrics.state_change_closed_total.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_gpio_raw_level Last GPIO raw signal level, 0 for low and 1 for high.\n"); + body.push_str("# TYPE noisebell_pi_gpio_raw_level gauge\n"); + body.push_str(&format!( + "noisebell_pi_gpio_raw_level {}\n", + state.metrics.gpio_raw_level.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_gpio_last_read_timestamp_seconds Unix timestamp of the last successful GPIO read.\n"); + body.push_str("# TYPE noisebell_pi_gpio_last_read_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_pi_gpio_last_read_timestamp_seconds {}\n", + state.metrics.gpio_last_read_timestamp.load(Ordering::Relaxed) + )); + body.push_str("# HELP noisebell_pi_gpio_read_error_total GPIO read errors.\n"); + body.push_str("# TYPE noisebell_pi_gpio_read_error_total counter\n"); + body.push_str(&format!( + "noisebell_pi_gpio_read_error_total {}\n", + state.metrics.gpio_read_error_total.load(Ordering::Relaxed) + )); + + if let Some(boot_id) = read_trimmed("/proc/sys/kernel/random/boot_id") { + let boot_id = prometheus_escape_label_value(&boot_id); + body.push_str("# HELP noisebell_pi_boot_info Pi boot identity. Changes on reboot.\n"); + body.push_str("# TYPE noisebell_pi_boot_info gauge\n"); + body.push_str(&format!("noisebell_pi_boot_info{{boot_id=\"{boot_id}\"}} 1\n")); + } + if let Some(uptime) = read_uptime_seconds() { + body.push_str("# HELP noisebell_pi_uptime_seconds Pi system uptime in seconds.\n"); + body.push_str("# TYPE noisebell_pi_uptime_seconds gauge\n"); + body.push_str(&format!("noisebell_pi_uptime_seconds {uptime}\n")); + } + if let Some(temp) = read_temperature_celsius() { + body.push_str("# HELP noisebell_pi_temperature_celsius Pi CPU temperature in Celsius.\n"); + body.push_str("# TYPE noisebell_pi_temperature_celsius gauge\n"); + body.push_str(&format!("noisebell_pi_temperature_celsius {temp}\n")); + } + if let Some(throttled) = read_throttled_flags() { + body.push_str("# HELP noisebell_pi_throttled_flags Raspberry Pi throttling bitfield from vcgencmd get_throttled.\n"); + body.push_str("# TYPE noisebell_pi_throttled_flags gauge\n"); + body.push_str(&format!("noisebell_pi_throttled_flags {throttled}\n")); + } + if let Some((interface, link, level)) = read_wifi_metrics() { + let interface = prometheus_escape_label_value(&interface); + body.push_str("# HELP noisebell_pi_wifi_link_quality Wireless link quality from /proc/net/wireless.\n"); + body.push_str("# TYPE noisebell_pi_wifi_link_quality gauge\n"); + body.push_str(&format!( + "noisebell_pi_wifi_link_quality{{interface=\"{interface}\"}} {link}\n" + )); + body.push_str("# HELP noisebell_pi_wifi_signal_dbm Wireless signal level in dBm from /proc/net/wireless.\n"); + body.push_str("# TYPE noisebell_pi_wifi_signal_dbm gauge\n"); + body.push_str(&format!( + "noisebell_pi_wifi_signal_dbm{{interface=\"{interface}\"}} {level}\n" + )); + } + + body.push_str("# HELP noisebell_pi_tailscale_running Whether tailscale status reports BackendState Running.\n"); + body.push_str("# TYPE noisebell_pi_tailscale_running gauge\n"); + body.push_str(&format!("noisebell_pi_tailscale_running {}\n", u8::from(tailscale_running()))); + + ([(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], body).into_response() +} + +fn read_trimmed(path: &str) -> Option { + fs::read_to_string(path) + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) +} + +fn read_uptime_seconds() -> Option { + read_trimmed("/proc/uptime")?.split_whitespace().next()?.parse().ok() +} + +fn read_temperature_celsius() -> Option { + let raw: f64 = read_trimmed("/sys/class/thermal/thermal_zone0/temp")?.parse().ok()?; + Some(raw / 1000.0) +} + +fn read_throttled_flags() -> Option { + let output = Command::new("vcgencmd").arg("get_throttled").output().ok()?; + if !output.status.success() { + return None; + } + let text = String::from_utf8_lossy(&output.stdout); + let value = text.trim().strip_prefix("throttled=0x")?; + u64::from_str_radix(value, 16).ok() +} + +fn read_wifi_metrics() -> Option<(String, f64, f64)> { + let text = read_trimmed("/proc/net/wireless")?; + for line in text.lines().skip(2) { + let (interface, values) = line.split_once(':')?; + let mut parts = values.split_whitespace(); + let _status = parts.next()?; + let link: f64 = parts.next()?.trim_end_matches('.').parse().ok()?; + let level: f64 = parts.next()?.trim_end_matches('.').parse().ok()?; + return Some((interface.trim().to_string(), link, level)); + } + None +} + +fn tailscale_running() -> bool { + let output = match Command::new("tailscale").args(["status", "--json"]).output() { + Ok(output) => output, + Err(_) => return false, + }; + output.status.success() + && String::from_utf8_lossy(&output.stdout).contains("\"BackendState\":\"Running\"") +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -151,17 +481,23 @@ async fn main() -> Result<()> { door_state: AtomicU8::new(initial_state as u8), last_changed: AtomicU64::new(now), inbound_api_key, + metrics: AppMetrics::new(), }); + state + .metrics + .gpio_raw_level + .store(u8::from(initial_raw_level == SignalLevel::High), Ordering::Relaxed); + state.metrics.gpio_last_read_timestamp.store(now, Ordering::Relaxed); info!( initial_status = %initial_state.as_door_status(), "GPIO initialized" ); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(DoorStatus, u64)>(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(DoorStatus, u64, StateEventKind)>(); // Sync initial state with the cache on startup - let _ = tx.send((initial_state.as_door_status(), now)); + let _ = tx.send((initial_state.as_door_status(), now, StateEventKind::Startup)); // Poll the input level and debounce in software. This is less elegant than // edge-triggered reads, but it is robust on Raspberry Pi OS. @@ -173,18 +509,46 @@ async fn main() -> Result<()> { let mut current_state = initial_state; let mut pending_state = current_state; let mut pending_since = std::time::Instant::now(); + let mut gpio_read_error_count = 0u64; + let mut last_gpio_error_log: Option = None; loop { let values = match inputs.get_values([false]) { Ok(values) => values, Err(e) => { - error!(error = %e, "failed to read GPIO value"); + state_for_edges.metrics.gpio_read_error_total.fetch_add(1, Ordering::Relaxed); + gpio_read_error_count = gpio_read_error_count.saturating_add(1); + let should_log = last_gpio_error_log + .map(|last| last.elapsed() >= Duration::from_secs(60)) + .unwrap_or(true); + if should_log { + error!( + error = %e, + consecutive_errors = gpio_read_error_count, + "failed to read GPIO value" + ); + last_gpio_error_log = Some(Instant::now()); + } std::thread::sleep(Duration::from_secs(1)); continue; } }; + if gpio_read_error_count > 0 { + info!(recovered_after_errors = gpio_read_error_count, "GPIO reads recovered"); + gpio_read_error_count = 0; + last_gpio_error_log = None; + } + let new_raw_level = if values[0] { SignalLevel::High } else { SignalLevel::Low }; + state_for_edges + .metrics + .gpio_raw_level + .store(u8::from(new_raw_level == SignalLevel::High), Ordering::Relaxed); + state_for_edges + .metrics + .gpio_last_read_timestamp + .store(unix_timestamp(), Ordering::Relaxed); let new_state = LocalDoorState::from_raw_level(new_raw_level, active_level); if new_state != pending_state { @@ -203,7 +567,25 @@ async fn main() -> Result<()> { let timestamp = unix_timestamp(); state_for_edges.last_changed.store(timestamp, Ordering::Relaxed); - let _ = edge_tx.send((new_state.as_door_status(), timestamp)); + match new_state { + LocalDoorState::Open => { + state_for_edges + .metrics + .state_change_open_total + .fetch_add(1, Ordering::Relaxed); + } + LocalDoorState::Closed => { + state_for_edges + .metrics + .state_change_closed_total + .fetch_add(1, Ordering::Relaxed); + } + } + let _ = edge_tx.send(( + new_state.as_door_status(), + timestamp, + StateEventKind::StateChange, + )); } std::thread::sleep(poll_interval); @@ -211,33 +593,97 @@ async fn main() -> Result<()> { }); drop(tx); // Drop original sender so rx closes when edge_handle is dropped + let state_for_notify = state.clone(); let notify_handle = tokio::spawn(async move { let client = reqwest::Client::builder() .timeout(Duration::from_secs(http_timeout_secs)) .build() .expect("failed to build HTTP client"); - while let Some((status, timestamp)) = rx.recv().await { - info!(status = %status, timestamp, "state changed"); + while let Some((status, timestamp, event_kind)) = rx.recv().await { + match event_kind { + StateEventKind::Startup => { + info!(status = %status, timestamp, event = event_kind.as_str(), "syncing initial door state"); + } + StateEventKind::StateChange => { + info!(status = %status, timestamp, event = event_kind.as_str(), "door state changed"); + } + } let payload = WebhookPayload { status, timestamp }; for attempt in 0..=retry_attempts { + let notify_started_at = Instant::now(); + state_for_notify.metrics.record_notify_attempt(unix_timestamp()); let result = client.post(&endpoint_url).bearer_auth(&api_key).json(&payload).send().await; match result { - Ok(resp) if resp.status().is_success() => break, - _ => { + Ok(resp) if resp.status().is_success() => { + let duration_ms = duration_millis(notify_started_at); + let http_status = resp.status().as_u16(); + state_for_notify.metrics.record_notify_success( + unix_timestamp(), + duration_ms, + http_status, + ); + info!( + status = %payload.status, + timestamp = payload.timestamp, + event = event_kind.as_str(), + http_status, + duration_ms, + attempts = attempt + 1, + "notified cache of door state" + ); + break; + } + result => { let err_msg = match &result { Ok(resp) => format!("HTTP {}", resp.status()), Err(e) => e.to_string(), }; + let http_status = result.as_ref().ok().map(|resp| resp.status().as_u16()); + let kind = if http_status.is_some() { + NotifyResultKind::HttpError + } else { + NotifyResultKind::RequestError + }; + let duration_ms = duration_millis(notify_started_at); + state_for_notify.metrics.record_notify_failure( + kind, + unix_timestamp(), + duration_ms, + http_status, + attempt == retry_attempts, + ); if attempt == retry_attempts { - error!(error = %err_msg, "failed to notify endpoint after {} attempts", retry_attempts + 1); + error!( + status = %payload.status, + timestamp = payload.timestamp, + event = event_kind.as_str(), + error = %err_msg, + kind = kind.as_str(), + http_status = http_status.unwrap_or(0), + duration_ms, + attempts = retry_attempts + 1, + "failed to notify cache after retries" + ); } else { let delay = Duration::from_secs(retry_base_delay_secs * 2u64.pow(attempt)); - warn!(error = %err_msg, attempt = attempt + 1, "notify failed, retrying in {:?}", delay); + warn!( + status = %payload.status, + timestamp = payload.timestamp, + event = event_kind.as_str(), + error = %err_msg, + kind = kind.as_str(), + http_status = http_status.unwrap_or(0), + duration_ms, + attempt = attempt + 1, + total_attempts = retry_attempts + 1, + delay_seconds = delay.as_secs(), + "notify cache failed, retrying" + ); tokio::time::sleep(delay).await; } } @@ -246,7 +692,10 @@ async fn main() -> Result<()> { } }); - let app = Router::new().route("/", get(get_status)).with_state(state); + let app = Router::new() + .route("/", get(get_status)) + .route("/metrics", get(get_metrics)) + .with_state(state); let listener = tokio::net::TcpListener::bind((&*bind_address, port)) .await diff --git a/remote/cache-service/Cargo.toml b/remote/cache-service/Cargo.toml index f1d481a..2f428b3 100644 --- a/remote/cache-service/Cargo.toml +++ b/remote/cache-service/Cargo.toml @@ -17,6 +17,5 @@ rusqlite = { version = "0.33", features = ["bundled"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "sync", "signal", "time"] } -tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/remote/cache-service/README.md b/remote/cache-service/README.md index b8a7ffa..358fdd7 100644 --- a/remote/cache-service/README.md +++ b/remote/cache-service/README.md @@ -12,11 +12,16 @@ If the Pi stops responding to polls (configurable threshold, default 3 misses), |--------|------|------|-------------| | `GET` | `/status` | — | Current door status (`status`, `since`, `last_checked`) | | `GET` | `/badge.svg` | — | Live README badge with Noisebridge logo | +| `GET` | `/metrics` | — | Prometheus metrics, scraped locally by the DO Prometheus | | `POST` | `/webhook` | Bearer | Inbound webhook from the Pi | | `GET` | `/health` | — | Health check | `since` is the Pi-reported time when the current state began. `last_checked` is when the cache most recently attempted a poll. +The public Caddy vhost returns `404` for `/metrics`; Prometheus scrapes the cache directly on localhost. Metrics include the configured Pi target, poll interval, offline threshold, last poll result, last HTTP status, last poll duration, last poll attempt/success/failure timestamps, and failure counters split into HTTP, timeout, connect, request-other, and parse failures. + +Regular timer-driven poll data should be debugged from Prometheus and Grafana, not by scanning logs. The cache logs sparse events instead: state changes applied from the Pi, offline/online transitions, first or changed poll failures in a failure streak, stale events, auth/rate-limit rejections, outbound webhook deliveries, retries, and final failures. Successful unchanged polls, badge/image/status reads, and metrics scrapes are intentionally quiet at `INFO`. + ## Badge `/badge.svg` serves a classic shields.io-style SVG badge with the Noisebridge logo and the current cache status (`open`, `closed`, or `offline`). diff --git a/remote/cache-service/module.nix b/remote/cache-service/module.nix index 12bcae6..c184a4d 100644 --- a/remote/cache-service/module.nix +++ b/remote/cache-service/module.nix @@ -89,6 +89,7 @@ in services.caddy.virtualHosts.${cfg.domain}.extraConfig = '' redir / https://git.extremist.software/jet/noisebell 302 + respond /metrics 404 reverse_proxy localhost:${toString cfg.port} ''; diff --git a/remote/cache-service/src/api.rs b/remote/cache-service/src/api.rs index aee065c..689ccba 100644 --- a/remote/cache-service/src/api.rs +++ b/remote/cache-service/src/api.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::Ordering; use std::sync::Arc; use axum::extract::State; @@ -6,12 +7,16 @@ use axum::response::{IntoResponse, Response}; use axum::Json; use chrono::{DateTime, Utc}; use chrono_tz::America::Los_Angeles; -use noisebell_common::{validate_bearer, CacheStatusResponse, DoorStatus, WebhookPayload}; +use noisebell_common::{ + prometheus_escape_label_value, validate_bearer, CacheStatusResponse, DoorStatus, + WebhookPayload, PROMETHEUS_CONTENT_TYPE, +}; use tokio::sync::Mutex; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::db; use crate::db::ApplyStateOutcome; +use crate::metrics::{atomic_seconds_from_millis, atomic_value, CacheMetrics, PollResultKind}; use crate::types::WebhookTarget; use crate::webhook; @@ -108,6 +113,7 @@ pub struct AppState { pub retry_base_delay_secs: u64, pub webhook_last_request: std::sync::atomic::AtomicU64, pub webhook_tokens: std::sync::atomic::AtomicU32, + pub metrics: Arc, } fn unix_now() -> u64 { @@ -186,8 +192,14 @@ pub async fn post_webhook( Json(body): Json, ) -> StatusCode { if !validate_bearer(&headers, &state.inbound_api_key) { + warn!( + status = %body.status, + timestamp = body.timestamp, + "unauthorized webhook rejected" + ); return StatusCode::UNAUTHORIZED; } + state.metrics.webhook_received_total.fetch_add(1, Ordering::Relaxed); // Simple rate limiting: reset tokens every window, reject if exhausted. let now = unix_now(); @@ -202,6 +214,14 @@ pub async fn post_webhook( |n| if n > 0 { Some(n - 1) } else { None }, ); if remaining.is_err() { + state.metrics.webhook_rate_limited_total.fetch_add(1, Ordering::Relaxed); + warn!( + status = %body.status, + timestamp = body.timestamp, + limit = WEBHOOK_RATE_LIMIT, + window_seconds = WEBHOOK_RATE_WINDOW_SECS, + "webhook rate limit exceeded" + ); return StatusCode::TOO_MANY_REQUESTS; } @@ -224,7 +244,7 @@ pub async fn post_webhook( "state updated via webhook" ); - webhook::forward( + let summary = webhook::forward( &state.client, &state.webhooks, &WebhookPayload { status, timestamp: body.timestamp }, @@ -232,16 +252,11 @@ pub async fn post_webhook( state.retry_base_delay_secs, ) .await; + state.metrics.add_outbound(summary.delivered, summary.failed); } - Ok(ApplyStateOutcome::Duplicate) => { - info!( - status = %status, - timestamp = body.timestamp, - "duplicate webhook ignored" - ); - } + Ok(ApplyStateOutcome::Duplicate) => {} Ok(ApplyStateOutcome::Stale) => { - info!( + warn!( status = %status, timestamp = body.timestamp, "stale webhook ignored" @@ -281,6 +296,231 @@ pub async fn health() -> StatusCode { StatusCode::OK } +pub async fn get_metrics(State(state): State>) -> Response { + let db = state.db.clone(); + let snapshot = match tokio::task::spawn_blocking(move || { + let conn = db.blocking_lock(); + db::get_metrics_snapshot(&conn) + }) + .await + .expect("db task panicked") + { + Ok(snapshot) => snapshot, + Err(e) => { + error!(error = %e, "failed to get metrics snapshot"); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + }; + + let mut body = String::new(); + body.push_str("# HELP noisebell_cache_status Current cached door status.\n"); + body.push_str("# TYPE noisebell_cache_status gauge\n"); + for status in DoorStatus::ALL { + let value = u8::from(snapshot.status == status); + let status = prometheus_escape_label_value(status.as_str()); + body.push_str(&format!("noisebell_cache_status{{status=\"{status}\"}} {value}\n")); + } + body.push_str("# HELP noisebell_cache_status_since_timestamp_seconds Unix timestamp for when the current cache state began.\n"); + body.push_str("# TYPE noisebell_cache_status_since_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_status_since_timestamp_seconds {}\n", + snapshot.since.unwrap_or(0) + )); + body.push_str("# HELP noisebell_cache_last_seen_timestamp_seconds Unix timestamp for the last successful Pi state update.\n"); + body.push_str("# TYPE noisebell_cache_last_seen_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_last_seen_timestamp_seconds {}\n", + snapshot.last_seen.unwrap_or(0) + )); + body.push_str("# HELP noisebell_cache_last_checked_timestamp_seconds Unix timestamp for the last Pi poll attempt.\n"); + body.push_str("# TYPE noisebell_cache_last_checked_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_last_checked_timestamp_seconds {}\n", + snapshot.last_checked.unwrap_or(0) + )); + body.push_str("# HELP noisebell_cache_process_start_time_seconds Unix timestamp when the cache service started.\n"); + body.push_str("# TYPE noisebell_cache_process_start_time_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_process_start_time_seconds {}\n", + state.metrics.process_start_time + )); + let pi_address = prometheus_escape_label_value(&state.metrics.pi_address); + body.push_str("# HELP noisebell_cache_pi_target_info Configured Pi polling target.\n"); + body.push_str("# TYPE noisebell_cache_pi_target_info gauge\n"); + body.push_str(&format!("noisebell_cache_pi_target_info{{address=\"{pi_address}\"}} 1\n")); + body.push_str("# HELP noisebell_cache_poll_interval_seconds Configured Pi poll interval.\n"); + body.push_str("# TYPE noisebell_cache_poll_interval_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_interval_seconds {}\n", + state.metrics.poll_interval_secs + )); + body.push_str("# HELP noisebell_cache_poll_offline_threshold Configured consecutive failure threshold before marking the Pi offline.\n"); + body.push_str("# TYPE noisebell_cache_poll_offline_threshold gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_offline_threshold {}\n", + state.metrics.offline_threshold + )); + body.push_str("# HELP noisebell_cache_retry_attempts Configured outbound retry attempts.\n"); + body.push_str("# TYPE noisebell_cache_retry_attempts gauge\n"); + body.push_str(&format!("noisebell_cache_retry_attempts {}\n", state.metrics.retry_attempts)); + body.push_str("# HELP noisebell_cache_http_timeout_seconds Configured HTTP timeout for cache HTTP clients.\n"); + body.push_str("# TYPE noisebell_cache_http_timeout_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_http_timeout_seconds {}\n", + state.metrics.http_timeout_secs + )); + body.push_str( + "# HELP noisebell_cache_webhook_received_total Authenticated inbound webhooks received.\n", + ); + body.push_str("# TYPE noisebell_cache_webhook_received_total counter\n"); + body.push_str(&format!( + "noisebell_cache_webhook_received_total {}\n", + atomic_value(&state.metrics.webhook_received_total) + )); + body.push_str("# HELP noisebell_cache_webhook_rate_limited_total Inbound webhooks rejected by rate limiting.\n"); + body.push_str("# TYPE noisebell_cache_webhook_rate_limited_total counter\n"); + body.push_str(&format!( + "noisebell_cache_webhook_rate_limited_total {}\n", + atomic_value(&state.metrics.webhook_rate_limited_total) + )); + body.push_str("# HELP noisebell_cache_outbound_webhook_success_total Outbound webhook deliveries that succeeded.\n"); + body.push_str("# TYPE noisebell_cache_outbound_webhook_success_total counter\n"); + body.push_str(&format!( + "noisebell_cache_outbound_webhook_success_total {}\n", + atomic_value(&state.metrics.outbound_success_total) + )); + body.push_str("# HELP noisebell_cache_outbound_webhook_failure_total Outbound webhook deliveries that failed after retries.\n"); + body.push_str("# TYPE noisebell_cache_outbound_webhook_failure_total counter\n"); + body.push_str(&format!( + "noisebell_cache_outbound_webhook_failure_total {}\n", + atomic_value(&state.metrics.outbound_failure_total) + )); + body.push_str("# HELP noisebell_cache_poll_success_total Successful Pi status polls.\n"); + body.push_str("# TYPE noisebell_cache_poll_success_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_success_total {}\n", + atomic_value(&state.metrics.poll_success_total) + )); + body.push_str("# HELP noisebell_cache_poll_attempt_total Pi status poll attempts.\n"); + body.push_str("# TYPE noisebell_cache_poll_attempt_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_attempt_total {}\n", + atomic_value(&state.metrics.poll_attempt_total) + )); + body.push_str("# HELP noisebell_cache_poll_failure_total Failed Pi status polls.\n"); + body.push_str("# TYPE noisebell_cache_poll_failure_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_failure_total {}\n", + atomic_value(&state.metrics.poll_failure_total) + )); + body.push_str("# HELP noisebell_cache_poll_http_error_total Pi poll responses with non-success HTTP status.\n"); + body.push_str("# TYPE noisebell_cache_poll_http_error_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_http_error_total {}\n", + atomic_value(&state.metrics.poll_http_error_total) + )); + body.push_str( + "# HELP noisebell_cache_poll_request_timeout_total Pi poll request timeout failures.\n", + ); + body.push_str("# TYPE noisebell_cache_poll_request_timeout_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_request_timeout_total {}\n", + atomic_value(&state.metrics.poll_request_timeout_total) + )); + body.push_str( + "# HELP noisebell_cache_poll_request_connect_total Pi poll connection failures.\n", + ); + body.push_str("# TYPE noisebell_cache_poll_request_connect_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_request_connect_total {}\n", + atomic_value(&state.metrics.poll_request_connect_total) + )); + body.push_str("# HELP noisebell_cache_poll_request_other_total Pi poll request failures that were not timeout/connect failures.\n"); + body.push_str("# TYPE noisebell_cache_poll_request_other_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_request_other_total {}\n", + atomic_value(&state.metrics.poll_request_other_total) + )); + body.push_str("# HELP noisebell_cache_poll_parse_failure_total Successful Pi HTTP responses that could not be parsed.\n"); + body.push_str("# TYPE noisebell_cache_poll_parse_failure_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_parse_failure_total {}\n", + atomic_value(&state.metrics.poll_parse_failure_total) + )); + body.push_str("# HELP noisebell_cache_poll_offline_transition_total Times the cache marked the Pi offline.\n"); + body.push_str("# TYPE noisebell_cache_poll_offline_transition_total counter\n"); + body.push_str(&format!( + "noisebell_cache_poll_offline_transition_total {}\n", + atomic_value(&state.metrics.poll_offline_transition_total) + )); + body.push_str( + "# HELP noisebell_cache_poll_consecutive_failures Current consecutive Pi poll failures.\n", + ); + body.push_str("# TYPE noisebell_cache_poll_consecutive_failures gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_consecutive_failures {}\n", + atomic_value(&state.metrics.poll_consecutive_failures) + )); + body.push_str("# HELP noisebell_cache_poll_last_attempt_timestamp_seconds Unix timestamp of the last Pi poll attempt.\n"); + body.push_str("# TYPE noisebell_cache_poll_last_attempt_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_attempt_timestamp_seconds {}\n", + atomic_value(&state.metrics.poll_last_attempt_timestamp) + )); + body.push_str("# HELP noisebell_cache_poll_last_success_timestamp_seconds Unix timestamp of the last successful Pi poll.\n"); + body.push_str("# TYPE noisebell_cache_poll_last_success_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_success_timestamp_seconds {}\n", + atomic_value(&state.metrics.poll_last_success_timestamp) + )); + body.push_str("# HELP noisebell_cache_poll_last_failure_timestamp_seconds Unix timestamp of the last failed Pi poll.\n"); + body.push_str("# TYPE noisebell_cache_poll_last_failure_timestamp_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_failure_timestamp_seconds {}\n", + atomic_value(&state.metrics.poll_last_failure_timestamp) + )); + body.push_str( + "# HELP noisebell_cache_poll_last_duration_seconds Duration of the most recent Pi poll.\n", + ); + body.push_str("# TYPE noisebell_cache_poll_last_duration_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_duration_seconds {}\n", + atomic_seconds_from_millis(&state.metrics.poll_last_duration_millis) + )); + body.push_str("# HELP noisebell_cache_poll_last_success_duration_seconds Duration of the most recent successful Pi poll.\n"); + body.push_str("# TYPE noisebell_cache_poll_last_success_duration_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_success_duration_seconds {}\n", + atomic_seconds_from_millis(&state.metrics.poll_last_success_duration_millis) + )); + body.push_str("# HELP noisebell_cache_poll_last_failure_duration_seconds Duration of the most recent failed Pi poll.\n"); + body.push_str("# TYPE noisebell_cache_poll_last_failure_duration_seconds gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_failure_duration_seconds {}\n", + atomic_seconds_from_millis(&state.metrics.poll_last_failure_duration_millis) + )); + body.push_str("# HELP noisebell_cache_poll_last_http_status HTTP status from the most recent Pi poll, or 0 when no HTTP response was received.\n"); + body.push_str("# TYPE noisebell_cache_poll_last_http_status gauge\n"); + body.push_str(&format!( + "noisebell_cache_poll_last_http_status {}\n", + atomic_value(&state.metrics.poll_last_http_status) + )); + let last_result = PollResultKind::from_code(atomic_value(&state.metrics.poll_last_result)); + body.push_str( + "# HELP noisebell_cache_poll_last_result Last Pi poll result as one-hot labels.\n", + ); + body.push_str("# TYPE noisebell_cache_poll_last_result gauge\n"); + for result in PollResultKind::ALL { + let value = u8::from(result == last_result); + body.push_str(&format!( + "noisebell_cache_poll_last_result{{result=\"{}\"}} {value}\n", + result.as_str() + )); + } + + ([(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], body).into_response() +} + pub async fn get_image_open() -> impl IntoResponse { ( [(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")], diff --git a/remote/cache-service/src/db.rs b/remote/cache-service/src/db.rs index 55b6c1c..ec501af 100644 --- a/remote/cache-service/src/db.rs +++ b/remote/cache-service/src/db.rs @@ -59,6 +59,14 @@ struct CurrentStateRow { last_checked: Option, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CacheMetricsSnapshot { + pub status: DoorStatus, + pub since: Option, + pub last_seen: Option, + pub last_checked: Option, +} + fn parse_status(status: &str, location: &str) -> Result { status.parse().with_context(|| format!("invalid door status {status:?} in {location}")) } @@ -183,6 +191,16 @@ pub fn get_status(conn: &Connection) -> Result { }) } +pub fn get_metrics_snapshot(conn: &Connection) -> Result { + let row = current_state_row(conn)?; + Ok(CacheMetricsSnapshot { + status: row.state.status_for_api(), + since: row.state.since_for_api(), + last_seen: row.last_seen, + last_checked: row.last_checked, + }) +} + fn write_state_change( conn: &Connection, status: DoorStatus, diff --git a/remote/cache-service/src/main.rs b/remote/cache-service/src/main.rs index af4e143..8e02685 100644 --- a/remote/cache-service/src/main.rs +++ b/remote/cache-service/src/main.rs @@ -6,11 +6,11 @@ use axum::routing::{get, post}; use axum::Router; use std::sync::atomic::AtomicU64; use tokio::sync::Mutex; -use tower_http::trace::TraceLayer; -use tracing::{info, Level}; +use tracing::info; mod api; mod db; +mod metrics; mod poller; mod types; mod webhook; @@ -100,6 +100,14 @@ async fn main() -> Result<()> { .build() .context("failed to build HTTP client")?; + let metrics = Arc::new(metrics::CacheMetrics::new( + pi_address.clone(), + status_poll_interval_secs, + offline_threshold, + retry_attempts, + http_timeout_secs, + )); + let poller_config = Arc::new(poller::PollerConfig { pi_address, pi_api_key, @@ -108,6 +116,7 @@ async fn main() -> Result<()> { retry_attempts, retry_base_delay_secs, webhooks: webhooks.clone(), + metrics: metrics.clone(), }); poller::spawn_status_poller(poller_config.clone(), db.clone(), client.clone()); @@ -121,22 +130,19 @@ async fn main() -> Result<()> { retry_base_delay_secs, webhook_last_request: AtomicU64::new(0), webhook_tokens: std::sync::atomic::AtomicU32::new(10), + metrics, }); let app = Router::new() .route("/health", get(api::health)) .route("/webhook", post(api::post_webhook)) .route("/status", get(api::get_status)) + .route("/metrics", get(api::get_metrics)) .route("/badge.svg", get(api::get_badge)) .route("/image", get(api::get_image)) .route("/image/open.png", get(api::get_image_open)) .route("/image/closed.png", get(api::get_image_closed)) .route("/image/offline.png", get(api::get_image_offline)) - .layer( - TraceLayer::new_for_http() - .make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO)) - .on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)), - ) .with_state(app_state); let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)) diff --git a/remote/cache-service/src/metrics.rs b/remote/cache-service/src/metrics.rs new file mode 100644 index 0000000..4235b45 --- /dev/null +++ b/remote/cache-service/src/metrics.rs @@ -0,0 +1,186 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PollResultKind { + Never = 0, + Success = 1, + HttpError = 2, + RequestTimeout = 3, + RequestConnect = 4, + RequestOther = 5, + ParseError = 6, +} + +impl PollResultKind { + pub const ALL: [Self; 7] = [ + Self::Never, + Self::Success, + Self::HttpError, + Self::RequestTimeout, + Self::RequestConnect, + Self::RequestOther, + Self::ParseError, + ]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::Never => "never", + Self::Success => "success", + Self::HttpError => "http_error", + Self::RequestTimeout => "request_timeout", + Self::RequestConnect => "request_connect", + Self::RequestOther => "request_other", + Self::ParseError => "parse_error", + } + } + + pub const fn from_code(code: u64) -> Self { + match code { + 1 => Self::Success, + 2 => Self::HttpError, + 3 => Self::RequestTimeout, + 4 => Self::RequestConnect, + 5 => Self::RequestOther, + 6 => Self::ParseError, + _ => Self::Never, + } + } +} + +#[derive(Debug)] +pub struct CacheMetrics { + pub process_start_time: u64, + pub pi_address: String, + pub poll_interval_secs: u64, + pub offline_threshold: u32, + pub retry_attempts: u32, + pub http_timeout_secs: u64, + pub webhook_received_total: AtomicU64, + pub webhook_rate_limited_total: AtomicU64, + pub outbound_success_total: AtomicU64, + pub outbound_failure_total: AtomicU64, + pub poll_attempt_total: AtomicU64, + pub poll_success_total: AtomicU64, + pub poll_failure_total: AtomicU64, + pub poll_http_error_total: AtomicU64, + pub poll_request_timeout_total: AtomicU64, + pub poll_request_connect_total: AtomicU64, + pub poll_request_other_total: AtomicU64, + pub poll_parse_failure_total: AtomicU64, + pub poll_offline_transition_total: AtomicU64, + pub poll_consecutive_failures: AtomicU64, + pub poll_last_attempt_timestamp: AtomicU64, + pub poll_last_success_timestamp: AtomicU64, + pub poll_last_failure_timestamp: AtomicU64, + pub poll_last_duration_millis: AtomicU64, + pub poll_last_success_duration_millis: AtomicU64, + pub poll_last_failure_duration_millis: AtomicU64, + pub poll_last_http_status: AtomicU64, + pub poll_last_result: AtomicU64, +} + +impl CacheMetrics { + pub fn new( + pi_address: String, + poll_interval_secs: u64, + offline_threshold: u32, + retry_attempts: u32, + http_timeout_secs: u64, + ) -> Self { + Self { + process_start_time: unix_now(), + pi_address, + poll_interval_secs, + offline_threshold, + retry_attempts, + http_timeout_secs, + webhook_received_total: AtomicU64::new(0), + webhook_rate_limited_total: AtomicU64::new(0), + outbound_success_total: AtomicU64::new(0), + outbound_failure_total: AtomicU64::new(0), + poll_attempt_total: AtomicU64::new(0), + poll_success_total: AtomicU64::new(0), + poll_failure_total: AtomicU64::new(0), + poll_http_error_total: AtomicU64::new(0), + poll_request_timeout_total: AtomicU64::new(0), + poll_request_connect_total: AtomicU64::new(0), + poll_request_other_total: AtomicU64::new(0), + poll_parse_failure_total: AtomicU64::new(0), + poll_offline_transition_total: AtomicU64::new(0), + poll_consecutive_failures: AtomicU64::new(0), + poll_last_attempt_timestamp: AtomicU64::new(0), + poll_last_success_timestamp: AtomicU64::new(0), + poll_last_failure_timestamp: AtomicU64::new(0), + poll_last_duration_millis: AtomicU64::new(0), + poll_last_success_duration_millis: AtomicU64::new(0), + poll_last_failure_duration_millis: AtomicU64::new(0), + poll_last_http_status: AtomicU64::new(0), + poll_last_result: AtomicU64::new(PollResultKind::Never as u64), + } + } + + pub fn add_outbound(&self, delivered: u64, failed: u64) { + self.outbound_success_total.fetch_add(delivered, Ordering::Relaxed); + self.outbound_failure_total.fetch_add(failed, Ordering::Relaxed); + } + + pub fn record_poll_attempt(&self, timestamp: u64) { + self.poll_attempt_total.fetch_add(1, Ordering::Relaxed); + self.poll_last_attempt_timestamp.store(timestamp, Ordering::Relaxed); + } + + pub fn record_poll_success(&self, timestamp: u64, duration_millis: u64, status: u16) { + self.poll_success_total.fetch_add(1, Ordering::Relaxed); + self.poll_last_success_timestamp.store(timestamp, Ordering::Relaxed); + self.poll_last_duration_millis.store(duration_millis, Ordering::Relaxed); + self.poll_last_success_duration_millis.store(duration_millis, Ordering::Relaxed); + self.poll_last_http_status.store(u64::from(status), Ordering::Relaxed); + self.poll_last_result.store(PollResultKind::Success as u64, Ordering::Relaxed); + } + + pub fn record_poll_failure( + &self, + kind: PollResultKind, + timestamp: u64, + duration_millis: u64, + status: Option, + ) { + self.poll_failure_total.fetch_add(1, Ordering::Relaxed); + match kind { + PollResultKind::HttpError => { + self.poll_http_error_total.fetch_add(1, Ordering::Relaxed); + } + PollResultKind::RequestTimeout => { + self.poll_request_timeout_total.fetch_add(1, Ordering::Relaxed); + } + PollResultKind::RequestConnect => { + self.poll_request_connect_total.fetch_add(1, Ordering::Relaxed); + } + PollResultKind::RequestOther => { + self.poll_request_other_total.fetch_add(1, Ordering::Relaxed); + } + PollResultKind::ParseError => { + self.poll_parse_failure_total.fetch_add(1, Ordering::Relaxed); + } + PollResultKind::Never | PollResultKind::Success => {} + } + self.poll_last_failure_timestamp.store(timestamp, Ordering::Relaxed); + self.poll_last_duration_millis.store(duration_millis, Ordering::Relaxed); + self.poll_last_failure_duration_millis.store(duration_millis, Ordering::Relaxed); + self.poll_last_http_status.store(status.map(u64::from).unwrap_or(0), Ordering::Relaxed); + self.poll_last_result.store(kind as u64, Ordering::Relaxed); + } +} + +fn unix_now() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() +} + +pub fn atomic_value(value: &AtomicU64) -> u64 { + value.load(Ordering::Relaxed) +} + +pub fn atomic_seconds_from_millis(value: &AtomicU64) -> f64 { + value.load(Ordering::Relaxed) as f64 / 1000.0 +} diff --git a/remote/cache-service/src/poller.rs b/remote/cache-service/src/poller.rs index df849ae..a329b39 100644 --- a/remote/cache-service/src/poller.rs +++ b/remote/cache-service/src/poller.rs @@ -1,5 +1,6 @@ +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use noisebell_common::{DoorStatus, PiStatusResponse, WebhookPayload}; use tokio::sync::Mutex; @@ -7,6 +8,7 @@ use tracing::{error, info, warn}; use crate::db; use crate::db::ApplyStateOutcome; +use crate::metrics::{CacheMetrics, PollResultKind}; use crate::types::WebhookTarget; use crate::webhook; @@ -18,12 +20,81 @@ pub struct PollerConfig { pub retry_attempts: u32, pub retry_base_delay_secs: u64, pub webhooks: Vec, + pub metrics: Arc, } fn unix_now() -> u64 { std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() } +fn duration_millis(started_at: Instant) -> u64 { + let millis = started_at.elapsed().as_millis(); + millis.try_into().unwrap_or(u64::MAX) +} + +fn classify_request_error(error: &reqwest::Error) -> PollResultKind { + if error.is_timeout() { + PollResultKind::RequestTimeout + } else if error.is_connect() { + PollResultKind::RequestConnect + } else { + PollResultKind::RequestOther + } +} + +fn should_log_poll_failure( + consecutive_failures: u32, + kind: PollResultKind, + status_code: Option, + last_failure_log: &mut Option<(PollResultKind, Option)>, +) -> bool { + let key = (kind, status_code); + let changed = match *last_failure_log { + Some(previous) => previous != key, + None => true, + }; + *last_failure_log = Some(key); + consecutive_failures == 1 || changed +} + +async fn mark_pi_offline( + config: &PollerConfig, + database: Arc>, + client: &reqwest::Client, + now: u64, + consecutive_failures: u32, +) { + let marked = tokio::task::spawn_blocking(move || { + let conn = database.blocking_lock(); + db::mark_offline(&conn, now) + }) + .await + .expect("db task panicked"); + + match marked { + Ok(()) => { + config.metrics.poll_offline_transition_total.fetch_add(1, Ordering::Relaxed); + warn!( + consecutive_failures, + threshold = config.offline_threshold, + "Pi marked offline after poll failures" + ); + let summary = webhook::forward( + client, + &config.webhooks, + &WebhookPayload { status: DoorStatus::Offline, timestamp: now }, + config.retry_attempts, + config.retry_base_delay_secs, + ) + .await; + config.metrics.add_outbound(summary.delivered, summary.failed); + } + Err(e) => { + error!(error = %e, "failed to mark Pi offline"); + } + } +} + pub fn spawn_status_poller( config: Arc, db: Arc>, @@ -32,10 +103,15 @@ pub fn spawn_status_poller( tokio::spawn(async move { let mut consecutive_failures: u32 = 0; let mut was_offline = false; + let mut last_failure_log: Option<(PollResultKind, Option)> = None; loop { + let poll_started_at = Instant::now(); + let poll_started_timestamp = unix_now(); + config.metrics.record_poll_attempt(poll_started_timestamp); + { - let now = unix_now(); + let now = poll_started_timestamp; let db = db.clone(); let _ = tokio::task::spawn_blocking(move || { let conn = db.blocking_lock(); @@ -53,16 +129,77 @@ pub fn spawn_status_poller( .await; match result { - Ok(resp) if resp.status().is_success() => { - consecutive_failures = 0; - if was_offline { - info!("Pi is back online"); - was_offline = false; + Ok(resp) => { + let status_code = resp.status().as_u16(); + if !resp.status().is_success() { + consecutive_failures += 1; + let kind = PollResultKind::HttpError; + config.metrics.record_poll_failure( + kind, + unix_now(), + duration_millis(poll_started_at), + Some(status_code), + ); + config + .metrics + .poll_consecutive_failures + .store(consecutive_failures.into(), Ordering::Relaxed); + + if should_log_poll_failure( + consecutive_failures, + kind, + Some(status_code), + &mut last_failure_log, + ) { + warn!( + kind = kind.as_str(), + http_status = status_code, + consecutive_failures, + "Pi status poll failed" + ); + } + + if consecutive_failures >= config.offline_threshold && !was_offline { + was_offline = true; + let now = unix_now(); + mark_pi_offline( + &config, + db.clone(), + &client, + now, + consecutive_failures, + ) + .await; + } + + tokio::time::sleep(config.status_poll_interval).await; + continue; } let now = unix_now(); match resp.json::().await { Ok(body) => { + if was_offline { + info!( + previous_consecutive_failures = consecutive_failures, + "Pi is back online" + ); + was_offline = false; + } else if consecutive_failures > 0 { + info!( + previous_consecutive_failures = consecutive_failures, + "Pi status poll recovered" + ); + } + consecutive_failures = 0; + last_failure_log = None; + config.metrics.poll_consecutive_failures.store(0, Ordering::Relaxed); + + config.metrics.record_poll_success( + now, + duration_millis(poll_started_at), + status_code, + ); let status = body.status; let event_timestamp = body.timestamp; @@ -102,7 +239,7 @@ pub fn spawn_status_poller( timestamp, "state updated from poll" ); - webhook::forward( + let summary = webhook::forward( &client, &config.webhooks, &WebhookPayload { status, timestamp }, @@ -110,14 +247,11 @@ pub fn spawn_status_poller( config.retry_base_delay_secs, ) .await; + config + .metrics + .add_outbound(summary.delivered, summary.failed); } - ApplyStateOutcome::Duplicate => { - info!( - status = %status, - timestamp, - "duplicate poll state ignored" - ); - } + ApplyStateOutcome::Duplicate => {} ApplyStateOutcome::Stale => { warn!( status = %status, @@ -129,52 +263,80 @@ pub fn spawn_status_poller( } } Err(e) => { - error!(error = %e, "failed to parse status poll response"); + consecutive_failures += 1; + let kind = PollResultKind::ParseError; + config.metrics.record_poll_failure( + kind, + unix_now(), + duration_millis(poll_started_at), + Some(status_code), + ); + config + .metrics + .poll_consecutive_failures + .store(consecutive_failures.into(), Ordering::Relaxed); + if should_log_poll_failure( + consecutive_failures, + kind, + Some(status_code), + &mut last_failure_log, + ) { + error!( + error = %e, + kind = kind.as_str(), + http_status = status_code, + consecutive_failures, + "failed to parse Pi status poll response" + ); + } + + if consecutive_failures >= config.offline_threshold && !was_offline { + was_offline = true; + let now = unix_now(); + mark_pi_offline( + &config, + db.clone(), + &client, + now, + consecutive_failures, + ) + .await; + } } } } - _ => { + Err(e) => { consecutive_failures += 1; - let err_msg = match &result { - Ok(resp) => format!("HTTP {}", resp.status()), - Err(e) => e.to_string(), - }; - warn!( - error = %err_msg, - consecutive_failures, - "status poll failed" + let kind = classify_request_error(&e); + config.metrics.record_poll_failure( + kind, + unix_now(), + duration_millis(poll_started_at), + None, ); + config + .metrics + .poll_consecutive_failures + .store(consecutive_failures.into(), Ordering::Relaxed); + if should_log_poll_failure( + consecutive_failures, + kind, + None, + &mut last_failure_log, + ) { + warn!( + error = %e, + kind = kind.as_str(), + consecutive_failures, + "Pi status poll failed" + ); + } if consecutive_failures >= config.offline_threshold && !was_offline { was_offline = true; let now = unix_now(); - let db = db.clone(); - let marked = tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - db::mark_offline(&conn, now) - }) - .await - .expect("db task panicked"); - - match marked { - Ok(()) => { - info!( - "Pi marked offline after {} consecutive failures", - consecutive_failures - ); - webhook::forward( - &client, - &config.webhooks, - &WebhookPayload { status: DoorStatus::Offline, timestamp: now }, - config.retry_attempts, - config.retry_base_delay_secs, - ) - .await; - } - Err(e) => { - error!(error = %e, "failed to mark Pi offline"); - } - } + mark_pi_offline(&config, db.clone(), &client, now, consecutive_failures) + .await; } } } diff --git a/remote/cache-service/src/webhook.rs b/remote/cache-service/src/webhook.rs index 564ff9a..14e8d75 100644 --- a/remote/cache-service/src/webhook.rs +++ b/remote/cache-service/src/webhook.rs @@ -1,17 +1,28 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use noisebell_common::WebhookPayload; use tracing::{error, info, warn}; use crate::types::{WebhookAuth, WebhookTarget}; +#[derive(Debug, Default)] +pub struct ForwardSummary { + pub delivered: u64, + pub failed: u64, +} + +fn duration_millis(started_at: Instant) -> u64 { + let millis = started_at.elapsed().as_millis(); + millis.try_into().unwrap_or(u64::MAX) +} + pub async fn forward( client: &reqwest::Client, targets: &[WebhookTarget], payload: &WebhookPayload, retry_attempts: u32, retry_base_delay_secs: u64, -) { +) -> ForwardSummary { let mut set = tokio::task::JoinSet::new(); for target in targets { @@ -21,9 +32,8 @@ pub async fn forward( let auth = target.auth.clone(); set.spawn(async move { - info!(url = %url, status = %payload.status, "forwarding to outbound webhook"); - for attempt in 0..=retry_attempts { + let attempt_started_at = Instant::now(); let mut req = client.post(&url).json(&payload); if let WebhookAuth::Bearer(secret) = &auth { req = req.bearer_auth(secret); @@ -31,30 +41,75 @@ pub async fn forward( match req.send().await { Ok(resp) if resp.status().is_success() => { - info!(url = %url, "outbound webhook delivered"); - return; + info!( + url = %url, + status = %payload.status, + timestamp = payload.timestamp, + http_status = resp.status().as_u16(), + duration_ms = duration_millis(attempt_started_at), + attempts = attempt + 1, + "outbound webhook delivered" + ); + return true; } result => { let err_msg = match &result { Ok(resp) => format!("HTTP {}", resp.status()), Err(e) => e.to_string(), }; + let http_status = result.as_ref().ok().map(|resp| resp.status().as_u16()); + let kind = + if http_status.is_some() { "http_error" } else { "request_error" }; + let duration_ms = duration_millis(attempt_started_at); if attempt == retry_attempts { - error!(url = %url, error = %err_msg, "outbound webhook failed after {} attempts", retry_attempts + 1); + error!( + url = %url, + status = %payload.status, + timestamp = payload.timestamp, + error = %err_msg, + kind, + http_status = http_status.unwrap_or(0), + duration_ms, + attempts = retry_attempts + 1, + "outbound webhook failed after retries" + ); } else { - let delay = Duration::from_secs(retry_base_delay_secs * 2u64.pow(attempt)); - warn!(url = %url, error = %err_msg, attempt = attempt + 1, "outbound webhook failed, retrying in {:?}", delay); + let delay = + Duration::from_secs(retry_base_delay_secs * 2u64.pow(attempt)); + warn!( + url = %url, + status = %payload.status, + timestamp = payload.timestamp, + error = %err_msg, + kind, + http_status = http_status.unwrap_or(0), + duration_ms, + attempt = attempt + 1, + total_attempts = retry_attempts + 1, + delay_seconds = delay.as_secs(), + "outbound webhook failed, retrying" + ); tokio::time::sleep(delay).await; } } } } + + false }); } + let mut summary = ForwardSummary::default(); while let Some(result) = set.join_next().await { - if let Err(e) = result { - error!(error = %e, "webhook task panicked"); + match result { + Ok(true) => summary.delivered += 1, + Ok(false) => summary.failed += 1, + Err(e) => { + summary.failed += 1; + error!(error = %e, "webhook task panicked"); + } } } + + summary } diff --git a/remote/discord-bot/Cargo.toml b/remote/discord-bot/Cargo.toml index 28e08bb..1e0b782 100644 --- a/remote/discord-bot/Cargo.toml +++ b/remote/discord-bot/Cargo.toml @@ -15,6 +15,5 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serenity = { version = "0.12", default-features = false, features = ["client", "gateway", "model", "rustls_backend"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "sync", "signal", "time"] } -tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/remote/discord-bot/src/main.rs b/remote/discord-bot/src/main.rs index 838148f..c2c5023 100644 --- a/remote/discord-bot/src/main.rs +++ b/remote/discord-bot/src/main.rs @@ -12,8 +12,7 @@ use serenity::all::{ CreateInteractionResponseMessage, CreateMessage, GatewayIntents, Interaction, }; use serenity::async_trait; -use tower_http::trace::TraceLayer; -use tracing::{error, info, warn, Level}; +use tracing::{error, info, warn}; struct AppState { http: Arc, @@ -66,6 +65,11 @@ async fn post_webhook( Json(body): Json, ) -> StatusCode { if !validate_bearer(&headers, &state.webhook_secret) { + warn!( + status = %body.status, + timestamp = body.timestamp, + "unauthorized Discord webhook rejected" + ); return StatusCode::UNAUTHORIZED; } @@ -226,11 +230,6 @@ async fn main() -> Result<()> { let app = Router::new() .route("/health", get(|| async { StatusCode::OK })) .route("/webhook", post(post_webhook)) - .layer( - TraceLayer::new_for_http() - .make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO)) - .on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)), - ) .with_state(app_state); let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)) diff --git a/remote/noisebell-common/src/lib.rs b/remote/noisebell-common/src/lib.rs index 21e381b..ff6464a 100644 --- a/remote/noisebell-common/src/lib.rs +++ b/remote/noisebell-common/src/lib.rs @@ -12,6 +12,21 @@ pub fn validate_bearer(headers: &HeaderMap, expected: &str) -> bool { .unwrap_or(false) } +pub fn prometheus_escape_label_value(value: &str) -> String { + let mut escaped = String::with_capacity(value.len()); + for ch in value.chars() { + match ch { + '\\' => escaped.push_str("\\\\"), + '"' => escaped.push_str("\\\""), + '\n' => escaped.push_str("\\n"), + _ => escaped.push(ch), + } + } + escaped +} + +pub const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum DoorStatus { diff --git a/remote/rss-service/Cargo.toml b/remote/rss-service/Cargo.toml index dd56d5e..eff34cf 100644 --- a/remote/rss-service/Cargo.toml +++ b/remote/rss-service/Cargo.toml @@ -10,6 +10,5 @@ chrono = "0.4" noisebell-common = { path = "../noisebell-common" } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "signal", "time"] } -tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/remote/rss-service/src/main.rs b/remote/rss-service/src/main.rs index 0c77ab0..7f2d1f5 100644 --- a/remote/rss-service/src/main.rs +++ b/remote/rss-service/src/main.rs @@ -9,8 +9,7 @@ use axum::routing::get; use axum::{Json, Router}; use chrono::{DateTime, Utc}; use noisebell_common::{CacheStatusResponse, DoorStatus}; -use tower_http::trace::TraceLayer; -use tracing::{error, info, Level}; +use tracing::{error, info}; const FEED_TTL_MINUTES: u32 = 1; const README_URL: &str = @@ -596,11 +595,6 @@ async fn main() -> Result<()> { .route("/open/", get(get_open_rss)) .route("/open/rss.xml", get(get_open_rss)) .route("/open/atom.xml", get(get_open_atom)) - .layer( - TraceLayer::new_for_http() - .make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO)) - .on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)), - ) .with_state(app_state); let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)) diff --git a/remote/zulip-bot/Cargo.toml b/remote/zulip-bot/Cargo.toml index a51e347..ce91722 100644 --- a/remote/zulip-bot/Cargo.toml +++ b/remote/zulip-bot/Cargo.toml @@ -13,6 +13,5 @@ noisebell-common = { path = "../noisebell-common" } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "signal"] } -tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/remote/zulip-bot/src/main.rs b/remote/zulip-bot/src/main.rs index 6ede2fd..67cda4e 100644 --- a/remote/zulip-bot/src/main.rs +++ b/remote/zulip-bot/src/main.rs @@ -7,8 +7,7 @@ use axum::routing::{get, post}; use axum::{Json, Router}; use noisebell_common::{validate_bearer, DoorStatus, WebhookPayload}; use serde::Serialize; -use tower_http::trace::TraceLayer; -use tracing::{error, info, Level}; +use tracing::{error, info, warn}; struct AppState { client: reqwest::Client, @@ -54,6 +53,11 @@ async fn post_webhook( Json(body): Json, ) -> StatusCode { if !validate_bearer(&headers, &state.webhook_secret) { + warn!( + status = %body.status, + timestamp = body.timestamp, + "unauthorized Zulip webhook rejected" + ); return StatusCode::UNAUTHORIZED; } @@ -131,11 +135,6 @@ async fn main() -> Result<()> { let app = Router::new() .route("/health", get(|| async { StatusCode::OK })) .route("/webhook", post(post_webhook)) - .layer( - TraceLayer::new_for_http() - .make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO)) - .on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)), - ) .with_state(Arc::new(AppState { client, webhook_secret, diff --git a/scripts/deploy-pios-pi.sh b/scripts/deploy-pios-pi.sh index 21a0d21..c10a699 100755 --- a/scripts/deploy-pios-pi.sh +++ b/scripts/deploy-pios-pi.sh @@ -70,7 +70,7 @@ ssh "${SSH_OPTS[@]}" "$TARGET_HOST" "DEPLOY_HOSTNAME='$DEPLOY_HOSTNAME' HOME_ASS set -euo pipefail sudo apt-get update -sudo apt-get install -y curl rsync avahi-daemon +sudo apt-get install -y curl jq rsync avahi-daemon prometheus-node-exporter sudo hostnamectl set-hostname "$DEPLOY_HOSTNAME" sudo tee /etc/hostname >/dev/null <<<"$DEPLOY_HOSTNAME" @@ -86,7 +86,16 @@ HOSTSEOF if ! command -v tailscale >/dev/null 2>&1; then curl -fsSL https://tailscale.com/install.sh | sh fi -sudo systemctl enable --now ssh avahi-daemon tailscaled +sudo mkdir -p /etc/systemd/journald.conf.d /var/log/journal +sudo tee /etc/systemd/journald.conf.d/noisebell-persistent.conf >/dev/null <<'JOURNALCONF' +[Journal] +Storage=persistent +SystemMaxUse=200M +MaxRetentionSec=30day +JOURNALCONF +sudo systemctl restart systemd-journald + +sudo systemctl enable --now ssh avahi-daemon tailscaled prometheus-node-exporter sudo install -m 755 "$REMOTE_TMP_DIR/noisebell" "$REMOTE_RELEASE_DIR/noisebell" sudo install -m 755 "$REMOTE_TMP_DIR/noisebell-relay" "$REMOTE_RELEASE_DIR/noisebell-relay" @@ -159,12 +168,95 @@ RestartSec=5 WantedBy=multi-user.target UNITEOF +sudo tee /usr/local/bin/noisebell-loki-journal >/dev/null <<'SCRIPTEOF' +#!/usr/bin/env bash +set -uo pipefail + +LOKI_URL=${LOKI_URL:-http://noisebell-do:3100/loki/api/v1/push} +HOST_LABEL=${HOST_LABEL:-$(hostname)} +CURSOR_DIR=/var/lib/noisebell-loki-journal +CURSOR_FILE=$CURSOR_DIR/cursor + +mkdir -p "$CURSOR_DIR" + +while true; do + args=(--output=json --no-pager --lines=100) + if [ -s "$CURSOR_FILE" ]; then + args+=(--after-cursor="$(cat "$CURSOR_FILE")") + else + args+=(--since=-5min) + fi + + saw_entry=0 + hit_error=0 + + while IFS= read -r entry; do + saw_entry=1 + cursor=$(jq -r '.__CURSOR // empty' <<<"$entry") + timestamp=$(jq -r '.__REALTIME_TIMESTAMP // empty' <<<"$entry") + if [ -n "$timestamp" ] && [ "$timestamp" != "null" ]; then + timestamp="${timestamp}000" + else + timestamp=$(date +%s%N) + fi + + unit=$(jq -r '._SYSTEMD_UNIT // .SYSLOG_IDENTIFIER // "journal"' <<<"$entry") + message=$(jq -r '.MESSAGE // .' <<<"$entry") + + payload=$(jq -cn \ + --arg host "$HOST_LABEL" \ + --arg unit "$unit" \ + --arg ts "$timestamp" \ + --arg line "$message" \ + '{streams:[{stream:{job:"journal",host:$host,unit:$unit},values:[[$ts,$line]]}]}') + + if curl -fsS --max-time 5 \ + -H 'Content-Type: application/json' \ + -X POST \ + --data "$payload" \ + "$LOKI_URL" >/dev/null 2>&1; then + if [ -n "$cursor" ]; then + printf '%s\n' "$cursor" > "$CURSOR_FILE" + fi + else + hit_error=1 + break + fi + done < <(journalctl "${args[@]}" 2>/dev/null) + + if [ "$hit_error" -eq 1 ] || [ "$saw_entry" -eq 0 ]; then + sleep 5 + fi +done +SCRIPTEOF +sudo chmod 755 /usr/local/bin/noisebell-loki-journal + +sudo tee /etc/systemd/system/noisebell-loki-journal.service >/dev/null <<'UNITEOF' +[Unit] +Description=Noisebell journal shipper to Loki +After=network-online.target tailscaled.service +Wants=network-online.target + +[Service] +Type=simple +Environment=LOKI_URL=http://noisebell-do:3100/loki/api/v1/push +Environment=HOST_LABEL=noisebell-pi +ExecStart=/usr/local/bin/noisebell-loki-journal +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +UNITEOF + sudo ln -sfn "$REMOTE_RELEASE_DIR" "$REMOTE_CURRENT_LINK" sudo systemctl daemon-reload sudo systemctl enable noisebell.service sudo systemctl enable noisebell-relay.service +sudo systemctl enable noisebell-loki-journal.service sudo systemctl restart noisebell.service sudo systemctl restart noisebell-relay.service +sudo systemctl restart noisebell-loki-journal.service sudo systemctl restart avahi-daemon sudo tailscale up --auth-key="$(sudo cat /etc/noisebell/tailscale-auth-key)" --hostname=noisebell-pi || true