aboutsummaryrefslogtreecommitdiff
path: root/src/scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/scheduler.rs')
-rw-r--r--src/scheduler.rs185
1 files changed, 185 insertions, 0 deletions
diff --git a/src/scheduler.rs b/src/scheduler.rs
new file mode 100644
index 0000000..ec2710b
--- /dev/null
+++ b/src/scheduler.rs
@@ -0,0 +1,185 @@
+// Scheduled query execution module
+use crate::config::{Config, ScheduledQueryTask};
+use crate::db::Database;
+use crate::logging::AuditLogger;
+use chrono::Utc;
+use std::sync::Arc;
+use tokio::time::{interval, Duration};
+use tracing::{error, info, warn};
+
+pub struct QueryScheduler {
+ config: Arc<Config>,
+ database: Database,
+ logging: AuditLogger,
+}
+
+impl QueryScheduler {
+ pub fn new(config: Arc<Config>, database: Database, logging: AuditLogger) -> Self {
+ Self {
+ config,
+ database,
+ logging,
+ }
+ }
+
+ /// Spawn background tasks for all enabled scheduled queries
+ pub fn spawn_tasks(&self) {
+ let Some(scheduled_config) = &self.config.scheduled_queries else {
+ info!("No scheduled queries configured");
+ return;
+ };
+
+ if scheduled_config.tasks.is_empty() {
+ info!("No scheduled query tasks defined");
+ return;
+ }
+
+ let enabled_tasks: Vec<&ScheduledQueryTask> = scheduled_config
+ .tasks
+ .iter()
+ .filter(|task| task.enabled)
+ .collect();
+
+ if enabled_tasks.is_empty() {
+ info!("No enabled scheduled query tasks");
+ return;
+ }
+
+ info!(
+ "Spawning {} enabled scheduled query task(s)",
+ enabled_tasks.len()
+ );
+
+ for task in enabled_tasks {
+ let task_clone = task.clone();
+ let database = self.database.clone();
+ let logging = self.logging.clone();
+
+ tokio::spawn(async move {
+ Self::run_scheduled_task(task_clone, database, logging).await;
+ });
+ }
+ }
+
+ async fn run_scheduled_task(
+ task: ScheduledQueryTask,
+ database: Database,
+ logging: AuditLogger,
+ ) {
+ let mut interval_timer = interval(Duration::from_secs(task.interval_minutes * 60));
+ let mut first_run = true;
+
+ info!(
+ "Scheduled task '{}' started (interval: {}min, run_on_startup: {}): {}",
+ task.name, task.interval_minutes, task.run_on_startup, task.description
+ );
+
+ loop {
+ interval_timer.tick().await;
+
+ if !database.is_available() {
+ warn!(
+ "Skipping scheduled task '{}' because database is unavailable",
+ task.name
+ );
+ log_task_event(
+ &logging,
+ "scheduler_skip",
+ &task.name,
+ "Database unavailable, task skipped",
+ false,
+ )
+ .await;
+ continue;
+ }
+
+ // Skip first execution if run_on_startup is false
+ if first_run && !task.run_on_startup {
+ first_run = false;
+ info!(
+ "Scheduled task '{}' skipping initial run (run_on_startup=false)",
+ task.name
+ );
+ continue;
+ }
+ first_run = false;
+
+ match sqlx::query(&task.query).execute(database.pool()).await {
+ Ok(result) => {
+ database.mark_available();
+ info!(
+ "Scheduled task '{}' executed successfully (rows affected: {})",
+ task.name,
+ result.rows_affected()
+ );
+ log_task_event(
+ &logging,
+ "scheduler_success",
+ &task.name,
+ &format!(
+ "Task executed successfully (rows affected: {})",
+ result.rows_affected()
+ ),
+ false,
+ )
+ .await;
+ }
+ Err(e) => {
+ database.mark_unavailable();
+ error!(
+ "Scheduled task '{}' failed: {} (query: {})",
+ task.name, e, task.query
+ );
+ log_task_event(
+ &logging,
+ "scheduler_failure",
+ &task.name,
+ &format!("Task failed: {}", e),
+ true,
+ )
+ .await;
+ }
+ }
+ }
+ }
+}
+
+async fn log_task_event(
+ logging: &AuditLogger,
+ context: &str,
+ task_name: &str,
+ message: &str,
+ as_error: bool,
+) {
+ let request_id = AuditLogger::generate_request_id();
+ let timestamp = Utc::now();
+ let full_message = format!("{}: {}", task_name, message);
+
+ let result = if as_error {
+ logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &full_message,
+ Some(context),
+ Some("system"),
+ None,
+ )
+ .await
+ } else {
+ logging
+ .log_info(
+ &request_id,
+ timestamp,
+ &full_message,
+ Some(context),
+ Some("system"),
+ None,
+ )
+ .await
+ };
+
+ if let Err(err) = result {
+ error!("Failed to record scheduler event ({}): {}", context, err);
+ }
+}