feat: add project and email service

This commit is contained in:
Jet Pham 2026-03-11 13:00:51 -07:00 committed by Jet
parent 99715f6105
commit f48390b15e
29 changed files with 2631 additions and 63 deletions

61
api/src/email.rs Normal file
View file

@ -0,0 +1,61 @@
use lettre::message::Mailbox;
use lettre::transport::smtp::client::Tls;
use lettre::{Message, SmtpTransport, Transport};
pub fn send_notification(
id: i64,
question: &str,
notify_email: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let truncated = if question.len() > 50 {
format!("{}...", &question[..50])
} else {
question.to_string()
};
let from: Mailbox = "Q&A <qa@extremist.software>".parse()?;
let reply_to: Mailbox = format!("qa+{id}@extremist.software").parse()?;
let to: Mailbox = notify_email.parse()?;
let email = Message::builder()
.from(from)
.reply_to(reply_to)
.to(to)
.subject(format!("Q&A #{id}: {truncated}"))
.body(question.to_string())?;
let mailer = SmtpTransport::builder_dangerous("localhost")
.tls(Tls::None)
.build();
mailer.send(&email)?;
Ok(())
}
pub fn strip_quoted_text(body: &str) -> String {
let mut result = Vec::new();
for line in body.lines() {
if line.starts_with('>') {
continue;
}
if line.starts_with("On ") && line.ends_with("wrote:") {
break;
}
result.push(line);
}
result.join("\n").trim().to_string()
}
pub fn extract_id_from_address(to: &str) -> Result<i64, Box<dyn std::error::Error>> {
let addr = to.trim();
let addr = if let Some(start) = addr.find('<') {
&addr[start + 1..addr.find('>').unwrap_or(addr.len())]
} else {
addr
};
let local = addr.split('@').next().unwrap_or("");
let id_str = local
.strip_prefix("qa+")
.ok_or("No qa+ prefix in address")?;
Ok(id_str.parse()?)
}

184
api/src/handlers.rs Normal file
View file

@ -0,0 +1,184 @@
use std::sync::Arc;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::Json;
use serde::{Deserialize, Serialize};
use crate::email;
use crate::serve::AppState;
#[derive(Serialize)]
pub struct Question {
id: i64,
question: String,
answer: String,
created_at: String,
answered_at: String,
}
pub async fn get_questions(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<Question>>, StatusCode> {
let db = state.db.lock().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut stmt = db
.prepare(
"SELECT id, question, answer, created_at, answered_at \
FROM questions WHERE answer IS NOT NULL \
ORDER BY answered_at DESC",
)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let questions = stmt
.query_map([], |row| {
Ok(Question {
id: row.get(0)?,
question: row.get(1)?,
answer: row.get(2)?,
created_at: row.get(3)?,
answered_at: row.get(4)?,
})
})
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.collect::<Result<Vec<_>, _>>()
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(questions))
}
#[derive(Deserialize)]
pub struct SubmitQuestion {
question: String,
}
pub async fn post_question(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(body): Json<SubmitQuestion>,
) -> Result<StatusCode, (StatusCode, String)> {
if body.question.is_empty() || body.question.len() > 200 {
return Err((
StatusCode::BAD_REQUEST,
"Question must be 1-200 characters".to_string(),
));
}
let ip = headers
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.split(',').next())
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "unknown".to_string());
if !state.rate_limiter.check(&ip) {
return Err((
StatusCode::TOO_MANY_REQUESTS,
"Too many questions. Try again later.".to_string(),
));
}
let id: i64 = {
let db = state
.db
.lock()
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "db error".to_string()))?;
db.execute(
"INSERT INTO questions (question) VALUES (?1)",
rusqlite::params![body.question],
)
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "insert error".to_string()))?;
db.last_insert_rowid()
};
let notify_email = state.notify_email.clone();
let question_text = body.question.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = email::send_notification(id, &question_text, &notify_email) {
eprintln!("Failed to send notification: {e}");
}
});
Ok(StatusCode::CREATED)
}
// --- MTA Hook webhook types ---
#[derive(Deserialize)]
pub struct MtaHookPayload {
#[serde(default)]
pub messages: Vec<MtaHookMessage>,
}
#[derive(Deserialize)]
pub struct MtaHookMessage {
#[serde(default)]
pub envelope: Envelope,
#[serde(default)]
pub contents: String,
}
#[derive(Deserialize, Default)]
pub struct Envelope {
#[serde(default)]
pub to: Vec<String>,
}
#[derive(Serialize)]
pub struct MtaHookResponse {
pub action: &'static str,
}
pub async fn webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Json(payload): Json<MtaHookPayload>,
) -> Result<Json<MtaHookResponse>, (StatusCode, String)> {
// Verify webhook secret
let secret = headers
.get("X-Webhook-Secret")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if secret != state.webhook_secret {
return Err((StatusCode::UNAUTHORIZED, "invalid secret".to_string()));
}
for message in &payload.messages {
// Find a qa+<id> recipient
let qa_recipient = message.envelope.to.iter().find(|addr| {
let local = addr.split('@').next().unwrap_or("");
local.starts_with("qa+")
});
let recipient = match qa_recipient {
Some(r) => r,
None => continue, // not a Q&A reply, skip
};
let id = match email::extract_id_from_address(recipient) {
Ok(id) => id,
Err(_) => continue,
};
let body = email::strip_quoted_text(&message.contents);
if body.is_empty() {
continue;
}
let db = state
.db
.lock()
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "db error".to_string()))?;
db.execute(
"UPDATE questions SET answer = ?1, answered_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
WHERE id = ?2 AND answer IS NULL",
rusqlite::params![body, id],
)
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "update error".to_string()))?;
return Ok(Json(MtaHookResponse { action: "discard" }));
}
// No Q&A recipient matched — let Stalwart deliver normally
Ok(Json(MtaHookResponse { action: "accept" }))
}

