use axum::{ extract::State, http::{HeaderMap, StatusCode}, response::{Html, IntoResponse}, routing::{get, post}, Json, Router, }; use chrono::{DateTime, Utc}; use sha2::{Digest, Sha256}; use sqlx::postgres::PgPoolOptions; use sqlx::{FromRow, PgPool}; use std::env; #[derive(Clone)] struct AppState { db: PgPool, webhook_secret: String, } #[derive(serde::Serialize)] struct WebhookResponse { received: bool, } #[derive(serde::Serialize)] struct ErrorResponse { error: String, } #[derive(serde::Serialize)] struct HealthResponse { status: String, #[serde(skip_serializing_if = "Option::is_none")] error: Option, } #[derive(FromRow)] struct EmailRow { body: String, subject: Option, from_address: Option, received_at: DateTime, } fn map_webhook_error(e: mymx_sdk::Error) -> (StatusCode, Json) { match &e { mymx_sdk::Error::InvalidSignatureHeader => { tracing::warn!("Webhook rejected: invalid signature header format"); ( StatusCode::UNAUTHORIZED, Json(ErrorResponse { error: "invalid signature header".into(), }), ) } mymx_sdk::Error::SignatureVerificationFailed => { tracing::warn!("Webhook rejected: signature verification failed"); ( StatusCode::UNAUTHORIZED, Json(ErrorResponse { error: "signature verification failed".into(), }), ) } mymx_sdk::Error::TimestampTooOld => { tracing::warn!("Webhook rejected: timestamp too old (replay protection)"); ( StatusCode::UNAUTHORIZED, Json(ErrorResponse { error: "timestamp too old".into(), }), ) } mymx_sdk::Error::HmacError(msg) => { tracing::warn!("Webhook rejected: HMAC error: {}", msg); ( StatusCode::UNAUTHORIZED, Json(ErrorResponse { error: "hmac error".into(), }), ) } mymx_sdk::Error::ParseError(parse_err) => { tracing::warn!("Webhook body parse failed: {}", parse_err); ( StatusCode::UNPROCESSABLE_ENTITY, Json(ErrorResponse { error: format!("failed to parse webhook body: {}", parse_err), }), ) } } } #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let webhook_secret = env::var("MYMX_WEBHOOK_SECRET").expect("MYMX_WEBHOOK_SECRET must be set"); let listen_addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "127.0.0.1:4002".to_string()); let pool = PgPoolOptions::new() .max_connections(5) .connect(&database_url) .await .expect("Failed to connect to database"); sqlx::migrate!() .run(&pool) .await .expect("Failed to run migrations"); let state = AppState { db: pool, webhook_secret, }; let app = Router::new() .route("/webhook", post(webhook_handler)) .route("/health", get(health_handler)) .route("/", get(index_handler)) .with_state(state); let listener = tokio::net::TcpListener::bind(&listen_addr) .await .expect("Failed to bind listener"); tracing::info!("Listening on {}", listen_addr); axum::serve(listener, app).await.unwrap(); } async fn webhook_handler( State(state): State, headers: HeaderMap, body: String, ) -> Result, (StatusCode, Json)> { let signature = headers .get("MyMX-Signature") .and_then(|v| v.to_str().ok()) .ok_or(( StatusCode::BAD_REQUEST, Json(ErrorResponse { error: "missing MyMX-Signature header".into(), }), ))?; mymx_sdk::verify_signature(&body, signature, &state.webhook_secret) .map_err(map_webhook_error)?; let event = mymx_sdk::parse_webhook(&body).map_err(map_webhook_error)?; let email_body = event .email .parsed .body_text .or(event.email.parsed.body_html) .unwrap_or_default(); let subject = event.email.headers.subject; let from_address = event.email.headers.from; let mymx_email_id = event.email.id; let received_at = chrono::DateTime::parse_from_rfc3339(&event.email.received_at) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); sqlx::query( "INSERT INTO emails (body, subject, from_address, mymx_email_id, received_at) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (mymx_email_id) DO NOTHING", ) .bind(&email_body) .bind(&subject) .bind(&from_address) .bind(&mymx_email_id) .bind(received_at) .execute(&state.db) .await .map_err(|e| { tracing::error!("Database insert failed: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse { error: "internal server error".into(), }), ) })?; tracing::info!( subject = subject.as_deref().unwrap_or("(none)"), from = %from_address, email_id = %mymx_email_id, "Received email" ); Ok(Json(WebhookResponse { received: true })) } async fn health_handler(State(state): State) -> impl IntoResponse { match sqlx::query_scalar::<_, i32>("SELECT 1").fetch_one(&state.db).await { Ok(_) => ( StatusCode::OK, Json(HealthResponse { status: "ok".into(), error: None, }), ), Err(e) => ( StatusCode::SERVICE_UNAVAILABLE, Json(HealthResponse { status: "unhealthy".into(), error: Some(e.to_string()), }), ), } } async fn index_handler(State(state): State) -> Result, StatusCode> { let emails: Vec = sqlx::query_as::<_, EmailRow>( "SELECT body, subject, from_address, received_at FROM emails", ) .fetch_all(&state.db) .await .map_err(|e| { tracing::error!("Database query failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let mut rows: Vec<(String, String, String, DateTime)> = emails .into_iter() .map(|email| { let mut hasher = Sha256::new(); hasher.update(email.body.as_bytes()); let hash = hex::encode(hasher.finalize()); let subject = email.subject.unwrap_or_default(); let from = email.from_address.unwrap_or_default(); (hash, subject, from, email.received_at) }) .collect(); // Sort alphabetically by hash rows.sort_by(|a, b| a.0.cmp(&b.0)); let table_rows: String = rows .iter() .map(|(hash, subject, from, received_at)| { format!( "{}{}{}{}", hash, html_escape(from), html_escape(subject), received_at.format("%Y-%m-%d %H:%M:%S UTC") ) }) .collect::>() .join("\n"); let html = format!( r#" MyMX Emails

MyMX Emails

{}
SHA-256 HashFromSubjectReceived
"#, table_rows ); Ok(Html(html)) } fn html_escape(s: &str) -> String { s.replace('&', "&") .replace('<', "<") .replace('>', ">") .replace('"', """) }