Compare commits
No commits in common. "3991d25293c48d405f1a5730ba50de47930691e6" and "183b2c2c886162e343a272e01a8c0bb995e41d12" have entirely different histories.
3991d25293
...
183b2c2c88
20 changed files with 166 additions and 964 deletions
15
Cargo.lock
generated
15
Cargo.lock
generated
|
|
@ -906,21 +906,6 @@ dependencies = [
|
|||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "noisebell-rss"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"chrono",
|
||||
"noisebell-common",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.3"
|
||||
|
|
|
|||
11
Cargo.toml
11
Cargo.toml
|
|
@ -3,17 +3,6 @@ members = [
|
|||
"pi/pi-service",
|
||||
"remote/noisebell-common",
|
||||
"remote/cache-service",
|
||||
"remote/rss-service",
|
||||
"remote/discord-bot",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.lints.rust]
|
||||
unsafe_code = "forbid"
|
||||
|
||||
[workspace.lints.clippy]
|
||||
dbg_macro = "warn"
|
||||
print_stderr = "warn"
|
||||
print_stdout = "warn"
|
||||
todo = "warn"
|
||||
unimplemented = "warn"
|
||||
|
|
|
|||
|
|
@ -66,7 +66,6 @@
|
|||
);
|
||||
|
||||
noisebell-cache = buildRemoteMember "noisebell-cache";
|
||||
noisebell-rss = buildRemoteMember "noisebell-rss";
|
||||
noisebell-discord = buildRemoteMember "noisebell-discord";
|
||||
|
||||
crossPkgs = import nixpkgs {
|
||||
|
|
@ -347,7 +346,6 @@
|
|||
packages.${system} = {
|
||||
inherit
|
||||
noisebell-cache
|
||||
noisebell-rss
|
||||
noisebell-discord
|
||||
flash-pi-sd
|
||||
pi-serial
|
||||
|
|
@ -363,12 +361,10 @@
|
|||
|
||||
nixosModules = {
|
||||
cache = import ./remote/cache-service/module.nix noisebell-cache;
|
||||
rss = import ./remote/rss-service/module.nix noisebell-rss;
|
||||
discord = import ./remote/discord-bot/module.nix noisebell-discord;
|
||||
default = {
|
||||
imports = [
|
||||
(import ./remote/cache-service/module.nix noisebell-cache)
|
||||
(import ./remote/rss-service/module.nix noisebell-rss)
|
||||
(import ./remote/discord-bot/module.nix noisebell-discord)
|
||||
(import ./remote/hosted-module.nix {
|
||||
inherit self agenix;
|
||||
|
|
|
|||
|
|
@ -3,9 +3,6 @@ name = "noisebell"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
axum = "0.8"
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
|
||||
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
|
|
@ -42,6 +42,7 @@ impl LocalDoorState {
|
|||
Self::Closed => DoorStatus::Closed,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
struct AppState {
|
||||
|
|
@ -57,7 +58,10 @@ impl AppState {
|
|||
}
|
||||
|
||||
fn unix_timestamp() -> u64 {
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
async fn get_status(
|
||||
|
|
@ -120,7 +124,11 @@ async fn main() -> Result<()> {
|
|||
.unwrap_or_else(|_| "true".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_ACTIVE_LOW must be true or false")?;
|
||||
let active_level = if active_low { SignalLevel::Low } else { SignalLevel::High };
|
||||
let active_level = if active_low {
|
||||
SignalLevel::Low
|
||||
} else {
|
||||
SignalLevel::High
|
||||
};
|
||||
|
||||
let inbound_api_key = std::env::var("NOISEBELL_INBOUND_API_KEY")
|
||||
.context("NOISEBELL_INBOUND_API_KEY is required")?;
|
||||
|
|
@ -129,20 +137,33 @@ async fn main() -> Result<()> {
|
|||
|
||||
let chip = Chip::new("gpiochip0").context("failed to open gpiochip0")?;
|
||||
|
||||
let bias = if active_level == SignalLevel::Low { Bias::PullUp } else { Bias::PullDown };
|
||||
let bias = if active_level == SignalLevel::Low {
|
||||
Bias::PullUp
|
||||
} else {
|
||||
Bias::PullDown
|
||||
};
|
||||
|
||||
// Keep the line requested and poll its value. Edge-triggered reads have
|
||||
// proven unreliable on Raspberry Pi OS even though the raw line level is
|
||||
// correct, so we debounce from sampled levels instead.
|
||||
let opts = Options::input([gpio_pin]).bias(bias).consumer("noisebell");
|
||||
let inputs =
|
||||
chip.request_lines(opts).context(format!("failed to request GPIO line {gpio_pin}"))?;
|
||||
let opts = Options::input([gpio_pin])
|
||||
.bias(bias)
|
||||
.consumer("noisebell");
|
||||
let inputs = chip
|
||||
.request_lines(opts)
|
||||
.context(format!("failed to request GPIO line {gpio_pin}"))?;
|
||||
|
||||
// Read initial value
|
||||
let initial_values = inputs.get_values([false]).context("failed to read initial GPIO value")?;
|
||||
let initial_values = inputs
|
||||
.get_values([false])
|
||||
.context("failed to read initial GPIO value")?;
|
||||
// Value is true when line is active. With Active::High (default),
|
||||
// true means the physical level is high.
|
||||
let initial_raw_level = if initial_values[0] { SignalLevel::High } else { SignalLevel::Low };
|
||||
let initial_raw_level = if initial_values[0] {
|
||||
SignalLevel::High
|
||||
} else {
|
||||
SignalLevel::Low
|
||||
};
|
||||
let initial_state = LocalDoorState::from_raw_level(initial_raw_level, active_level);
|
||||
|
||||
let now = unix_timestamp();
|
||||
|
|
@ -184,7 +205,11 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
};
|
||||
|
||||
let new_raw_level = if values[0] { SignalLevel::High } else { SignalLevel::Low };
|
||||
let new_raw_level = if values[0] {
|
||||
SignalLevel::High
|
||||
} else {
|
||||
SignalLevel::Low
|
||||
};
|
||||
let new_state = LocalDoorState::from_raw_level(new_raw_level, active_level);
|
||||
|
||||
if new_state != pending_state {
|
||||
|
|
@ -193,7 +218,9 @@ async fn main() -> Result<()> {
|
|||
} else if new_state != current_state && pending_since.elapsed() >= debounce {
|
||||
current_state = new_state;
|
||||
let previous_state = LocalDoorState::from_atomic(
|
||||
state_for_edges.door_state.swap(new_state as u8, Ordering::Relaxed),
|
||||
state_for_edges
|
||||
.door_state
|
||||
.swap(new_state as u8, Ordering::Relaxed),
|
||||
);
|
||||
|
||||
if previous_state == new_state {
|
||||
|
|
@ -202,7 +229,9 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
|
||||
let timestamp = unix_timestamp();
|
||||
state_for_edges.last_changed.store(timestamp, Ordering::Relaxed);
|
||||
state_for_edges
|
||||
.last_changed
|
||||
.store(timestamp, Ordering::Relaxed);
|
||||
let _ = edge_tx.send((new_state.as_door_status(), timestamp));
|
||||
}
|
||||
|
||||
|
|
@ -223,8 +252,12 @@ async fn main() -> Result<()> {
|
|||
let payload = WebhookPayload { status, timestamp };
|
||||
|
||||
for attempt in 0..=retry_attempts {
|
||||
let result =
|
||||
client.post(&endpoint_url).bearer_auth(&api_key).json(&payload).send().await;
|
||||
let result = client
|
||||
.post(&endpoint_url)
|
||||
.bearer_auth(&api_key)
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await;
|
||||
match result {
|
||||
Ok(resp) if resp.status().is_success() => break,
|
||||
_ => {
|
||||
|
|
@ -246,7 +279,9 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
});
|
||||
|
||||
let app = Router::new().route("/", get(get_status)).with_state(state);
|
||||
let app = Router::new()
|
||||
.route("/", get(get_status))
|
||||
.with_state(state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind((&*bind_address, port))
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ Cargo workspace with the server-side pieces of Noisebell. Runs on any Linux box.
|
|||
| Service | Port | What it does |
|
||||
|---------|------|--------------|
|
||||
| [`cache-service/`](cache-service/) | 3000 | Polls the Pi, stores the latest state in SQLite, fans out webhooks |
|
||||
| [`rss-service/`](rss-service/) | 3002 | Fetches current status from cache and serves RSS/Atom feeds |
|
||||
| [`discord-bot/`](discord-bot/) | 3001 | Posts door status to a Discord channel |
|
||||
| [`noisebell-common/`](noisebell-common/) | — | Shared types and helpers |
|
||||
|
||||
|
|
@ -21,7 +20,6 @@ Or with Nix:
|
|||
|
||||
```sh
|
||||
nix build .#noisebell-cache
|
||||
nix build .#noisebell-rss
|
||||
nix build .#noisebell-discord
|
||||
```
|
||||
|
||||
|
|
@ -44,10 +42,6 @@ The flake exports a NixOS module for the hosted remote machine. It imports `agen
|
|||
domain = "cache.noisebell.example.com";
|
||||
piAddress = "http://noisebell-pi:80";
|
||||
};
|
||||
services.noisebell-rss = {
|
||||
enable = true;
|
||||
domain = "rss.noisebell.example.com";
|
||||
};
|
||||
services.noisebell-discord = {
|
||||
enable = true;
|
||||
domain = "discord.noisebell.example.com";
|
||||
|
|
|
|||
|
|
@ -3,9 +3,6 @@ name = "noisebell-cache"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
axum = "0.8"
|
||||
|
|
|
|||
|
|
@ -30,7 +30,10 @@ pub struct AppState {
|
|||
}
|
||||
|
||||
fn unix_now() -> u64 {
|
||||
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
fn format_full_timestamp(ts: u64) -> String {
|
||||
|
|
@ -40,7 +43,12 @@ fn format_full_timestamp(ts: u64) -> String {
|
|||
}
|
||||
|
||||
fn format_duration(seconds: u64) -> String {
|
||||
let units = [(86_400, "day"), (3_600, "hour"), (60, "minute"), (1, "second")];
|
||||
let units = [
|
||||
(86_400, "day"),
|
||||
(3_600, "hour"),
|
||||
(60, "minute"),
|
||||
(1, "second"),
|
||||
];
|
||||
|
||||
let mut remaining = seconds;
|
||||
let mut parts = Vec::new();
|
||||
|
|
@ -67,12 +75,7 @@ fn format_duration(seconds: u64) -> String {
|
|||
}
|
||||
}
|
||||
|
||||
fn status_summary(
|
||||
status: DoorStatus,
|
||||
since: Option<u64>,
|
||||
last_checked: Option<u64>,
|
||||
now: u64,
|
||||
) -> String {
|
||||
fn status_summary(status: DoorStatus, since: Option<u64>, last_checked: Option<u64>, now: u64) -> String {
|
||||
let since_text = since
|
||||
.map(|ts| {
|
||||
format!(
|
||||
|
|
@ -108,10 +111,16 @@ pub async fn post_webhook(
|
|||
|
||||
// Simple rate limiting: reset tokens every window, reject if exhausted.
|
||||
let now = unix_now();
|
||||
let last = state.webhook_last_request.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let last = state
|
||||
.webhook_last_request
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if now.saturating_sub(last) >= WEBHOOK_RATE_WINDOW_SECS {
|
||||
state.webhook_tokens.store(WEBHOOK_RATE_LIMIT, std::sync::atomic::Ordering::Relaxed);
|
||||
state.webhook_last_request.store(now, std::sync::atomic::Ordering::Relaxed);
|
||||
state
|
||||
.webhook_tokens
|
||||
.store(WEBHOOK_RATE_LIMIT, std::sync::atomic::Ordering::Relaxed);
|
||||
state
|
||||
.webhook_last_request
|
||||
.store(now, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
let remaining = state.webhook_tokens.fetch_update(
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
|
|
@ -144,7 +153,10 @@ pub async fn post_webhook(
|
|||
webhook::forward(
|
||||
&state.client,
|
||||
&state.webhooks,
|
||||
&WebhookPayload { status, timestamp: body.timestamp },
|
||||
&WebhookPayload {
|
||||
status,
|
||||
timestamp: body.timestamp,
|
||||
},
|
||||
state.retry_attempts,
|
||||
state.retry_base_delay_secs,
|
||||
)
|
||||
|
|
@ -188,8 +200,7 @@ pub async fn get_status(
|
|||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
status.human_readable =
|
||||
status_summary(status.status, status.since, status.last_checked, unix_now());
|
||||
status.human_readable = status_summary(status.status, status.since, status.last_checked, unix_now());
|
||||
|
||||
Ok(Json(status))
|
||||
}
|
||||
|
|
@ -200,21 +211,30 @@ pub async fn health() -> StatusCode {
|
|||
|
||||
pub async fn get_image_open() -> impl IntoResponse {
|
||||
(
|
||||
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],
|
||||
[
|
||||
(header::CONTENT_TYPE, "image/png"),
|
||||
(header::CACHE_CONTROL, "public, max-age=86400"),
|
||||
],
|
||||
OPEN_PNG,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_image_closed() -> impl IntoResponse {
|
||||
(
|
||||
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],
|
||||
[
|
||||
(header::CONTENT_TYPE, "image/png"),
|
||||
(header::CACHE_CONTROL, "public, max-age=86400"),
|
||||
],
|
||||
CLOSED_PNG,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_image_offline() -> impl IntoResponse {
|
||||
(
|
||||
[(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=86400")],
|
||||
[
|
||||
(header::CONTENT_TYPE, "image/png"),
|
||||
(header::CACHE_CONTROL, "public, max-age=86400"),
|
||||
],
|
||||
OFFLINE_PNG,
|
||||
)
|
||||
}
|
||||
|
|
@ -240,7 +260,13 @@ pub async fn get_image(State(state): State<Arc<AppState>>) -> Response {
|
|||
DoorStatus::Closed => CLOSED_PNG,
|
||||
DoorStatus::Offline => OFFLINE_PNG,
|
||||
};
|
||||
([(header::CONTENT_TYPE, "image/png"), (header::CACHE_CONTROL, "public, max-age=5")], image)
|
||||
(
|
||||
[
|
||||
(header::CONTENT_TYPE, "image/png"),
|
||||
(header::CACHE_CONTROL, "public, max-age=5"),
|
||||
],
|
||||
image,
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
use std::path::Path;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use noisebell_common::{CacheStatusResponse, DoorStatus};
|
||||
use rusqlite::{Connection, OptionalExtension};
|
||||
|
|
@ -60,7 +63,9 @@ struct CurrentStateRow {
|
|||
}
|
||||
|
||||
fn parse_status(status: &str, location: &str) -> Result<DoorStatus> {
|
||||
status.parse().with_context(|| format!("invalid door status {status:?} in {location}"))
|
||||
status
|
||||
.parse()
|
||||
.with_context(|| format!("invalid door status {status:?} in {location}"))
|
||||
}
|
||||
|
||||
pub fn init(path: &str) -> Result<Connection> {
|
||||
|
|
@ -98,12 +103,18 @@ fn current_state_has_column(conn: &Connection, column: &str) -> Result<bool> {
|
|||
|
||||
fn migrate_current_state(conn: &Connection) -> Result<()> {
|
||||
if !current_state_has_column(conn, "last_checked")? {
|
||||
conn.execute("ALTER TABLE current_state ADD COLUMN last_checked INTEGER", [])
|
||||
.context("failed to add current_state.last_checked")?;
|
||||
conn.execute(
|
||||
"ALTER TABLE current_state ADD COLUMN last_checked INTEGER",
|
||||
[],
|
||||
)
|
||||
.context("failed to add current_state.last_checked")?;
|
||||
}
|
||||
|
||||
conn.execute("UPDATE current_state SET status = 'offline' WHERE status IS NULL", [])
|
||||
.context("failed to backfill NULL current_state.status")?;
|
||||
conn.execute(
|
||||
"UPDATE current_state SET status = 'offline' WHERE status IS NULL",
|
||||
[],
|
||||
)
|
||||
.context("failed to backfill NULL current_state.status")?;
|
||||
|
||||
validate_status_values(conn)?;
|
||||
Ok(())
|
||||
|
|
@ -157,12 +168,14 @@ fn current_state_row(conn: &Connection) -> Result<CurrentStateRow> {
|
|||
let status = parse_status(&status_str, "current_state.status")?;
|
||||
|
||||
let state = match (status, since) {
|
||||
(DoorStatus::Open, Some(since)) => {
|
||||
CachedState::Live { status: LiveDoorStatus::Open, since }
|
||||
}
|
||||
(DoorStatus::Closed, Some(since)) => {
|
||||
CachedState::Live { status: LiveDoorStatus::Closed, since }
|
||||
}
|
||||
(DoorStatus::Open, Some(since)) => CachedState::Live {
|
||||
status: LiveDoorStatus::Open,
|
||||
since,
|
||||
},
|
||||
(DoorStatus::Closed, Some(since)) => CachedState::Live {
|
||||
status: LiveDoorStatus::Closed,
|
||||
since,
|
||||
},
|
||||
(DoorStatus::Offline, Some(since)) => CachedState::Offline { since },
|
||||
(DoorStatus::Offline, None) => CachedState::Unknown,
|
||||
(DoorStatus::Open | DoorStatus::Closed, None) => {
|
||||
|
|
@ -170,7 +183,11 @@ fn current_state_row(conn: &Connection) -> Result<CurrentStateRow> {
|
|||
}
|
||||
};
|
||||
|
||||
Ok(CurrentStateRow { state, last_seen, last_checked })
|
||||
Ok(CurrentStateRow {
|
||||
state,
|
||||
last_seen,
|
||||
last_checked,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_status(conn: &Connection) -> Result<CacheStatusResponse> {
|
||||
|
|
@ -211,11 +228,10 @@ pub fn apply_state(
|
|||
CachedState::Offline { since } if timestamp < since => ApplyStateOutcome::Stale,
|
||||
CachedState::Offline { .. } => ApplyStateOutcome::Applied,
|
||||
CachedState::Live { status: _, since } if timestamp < since => ApplyStateOutcome::Stale,
|
||||
CachedState::Live { status: current_status, since }
|
||||
if timestamp == since && live_status == current_status =>
|
||||
{
|
||||
ApplyStateOutcome::Duplicate
|
||||
}
|
||||
CachedState::Live {
|
||||
status: current_status,
|
||||
since,
|
||||
} if timestamp == since && live_status == current_status => ApplyStateOutcome::Duplicate,
|
||||
CachedState::Live { .. } => ApplyStateOutcome::Applied,
|
||||
};
|
||||
|
||||
|
|
@ -228,7 +244,10 @@ pub fn apply_state(
|
|||
}
|
||||
|
||||
pub fn update_last_seen(conn: &Connection, now: u64) -> Result<()> {
|
||||
conn.execute("UPDATE current_state SET last_seen = ?1 WHERE id = 1", rusqlite::params![now])?;
|
||||
conn.execute(
|
||||
"UPDATE current_state SET last_seen = ?1 WHERE id = 1",
|
||||
rusqlite::params![now],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -256,15 +275,16 @@ pub fn get_current_status(conn: &Connection) -> Result<DoorStatus> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::Path;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
fn test_db() -> Connection {
|
||||
init(":memory:").expect("failed to init test db")
|
||||
}
|
||||
|
||||
fn temp_db_path(label: &str) -> std::path::PathBuf {
|
||||
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
|
||||
let nanos = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos();
|
||||
std::env::temp_dir().join(format!("noisebell-{label}-{nanos}.sqlite"))
|
||||
}
|
||||
|
||||
|
|
@ -444,7 +464,11 @@ mod tests {
|
|||
create_legacy_db(&path);
|
||||
|
||||
let conn = Connection::open(&path).unwrap();
|
||||
conn.execute("UPDATE current_state SET status = 'mystery' WHERE id = 1", []).unwrap();
|
||||
conn.execute(
|
||||
"UPDATE current_state SET status = 'mystery' WHERE id = 1",
|
||||
[],
|
||||
)
|
||||
.unwrap();
|
||||
drop(conn);
|
||||
|
||||
let err = init(path.to_str().unwrap()).unwrap_err().to_string();
|
||||
|
|
|
|||
|
|
@ -21,7 +21,10 @@ pub struct PollerConfig {
|
|||
}
|
||||
|
||||
fn unix_now() -> u64 {
|
||||
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
pub fn spawn_status_poller(
|
||||
|
|
@ -165,7 +168,10 @@ pub fn spawn_status_poller(
|
|||
webhook::forward(
|
||||
&client,
|
||||
&config.webhooks,
|
||||
&WebhookPayload { status: DoorStatus::Offline, timestamp: now },
|
||||
&WebhookPayload {
|
||||
status: DoorStatus::Offline,
|
||||
timestamp: now,
|
||||
},
|
||||
config.retry_attempts,
|
||||
config.retry_base_delay_secs,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -3,9 +3,6 @@ name = "noisebell-discord"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
axum = "0.8"
|
||||
|
|
|
|||
|
|
@ -87,7 +87,10 @@ async fn post_webhook(
|
|||
}
|
||||
|
||||
fn unix_now() -> u64 {
|
||||
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
fn format_timestamp(ts: u64) -> String {
|
||||
|
|
@ -103,9 +106,11 @@ async fn handle_status(
|
|||
|
||||
let embed = match resp {
|
||||
Ok(resp) if resp.status().is_success() => match resp.json::<CacheStatusResponse>().await {
|
||||
Ok(data) => {
|
||||
build_embed(data.status, data.since.unwrap_or(unix_now()), &state.image_base_url)
|
||||
}
|
||||
Ok(data) => build_embed(
|
||||
data.status,
|
||||
data.since.unwrap_or(unix_now()),
|
||||
&state.image_base_url,
|
||||
),
|
||||
Err(e) => {
|
||||
error!(error = %e, "failed to parse status response");
|
||||
CreateEmbed::new()
|
||||
|
|
@ -132,8 +137,7 @@ impl serenity::all::EventHandler for Handler {
|
|||
async fn ready(&self, ctx: serenity::all::Context, ready: serenity::model::gateway::Ready) {
|
||||
info!(user = %ready.user.name, "Discord bot connected");
|
||||
|
||||
let commands =
|
||||
vec![CreateCommand::new("status").description("Show the current door status")];
|
||||
let commands = vec![CreateCommand::new("status").description("Show the current door status")];
|
||||
|
||||
if let Err(e) = serenity::all::Command::set_global_commands(&ctx.http, commands).await {
|
||||
error!(error = %e, "failed to register slash commands");
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
|
||||
let
|
||||
cfgCache = config.services.noisebell-cache;
|
||||
cfgRss = config.services.noisebell-rss;
|
||||
cfgDiscord = config.services.noisebell-discord;
|
||||
in
|
||||
{
|
||||
|
|
@ -12,7 +11,6 @@ in
|
|||
users.groups.noisebell = { };
|
||||
|
||||
users.users.noisebell-cache.extraGroups = lib.mkIf cfgCache.enable [ "noisebell" ];
|
||||
users.users.noisebell-rss.extraGroups = lib.mkIf cfgRss.enable [ "noisebell" ];
|
||||
users.users.noisebell-discord.extraGroups = lib.mkIf cfgDiscord.enable [ "noisebell" ];
|
||||
|
||||
age.secrets.noisebell-pi-to-cache-key = {
|
||||
|
|
@ -50,12 +48,6 @@ in
|
|||
);
|
||||
};
|
||||
|
||||
services.noisebell-rss = lib.mkIf cfgRss.enable (
|
||||
lib.optionalAttrs cfgCache.enable {
|
||||
cacheUrl = lib.mkDefault "http://127.0.0.1:${toString cfgCache.port}";
|
||||
}
|
||||
);
|
||||
|
||||
services.noisebell-discord = lib.mkIf cfgDiscord.enable (
|
||||
{
|
||||
discordTokenFile = lib.mkDefault config.age.secrets.noisebell-discord-token.path;
|
||||
|
|
|
|||
|
|
@ -3,9 +3,6 @@ name = "noisebell-common"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
|||
|
|
@ -148,7 +148,10 @@ mod tests {
|
|||
fn door_status_round_trips() {
|
||||
for status in DoorStatus::ALL {
|
||||
assert_eq!(status.as_str().parse::<DoorStatus>().unwrap(), status);
|
||||
assert_eq!(serde_json::to_string(&status).unwrap(), format!("\"{status}\""));
|
||||
assert_eq!(
|
||||
serde_json::to_string(&status).unwrap(),
|
||||
format!("\"{status}\"")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -160,7 +163,10 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn webhook_payload_round_trips() {
|
||||
let payload = WebhookPayload { status: DoorStatus::Open, timestamp: 1234567890 };
|
||||
let payload = WebhookPayload {
|
||||
status: DoorStatus::Open,
|
||||
timestamp: 1234567890,
|
||||
};
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
let deserialized: WebhookPayload = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(deserialized.status, DoorStatus::Open);
|
||||
|
|
|
|||
|
|
@ -1,15 +0,0 @@
|
|||
[package]
|
||||
name = "noisebell-rss"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
axum = "0.8"
|
||||
chrono = "0.4"
|
||||
noisebell-common = { path = "../noisebell-common" }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "signal", "time"] }
|
||||
tower-http = { version = "0.6", features = ["trace"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
|
@ -1,74 +0,0 @@
|
|||
# RSS Service
|
||||
|
||||
Serves public feed endpoints for Noisebell.
|
||||
|
||||
This service does not store history. On each request it fetches the current state from `cache-service` and renders that one current state as RSS or Atom.
|
||||
|
||||
## States
|
||||
|
||||
- `open`: Noisebridge is open right now.
|
||||
- `closed`: Noisebridge is closed right now.
|
||||
- `offline`: the cache cannot currently confirm the Pi state.
|
||||
|
||||
## Feed behavior
|
||||
|
||||
- Feeds are current-state only, not historical archives.
|
||||
- Each feed emits either one item or zero items.
|
||||
- The item `guid` changes when the current status event changes, so feed readers can treat it as new.
|
||||
- Filtered feeds omit the item when the current state does not match that feed.
|
||||
|
||||
## Endpoints
|
||||
|
||||
| Format | Purpose | Path |
|
||||
|--------|---------|------|
|
||||
| JSON | Current status passthrough | `/status` |
|
||||
| RSS | All states | `/all/rss.xml` |
|
||||
| Atom | All states | `/all/atom.xml` |
|
||||
| RSS | Open + closed only | `/door/rss.xml` |
|
||||
| Atom | Open + closed only | `/door/atom.xml` |
|
||||
| RSS | Open only | `/open/rss.xml` |
|
||||
| Atom | Open only | `/open/atom.xml` |
|
||||
| HTTP | Health check | `/health` |
|
||||
| HTTP | Redirect to docs | `/` |
|
||||
|
||||
Current aliases:
|
||||
|
||||
- `/rss.xml` currently serves the same feed as `/all/rss.xml`.
|
||||
- `/atom.xml` currently serves the same feed as `/all/atom.xml`.
|
||||
- `/` redirects to the repo README for this module.
|
||||
|
||||
## Feed text
|
||||
|
||||
Items use custom titles:
|
||||
|
||||
- `Noisebridge opened`
|
||||
- `Noisebridge closed`
|
||||
- `Noisebridge sensor went offline`
|
||||
|
||||
Descriptions include:
|
||||
|
||||
- the current state
|
||||
- when that state began, if known
|
||||
- when the cache last confirmed the state
|
||||
|
||||
## Caching and polling hints
|
||||
|
||||
- RSS includes `<ttl>1`, which suggests a 1-minute polling interval.
|
||||
- HTTP responses include `Cache-Control: public, max-age=60`.
|
||||
- HTTP responses also include `ETag` and `Last-Modified` so clients can revalidate cheaply.
|
||||
|
||||
## Atom vs RSS
|
||||
|
||||
RSS and Atom are both XML feed formats.
|
||||
|
||||
- RSS is older and very widely supported.
|
||||
- Atom is newer and has a more regular structure.
|
||||
- For this service they contain the same information in two different standard formats.
|
||||
|
||||
## Configuration
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `NOISEBELL_RSS_CACHE_URL` | required | Base URL of `cache-service` |
|
||||
| `NOISEBELL_RSS_PORT` | `3002` | Listen port |
|
||||
| `NOISEBELL_RSS_HTTP_TIMEOUT_SECS` | `10` | Timeout when requesting cache status |
|
||||
|
|
@ -1,75 +0,0 @@
|
|||
pkg:
|
||||
{ config, lib, ... }:
|
||||
|
||||
let
|
||||
cfg = config.services.noisebell-rss;
|
||||
bin = "${pkg}/bin/noisebell-rss";
|
||||
in
|
||||
{
|
||||
options.services.noisebell-rss = {
|
||||
enable = lib.mkEnableOption "noisebell RSS service";
|
||||
|
||||
domain = lib.mkOption {
|
||||
type = lib.types.str;
|
||||
description = "Domain for the Caddy virtual host.";
|
||||
};
|
||||
|
||||
port = lib.mkOption {
|
||||
type = lib.types.port;
|
||||
default = 3002;
|
||||
};
|
||||
|
||||
cacheUrl = lib.mkOption {
|
||||
type = lib.types.str;
|
||||
description = "URL of the cache service (e.g. http://localhost:3000).";
|
||||
};
|
||||
|
||||
httpTimeoutSecs = lib.mkOption {
|
||||
type = lib.types.ints.positive;
|
||||
default = 10;
|
||||
};
|
||||
};
|
||||
|
||||
config = lib.mkIf cfg.enable {
|
||||
users.users.noisebell-rss = {
|
||||
isSystemUser = true;
|
||||
group = "noisebell-rss";
|
||||
};
|
||||
users.groups.noisebell-rss = { };
|
||||
|
||||
services.caddy.virtualHosts.${cfg.domain}.extraConfig = ''
|
||||
reverse_proxy localhost:${toString cfg.port}
|
||||
'';
|
||||
|
||||
systemd.services.noisebell-rss = {
|
||||
description = "Noisebell RSS service";
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
after = [ "network-online.target" ];
|
||||
wants = [ "network-online.target" ];
|
||||
environment = {
|
||||
NOISEBELL_RSS_PORT = toString cfg.port;
|
||||
NOISEBELL_RSS_CACHE_URL = cfg.cacheUrl;
|
||||
NOISEBELL_RSS_HTTP_TIMEOUT_SECS = toString cfg.httpTimeoutSecs;
|
||||
RUST_LOG = "info";
|
||||
};
|
||||
script = ''
|
||||
exec ${bin}
|
||||
'';
|
||||
serviceConfig = {
|
||||
Type = "simple";
|
||||
Restart = "on-failure";
|
||||
RestartSec = 5;
|
||||
User = "noisebell-rss";
|
||||
Group = "noisebell-rss";
|
||||
NoNewPrivileges = true;
|
||||
ProtectSystem = "strict";
|
||||
ProtectHome = true;
|
||||
PrivateTmp = true;
|
||||
ProtectKernelTunables = true;
|
||||
ProtectKernelModules = true;
|
||||
ProtectControlGroups = true;
|
||||
RestrictSUIDSGID = true;
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
||||
|
|
@ -1,677 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use axum::extract::State;
|
||||
use axum::http::{header, HeaderMap, HeaderValue, StatusCode};
|
||||
use axum::response::{IntoResponse, Redirect, Response};
|
||||
use axum::routing::get;
|
||||
use axum::{Json, Router};
|
||||
use chrono::{DateTime, Utc};
|
||||
use noisebell_common::{CacheStatusResponse, DoorStatus};
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::{error, info, Level};
|
||||
|
||||
const FEED_TTL_MINUTES: u32 = 1;
|
||||
const README_URL: &str =
|
||||
"https://git.extremist.software/jet/noisebell/src/branch/main/remote/rss-service/README.md";
|
||||
|
||||
struct AppState {
|
||||
cache_url: String,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum FeedFormat {
|
||||
Rss,
|
||||
Atom,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum FeedKind {
|
||||
All,
|
||||
Door,
|
||||
Open,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FeedSpec {
|
||||
kind: FeedKind,
|
||||
format: FeedFormat,
|
||||
path: &'static str,
|
||||
}
|
||||
|
||||
struct FeedDocument {
|
||||
body: String,
|
||||
etag: String,
|
||||
last_modified: String,
|
||||
content_type: &'static str,
|
||||
}
|
||||
|
||||
fn unix_now() -> u64 {
|
||||
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
|
||||
}
|
||||
|
||||
fn format_full_timestamp(ts: u64) -> String {
|
||||
DateTime::from_timestamp(ts as i64, 0)
|
||||
.map(|dt: DateTime<Utc>| dt.format("%A, %B %-d, %Y at %-I:%M:%S %p UTC").to_string())
|
||||
.unwrap_or_else(|| format!("unix timestamp {ts}"))
|
||||
}
|
||||
|
||||
fn format_rfc2822_timestamp(ts: u64) -> String {
|
||||
DateTime::from_timestamp(ts as i64, 0)
|
||||
.map(|dt: DateTime<Utc>| dt.to_rfc2822())
|
||||
.unwrap_or_else(|| "Thu, 01 Jan 1970 00:00:00 +0000".to_string())
|
||||
}
|
||||
|
||||
fn format_rfc3339_timestamp(ts: u64) -> String {
|
||||
DateTime::from_timestamp(ts as i64, 0)
|
||||
.map(|dt: DateTime<Utc>| dt.to_rfc3339())
|
||||
.unwrap_or_else(|| "1970-01-01T00:00:00+00:00".to_string())
|
||||
}
|
||||
|
||||
fn format_duration(seconds: u64) -> String {
|
||||
let units = [(86_400, "day"), (3_600, "hour"), (60, "minute"), (1, "second")];
|
||||
|
||||
let mut remaining = seconds;
|
||||
let mut parts = Vec::new();
|
||||
|
||||
for (unit_seconds, name) in units {
|
||||
if remaining >= unit_seconds {
|
||||
let count = remaining / unit_seconds;
|
||||
remaining %= unit_seconds;
|
||||
let suffix = if count == 1 { "" } else { "s" };
|
||||
parts.push(format!("{count} {name}{suffix}"));
|
||||
}
|
||||
|
||||
if parts.len() == 2 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if parts.is_empty() {
|
||||
"0 seconds".to_string()
|
||||
} else if parts.len() == 1 {
|
||||
parts.remove(0)
|
||||
} else {
|
||||
format!("{} and {}", parts[0], parts[1])
|
||||
}
|
||||
}
|
||||
|
||||
fn xml_escape(text: &str) -> String {
|
||||
let mut escaped = String::with_capacity(text.len());
|
||||
for ch in text.chars() {
|
||||
match ch {
|
||||
'&' => escaped.push_str("&"),
|
||||
'<' => escaped.push_str("<"),
|
||||
'>' => escaped.push_str(">"),
|
||||
'"' => escaped.push_str("""),
|
||||
'\'' => escaped.push_str("'"),
|
||||
_ => escaped.push(ch),
|
||||
}
|
||||
}
|
||||
escaped
|
||||
}
|
||||
|
||||
fn header_value(headers: &HeaderMap, name: &'static str) -> Option<String> {
|
||||
headers
|
||||
.get(name)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(ToOwned::to_owned)
|
||||
}
|
||||
|
||||
fn public_base_url(headers: &HeaderMap) -> String {
|
||||
let host = header_value(headers, "x-forwarded-host")
|
||||
.or_else(|| header_value(headers, "host"))
|
||||
.unwrap_or_else(|| "localhost:3002".to_string());
|
||||
let scheme = header_value(headers, "x-forwarded-proto").unwrap_or_else(|| "http".to_string());
|
||||
format!("{scheme}://{host}")
|
||||
}
|
||||
|
||||
fn item_timestamp(status: &CacheStatusResponse) -> u64 {
|
||||
status.since.or(status.last_checked).unwrap_or(0)
|
||||
}
|
||||
|
||||
fn item_guid(status: &CacheStatusResponse) -> String {
|
||||
let ts = item_timestamp(status);
|
||||
format!("urn:noisebell:status:{}:{ts}", status.status.as_str())
|
||||
}
|
||||
|
||||
fn feed_kind_slug(kind: FeedKind) -> &'static str {
|
||||
match kind {
|
||||
FeedKind::All => "all",
|
||||
FeedKind::Door => "door",
|
||||
FeedKind::Open => "open",
|
||||
}
|
||||
}
|
||||
|
||||
fn feed_title(kind: FeedKind) -> &'static str {
|
||||
match kind {
|
||||
FeedKind::All => "Noisebell all status feed",
|
||||
FeedKind::Door => "Noisebell door state feed",
|
||||
FeedKind::Open => "Noisebell open-only feed",
|
||||
}
|
||||
}
|
||||
|
||||
fn feed_description(kind: FeedKind) -> &'static str {
|
||||
match kind {
|
||||
FeedKind::All => "Current Noisebell state including open, closed, and offline.",
|
||||
FeedKind::Door => "Current Noisebell door state, limited to open and closed.",
|
||||
FeedKind::Open => "Current Noisebell state when the space is open.",
|
||||
}
|
||||
}
|
||||
|
||||
fn feed_self_path(spec: FeedSpec) -> &'static str {
|
||||
spec.path
|
||||
}
|
||||
|
||||
fn feed_alt_path(kind: FeedKind, format: FeedFormat) -> &'static str {
|
||||
match (kind, format) {
|
||||
(FeedKind::All, FeedFormat::Rss) => "/all/rss.xml",
|
||||
(FeedKind::All, FeedFormat::Atom) => "/all/atom.xml",
|
||||
(FeedKind::Door, FeedFormat::Rss) => "/door/",
|
||||
(FeedKind::Door, FeedFormat::Atom) => "/door/atom.xml",
|
||||
(FeedKind::Open, FeedFormat::Rss) => "/open/",
|
||||
(FeedKind::Open, FeedFormat::Atom) => "/open/atom.xml",
|
||||
}
|
||||
}
|
||||
|
||||
fn matches_feed(kind: FeedKind, status: DoorStatus) -> bool {
|
||||
match kind {
|
||||
FeedKind::All => true,
|
||||
FeedKind::Door => matches!(status, DoorStatus::Open | DoorStatus::Closed),
|
||||
FeedKind::Open => status == DoorStatus::Open,
|
||||
}
|
||||
}
|
||||
|
||||
fn custom_item_title(status: DoorStatus) -> &'static str {
|
||||
match status {
|
||||
DoorStatus::Open => "Noisebridge opened",
|
||||
DoorStatus::Closed => "Noisebridge closed",
|
||||
DoorStatus::Offline => "Noisebridge sensor went offline",
|
||||
}
|
||||
}
|
||||
|
||||
fn richer_item_text(status: &CacheStatusResponse, now: u64) -> String {
|
||||
let since_sentence = status
|
||||
.since
|
||||
.map(|ts| {
|
||||
format!(
|
||||
"The current state became {} at {}, about {} ago.",
|
||||
status.status,
|
||||
format_full_timestamp(ts),
|
||||
format_duration(now.saturating_sub(ts)),
|
||||
)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
format!("The current state is {}, but the start time is unknown.", status.status)
|
||||
});
|
||||
|
||||
let checked_sentence = status
|
||||
.last_checked
|
||||
.map(|ts| {
|
||||
format!(
|
||||
"The cache last confirmed this at {}, about {} ago.",
|
||||
format_full_timestamp(ts),
|
||||
format_duration(now.saturating_sub(ts)),
|
||||
)
|
||||
})
|
||||
.unwrap_or_else(|| "The cache has no recorded last-checked time yet.".to_string());
|
||||
|
||||
let lead = match status.status {
|
||||
DoorStatus::Open => "Noisebridge is open right now.",
|
||||
DoorStatus::Closed => "Noisebridge is closed right now.",
|
||||
DoorStatus::Offline => "Noisebridge is currently offline right now.",
|
||||
};
|
||||
|
||||
format!("{lead} {since_sentence} {checked_sentence}")
|
||||
}
|
||||
|
||||
fn effective_human_readable(status: &CacheStatusResponse, now: u64) -> String {
|
||||
if status.human_readable.is_empty() {
|
||||
richer_item_text(status, now)
|
||||
} else {
|
||||
status.human_readable.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn build_rss_item(base_url: &str, status: &CacheStatusResponse, now: u64) -> String {
|
||||
let ts = item_timestamp(status);
|
||||
let pub_date = format_rfc2822_timestamp(ts);
|
||||
let link = format!("{base_url}/status");
|
||||
let guid = item_guid(status);
|
||||
let title = custom_item_title(status.status);
|
||||
let description = richer_item_text(status, now);
|
||||
|
||||
format!(
|
||||
concat!(
|
||||
" <item>\n",
|
||||
" <title>{title}</title>\n",
|
||||
" <link>{link}</link>\n",
|
||||
" <guid isPermaLink=\"false\">{guid}</guid>\n",
|
||||
" <pubDate>{pub_date}</pubDate>\n",
|
||||
" <description>{description}</description>\n",
|
||||
" </item>\n"
|
||||
),
|
||||
title = xml_escape(title),
|
||||
link = xml_escape(&link),
|
||||
guid = xml_escape(&guid),
|
||||
pub_date = xml_escape(&pub_date),
|
||||
description = xml_escape(&description),
|
||||
)
|
||||
}
|
||||
|
||||
fn build_rss_feed(
|
||||
base_url: &str,
|
||||
spec: FeedSpec,
|
||||
status: &CacheStatusResponse,
|
||||
now: u64,
|
||||
) -> FeedDocument {
|
||||
let ts = item_timestamp(status);
|
||||
let pub_date = format_rfc2822_timestamp(ts);
|
||||
let self_url = format!("{base_url}{}", feed_self_path(spec));
|
||||
let atom_alt_url = format!("{base_url}{}", feed_alt_path(spec.kind, FeedFormat::Atom));
|
||||
let item = if matches_feed(spec.kind, status.status) {
|
||||
build_rss_item(base_url, status, now)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let etag = format!("\"rss:{}:{}:{}\"", feed_kind_slug(spec.kind), status.status.as_str(), ts);
|
||||
let body = format!(
|
||||
concat!(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
|
||||
"<rss version=\"2.0\" xmlns:atom=\"http://www.w3.org/2005/Atom\">\n",
|
||||
" <channel>\n",
|
||||
" <title>{title}</title>\n",
|
||||
" <link>{self_url}</link>\n",
|
||||
" <description>{description}</description>\n",
|
||||
" <lastBuildDate>{pub_date}</lastBuildDate>\n",
|
||||
" <ttl>{ttl}</ttl>\n",
|
||||
" <atom:link href=\"{self_url_attr}\" rel=\"self\" type=\"application/rss+xml\" />\n",
|
||||
" <atom:link href=\"{atom_url_attr}\" rel=\"alternate\" type=\"application/atom+xml\" />\n",
|
||||
"{item}",
|
||||
" </channel>\n",
|
||||
"</rss>\n"
|
||||
),
|
||||
title = xml_escape(feed_title(spec.kind)),
|
||||
self_url = xml_escape(&self_url),
|
||||
description = xml_escape(feed_description(spec.kind)),
|
||||
pub_date = xml_escape(&pub_date),
|
||||
ttl = FEED_TTL_MINUTES,
|
||||
self_url_attr = xml_escape(&self_url),
|
||||
atom_url_attr = xml_escape(&atom_alt_url),
|
||||
item = item,
|
||||
);
|
||||
|
||||
FeedDocument {
|
||||
body,
|
||||
etag,
|
||||
last_modified: pub_date,
|
||||
content_type: "application/rss+xml; charset=utf-8",
|
||||
}
|
||||
}
|
||||
|
||||
fn build_atom_entry(base_url: &str, status: &CacheStatusResponse, now: u64) -> String {
|
||||
let ts = item_timestamp(status);
|
||||
let updated = format_rfc3339_timestamp(ts);
|
||||
let link = format!("{base_url}/status");
|
||||
let guid = item_guid(status);
|
||||
let title = custom_item_title(status.status);
|
||||
let description = richer_item_text(status, now);
|
||||
|
||||
format!(
|
||||
concat!(
|
||||
" <entry>\n",
|
||||
" <title>{title}</title>\n",
|
||||
" <id>{guid}</id>\n",
|
||||
" <updated>{updated}</updated>\n",
|
||||
" <link href=\"{link_attr}\" />\n",
|
||||
" <summary>{description}</summary>\n",
|
||||
" </entry>\n"
|
||||
),
|
||||
title = xml_escape(title),
|
||||
guid = xml_escape(&guid),
|
||||
updated = xml_escape(&updated),
|
||||
link_attr = xml_escape(&link),
|
||||
description = xml_escape(&description),
|
||||
)
|
||||
}
|
||||
|
||||
fn build_atom_feed(
|
||||
base_url: &str,
|
||||
spec: FeedSpec,
|
||||
status: &CacheStatusResponse,
|
||||
now: u64,
|
||||
) -> FeedDocument {
|
||||
let ts = item_timestamp(status);
|
||||
let updated = format_rfc3339_timestamp(ts);
|
||||
let self_url = format!("{base_url}{}", feed_self_path(spec));
|
||||
let rss_alt_url = format!("{base_url}{}", feed_alt_path(spec.kind, FeedFormat::Rss));
|
||||
let id = format!("urn:noisebell:feed:{}", feed_kind_slug(spec.kind));
|
||||
let entry = if matches_feed(spec.kind, status.status) {
|
||||
build_atom_entry(base_url, status, now)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let etag = format!("\"atom:{}:{}:{}\"", feed_kind_slug(spec.kind), status.status.as_str(), ts);
|
||||
let body = format!(
|
||||
concat!(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n",
|
||||
"<feed xmlns=\"http://www.w3.org/2005/Atom\">\n",
|
||||
" <title>{title}</title>\n",
|
||||
" <id>{id}</id>\n",
|
||||
" <updated>{updated}</updated>\n",
|
||||
" <link href=\"{self_url_attr}\" rel=\"self\" />\n",
|
||||
" <link href=\"{rss_url_attr}\" rel=\"alternate\" type=\"application/rss+xml\" />\n",
|
||||
" <subtitle>{description}</subtitle>\n",
|
||||
"{entry}",
|
||||
"</feed>\n"
|
||||
),
|
||||
title = xml_escape(feed_title(spec.kind)),
|
||||
id = xml_escape(&id),
|
||||
updated = xml_escape(&updated),
|
||||
self_url_attr = xml_escape(&self_url),
|
||||
rss_url_attr = xml_escape(&rss_alt_url),
|
||||
description = xml_escape(feed_description(spec.kind)),
|
||||
entry = entry,
|
||||
);
|
||||
|
||||
FeedDocument {
|
||||
body,
|
||||
etag,
|
||||
last_modified: format_rfc2822_timestamp(ts),
|
||||
content_type: "application/atom+xml; charset=utf-8",
|
||||
}
|
||||
}
|
||||
|
||||
fn build_feed_document(
|
||||
base_url: &str,
|
||||
spec: FeedSpec,
|
||||
status: &CacheStatusResponse,
|
||||
now: u64,
|
||||
) -> FeedDocument {
|
||||
match spec.format {
|
||||
FeedFormat::Rss => build_rss_feed(base_url, spec, status, now),
|
||||
FeedFormat::Atom => build_atom_feed(base_url, spec, status, now),
|
||||
}
|
||||
}
|
||||
|
||||
fn header_matches(headers: &HeaderMap, name: &'static str, expected: &str) -> bool {
|
||||
headers
|
||||
.get(name)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(str::trim)
|
||||
.map(|value| value == expected)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn response_with_cache(headers: &HeaderMap, document: FeedDocument) -> Response {
|
||||
if header_matches(headers, "if-none-match", &document.etag) {
|
||||
return (
|
||||
StatusCode::NOT_MODIFIED,
|
||||
[
|
||||
(header::ETAG, document.etag),
|
||||
(header::LAST_MODIFIED, document.last_modified),
|
||||
(header::CACHE_CONTROL, "public, max-age=60".to_string()),
|
||||
],
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
let mut response = document.body.into_response();
|
||||
response
|
||||
.headers_mut()
|
||||
.insert(header::CONTENT_TYPE, HeaderValue::from_static(document.content_type));
|
||||
response
|
||||
.headers_mut()
|
||||
.insert(header::CACHE_CONTROL, HeaderValue::from_static("public, max-age=60"));
|
||||
response
|
||||
.headers_mut()
|
||||
.insert(header::ETAG, HeaderValue::from_str(&document.etag).expect("etag is always valid"));
|
||||
response.headers_mut().insert(
|
||||
header::LAST_MODIFIED,
|
||||
HeaderValue::from_str(&document.last_modified).expect("last-modified is always valid"),
|
||||
);
|
||||
response
|
||||
}
|
||||
|
||||
async fn fetch_status(state: &AppState) -> Result<CacheStatusResponse, StatusCode> {
|
||||
let url = format!("{}/status", state.cache_url);
|
||||
let response = state.client.get(&url).send().await.map_err(|e| {
|
||||
error!(error = %e, %url, "failed to reach cache service");
|
||||
StatusCode::BAD_GATEWAY
|
||||
})?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
error!(status = %response.status(), %url, "cache service returned non-success status");
|
||||
return Err(StatusCode::BAD_GATEWAY);
|
||||
}
|
||||
|
||||
response.json::<CacheStatusResponse>().await.map_err(|e| {
|
||||
error!(error = %e, %url, "failed to parse cache status response");
|
||||
StatusCode::BAD_GATEWAY
|
||||
})
|
||||
}
|
||||
|
||||
async fn health() -> StatusCode {
|
||||
StatusCode::OK
|
||||
}
|
||||
|
||||
async fn root_redirect() -> Redirect {
|
||||
Redirect::temporary(README_URL)
|
||||
}
|
||||
|
||||
async fn get_status(
|
||||
State(state): State<Arc<AppState>>,
|
||||
) -> Result<Json<CacheStatusResponse>, StatusCode> {
|
||||
let mut status = fetch_status(&state).await?;
|
||||
status.human_readable = effective_human_readable(&status, unix_now());
|
||||
Ok(Json(status))
|
||||
}
|
||||
|
||||
async fn serve_feed(
|
||||
State(state): State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
spec: FeedSpec,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let mut status = fetch_status(&state).await?;
|
||||
let now = unix_now();
|
||||
status.human_readable = effective_human_readable(&status, now);
|
||||
let base_url = public_base_url(&headers);
|
||||
let document = build_feed_document(&base_url, spec, &status, now);
|
||||
Ok(response_with_cache(&headers, document))
|
||||
}
|
||||
|
||||
async fn get_all_rss(
|
||||
state: State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
) -> Result<Response, StatusCode> {
|
||||
serve_feed(state, headers, FeedSpec { kind: FeedKind::All, format: FeedFormat::Rss, path: "/" })
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_all_atom(
|
||||
state: State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
) -> Result<Response, StatusCode> {
|
||||
serve_feed(
|
||||
state,
|
||||
headers,
|
||||
FeedSpec { kind: FeedKind::All, format: FeedFormat::Atom, path: "/atom.xml" },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_door_rss(
|
||||
state: State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
) -> Result<Response, StatusCode> {
|
||||
serve_feed(
|
||||
state,
|
||||
headers,
|
||||
FeedSpec { kind: FeedKind::Door, format: FeedFormat::Rss, path: "/door/" },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_door_atom(
|
||||
state: State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
) -> Result<Response, StatusCode> {
|
||||
serve_feed(
|
||||
state,
|
||||
headers,
|
||||
FeedSpec { kind: FeedKind::Door, format: FeedFormat::Atom, path: "/door/atom.xml" },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_open_rss(
|
||||
state: State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
) -> Result<Response, StatusCode> {
|
||||
serve_feed(
|
||||
state,
|
||||
headers,
|
||||
FeedSpec { kind: FeedKind::Open, format: FeedFormat::Rss, path: "/open/" },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_open_atom(
|
||||
state: State<Arc<AppState>>,
|
||||
headers: HeaderMap,
|
||||
) -> Result<Response, StatusCode> {
|
||||
serve_feed(
|
||||
state,
|
||||
headers,
|
||||
FeedSpec { kind: FeedKind::Open, format: FeedFormat::Atom, path: "/open/atom.xml" },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let port: u16 = std::env::var("NOISEBELL_RSS_PORT")
|
||||
.unwrap_or_else(|_| "3002".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_RSS_PORT must be a valid u16")?;
|
||||
|
||||
let cache_url = std::env::var("NOISEBELL_RSS_CACHE_URL")
|
||||
.context("NOISEBELL_RSS_CACHE_URL is required")?
|
||||
.trim_end_matches('/')
|
||||
.to_string();
|
||||
|
||||
let http_timeout_secs: u64 = std::env::var("NOISEBELL_RSS_HTTP_TIMEOUT_SECS")
|
||||
.unwrap_or_else(|_| "10".into())
|
||||
.parse()
|
||||
.context("NOISEBELL_RSS_HTTP_TIMEOUT_SECS must be a valid u64")?;
|
||||
|
||||
info!(port, %cache_url, ttl_minutes = FEED_TTL_MINUTES, "starting noisebell-rss");
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(http_timeout_secs))
|
||||
.build()
|
||||
.context("failed to build HTTP client")?;
|
||||
|
||||
let app_state = Arc::new(AppState { cache_url, client });
|
||||
|
||||
let app = Router::new()
|
||||
.route("/health", get(health))
|
||||
.route("/status", get(get_status))
|
||||
.route("/", get(root_redirect))
|
||||
.route("/rss.xml", get(get_all_rss))
|
||||
.route("/atom.xml", get(get_all_atom))
|
||||
.route("/all/rss.xml", get(get_all_rss))
|
||||
.route("/all/atom.xml", get(get_all_atom))
|
||||
.route("/door/", get(get_door_rss))
|
||||
.route("/door/rss.xml", get(get_door_rss))
|
||||
.route("/door/atom.xml", get(get_door_atom))
|
||||
.route("/open/", get(get_open_rss))
|
||||
.route("/open/rss.xml", get(get_open_rss))
|
||||
.route("/open/atom.xml", get(get_open_atom))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(tower_http::trace::DefaultMakeSpan::new().level(Level::INFO))
|
||||
.on_response(tower_http::trace::DefaultOnResponse::new().level(Level::INFO)),
|
||||
)
|
||||
.with_state(app_state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port))
|
||||
.await
|
||||
.context(format!("failed to bind to 0.0.0.0:{port}"))?;
|
||||
|
||||
info!(port, "listening");
|
||||
|
||||
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
.context("failed to register SIGTERM handler")?;
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async move {
|
||||
sigterm.recv().await;
|
||||
})
|
||||
.await
|
||||
.context("server error")?;
|
||||
|
||||
info!("shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn sample_status(status: DoorStatus) -> CacheStatusResponse {
|
||||
CacheStatusResponse {
|
||||
status,
|
||||
since: Some(1_700_000_000),
|
||||
last_checked: Some(1_700_000_120),
|
||||
human_readable: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_feed_omits_closed_item() {
|
||||
let feed = build_rss_feed(
|
||||
"https://rss.example.com",
|
||||
FeedSpec { kind: FeedKind::Open, format: FeedFormat::Rss, path: "/open/" },
|
||||
&sample_status(DoorStatus::Closed),
|
||||
1_700_000_200,
|
||||
);
|
||||
|
||||
assert_eq!(feed.body.matches("<item>").count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_rss_feed_contains_custom_title() {
|
||||
let feed = build_rss_feed(
|
||||
"https://rss.example.com",
|
||||
FeedSpec { kind: FeedKind::All, format: FeedFormat::Rss, path: "/" },
|
||||
&sample_status(DoorStatus::Open),
|
||||
1_700_000_200,
|
||||
);
|
||||
|
||||
assert!(feed.body.contains("<title>Noisebridge opened</title>"));
|
||||
assert!(feed.body.contains("<ttl>1</ttl>"));
|
||||
assert!(feed.body.contains("/atom.xml"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn atom_feed_contains_entry() {
|
||||
let feed = build_atom_feed(
|
||||
"https://rss.example.com",
|
||||
FeedSpec { kind: FeedKind::Door, format: FeedFormat::Atom, path: "/door/atom.xml" },
|
||||
&sample_status(DoorStatus::Closed),
|
||||
1_700_000_200,
|
||||
);
|
||||
|
||||
assert!(feed.body.contains("<feed xmlns=\"http://www.w3.org/2005/Atom\">"));
|
||||
assert!(feed.body.contains("<entry>"));
|
||||
assert!(feed.body.contains("Noisebridge closed"));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,2 +0,0 @@
|
|||
max_width = 100
|
||||
use_small_heuristics = "Max"
|
||||
Loading…
Add table
Add a link
Reference in a new issue