// Query routes and execution use anyhow::{Context, Result}; use axum::{ extract::{ConnectInfo, State}, http::{HeaderMap, StatusCode}, Json, }; use chrono::Utc; use rand::Rng; use serde_json::Value; use sqlx::{Column, Row}; use std::collections::HashMap; use std::net::SocketAddr; use tracing::{error, info, warn}; use crate::logging::AuditLogger; use crate::models::{PermissionsResponse, QueryAction, QueryRequest, QueryResponse, UserInfo}; use crate::sql::{ build_filter_clause, build_legacy_where_clause, build_order_by_clause, validate_column_name, validate_column_names, validate_table_name, }; use crate::AppState; // Helper function to extract token from Authorization header fn extract_token(headers: &HeaderMap) -> Option { headers .get("Authorization") .and_then(|header| header.to_str().ok()) .and_then(|auth_str| { if auth_str.starts_with("Bearer ") { Some(auth_str[7..].to_string()) } else { None } }) } fn database_unavailable_response(request_id: &str) -> QueryResponse { QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Database temporarily unavailable, try again in a moment [request_id: {}]", request_id )), warning: None, results: None, } } fn database_unavailable_batch_response(request_id: &str) -> QueryResponse { let mut base = database_unavailable_response(request_id); base.results = Some(vec![]); base } async fn log_database_unavailable_event( logger: &AuditLogger, request_id: &str, username: Option<&str>, power: Option, detail: &str, ) { if let Err(err) = logger .log_error( request_id, Utc::now(), detail, Some("database_unavailable"), username, power, ) .await { error!("[{}] Failed to record database outage: {}", request_id, err); } } pub async fn execute_query( State(state): State, ConnectInfo(addr): ConnectInfo, headers: HeaderMap, Json(payload): Json, ) -> Result, StatusCode> { let timestamp = Utc::now(); let client_ip = addr.ip().to_string(); let request_id = AuditLogger::generate_request_id(); // Extract and validate session token let token = match extract_token(&headers) { Some(token) => token, None => { return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some( "Please stop trying to access this resource without signing in".to_string(), ), warning: None, results: None, })); } }; let session = match state.session_manager.get_session(&token) { Some(session) => session, None => { return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some("Session not found".to_string()), warning: None, results: None, })); } }; // Detect batch mode - if queries field is present, handle as batch operation if payload.queries.is_some() { // SECURITY: Check if user has permission to use batch operations let power_perms = state .config .permissions .power_levels .get(&session.power.to_string()) .ok_or(StatusCode::FORBIDDEN)?; if !power_perms.allow_batch_operations { super::log_warning_async( &state.logging, &request_id, &format!( "User {} (power {}) attempted batch operation without permission", session.username, session.power ), Some("authorization"), Some(&session.username), Some(session.power), ); return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some("Batch operations not permitted for your role".to_string()), warning: None, results: None, })); } return execute_batch_mode(state, session, request_id, timestamp, client_ip, &payload) .await; } // Validate input for very basic security vulnerabilities (null bytes, etc.) if let Err(security_error) = validate_input_security(&payload) { super::log_warning_async( &state.logging, &request_id, &format!( "Security validation failed for user {}: {}", session.username, security_error ), Some("security_validation"), Some(&session.username), Some(session.power), ); return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Invalid input detected, how did you even manage to do that? [request_id: {}]", request_id )), warning: None, results: None, })); } // Log the request if let Err(e) = state .logging .log_request( &request_id, timestamp, &client_ip, Some(&session.username), Some(session.power), "/query", &serde_json::to_value(&payload).unwrap_or_default(), ) .await { error!("[{}] Failed to log request: {}", request_id, e); } // Clone payload before extracting fields (to avoid partial move issues) let payload_clone = payload.clone(); // Single query mode - validate required fields let action = payload.action.ok_or_else(|| { let error_msg = "Missing action field in single query mode"; super::log_error_async( &state.logging, &request_id, error_msg, Some("request_validation"), Some(&session.username), Some(session.power), ); StatusCode::BAD_REQUEST })?; let table = payload.table.ok_or_else(|| { let error_msg = "Missing table field in single query mode"; super::log_error_async( &state.logging, &request_id, error_msg, Some("request_validation"), Some(&session.username), Some(session.power), ); StatusCode::BAD_REQUEST })?; // SECURITY: Validate table name before any operations if let Err(e) = validate_table_name(&table, &state.config) { let error_msg = format!("Invalid table name '{}': {}", table, e); super::log_error_async( &state.logging, &request_id, &error_msg, Some("table_validation"), Some(&session.username), Some(session.power), ); return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!("Invalid table name [request_id: {}]", request_id)), warning: None, results: None, })); } // SECURITY: Validate column names if specified if let Some(ref columns) = payload.columns { if let Err(e) = validate_column_names(columns) { let error_msg = format!("Invalid column names on table '{}': {}", table, e); super::log_error_async( &state.logging, &request_id, &error_msg, Some("column_validation"), Some(&session.username), Some(session.power), ); return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Invalid column name: {} [request_id: {}]", e, request_id )), warning: None, results: None, })); } } // Check permissions (after validation to avoid leaking table existence) if !state .rbac .check_permission(&state.config, session.power, &table, &action) { let action_str = match action { QueryAction::Select => "SELECT", QueryAction::Insert => "INSERT", QueryAction::Update => "UPDATE", QueryAction::Delete => "DELETE", QueryAction::Count => "COUNT", }; super::log_warning_async( &state.logging, &request_id, &format!( "User {} attempted unauthorized {} on table {}", session.username, action_str, table ), Some("authorization"), Some(&session.username), Some(session.power), ); // Log security violation if let Err(log_err) = state .logging .log_error( &request_id, timestamp, &format!("Permission denied: {} on table {}", action_str, table), Some("authorization"), Some(&session.username), Some(session.power), ) .await { error!( "[{}] Failed to log permission denial: {}", request_id, log_err ); } return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!("Insufficient permissions for this operation, Might as well give up [request_id: {}]", request_id)), warning: None, results: None, })); } if !state.database.is_available() { warn!( "[{}] Database marked unavailable, returning graceful error", request_id ); log_database_unavailable_event( &state.logging, &request_id, Some(&session.username), Some(session.power), "Database flagged unavailable before transaction", ) .await; return Ok(Json(database_unavailable_response(&request_id))); } // ALL database operations now use transactions with user context set let mut tx = match state.database.pool().begin().await { Ok(tx) => { state.database.mark_available(); tx } Err(e) => { state.database.mark_unavailable(); error!("[{}] Failed to begin transaction: {}", request_id, e); log_database_unavailable_event( &state.logging, &request_id, Some(&session.username), Some(session.power), &format!("Failed to begin transaction: {}", e), ) .await; return Ok(Json(database_unavailable_response(&request_id))); } }; // Set user context and request ID in transaction - ALL queries have user context now if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?") .bind(session.user_id) .bind(&request_id) .execute(&mut *tx) .await { state.database.mark_unavailable(); error!( "[{}] Failed to set current user context and request ID: {}", request_id, e ); log_database_unavailable_event( &state.logging, &request_id, Some(&session.username), Some(session.power), &format!("Failed to set user context: {}", e), ) .await; return Ok(Json(database_unavailable_response(&request_id))); } // Execute the query within the transaction let result = match action { QueryAction::Select => { execute_select_with_tx( &request_id, tx, &payload_clone, &session.username, &session, &state, ) .await } QueryAction::Insert => { execute_insert_with_tx( &request_id, tx, &payload_clone, &session.username, &state, &session, ) .await } QueryAction::Update => { execute_update_with_tx( &request_id, tx, &payload_clone, &session.username, &state, &session, ) .await } QueryAction::Delete => { execute_delete_with_tx( &request_id, tx, &payload_clone, &session.username, &state, &session, ) .await } QueryAction::Count => { execute_count_with_tx( &request_id, tx, &payload_clone, &session.username, &session, &state, ) .await } }; match result { Ok(response) => { let action_str = match action { QueryAction::Select => "SELECT", QueryAction::Insert => "INSERT", QueryAction::Update => "UPDATE", QueryAction::Delete => "DELETE", QueryAction::Count => "COUNT", }; super::log_info_async( &state.logging, &request_id, &format!("Query executed successfully: {} on {}", action_str, table), Some("query_execution"), Some(&session.username), Some(session.power), ); Ok(Json(response)) } Err(e) => { error!("[{}] Query execution failed: {}", request_id, e); if let Err(log_err) = state .logging .log_error( &request_id, timestamp, &format!("Query execution error: {}", e), Some("query_execution"), Some(&session.username), Some(session.power), ) .await { error!("[{}] Failed to log error: {}", request_id, log_err); } Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Database query failed [request_id: {}]", request_id )), warning: None, results: None, })) } } } pub async fn get_permissions( State(state): State, ConnectInfo(addr): ConnectInfo, headers: HeaderMap, ) -> Result, StatusCode> { let timestamp = Utc::now(); let client_ip = addr.ip().to_string(); let request_id = AuditLogger::generate_request_id(); // Extract and validate session token let token = match extract_token(&headers) { Some(token) => token, None => { return Ok(Json(PermissionsResponse { success: false, permissions: HashMap::new(), user: UserInfo { id: 0, username: "".to_string(), name: "".to_string(), role: "".to_string(), power: 0, }, security_clearance: None, user_settings_access: "".to_string(), })); } }; let session = match state.session_manager.get_session(&token) { Some(session) => session, None => { return Ok(Json(PermissionsResponse { success: false, permissions: HashMap::new(), user: UserInfo { id: 0, username: "".to_string(), name: "".to_string(), role: "".to_string(), power: 0, }, security_clearance: None, user_settings_access: "".to_string(), })); } }; // Log the request if let Err(e) = state .logging .log_request( &request_id, timestamp, &client_ip, Some(&session.username), Some(session.power), "/permissions", &serde_json::json!({}), ) .await { error!("[{}] Failed to log request: {}", request_id, e); } let permissions = state .rbac .get_table_permissions(&state.config, session.power); // Get user settings access permission let user_settings_permission = state .config .permissions .power_levels .get(&session.power.to_string()) .map(|p| &p.user_settings_access) .unwrap_or(&state.config.security.default_user_settings_access); let user_settings_access_str = match user_settings_permission { crate::config::UserSettingsAccess::ReadOwnOnly => "read-own-only", crate::config::UserSettingsAccess::ReadWriteOwn => "read-write-own", crate::config::UserSettingsAccess::ReadWriteAll => "read-write-all", }; Ok(Json(PermissionsResponse { success: true, permissions, user: UserInfo { id: session.user_id, username: session.username, name: "".to_string(), // We don't store name in session, would need to fetch from DB role: session.role_name, power: session.power, }, security_clearance: None, user_settings_access: user_settings_access_str.to_string(), })) } // ===== SAFE SQL QUERY BUILDERS WITH VALIDATION ===== // All functions validate table/column names to prevent SQL injection /// Build WHERE clause with column name validation (legacy simple format) fn build_where_clause(where_clause: &Value) -> anyhow::Result<(String, Vec)> { // Use the new validated builder from sql module build_legacy_where_clause(where_clause) } /// Build INSERT data with column name validation fn build_insert_data( data: &Value, ) -> anyhow::Result<(Vec, Vec, Vec>)> { let mut columns = Vec::new(); let mut placeholders = Vec::new(); let mut values = Vec::new(); if let Value::Object(map) = data { for (key, value) in map { // SECURITY: Validate column name before using it validate_column_name(key) .with_context(|| format!("Invalid column name in INSERT: {}", key))?; // Handle special JSON fields like additional_fields if key == "additional_fields" && value.is_object() { columns.push(key.clone()); placeholders.push("?".to_string()); values.push(Some( serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()), )); } else { columns.push(key.clone()); placeholders.push("?".to_string()); values.push(json_value_to_option_string(value)); } } } else { anyhow::bail!("INSERT data must be a JSON object"); } if columns.is_empty() { anyhow::bail!("INSERT data cannot be empty"); } Ok((columns, placeholders, values)) } /// Build UPDATE SET clause with column name validation fn build_update_set_clause(data: &Value) -> anyhow::Result<(String, Vec>)> { let mut set_clauses = Vec::new(); let mut values = Vec::new(); if let Value::Object(map) = data { for (key, value) in map { // SECURITY: Validate column name before using it validate_column_name(key) .with_context(|| format!("Invalid column name in UPDATE: {}", key))?; set_clauses.push(format!("{} = ?", key)); // Handle special JSON fields like additional_fields if key == "additional_fields" && value.is_object() { values.push(Some( serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()), )); } else { values.push(json_value_to_option_string(value)); } } } else { anyhow::bail!("UPDATE data must be a JSON object"); } if set_clauses.is_empty() { anyhow::bail!("UPDATE data cannot be empty"); } Ok((set_clauses.join(", "), values)) } /// Convert JSON value to Option for SQL binding /// Properly handles booleans (true/false -> "1"/"0" for MySQL TINYINT/BOOLEAN) /// NULL values return None for proper SQL NULL handling fn json_value_to_option_string(value: &Value) -> Option { match value { Value::String(s) => Some(s.clone()), Value::Number(n) => Some(n.to_string()), // MySQL uses TINYINT(1) for booleans: true -> 1, false -> 0 Value::Bool(b) => Some(if *b { "1".to_string() } else { "0".to_string() }), Value::Null => None, // Return None for proper SQL NULL // Complex types (objects, arrays) get JSON serialized _ => Some(serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())), } } /// Convert SQL values from MySQL to JSON /// Handles ALL MySQL types automatically with proper NULL handling /// Returns booleans as true/false (MySQL TINYINT(1) -> JSON bool) fn convert_sql_value_to_json( row: &sqlx::mysql::MySqlRow, index: usize, request_id: Option<&str>, username: Option<&str>, power: Option, state: Option<&AppState>, ) -> anyhow::Result { let column = &row.columns()[index]; use sqlx::TypeInfo; let type_name = column.type_info().name(); // Comprehensive MySQL type handling - no need to manually add types anymore! let result = match type_name { // ===== String types ===== "VARCHAR" | "TEXT" | "CHAR" | "LONGTEXT" | "MEDIUMTEXT" | "TINYTEXT" | "SET" | "ENUM" => { row.try_get::, _>(index) .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) } // ===== Integer types ===== "INT" | "BIGINT" | "MEDIUMINT" | "SMALLINT" | "INTEGER" => row .try_get::, _>(index) .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)), // Unsigned integers "INT UNSIGNED" | "BIGINT UNSIGNED" | "MEDIUMINT UNSIGNED" | "SMALLINT UNSIGNED" => row .try_get::, _>(index) .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)), // ===== Boolean type (MySQL TINYINT(1)) ===== // Returns proper JSON true/false instead of 1/0 "TINYINT" | "BOOLEAN" | "BOOL" => { // Try as bool first (for TINYINT(1)) if let Ok(opt_bool) = row.try_get::, _>(index) { return Ok(opt_bool.map(Value::Bool).unwrap_or(Value::Null)); } // Fallback to i8 for regular TINYINT row.try_get::, _>(index) .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)) } // ===== Decimal/Numeric types ===== "DECIMAL" | "NUMERIC" => { row.try_get::, _>(index) .map(|opt| { opt.map(|decimal| { // Keep precision by converting to string then parsing let decimal_str = decimal.to_string(); if let Ok(f) = decimal_str.parse::() { serde_json::json!(f) } else { Value::String(decimal_str) } }) .unwrap_or(Value::Null) }) } // ===== Floating point types ===== "FLOAT" | "DOUBLE" | "REAL" => row .try_get::, _>(index) .map(|opt| opt.map(|v| serde_json::json!(v)).unwrap_or(Value::Null)), // ===== Date/Time types ===== "DATE" => { use chrono::NaiveDate; row.try_get::, _>(index).map(|opt| { opt.map(|date| Value::String(date.format("%Y-%m-%d").to_string())) .unwrap_or(Value::Null) }) } "DATETIME" => { use chrono::NaiveDateTime; row.try_get::, _>(index).map(|opt| { opt.map(|datetime| Value::String(datetime.format("%Y-%m-%d %H:%M:%S").to_string())) .unwrap_or(Value::Null) }) } "TIMESTAMP" => { use chrono::{DateTime, Utc}; row.try_get::>, _>(index).map(|opt| { opt.map(|timestamp| Value::String(timestamp.to_rfc3339())) .unwrap_or(Value::Null) }) } "TIME" => { // TIME values come as strings in HH:MM:SS format row.try_get::, _>(index) .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) } "YEAR" => row .try_get::, _>(index) .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)), // ===== JSON type ===== "JSON" => row.try_get::, _>(index).map(|opt| { opt.and_then(|s| serde_json::from_str(&s).ok()) .unwrap_or(Value::Null) }), // ===== Binary types ===== // Return as base64-encoded strings for safe JSON transmission "BLOB" | "MEDIUMBLOB" | "LONGBLOB" | "TINYBLOB" | "BINARY" | "VARBINARY" => { row.try_get::>, _>(index).map(|opt| { opt.map(|bytes| { use base64::{engine::general_purpose, Engine as _}; Value::String(general_purpose::STANDARD.encode(&bytes)) }) .unwrap_or(Value::Null) }) } // ===== Bit type ===== "BIT" => { row.try_get::>, _>(index).map(|opt| { opt.map(|bytes| { // Convert bit value to number let mut val: i64 = 0; for &byte in &bytes { val = (val << 8) | byte as i64; } Value::Number(val.into()) }) .unwrap_or(Value::Null) }) } // ===== Spatial/Geometry types ===== "GEOMETRY" | "POINT" | "LINESTRING" | "POLYGON" | "MULTIPOINT" | "MULTILINESTRING" | "MULTIPOLYGON" | "GEOMETRYCOLLECTION" => { // Return as WKT (Well-Known Text) string row.try_get::, _>(index) .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) } // ===== Catch-all for unknown/new types ===== // This ensures forward compatibility if MySQL adds new types _ => { warn!( "Unknown MySQL type '{}' for column '{}', attempting string fallback", type_name, column.name() ); if let (Some(rid), Some(st)) = (request_id, state) { super::log_warning_async( &st.logging, rid, &format!( "Unknown MySQL type '{}' for column '{}', attempting string fallback", type_name, column.name() ), Some("data_conversion"), username, power, ); } row.try_get::, _>(index) .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) } }; // Robust error handling with fallback match result { Ok(value) => Ok(value), Err(e) => { // Final fallback: try as string match row.try_get::, _>(index) { Ok(opt) => { warn!("Primary conversion failed for column '{}' (type: {}), used string fallback", column.name(), type_name); if let (Some(rid), Some(st)) = (request_id, state) { super::log_warning_async( &st.logging, rid, &format!("Primary conversion failed for column '{}' (type: {}), used string fallback", column.name(), type_name), Some("data_conversion"), username, power, ); } Ok(opt.map(Value::String).unwrap_or(Value::Null)) } Err(_) => { error!( "Complete failure to decode column '{}' (index: {}, type: {}): {}", column.name(), index, type_name, e ); // Return NULL instead of failing the entire query Ok(Value::Null) } } } } } // Generate auto values based on configuration async fn generate_auto_value( state: &AppState, table: &str, config: &crate::config::AutoGenerationConfig, ) -> Result { match config.gen_type.as_str() { "numeric" => generate_unique_numeric_id(state, table, config).await, _ => Err(anyhow::anyhow!( "Unsupported auto-generation type: {}", config.gen_type )), } } // Generate a unique numeric ID based on configuration async fn generate_unique_numeric_id( state: &AppState, table: &str, config: &crate::config::AutoGenerationConfig, ) -> Result { let range_min = config.range_min.unwrap_or(10000000); let range_max = config.range_max.unwrap_or(99999999); let max_attempts = config.max_attempts.unwrap_or(10) as usize; let field_name = &config.field; for _attempt in 0..max_attempts { // Generate random number in specified range let id = { let mut rng = rand::rng(); rng.random_range(range_min..=range_max) }; let id_str = id.to_string(); // Check if this ID already exists let query_str = format!( "SELECT COUNT(*) as count FROM {} WHERE {} = ?", table, field_name ); let exists = sqlx::query(&query_str) .bind(&id_str) .fetch_one(state.database.pool()) .await?; let count: i64 = exists.try_get("count")?; if count == 0 { return Ok(id_str); } } Err(anyhow::anyhow!( "Failed to generate unique {} for table {} after {} attempts", field_name, table, max_attempts )) } // Security validation functions fn validate_input_security(payload: &QueryRequest) -> Result<(), String> { // Check for null bytes in table name if let Some(ref table) = payload.table { if table.contains('\0') { return Err("Null byte detected in table name".to_string()); } } // Check for null bytes in column names if let Some(columns) = &payload.columns { for column in columns { if column.contains('\0') { return Err("Null byte detected in column name".to_string()); } } } // Check for null bytes in data values if let Some(data) = &payload.data { if contains_null_bytes_in_value(data) { return Err("Null byte detected in data values".to_string()); } } // Check for null bytes in WHERE clause if let Some(where_clause) = &payload.where_clause { if contains_null_bytes_in_value(where_clause) { return Err("Null byte detected in WHERE clause".to_string()); } } Ok(()) } fn contains_null_bytes_in_value(value: &Value) -> bool { match value { Value::String(s) => s.contains('\0'), Value::Array(arr) => arr.iter().any(contains_null_bytes_in_value), Value::Object(map) => { map.keys().any(|k| k.contains('\0')) || map.values().any(contains_null_bytes_in_value) } _ => false, } } // Core execution functions that work with mutable transaction references // These are used by batch operations to execute multiple queries in a single atomic transaction async fn execute_select_core( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, session: &crate::models::Session, state: &AppState, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; // Helper to count conditions in filter/where fn count_conditions( filter: &Option, where_clause: &Option, ) -> usize { let mut count = 0; if let Some(f) = filter { count += count_filter_conditions(f); } if let Some(w) = where_clause { count += count_where_conditions(w); } count } fn count_filter_conditions(filter: &crate::models::FilterCondition) -> usize { use crate::models::FilterCondition; match filter { FilterCondition::Logical { and_conditions, or_conditions, } => { let mut count = 0; if let Some(conditions) = and_conditions { count += conditions .iter() .map(|c| count_filter_conditions(c)) .sum::(); } if let Some(conditions) = or_conditions { count += conditions .iter() .map(|c| count_filter_conditions(c)) .sum::(); } count } _ => 1, } } fn count_where_conditions(where_clause: &serde_json::Value) -> usize { match where_clause { serde_json::Value::Object(map) => { if map.contains_key("AND") || map.contains_key("OR") { if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) { if let serde_json::Value::Array(conditions) = arr { return conditions.iter().map(|c| count_where_conditions(c)).sum(); } } } 1 } _ => 1, } } let max_limit = state.config.get_max_limit(session.power); let max_where = state.config.get_max_where_conditions(session.power); let requested_columns = if let Some(ref cols) = payload.columns { cols.clone() } else { vec!["*".to_string()] }; let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" { "*".to_string() } else { let allowed_columns = state .config .filter_readable_columns(session.power, &table, &requested_columns); if allowed_columns.is_empty() { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some("No readable columns available for this request".to_string()), warning: None, results: None, }); } allowed_columns.join(", ") }; let condition_count = count_conditions(&payload.filter, &payload.where_clause); if condition_count > max_where as usize { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE/filter conditions ({}) > max {}", condition_count, max_where )), warning: None, results: None, }); } let mut query = format!("SELECT {} FROM {}", filtered_columns, table); let mut values = Vec::new(); if let Some(joins) = &payload.joins { for join in joins { if !state.rbac.check_permission( &state.config, session.power, &join.table, &crate::models::QueryAction::Select, ) { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Insufficient permissions to JOIN with table '{}'", join.table )), warning: None, results: None, }); } } let join_sql = crate::sql::build_join_clause(joins, &state.config)?; query.push_str(&join_sql); } if let Some(filter) = &payload.filter { let (where_sql, where_values) = build_filter_clause(filter)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } else if let Some(where_clause) = &payload.where_clause { let (where_sql, where_values) = build_where_clause(where_clause)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } if let Some(order_by) = &payload.order_by { let order_clause = build_order_by_clause(order_by)?; query.push_str(&order_clause); } let requested_limit = payload.limit; let limit = requested_limit.unwrap_or(max_limit); let was_capped = limit > max_limit; let limit = if limit > max_limit { max_limit } else { limit }; query.push_str(&format!(" LIMIT {}", limit)); let limit_warning = if was_capped { Some(format!( "Requested LIMIT {} exceeded maximum {} for your power level, capped to {}", requested_limit.unwrap(), max_limit, max_limit )) } else if requested_limit.is_none() { Some(format!( "No LIMIT specified, defaulted to {} (max for power level {})", max_limit, session.power )) } else { None }; if let Err(e) = state .logging .log_query( request_id, chrono::Utc::now(), username, Some(session.power), &query, None, None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } let mut sqlx_query = sqlx::query(&query); for value in values { match value { Some(v) => sqlx_query = sqlx_query.bind(v), None => sqlx_query = sqlx_query.bind(Option::::None), } } let rows = sqlx_query.fetch_all(&mut **tx).await?; let mut results = Vec::new(); for row in rows { let mut result_row = serde_json::Map::new(); for (i, column) in row.columns().iter().enumerate() { let value = convert_sql_value_to_json( &row, i, Some(request_id), Some(username), Some(session.power), Some(state), )?; result_row.insert(column.name().to_string(), value); } results.push(Value::Object(result_row)); } Ok(QueryResponse { success: true, data: Some(Value::Array(results)), rows_affected: None, error: None, warning: limit_warning, results: None, }) } async fn execute_count_core( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, session: &crate::models::Session, state: &AppState, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; // Helper to count conditions in filter/where (same as other functions) fn count_conditions( filter: &Option, where_clause: &Option, ) -> usize { use crate::models::FilterCondition; fn count_filter_conditions(filter: &FilterCondition) -> usize { match filter { FilterCondition::Logical { and_conditions, or_conditions, } => { let mut count = 0; if let Some(conditions) = and_conditions { count += conditions .iter() .map(|c| count_filter_conditions(c)) .sum::(); } if let Some(conditions) = or_conditions { count += conditions .iter() .map(|c| count_filter_conditions(c)) .sum::(); } count } _ => 1, } } fn count_where_conditions(where_clause: &serde_json::Value) -> usize { match where_clause { serde_json::Value::Object(map) => { if map.contains_key("AND") || map.contains_key("OR") { if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) { if let serde_json::Value::Array(conditions) = arr { return conditions.iter().map(|c| count_where_conditions(c)).sum(); } } } 1 } _ => 1, } } let mut count = 0; if let Some(f) = filter { count += count_filter_conditions(f); } if let Some(w) = where_clause { count += count_where_conditions(w); } count } let max_where = state.config.get_max_where_conditions(session.power); // Enforce WHERE clause complexity let condition_count = count_conditions(&payload.filter, &payload.where_clause); if condition_count > max_where as usize { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE/filter conditions ({}) > max {}", condition_count, max_where )), warning: None, results: None, }); } let mut query = format!("SELECT COUNT(*) as count FROM {}", table); let mut values = Vec::new(); // Add JOIN clauses if provided - validates permissions for all joined tables if let Some(joins) = &payload.joins { for join in joins { if !state.rbac.check_permission( &state.config, session.power, &join.table, &crate::models::QueryAction::Select, ) { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Insufficient permissions to JOIN with table '{}'", join.table )), warning: None, results: None, }); } } let join_sql = crate::sql::build_join_clause(joins, &state.config)?; query.push_str(&join_sql); } // Add WHERE conditions (filter takes precedence over where_clause if both are provided) if let Some(filter) = &payload.filter { let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } else if let Some(where_clause) = &payload.where_clause { let (where_sql, where_values) = build_where_clause(where_clause)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } // Log the query if let Err(e) = state .logging .log_query( request_id, chrono::Utc::now(), username, Some(session.power), &query, None, None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } let mut sqlx_query = sqlx::query(&query); for value in values { match value { Some(v) => sqlx_query = sqlx_query.bind(v), None => sqlx_query = sqlx_query.bind(Option::::None), } } let result = sqlx_query.fetch_one(&mut **tx).await?; let count: i64 = result.try_get("count")?; Ok(QueryResponse { success: true, data: Some(serde_json::json!(count)), rows_affected: None, error: None, warning: None, results: None, }) } async fn execute_update_core( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; let mut data = payload .data .as_ref() .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))? .clone(); let where_clause = payload .where_clause .as_ref() .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?; let max_where = state.config.get_max_where_conditions(session.power); let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0); if condition_count > max_where as usize { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE conditions ({}) > max {}", condition_count, max_where )), warning: None, results: None, }); } // SECURITY: Apply column-level write filtering FIRST (before auto-generation) if let Value::Object(ref mut map) = data { let all_columns: Vec = map.keys().cloned().collect(); let writable_columns = state .config .filter_writable_columns(session.power, &table, &all_columns); // Remove columns that user cannot write to map.retain(|key, _| writable_columns.contains(key)); // Check for auto-generation (system-generated fields bypass write protection) if let Some(auto_config) = state.config.get_auto_generation_config(&table) { if auto_config.on_action == "update" || auto_config.on_action == "both" { let field_name = &auto_config.field; if !map.contains_key(field_name) || map.get(field_name).map_or(true, |v| { v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) }) { let generated_value = generate_auto_value(&state, &table, auto_config).await?; map.insert(field_name.clone(), Value::String(generated_value)); } } } } let writable_columns = data .as_object() .unwrap() .keys() .cloned() .collect::>(); if writable_columns.is_empty() { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some("No writable columns in UPDATE data".to_string()), warning: None, results: None, }); } let set_clause = writable_columns .iter() .map(|col| format!("{} = ?", col)) .collect::>() .join(", "); let (where_sql, where_values) = build_where_clause(where_clause)?; let query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql); let requested_limit = payload.limit; let max_limit = state.config.get_max_limit(session.power); let limit = requested_limit.unwrap_or(max_limit); let was_capped = limit > max_limit; let limit = if limit > max_limit { max_limit } else { limit }; let query_with_limit = format!("{} LIMIT {}", query, limit); let limit_warning = if was_capped { Some(format!( "Requested LIMIT {} exceeded maximum {}, capped to {}", requested_limit.unwrap(), max_limit, max_limit )) } else { None }; if let Err(e) = state .logging .log_query( request_id, chrono::Utc::now(), username, Some(session.power), &query_with_limit, None, None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } let mut sqlx_query = sqlx::query(&query_with_limit); for col in &writable_columns { if let Some(val) = data.get(col) { sqlx_query = sqlx_query.bind(val.clone()); } } for val in where_values { sqlx_query = sqlx_query.bind(val); } let result = sqlx_query.execute(&mut **tx).await?; Ok(QueryResponse { success: true, data: None, rows_affected: Some(result.rows_affected()), error: None, warning: limit_warning, results: None, }) } async fn execute_delete_core( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; let where_clause = payload .where_clause .as_ref() .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?; let max_where = state.config.get_max_where_conditions(session.power); let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0); if condition_count > max_where as usize { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE conditions ({}) > max {}", condition_count, max_where )), warning: None, results: None, }); } let (where_sql, where_values) = build_where_clause(where_clause)?; let query = format!("DELETE FROM {} WHERE {}", table, where_sql); let requested_limit = payload.limit; let max_limit = state.config.get_max_limit(session.power); let limit = requested_limit.unwrap_or(max_limit); let was_capped = limit > max_limit; let limit = if limit > max_limit { max_limit } else { limit }; let query_with_limit = format!("{} LIMIT {}", query, limit); let limit_warning = if was_capped { Some(format!( "Requested LIMIT {} exceeded maximum {}, capped to {}", requested_limit.unwrap(), max_limit, max_limit )) } else { None }; if let Err(e) = state .logging .log_query( request_id, chrono::Utc::now(), username, Some(session.power), &query_with_limit, None, None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } let mut sqlx_query = sqlx::query(&query_with_limit); for val in where_values { sqlx_query = sqlx_query.bind(val); } let result = sqlx_query.execute(&mut **tx).await?; Ok(QueryResponse { success: true, data: None, rows_affected: Some(result.rows_affected()), error: None, warning: limit_warning, results: None, }) } // Transaction-based execution functions for user context operations // These create their own transactions and commit them - used for single query operations async fn execute_select_with_tx( request_id: &str, mut tx: sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, session: &crate::models::Session, state: &AppState, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; // Helper to count conditions in filter/where fn count_conditions( filter: &Option, where_clause: &Option, ) -> usize { use crate::models::FilterCondition; fn count_filter(cond: &FilterCondition) -> usize { match cond { FilterCondition::Simple { .. } => 1, FilterCondition::Logical { and_conditions, or_conditions, } => { and_conditions .as_ref() .map_or(0, |conds| conds.iter().map(count_filter).sum()) + or_conditions .as_ref() .map_or(0, |conds| conds.iter().map(count_filter).sum()) } FilterCondition::Not { not } => count_filter(not), } } let mut count = 0; if let Some(f) = filter { count += count_filter(f); } if let Some(w) = where_clause { if let serde_json::Value::Object(map) = w { count += map.len(); } } count } // Enforce query limits from config (power-level specific with fallback to defaults) let max_limit = state.config.get_max_limit(session.power); let max_where = state.config.get_max_where_conditions(session.power); // Apply granular column filtering based on user's power level let requested_columns = if let Some(cols) = &payload.columns { cols.clone() } else { vec!["*".to_string()] }; let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" { "*".to_string() } else { let allowed_columns = state .config .filter_readable_columns(session.power, &table, &requested_columns); if allowed_columns.is_empty() { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some("No readable columns available for this request".to_string()), warning: None, results: None, }); } allowed_columns.join(", ") }; // Enforce WHERE clause complexity let condition_count = count_conditions(&payload.filter, &payload.where_clause); if condition_count > max_where as usize { // Log security violation let timestamp = chrono::Utc::now(); if let Err(log_err) = state .logging .log_error( &request_id, timestamp, &format!( "Too many WHERE conditions: {} exceeds maximum {} for power level {}", condition_count, max_where, session.power ), Some("query_limits"), Some(&session.username), Some(session.power), ) .await { error!( "[{}] Failed to log WHERE limit violation: {}", request_id, log_err ); } return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]", condition_count, max_where, request_id )), warning: None, results: None, }); } let mut query = format!("SELECT {} FROM {}", filtered_columns, table); let mut values = Vec::new(); // Add JOIN clauses if provided - validates permissions for all joined tables if let Some(joins) = &payload.joins { // Validate user has read permission for all joined tables for join in joins { if !state.rbac.check_permission( &state.config, session.power, &join.table, &crate::models::QueryAction::Select, ) { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Insufficient permissions to JOIN with table '{}'", join.table )), warning: None, results: None, }); } } // Build and append JOIN SQL let join_sql = crate::sql::build_join_clause(joins, &state.config)?; query.push_str(&join_sql); } // Add WHERE clause - support both legacy and new filter format if let Some(filter) = &payload.filter { let (where_sql, where_values) = build_filter_clause(filter)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } else if let Some(where_clause) = &payload.where_clause { let (where_sql, where_values) = build_where_clause(where_clause)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } // Add ORDER BY if provided if let Some(order_by) = &payload.order_by { let order_clause = build_order_by_clause(order_by)?; query.push_str(&order_clause); } // Enforce LIMIT and track if it was capped let requested_limit = payload.limit; let limit = requested_limit.unwrap_or(max_limit); let was_capped = limit > max_limit; let limit = if limit > max_limit { max_limit } else { limit }; query.push_str(&format!(" LIMIT {}", limit)); let limit_warning = if was_capped { Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]", requested_limit.unwrap(), max_limit, max_limit, request_id)) } else if requested_limit.is_none() { Some(format!( "No LIMIT specified, using default {} based on power level [request_id: {}]", max_limit, request_id )) } else { None }; // Add OFFSET if provided if let Some(offset) = payload.offset { query.push_str(&format!(" OFFSET {}", offset)); } // Log the query let params_json = serde_json::to_value(&values).ok(); if let Err(e) = state .logging .log_query( request_id, Utc::now(), username, Some(session.power), &query, params_json.as_ref(), None, // Row count will be known after execution ) .await { error!("[{}] Failed to log query: {}", request_id, e); } // Execute the query let mut sqlx_query = sqlx::query(&query); for value in values { match value { Some(v) => sqlx_query = sqlx_query.bind(v), None => sqlx_query = sqlx_query.bind(Option::::None), } } let rows = sqlx_query.fetch_all(&mut *tx).await?; tx.commit().await?; let mut results = Vec::new(); for row in rows { let mut result_row = serde_json::Map::new(); for (i, column) in row.columns().iter().enumerate() { let value = convert_sql_value_to_json( &row, i, Some(request_id), Some(username), Some(session.power), Some(state), )?; result_row.insert(column.name().to_string(), value); } results.push(Value::Object(result_row)); } Ok(QueryResponse { success: true, data: Some(Value::Array(results)), rows_affected: None, error: None, warning: limit_warning, results: None, }) } async fn execute_insert_with_tx( request_id: &str, mut tx: sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; let mut data = payload .data .as_ref() .ok_or_else(|| anyhow::anyhow!("Data is required for INSERT"))? .clone(); // SECURITY: Apply column-level write filtering FIRST (before auto-generation) // This validates what the user is trying to write if let Value::Object(ref mut map) = data { let all_columns: Vec = map.keys().cloned().collect(); let writable_columns = state .config .filter_writable_columns(session.power, &table, &all_columns); // Remove columns that user cannot write to map.retain(|key, _| writable_columns.contains(key)); // Check for auto-generation requirements based on config // Auto-generated fields bypass write protection since they're system-generated if let Some(auto_config) = state.config.get_auto_generation_config(&table) { // Check if auto-generation is enabled for INSERT action if auto_config.on_action == "insert" || auto_config.on_action == "both" { let field_name = &auto_config.field; if !map.contains_key(field_name) || map.get(field_name).map_or(true, |v| { v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) }) { // Generate auto value based on config let generated_value = generate_auto_value(&state, &table, auto_config).await?; map.insert(field_name.clone(), Value::String(generated_value)); } } } } // Final validation: ensure we have columns to insert if let Value::Object(ref map) = data { if map.is_empty() { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some("No writable columns in INSERT data".to_string()), warning: None, results: None, }); } } let (columns_vec, placeholders_vec, values) = build_insert_data(&data)?; let columns = columns_vec.join(", "); let placeholders = placeholders_vec.join(", "); let query = format!( "INSERT INTO {} ({}) VALUES ({})", table, columns, placeholders ); // Log the query let params_json = serde_json::to_value(&values).ok(); if let Err(e) = state .logging .log_query( request_id, Utc::now(), username, Some(session.power), &query, params_json.as_ref(), None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } // Execute the query let mut sqlx_query = sqlx::query(&query); for value in values { match value { Some(v) => sqlx_query = sqlx_query.bind(v), None => sqlx_query = sqlx_query.bind(Option::::None), } } let result = sqlx_query.execute(&mut *tx).await?; let insert_id = result.last_insert_id(); tx.commit().await?; Ok(QueryResponse { success: true, data: Some(serde_json::json!(insert_id)), rows_affected: Some(result.rows_affected()), error: None, warning: None, // INSERT queries don't have LIMIT, results: None, }) } async fn execute_update_with_tx( request_id: &str, mut tx: sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; let mut data = payload .data .as_ref() .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))? .clone(); let where_clause = payload .where_clause .as_ref() .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?; // Enforce query limits from config (power-level specific with fallback to defaults) let max_limit = state.config.get_max_limit(session.power); let max_where = state.config.get_max_where_conditions(session.power); // Enforce WHERE clause complexity let condition_count = if let Some(w) = &payload.where_clause { if let serde_json::Value::Object(map) = w { map.len() } else { 0 } } else { 0 }; if condition_count > max_where as usize { // Log security violation let timestamp = chrono::Utc::now(); if let Err(log_err) = state .logging .log_error( &request_id, timestamp, &format!( "Too many WHERE conditions: {} exceeds maximum {} for power level {}", condition_count, max_where, session.power ), Some("query_limits"), Some(&session.username), Some(session.power), ) .await { error!( "[{}] Failed to log WHERE limit violation: {}", request_id, log_err ); } return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE conditions ({}) > max {} [request_id: {}]", condition_count, max_where, request_id )), warning: None, results: None, }); } // SECURITY: Apply column-level write filtering FIRST (before auto-generation) if let Value::Object(ref mut map) = data { let all_columns: Vec = map.keys().cloned().collect(); let writable_columns = state .config .filter_writable_columns(session.power, &table, &all_columns); // Remove columns that user cannot write to map.retain(|key, _| writable_columns.contains(key)); // Check for auto-generation (system-generated fields bypass write protection) if let Some(auto_config) = state.config.get_auto_generation_config(&table) { if auto_config.on_action == "update" || auto_config.on_action == "both" { let field_name = &auto_config.field; if !map.contains_key(field_name) || map.get(field_name).map_or(true, |v| { v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) }) { let generated_value = generate_auto_value(&state, &table, auto_config).await?; map.insert(field_name.clone(), Value::String(generated_value)); } } } } // Final validation: ensure we have columns to update if let Value::Object(ref map) = data { if map.is_empty() { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some("No writable columns in UPDATE data".to_string()), warning: None, results: None, }); } } let (set_clause, mut values) = build_update_set_clause(&data)?; let (where_sql, where_values) = build_where_clause(where_clause)?; // Convert where_values to Option to match set values values.extend(where_values.into_iter().map(Some)); let mut query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql); // Enforce LIMIT and track if it was capped let requested_limit = payload.limit; let limit = requested_limit.unwrap_or(max_limit); let was_capped = limit > max_limit; let limit = if limit > max_limit { max_limit } else { limit }; query.push_str(&format!(" LIMIT {}", limit)); let limit_warning = if was_capped { Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]", requested_limit.unwrap(), max_limit, max_limit, request_id)) } else if requested_limit.is_none() { Some(format!( "No LIMIT specified, using default {} based on power level [request_id: {}]", max_limit, request_id )) } else { None }; // Log the query let params_json = serde_json::to_value(&values).ok(); if let Err(e) = state .logging .log_query( request_id, Utc::now(), username, Some(session.power), &query, params_json.as_ref(), None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } // Execute the query let mut sqlx_query = sqlx::query(&query); for value in values { match value { Some(v) => sqlx_query = sqlx_query.bind(v), None => sqlx_query = sqlx_query.bind(Option::::None), } } let result = sqlx_query.execute(&mut *tx).await?; tx.commit().await?; Ok(QueryResponse { success: true, data: None, rows_affected: Some(result.rows_affected()), error: None, warning: limit_warning, results: None, }) } async fn execute_delete_with_tx( request_id: &str, mut tx: sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; let where_clause = payload .where_clause .as_ref() .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?; // Enforce query limits from config (power-level specific with fallback to defaults) let max_limit = state.config.get_max_limit(session.power); let max_where = state.config.get_max_where_conditions(session.power); // Enforce WHERE clause complexity let condition_count = if let Some(w) = &payload.where_clause { if let serde_json::Value::Object(map) = w { map.len() } else { 0 } } else { 0 }; if condition_count > max_where as usize { // Log security violation let timestamp = chrono::Utc::now(); if let Err(log_err) = state .logging .log_error( &request_id, timestamp, &format!( "Too many WHERE conditions: {} exceeds maximum {} for power level {}", condition_count, max_where, session.power ), Some("query_limits"), Some(&session.username), Some(session.power), ) .await { error!( "[{}] Failed to log WHERE limit violation: {}", request_id, log_err ); } return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE conditions ({}) > max {} [request_id: {}]", condition_count, max_where, request_id )), warning: None, results: None, }); } let (where_sql, values) = build_where_clause(where_clause)?; let mut query = format!("DELETE FROM {} WHERE {}", table, where_sql); // Enforce LIMIT and track if it was capped let requested_limit = payload.limit; let limit = requested_limit.unwrap_or(max_limit); let was_capped = limit > max_limit; let limit = if limit > max_limit { max_limit } else { limit }; query.push_str(&format!(" LIMIT {}", limit)); let limit_warning = if was_capped { Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]", requested_limit.unwrap(), max_limit, max_limit, request_id)) } else if requested_limit.is_none() { Some(format!( "No LIMIT specified, using default {} based on power level [request_id: {}]", max_limit, request_id )) } else { None }; // Log the query let params_json = serde_json::to_value(&values).ok(); if let Err(e) = state .logging .log_query( request_id, Utc::now(), username, Some(session.power), &query, params_json.as_ref(), None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } // Execute the query let mut sqlx_query = sqlx::query(&query); for value in values { sqlx_query = sqlx_query.bind(value); } let result = sqlx_query.execute(&mut *tx).await?; tx.commit().await?; Ok(QueryResponse { success: true, data: None, rows_affected: Some(result.rows_affected()), error: None, warning: limit_warning, results: None, }) } async fn execute_count_with_tx( request_id: &str, mut tx: sqlx::Transaction<'_, sqlx::MySql>, payload: &QueryRequest, username: &str, session: &crate::models::Session, state: &AppState, ) -> anyhow::Result { let table = payload .table .as_ref() .ok_or_else(|| anyhow::anyhow!("Table is required"))?; // Check read permissions for the table if !state.rbac.check_permission( &state.config, session.power, table, &crate::models::QueryAction::Select, ) { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Insufficient permissions to COUNT from table '{}' [request_id: {}]", table, request_id )), warning: None, results: None, }); } // Helper to count conditions in filter/where (same as select function) fn count_conditions( filter: &Option, where_clause: &Option, ) -> usize { use crate::models::FilterCondition; fn count_filter(cond: &FilterCondition) -> usize { match cond { FilterCondition::Simple { .. } => 1, FilterCondition::Logical { and_conditions, or_conditions, } => { and_conditions .as_ref() .map_or(0, |conds| conds.iter().map(count_filter).sum()) + or_conditions .as_ref() .map_or(0, |conds| conds.iter().map(count_filter).sum()) } FilterCondition::Not { not } => count_filter(not), } } let mut count = 0; if let Some(f) = filter { count += count_filter(f); } if let Some(w) = where_clause { if let serde_json::Value::Object(map) = w { count += map.len(); } } count } // Enforce query limits from config (power-level specific with fallback to defaults) let max_where = state.config.get_max_where_conditions(session.power); // Enforce WHERE clause complexity let condition_count = count_conditions(&payload.filter, &payload.where_clause); if condition_count > max_where as usize { // Log security violation let timestamp = chrono::Utc::now(); if let Err(log_err) = state .logging .log_error( &request_id, timestamp, &format!( "Too many WHERE conditions: {} exceeds maximum {} for power level {}", condition_count, max_where, session.power ), Some("query_limits"), Some(&session.username), Some(session.power), ) .await { error!( "[{}] Failed to log WHERE limit violation: {}", request_id, log_err ); } return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]", condition_count, max_where, request_id )), warning: None, results: None, }); } let mut query = format!("SELECT COUNT(*) as count FROM {}", table); let mut values = Vec::new(); // Add JOIN clauses if provided - validates permissions for all joined tables if let Some(joins) = &payload.joins { // Validate user has read permission for all joined tables for join in joins { if !state.rbac.check_permission( &state.config, session.power, &join.table, &crate::models::QueryAction::Select, ) { return Ok(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Insufficient permissions to JOIN with table '{}'", join.table )), warning: None, results: None, }); } } let join_sql = crate::sql::build_join_clause(joins, &state.config)?; query.push_str(&join_sql); } // Add WHERE conditions (filter takes precedence over where_clause if both are provided) if let Some(filter) = &payload.filter { let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } else if let Some(where_clause) = &payload.where_clause { let (where_sql, where_values) = build_where_clause(where_clause)?; query.push_str(&format!(" WHERE {}", where_sql)); values.extend(where_values.into_iter().map(Some)); } // Log the query let params_json = serde_json::to_value(&values).ok(); if let Err(e) = state .logging .log_query( request_id, chrono::Utc::now(), username, Some(session.power), &query, params_json.as_ref(), None, ) .await { error!("[{}] Failed to log query: {}", request_id, e); } // Execute the query let mut sqlx_query = sqlx::query(&query); for value in values { match value { Some(v) => sqlx_query = sqlx_query.bind(v), None => sqlx_query = sqlx_query.bind(Option::::None), } } let result = sqlx_query.fetch_one(&mut *tx).await?; tx.commit().await?; let count: i64 = result.try_get("count")?; Ok(QueryResponse { success: true, data: Some(serde_json::json!(count)), rows_affected: None, error: None, warning: None, results: None, }) } /// Execute multiple queries in a single transaction async fn execute_batch_mode( state: AppState, session: crate::models::Session, request_id: String, timestamp: chrono::DateTime, client_ip: String, payload: &QueryRequest, ) -> Result, StatusCode> { let queries = payload.queries.as_ref().unwrap(); // Check if batch is empty if queries.is_empty() { return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some("Batch request cannot be empty".to_string()), warning: None, results: Some(vec![]), })); } info!( "[{}] Batch query request from user {} (power {}): {} queries", request_id, session.username, session.power, queries.len() ); // Log the batch request (log full payload including action/table) if let Err(e) = state .logging .log_request( &request_id, timestamp, &client_ip, Some(&session.username), Some(session.power), "/query", &serde_json::to_value(payload).unwrap_or_default(), ) .await { error!("[{}] Failed to log batch request: {}", request_id, e); } // Get action and table from parent request (all batch queries share these) let action = payload .action .as_ref() .ok_or_else(|| StatusCode::BAD_REQUEST)?; let table = payload .table .as_ref() .ok_or_else(|| StatusCode::BAD_REQUEST)?; // Validate table name once for the entire batch if let Err(e) = validate_table_name(table, &state.config) { return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!("Invalid table name: {}", e)), warning: None, results: Some(vec![]), })); } // Check RBAC permission once for the entire batch if !state .rbac .check_permission(&state.config, session.power, table, action) { return Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!( "Insufficient permissions for {} on table '{}'", match action { QueryAction::Select => "SELECT", QueryAction::Insert => "INSERT", QueryAction::Update => "UPDATE", QueryAction::Delete => "DELETE", QueryAction::Count => "COUNT", }, table )), warning: None, results: Some(vec![]), })); } info!( "[{}] Validated batch: {} x {:?} on table '{}'", request_id, queries.len(), action, table ); super::log_info_async( &state.logging, &request_id, &format!( "Validated batch: {} x {:?} on table '{}'", queries.len(), action, table ), Some("query"), Some(&session.username), Some(session.power), ); if !state.database.is_available() { warn!( "[{}] Database marked unavailable before batch execution", request_id ); log_database_unavailable_event( &state.logging, &request_id, Some(&session.username), Some(session.power), "Database flagged unavailable before batch", ) .await; return Ok(Json(database_unavailable_batch_response(&request_id))); } // Start a SINGLE transaction for the batch - proper atomic operation let mut tx = match state.database.pool().begin().await { Ok(tx) => { state.database.mark_available(); tx } Err(e) => { state.database.mark_unavailable(); error!("[{}] Failed to begin batch transaction: {}", request_id, e); log_database_unavailable_event( &state.logging, &request_id, Some(&session.username), Some(session.power), &format!("Failed to begin batch transaction: {}", e), ) .await; return Ok(Json(database_unavailable_batch_response(&request_id))); } }; // Set user context and request ID in transaction if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?") .bind(session.user_id) .bind(&request_id) .execute(&mut *tx) .await { state.database.mark_unavailable(); error!( "[{}] Failed to set current user context and request ID: {}", request_id, e ); log_database_unavailable_event( &state.logging, &request_id, Some(&session.username), Some(session.power), &format!("Failed to set batch user context: {}", e), ) .await; return Ok(Json(database_unavailable_batch_response(&request_id))); } let rollback_on_error = payload.rollback_on_error.unwrap_or(false); info!( "[{}] Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})", request_id, queries.len(), action, table, rollback_on_error ); super::log_info_async( &state.logging, &request_id, &format!( "Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})", queries.len(), action, table, rollback_on_error ), Some("query"), Some(&session.username), Some(session.power), ); // Execute as SINGLE native batch query based on action type let result = match action { QueryAction::Insert => { execute_batch_insert( &request_id, &mut tx, queries, table, action, &session.username, &state, &session, ) .await } QueryAction::Update => { execute_batch_update( &request_id, &mut tx, queries, table, action, &session.username, &state, &session, ) .await } QueryAction::Delete => { execute_batch_delete( &request_id, &mut tx, queries, table, action, &session.username, &state, &session, ) .await } QueryAction::Select => { // SELECT batches are less common but we'll execute them individually // (combining SELECTs into one query doesn't make sense as they return different results) execute_batch_selects( &request_id, &mut tx, queries, table, action, &session.username, &session, &state, ) .await } QueryAction::Count => { // COUNT batches execute individually (each returns different results) execute_batch_counts( &request_id, &mut tx, queries, table, action, &session.username, &session, &state, ) .await } }; match result { Ok(response) => { if response.success || !rollback_on_error { // Commit the transaction tx.commit().await.map_err(|e| { error!("[{}] Failed to commit batch transaction: {}", request_id, e); StatusCode::INTERNAL_SERVER_ERROR })?; info!( "[{}] Native batch committed: {} operations", request_id, queries.len() ); super::log_info_async( &state.logging, &request_id, &format!("Native batch committed: {} operations", queries.len()), Some("query"), Some(&session.username), Some(session.power), ); Ok(Json(response)) } else { // Rollback on error error!( "[{}] Rolling back batch transaction due to error", request_id ); tx.rollback().await.map_err(|e| { error!("[{}] Failed to rollback transaction: {}", request_id, e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(response)) } } Err(e) => { error!("[{}] Batch execution failed: {}", request_id, e); tx.rollback().await.map_err(|e2| { error!("[{}] Failed to rollback after error: {}", request_id, e2); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(QueryResponse { success: false, data: None, rows_affected: None, error: Some(format!("Batch execution failed: {}", e)), warning: None, results: Some(vec![]), })) } } } // ===== NATIVE BATCH EXECUTION FUNCTIONS ===== /// Execute batch INSERT using MySQL multi-value INSERT async fn execute_batch_insert( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, queries: &Vec, table: &str, _action: &QueryAction, _username: &str, state: &AppState, _session: &crate::models::Session, ) -> anyhow::Result { use serde_json::Value; // Extract all data objects, apply auto-generation, and validate they have the same columns let mut all_data = Vec::new(); let mut column_set: Option> = None; // Check for auto-generation config let auto_config = state.config.get_auto_generation_config(&table); for (idx, query) in queries.iter().enumerate() { let mut data = query .data .as_ref() .ok_or_else(|| anyhow::anyhow!("Query {} missing data field for INSERT", idx + 1))? .clone(); // Apply auto-generation if configured for INSERT if let Some(ref auto_cfg) = auto_config { if auto_cfg.on_action == "insert" || auto_cfg.on_action == "both" { if let Value::Object(ref mut map) = data { let field_name = &auto_cfg.field; if !map.contains_key(field_name) || map.get(field_name).map_or(true, |v| { v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) }) { // Generate auto value based on config let generated_value = generate_auto_value(&state, &table, auto_cfg).await?; map.insert(field_name.clone(), Value::String(generated_value)); } } } } if let Value::Object(map) = data { let cols: std::collections::HashSet = map.keys().cloned().collect(); if let Some(ref expected_cols) = column_set { if *expected_cols != cols { anyhow::bail!("All INSERT queries must have the same columns. Query {} has different columns", idx + 1); } } else { column_set = Some(cols); } all_data.push(map); } else { anyhow::bail!("Query {} data must be an object", idx + 1); } } let columns: Vec = column_set.unwrap().into_iter().collect(); // Validate column names for col in &columns { validate_column_name(col)?; } // Build multi-value INSERT: INSERT INTO table (col1, col2) VALUES (?, ?), (?, ?), ... let column_list = columns.join(", "); let value_placeholder = format!("({})", vec!["?"; columns.len()].join(", ")); let values_clause = vec![value_placeholder; all_data.len()].join(", "); let sql = format!( "INSERT INTO {} ({}) VALUES {}", table, column_list, values_clause ); info!( "[{}] Native batch INSERT: {} rows into {}", request_id, all_data.len(), table ); super::log_info_async( &state.logging, &request_id, &format!( "Native batch INSERT: {} rows into {}", all_data.len(), table ), Some("query"), Some(&_session.username), Some(_session.power), ); // Bind all values let mut query = sqlx::query(&sql); for data_map in &all_data { for col in &columns { let value = data_map.get(col).and_then(|v| match v { Value::String(s) => Some(s.clone()), Value::Number(n) => Some(n.to_string()), Value::Bool(b) => Some(b.to_string()), Value::Null => None, _ => Some(v.to_string()), }); query = query.bind(value); } } // Execute the batch INSERT let result = query.execute(&mut **tx).await?; let rows_affected = result.rows_affected(); info!( "[{}] Batch INSERT affected {} rows", request_id, rows_affected ); super::log_info_async( &state.logging, &request_id, &format!("Batch INSERT affected {} rows", rows_affected), Some("query"), Some(&_session.username), Some(_session.power), ); Ok(QueryResponse { success: true, data: None, rows_affected: Some(rows_affected), error: None, warning: None, results: None, }) } /// Execute batch UPDATE (executes individually for now, could be optimized with CASE statements) async fn execute_batch_update( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, queries: &Vec, table: &str, action: &QueryAction, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { let mut total_rows = 0u64; for (idx, batch_query) in queries.iter().enumerate() { // Convert BatchQuery to QueryRequest by adding inherited action/table let query_req = QueryRequest { action: Some(action.clone()), table: Some(table.to_string()), columns: batch_query.columns.clone(), data: batch_query.data.clone(), where_clause: batch_query.where_clause.clone(), filter: batch_query.filter.clone(), joins: None, limit: batch_query.limit, offset: batch_query.offset, order_by: batch_query.order_by.clone(), queries: None, rollback_on_error: None, }; let result = execute_update_core( &format!("{}-{}", request_id, idx + 1), tx, &query_req, username, state, session, ) .await?; if let Some(rows) = result.rows_affected { total_rows += rows; } } info!( "[{}] Batch UPDATE affected {} total rows", request_id, total_rows ); super::log_info_async( &state.logging, &request_id, &format!("Batch UPDATE affected {} total rows", total_rows), Some("query"), Some(&session.username), Some(session.power), ); Ok(QueryResponse { success: true, data: None, rows_affected: Some(total_rows), error: None, warning: None, results: None, }) } /// Execute batch DELETE using IN clause when possible async fn execute_batch_delete( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, queries: &Vec, table: &str, action: &QueryAction, username: &str, state: &AppState, session: &crate::models::Session, ) -> anyhow::Result { // For now, execute individually // TODO: Optimize by detecting simple ID-based deletes and combining with IN clause let mut total_rows = 0u64; for (idx, batch_query) in queries.iter().enumerate() { // Convert BatchQuery to QueryRequest by adding inherited action/table let query_req = QueryRequest { action: Some(action.clone()), table: Some(table.to_string()), columns: batch_query.columns.clone(), data: batch_query.data.clone(), where_clause: batch_query.where_clause.clone(), filter: batch_query.filter.clone(), joins: None, limit: batch_query.limit, offset: batch_query.offset, order_by: batch_query.order_by.clone(), queries: None, rollback_on_error: None, }; let result = execute_delete_core( &format!("{}-{}", request_id, idx + 1), tx, &query_req, username, state, session, ) .await?; if let Some(rows) = result.rows_affected { total_rows += rows; } } info!( "[{}] Batch DELETE affected {} total rows", request_id, total_rows ); super::log_info_async( &state.logging, &request_id, &format!("Batch DELETE affected {} total rows", total_rows), Some("query"), Some(&session.username), Some(session.power), ); Ok(QueryResponse { success: true, data: None, rows_affected: Some(total_rows), error: None, warning: None, results: None, }) } /// Execute batch SELECT (executes individually since they return different results) async fn execute_batch_selects( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, queries: &Vec, table: &str, action: &QueryAction, username: &str, session: &crate::models::Session, state: &AppState, ) -> anyhow::Result { let mut results = Vec::new(); for (idx, batch_query) in queries.iter().enumerate() { // Convert BatchQuery to QueryRequest by adding inherited action/table let query_req = QueryRequest { action: Some(action.clone()), table: Some(table.to_string()), columns: batch_query.columns.clone(), data: batch_query.data.clone(), where_clause: batch_query.where_clause.clone(), filter: batch_query.filter.clone(), joins: None, limit: batch_query.limit, offset: batch_query.offset, order_by: batch_query.order_by.clone(), queries: None, rollback_on_error: None, }; let result = execute_select_core( &format!("{}-{}", request_id, idx + 1), tx, &query_req, username, session, state, ) .await?; results.push(result); } info!( "[{}] Batch SELECT executed {} queries", request_id, results.len() ); super::log_info_async( &state.logging, &request_id, &format!("Batch SELECT executed {} queries", results.len()), Some("query"), Some(&session.username), Some(session.power), ); Ok(QueryResponse { success: true, data: None, rows_affected: None, error: None, warning: None, results: Some(results), }) } /// Execute batch COUNT (executes individually since they return different results) async fn execute_batch_counts( request_id: &str, tx: &mut sqlx::Transaction<'_, sqlx::MySql>, queries: &Vec, table: &str, action: &QueryAction, username: &str, session: &crate::models::Session, state: &AppState, ) -> anyhow::Result { let mut results = Vec::new(); for (idx, batch_query) in queries.iter().enumerate() { // Convert BatchQuery to QueryRequest by adding inherited action/table let query_req = QueryRequest { action: Some(action.clone()), table: Some(table.to_string()), columns: batch_query.columns.clone(), data: batch_query.data.clone(), where_clause: batch_query.where_clause.clone(), filter: batch_query.filter.clone(), joins: None, limit: batch_query.limit, offset: batch_query.offset, order_by: batch_query.order_by.clone(), queries: None, rollback_on_error: None, }; let result = execute_count_core( &format!("{}:{}", request_id, idx), tx, &query_req, username, session, state, ) .await?; results.push(result); } info!( "[{}] Batch COUNT executed {} queries", request_id, results.len() ); super::log_info_async( &state.logging, &request_id, &format!("Batch COUNT executed {} queries", results.len()), Some("query"), Some(&session.username), Some(session.power), ); Ok(QueryResponse { success: true, data: None, rows_affected: None, error: None, warning: None, results: Some(results), }) }