diff options
Diffstat (limited to 'src/scheduler.rs')
| -rw-r--r-- | src/scheduler.rs | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..ec2710b --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,185 @@ +// Scheduled query execution module +use crate::config::{Config, ScheduledQueryTask}; +use crate::db::Database; +use crate::logging::AuditLogger; +use chrono::Utc; +use std::sync::Arc; +use tokio::time::{interval, Duration}; +use tracing::{error, info, warn}; + +pub struct QueryScheduler { + config: Arc<Config>, + database: Database, + logging: AuditLogger, +} + +impl QueryScheduler { + pub fn new(config: Arc<Config>, database: Database, logging: AuditLogger) -> Self { + Self { + config, + database, + logging, + } + } + + /// Spawn background tasks for all enabled scheduled queries + pub fn spawn_tasks(&self) { + let Some(scheduled_config) = &self.config.scheduled_queries else { + info!("No scheduled queries configured"); + return; + }; + + if scheduled_config.tasks.is_empty() { + info!("No scheduled query tasks defined"); + return; + } + + let enabled_tasks: Vec<&ScheduledQueryTask> = scheduled_config + .tasks + .iter() + .filter(|task| task.enabled) + .collect(); + + if enabled_tasks.is_empty() { + info!("No enabled scheduled query tasks"); + return; + } + + info!( + "Spawning {} enabled scheduled query task(s)", + enabled_tasks.len() + ); + + for task in enabled_tasks { + let task_clone = task.clone(); + let database = self.database.clone(); + let logging = self.logging.clone(); + + tokio::spawn(async move { + Self::run_scheduled_task(task_clone, database, logging).await; + }); + } + } + + async fn run_scheduled_task( + task: ScheduledQueryTask, + database: Database, + logging: AuditLogger, + ) { + let mut interval_timer = interval(Duration::from_secs(task.interval_minutes * 60)); + let mut first_run = true; + + info!( + "Scheduled task '{}' started (interval: {}min, run_on_startup: {}): {}", + task.name, task.interval_minutes, task.run_on_startup, task.description + ); + + loop { + interval_timer.tick().await; + + if !database.is_available() { + warn!( + "Skipping scheduled task '{}' because database is unavailable", + task.name + ); + log_task_event( + &logging, + "scheduler_skip", + &task.name, + "Database unavailable, task skipped", + false, + ) + .await; + continue; + } + + // Skip first execution if run_on_startup is false + if first_run && !task.run_on_startup { + first_run = false; + info!( + "Scheduled task '{}' skipping initial run (run_on_startup=false)", + task.name + ); + continue; + } + first_run = false; + + match sqlx::query(&task.query).execute(database.pool()).await { + Ok(result) => { + database.mark_available(); + info!( + "Scheduled task '{}' executed successfully (rows affected: {})", + task.name, + result.rows_affected() + ); + log_task_event( + &logging, + "scheduler_success", + &task.name, + &format!( + "Task executed successfully (rows affected: {})", + result.rows_affected() + ), + false, + ) + .await; + } + Err(e) => { + database.mark_unavailable(); + error!( + "Scheduled task '{}' failed: {} (query: {})", + task.name, e, task.query + ); + log_task_event( + &logging, + "scheduler_failure", + &task.name, + &format!("Task failed: {}", e), + true, + ) + .await; + } + } + } + } +} + +async fn log_task_event( + logging: &AuditLogger, + context: &str, + task_name: &str, + message: &str, + as_error: bool, +) { + let request_id = AuditLogger::generate_request_id(); + let timestamp = Utc::now(); + let full_message = format!("{}: {}", task_name, message); + + let result = if as_error { + logging + .log_error( + &request_id, + timestamp, + &full_message, + Some(context), + Some("system"), + None, + ) + .await + } else { + logging + .log_info( + &request_id, + timestamp, + &full_message, + Some(context), + Some("system"), + None, + ) + .await + }; + + if let Err(err) = result { + error!("Failed to record scheduler event ({}): {}", context, err); + } +} |
