// Scheduled query execution module use crate::config::{Config, ScheduledQueryTask}; use crate::db::Database; use crate::logging::AuditLogger; use chrono::Utc; use std::sync::Arc; use tokio::time::{interval, Duration}; use tracing::{error, info, warn}; pub struct QueryScheduler { config: Arc, database: Database, logging: AuditLogger, } impl QueryScheduler { pub fn new(config: Arc, database: Database, logging: AuditLogger) -> Self { Self { config, database, logging, } } /// Spawn background tasks for all enabled scheduled queries pub fn spawn_tasks(&self) { let Some(scheduled_config) = &self.config.scheduled_queries else { info!("No scheduled queries configured"); return; }; if scheduled_config.tasks.is_empty() { info!("No scheduled query tasks defined"); return; } let enabled_tasks: Vec<&ScheduledQueryTask> = scheduled_config .tasks .iter() .filter(|task| task.enabled) .collect(); if enabled_tasks.is_empty() { info!("No enabled scheduled query tasks"); return; } info!( "Spawning {} enabled scheduled query task(s)", enabled_tasks.len() ); for task in enabled_tasks { let task_clone = task.clone(); let database = self.database.clone(); let logging = self.logging.clone(); tokio::spawn(async move { Self::run_scheduled_task(task_clone, database, logging).await; }); } } async fn run_scheduled_task( task: ScheduledQueryTask, database: Database, logging: AuditLogger, ) { let mut interval_timer = interval(Duration::from_secs(task.interval_minutes * 60)); let mut first_run = true; info!( "Scheduled task '{}' started (interval: {}min, run_on_startup: {}): {}", task.name, task.interval_minutes, task.run_on_startup, task.description ); loop { interval_timer.tick().await; if !database.is_available() { warn!( "Skipping scheduled task '{}' because database is unavailable", task.name ); log_task_event( &logging, "scheduler_skip", &task.name, "Database unavailable, task skipped", false, ) .await; continue; } // Skip first execution if run_on_startup is false if first_run && !task.run_on_startup { first_run = false; info!( "Scheduled task '{}' skipping initial run (run_on_startup=false)", task.name ); continue; } first_run = false; match sqlx::query(&task.query).execute(database.pool()).await { Ok(result) => { database.mark_available(); info!( "Scheduled task '{}' executed successfully (rows affected: {})", task.name, result.rows_affected() ); log_task_event( &logging, "scheduler_success", &task.name, &format!( "Task executed successfully (rows affected: {})", result.rows_affected() ), false, ) .await; } Err(e) => { database.mark_unavailable(); error!( "Scheduled task '{}' failed: {} (query: {})", task.name, e, task.query ); log_task_event( &logging, "scheduler_failure", &task.name, &format!("Task failed: {}", e), true, ) .await; } } } } } async fn log_task_event( logging: &AuditLogger, context: &str, task_name: &str, message: &str, as_error: bool, ) { let request_id = AuditLogger::generate_request_id(); let timestamp = Utc::now(); let full_message = format!("{}: {}", task_name, message); let result = if as_error { logging .log_error( &request_id, timestamp, &full_message, Some(context), Some("system"), None, ) .await } else { logging .log_info( &request_id, timestamp, &full_message, Some(context), Some("system"), None, ) .await }; if let Err(err) = result { error!("Failed to record scheduler event ({}): {}", context, err); } }