9
api/src/main.rs Normal file
View file

@ -0,0 +1,9 @@
mod email;
mod handlers;
mod rate_limit;
mod serve;
#[tokio::main]
async fn main() {
serve::run().await.expect("server error");
}

35
api/src/rate_limit.rs Normal file
View file

@ -0,0 +1,35 @@
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Instant;
pub struct RateLimiter {
max_requests: u32,
window_secs: u64,
clients: Mutex<HashMap<String, (u32, Instant)>>,
}
impl RateLimiter {
pub fn new(max_requests: u32, window_secs: u64) -> Self {
Self {
max_requests,
window_secs,
clients: Mutex::new(HashMap::new()),
}
}
pub fn check(&self, ip: &str) -> bool {
let mut clients = self.clients.lock().unwrap();
let now = Instant::now();
let entry = clients.entry(ip.to_string()).or_insert((0, now));
if now.duration_since(entry.1).as_secs() >= self.window_secs {
*entry = (1, now);
return true;
}
if entry.0 >= self.max_requests {
return false;
}
entry.0 += 1;
true
}
}

55
api/src/serve.rs Normal file
View file

@ -0,0 +1,55 @@
use std::sync::{Arc, Mutex};
use axum::routing::{get, post};
use axum::Router;
use rusqlite::Connection;
use tower_http::cors::CorsLayer;
use crate::handlers;
use crate::rate_limit::RateLimiter;
pub struct AppState {
pub db: Mutex<Connection>,
pub notify_email: String,
pub rate_limiter: RateLimiter,
pub webhook_secret: String,
}
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
let db_path = std::env::var("QA_DB_PATH").unwrap_or_else(|_| "qa.db".to_string());
let notify_email = std::env::var("QA_NOTIFY_EMAIL").expect("QA_NOTIFY_EMAIL must be set");
let webhook_secret = std::env::var("WEBHOOK_SECRET").expect("WEBHOOK_SECRET must be set");
let conn = Connection::open(&db_path)?;
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question TEXT NOT NULL,
answer TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
answered_at TEXT
);",
)?;
let state = Arc::new(AppState {
db: Mutex::new(conn),
notify_email,
rate_limiter: RateLimiter::new(5, 3600),
webhook_secret,
});
let app = Router::new()
.route(
"/api/questions",
get(handlers::get_questions).post(handlers::post_question),
)
.route("/api/webhook", post(handlers::webhook))
.layer(CorsLayer::permissive())
.with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3001").await?;
println!("Listening on 127.0.0.1:3001");
axum::serve(listener, app).await?;
Ok(())
}