diff options
Diffstat (limited to 'src/routes/query.rs')
| -rw-r--r-- | src/routes/query.rs | 3255 |
1 files changed, 3255 insertions, 0 deletions
diff --git a/src/routes/query.rs b/src/routes/query.rs new file mode 100644 index 0000000..9814215 --- /dev/null +++ b/src/routes/query.rs @@ -0,0 +1,3255 @@ +// Query routes and execution +use anyhow::{Context, Result}; +use axum::{ + extract::{ConnectInfo, State}, + http::{HeaderMap, StatusCode}, + Json, +}; +use chrono::Utc; +use rand::Rng; +use serde_json::Value; +use sqlx::{Column, Row}; +use std::collections::HashMap; +use std::net::SocketAddr; +use tracing::{error, info, warn}; + +use crate::logging::AuditLogger; +use crate::models::{PermissionsResponse, QueryAction, QueryRequest, QueryResponse, UserInfo}; +use crate::sql::{ + build_filter_clause, build_legacy_where_clause, build_order_by_clause, validate_column_name, + validate_column_names, validate_table_name, +}; +use crate::AppState; + +// Helper function to extract token from Authorization header +fn extract_token(headers: &HeaderMap) -> Option<String> { + headers + .get("Authorization") + .and_then(|header| header.to_str().ok()) + .and_then(|auth_str| { + if auth_str.starts_with("Bearer ") { + Some(auth_str[7..].to_string()) + } else { + None + } + }) +} + +fn database_unavailable_response(request_id: &str) -> QueryResponse { + QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Database temporarily unavailable, try again in a moment [request_id: {}]", + request_id + )), + warning: None, + results: None, + } +} + +fn database_unavailable_batch_response(request_id: &str) -> QueryResponse { + let mut base = database_unavailable_response(request_id); + base.results = Some(vec![]); + base +} + +async fn log_database_unavailable_event( + logger: &AuditLogger, + request_id: &str, + username: Option<&str>, + power: Option<i32>, + detail: &str, +) { + if let Err(err) = logger + .log_error( + request_id, + Utc::now(), + detail, + Some("database_unavailable"), + username, + power, + ) + .await + { + error!("[{}] Failed to record database outage: {}", request_id, err); + } +} + +pub async fn execute_query( + State(state): State<AppState>, + ConnectInfo(addr): ConnectInfo<SocketAddr>, + headers: HeaderMap, + Json(payload): Json<QueryRequest>, +) -> Result<Json<QueryResponse>, StatusCode> { + let timestamp = Utc::now(); + let client_ip = addr.ip().to_string(); + let request_id = AuditLogger::generate_request_id(); + + // Extract and validate session token + let token = match extract_token(&headers) { + Some(token) => token, + None => { + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some( + "Please stop trying to access this resource without signing in".to_string(), + ), + warning: None, + results: None, + })); + } + }; + + let session = match state.session_manager.get_session(&token) { + Some(session) => session, + None => { + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("Session not found".to_string()), + warning: None, + results: None, + })); + } + }; + + // Detect batch mode - if queries field is present, handle as batch operation + if payload.queries.is_some() { + // SECURITY: Check if user has permission to use batch operations + let power_perms = state + .config + .permissions + .power_levels + .get(&session.power.to_string()) + .ok_or(StatusCode::FORBIDDEN)?; + + if !power_perms.allow_batch_operations { + super::log_warning_async( + &state.logging, + &request_id, + &format!( + "User {} (power {}) attempted batch operation without permission", + session.username, session.power + ), + Some("authorization"), + Some(&session.username), + Some(session.power), + ); + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("Batch operations not permitted for your role".to_string()), + warning: None, + results: None, + })); + } + + return execute_batch_mode(state, session, request_id, timestamp, client_ip, &payload) + .await; + } + + // Validate input for very basic security vulnerabilities (null bytes, etc.) + if let Err(security_error) = validate_input_security(&payload) { + super::log_warning_async( + &state.logging, + &request_id, + &format!( + "Security validation failed for user {}: {}", + session.username, security_error + ), + Some("security_validation"), + Some(&session.username), + Some(session.power), + ); + + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Invalid input detected, how did you even manage to do that? [request_id: {}]", + request_id + )), + warning: None, + results: None, + })); + } + + // Log the request + if let Err(e) = state + .logging + .log_request( + &request_id, + timestamp, + &client_ip, + Some(&session.username), + Some(session.power), + "/query", + &serde_json::to_value(&payload).unwrap_or_default(), + ) + .await + { + error!("[{}] Failed to log request: {}", request_id, e); + } + + // Clone payload before extracting fields (to avoid partial move issues) + let payload_clone = payload.clone(); + + // Single query mode - validate required fields + let action = payload.action.ok_or_else(|| { + let error_msg = "Missing action field in single query mode"; + super::log_error_async( + &state.logging, + &request_id, + error_msg, + Some("request_validation"), + Some(&session.username), + Some(session.power), + ); + StatusCode::BAD_REQUEST + })?; + + let table = payload.table.ok_or_else(|| { + let error_msg = "Missing table field in single query mode"; + super::log_error_async( + &state.logging, + &request_id, + error_msg, + Some("request_validation"), + Some(&session.username), + Some(session.power), + ); + StatusCode::BAD_REQUEST + })?; + + // SECURITY: Validate table name before any operations + if let Err(e) = validate_table_name(&table, &state.config) { + let error_msg = format!("Invalid table name '{}': {}", table, e); + super::log_error_async( + &state.logging, + &request_id, + &error_msg, + Some("table_validation"), + Some(&session.username), + Some(session.power), + ); + + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!("Invalid table name [request_id: {}]", request_id)), + warning: None, + results: None, + })); + } + + // SECURITY: Validate column names if specified + if let Some(ref columns) = payload.columns { + if let Err(e) = validate_column_names(columns) { + let error_msg = format!("Invalid column names on table '{}': {}", table, e); + super::log_error_async( + &state.logging, + &request_id, + &error_msg, + Some("column_validation"), + Some(&session.username), + Some(session.power), + ); + + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Invalid column name: {} [request_id: {}]", + e, request_id + )), + warning: None, + results: None, + })); + } + } + + // Check permissions (after validation to avoid leaking table existence) + if !state + .rbac + .check_permission(&state.config, session.power, &table, &action) + { + let action_str = match action { + QueryAction::Select => "SELECT", + QueryAction::Insert => "INSERT", + QueryAction::Update => "UPDATE", + QueryAction::Delete => "DELETE", + QueryAction::Count => "COUNT", + }; + + super::log_warning_async( + &state.logging, + &request_id, + &format!( + "User {} attempted unauthorized {} on table {}", + session.username, action_str, table + ), + Some("authorization"), + Some(&session.username), + Some(session.power), + ); + + // Log security violation + if let Err(log_err) = state + .logging + .log_error( + &request_id, + timestamp, + &format!("Permission denied: {} on table {}", action_str, table), + Some("authorization"), + Some(&session.username), + Some(session.power), + ) + .await + { + error!( + "[{}] Failed to log permission denial: {}", + request_id, log_err + ); + } + + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!("Insufficient permissions for this operation, Might as well give up [request_id: {}]", request_id)), + warning: None, + results: None, + })); + } + + if !state.database.is_available() { + warn!( + "[{}] Database marked unavailable, returning graceful error", + request_id + ); + log_database_unavailable_event( + &state.logging, + &request_id, + Some(&session.username), + Some(session.power), + "Database flagged unavailable before transaction", + ) + .await; + return Ok(Json(database_unavailable_response(&request_id))); + } + + // ALL database operations now use transactions with user context set + let mut tx = match state.database.pool().begin().await { + Ok(tx) => { + state.database.mark_available(); + tx + } + Err(e) => { + state.database.mark_unavailable(); + error!("[{}] Failed to begin transaction: {}", request_id, e); + log_database_unavailable_event( + &state.logging, + &request_id, + Some(&session.username), + Some(session.power), + &format!("Failed to begin transaction: {}", e), + ) + .await; + return Ok(Json(database_unavailable_response(&request_id))); + } + }; + + // Set user context and request ID in transaction - ALL queries have user context now + if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?") + .bind(session.user_id) + .bind(&request_id) + .execute(&mut *tx) + .await + { + state.database.mark_unavailable(); + error!( + "[{}] Failed to set current user context and request ID: {}", + request_id, e + ); + log_database_unavailable_event( + &state.logging, + &request_id, + Some(&session.username), + Some(session.power), + &format!("Failed to set user context: {}", e), + ) + .await; + return Ok(Json(database_unavailable_response(&request_id))); + } + + // Execute the query within the transaction + let result = match action { + QueryAction::Select => { + execute_select_with_tx( + &request_id, + tx, + &payload_clone, + &session.username, + &session, + &state, + ) + .await + } + QueryAction::Insert => { + execute_insert_with_tx( + &request_id, + tx, + &payload_clone, + &session.username, + &state, + &session, + ) + .await + } + QueryAction::Update => { + execute_update_with_tx( + &request_id, + tx, + &payload_clone, + &session.username, + &state, + &session, + ) + .await + } + QueryAction::Delete => { + execute_delete_with_tx( + &request_id, + tx, + &payload_clone, + &session.username, + &state, + &session, + ) + .await + } + QueryAction::Count => { + execute_count_with_tx( + &request_id, + tx, + &payload_clone, + &session.username, + &session, + &state, + ) + .await + } + }; + + match result { + Ok(response) => { + let action_str = match action { + QueryAction::Select => "SELECT", + QueryAction::Insert => "INSERT", + QueryAction::Update => "UPDATE", + QueryAction::Delete => "DELETE", + QueryAction::Count => "COUNT", + }; + + super::log_info_async( + &state.logging, + &request_id, + &format!("Query executed successfully: {} on {}", action_str, table), + Some("query_execution"), + Some(&session.username), + Some(session.power), + ); + Ok(Json(response)) + } + Err(e) => { + error!("[{}] Query execution failed: {}", request_id, e); + if let Err(log_err) = state + .logging + .log_error( + &request_id, + timestamp, + &format!("Query execution error: {}", e), + Some("query_execution"), + Some(&session.username), + Some(session.power), + ) + .await + { + error!("[{}] Failed to log error: {}", request_id, log_err); + } + + Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Database query failed [request_id: {}]", + request_id + )), + warning: None, + results: None, + })) + } + } +} + +pub async fn get_permissions( + State(state): State<AppState>, + ConnectInfo(addr): ConnectInfo<SocketAddr>, + headers: HeaderMap, +) -> Result<Json<PermissionsResponse>, StatusCode> { + let timestamp = Utc::now(); + let client_ip = addr.ip().to_string(); + let request_id = AuditLogger::generate_request_id(); + + // Extract and validate session token + let token = match extract_token(&headers) { + Some(token) => token, + None => { + return Ok(Json(PermissionsResponse { + success: false, + permissions: HashMap::new(), + user: UserInfo { + id: 0, + username: "".to_string(), + name: "".to_string(), + role: "".to_string(), + power: 0, + }, + security_clearance: None, + user_settings_access: "".to_string(), + })); + } + }; + + let session = match state.session_manager.get_session(&token) { + Some(session) => session, + None => { + return Ok(Json(PermissionsResponse { + success: false, + permissions: HashMap::new(), + user: UserInfo { + id: 0, + username: "".to_string(), + name: "".to_string(), + role: "".to_string(), + power: 0, + }, + security_clearance: None, + user_settings_access: "".to_string(), + })); + } + }; + + // Log the request + if let Err(e) = state + .logging + .log_request( + &request_id, + timestamp, + &client_ip, + Some(&session.username), + Some(session.power), + "/permissions", + &serde_json::json!({}), + ) + .await + { + error!("[{}] Failed to log request: {}", request_id, e); + } + + let permissions = state + .rbac + .get_table_permissions(&state.config, session.power); + + // Get user settings access permission + let user_settings_permission = state + .config + .permissions + .power_levels + .get(&session.power.to_string()) + .map(|p| &p.user_settings_access) + .unwrap_or(&state.config.security.default_user_settings_access); + + let user_settings_access_str = match user_settings_permission { + crate::config::UserSettingsAccess::ReadOwnOnly => "read-own-only", + crate::config::UserSettingsAccess::ReadWriteOwn => "read-write-own", + crate::config::UserSettingsAccess::ReadWriteAll => "read-write-all", + }; + + Ok(Json(PermissionsResponse { + success: true, + permissions, + user: UserInfo { + id: session.user_id, + username: session.username, + name: "".to_string(), // We don't store name in session, would need to fetch from DB + role: session.role_name, + power: session.power, + }, + security_clearance: None, + user_settings_access: user_settings_access_str.to_string(), + })) +} + +// ===== SAFE SQL QUERY BUILDERS WITH VALIDATION ===== +// All functions validate table/column names to prevent SQL injection + +/// Build WHERE clause with column name validation (legacy simple format) +fn build_where_clause(where_clause: &Value) -> anyhow::Result<(String, Vec<String>)> { + // Use the new validated builder from sql module + build_legacy_where_clause(where_clause) +} + +/// Build INSERT data with column name validation +fn build_insert_data( + data: &Value, +) -> anyhow::Result<(Vec<String>, Vec<String>, Vec<Option<String>>)> { + let mut columns = Vec::new(); + let mut placeholders = Vec::new(); + let mut values = Vec::new(); + + if let Value::Object(map) = data { + for (key, value) in map { + // SECURITY: Validate column name before using it + validate_column_name(key) + .with_context(|| format!("Invalid column name in INSERT: {}", key))?; + + // Handle special JSON fields like additional_fields + if key == "additional_fields" && value.is_object() { + columns.push(key.clone()); + placeholders.push("?".to_string()); + values.push(Some( + serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()), + )); + } else { + columns.push(key.clone()); + placeholders.push("?".to_string()); + values.push(json_value_to_option_string(value)); + } + } + } else { + anyhow::bail!("INSERT data must be a JSON object"); + } + + if columns.is_empty() { + anyhow::bail!("INSERT data cannot be empty"); + } + + Ok((columns, placeholders, values)) +} + +/// Build UPDATE SET clause with column name validation +fn build_update_set_clause(data: &Value) -> anyhow::Result<(String, Vec<Option<String>>)> { + let mut set_clauses = Vec::new(); + let mut values = Vec::new(); + + if let Value::Object(map) = data { + for (key, value) in map { + // SECURITY: Validate column name before using it + validate_column_name(key) + .with_context(|| format!("Invalid column name in UPDATE: {}", key))?; + + set_clauses.push(format!("{} = ?", key)); + // Handle special JSON fields like additional_fields + if key == "additional_fields" && value.is_object() { + values.push(Some( + serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()), + )); + } else { + values.push(json_value_to_option_string(value)); + } + } + } else { + anyhow::bail!("UPDATE data must be a JSON object"); + } + + if set_clauses.is_empty() { + anyhow::bail!("UPDATE data cannot be empty"); + } + + Ok((set_clauses.join(", "), values)) +} + +/// Convert JSON value to Option<String> for SQL binding +/// Properly handles booleans (true/false -> "1"/"0" for MySQL TINYINT/BOOLEAN) +/// NULL values return None for proper SQL NULL handling +fn json_value_to_option_string(value: &Value) -> Option<String> { + match value { + Value::String(s) => Some(s.clone()), + Value::Number(n) => Some(n.to_string()), + // MySQL uses TINYINT(1) for booleans: true -> 1, false -> 0 + Value::Bool(b) => Some(if *b { "1".to_string() } else { "0".to_string() }), + Value::Null => None, // Return None for proper SQL NULL + // Complex types (objects, arrays) get JSON serialized + _ => Some(serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())), + } +} +/// Convert SQL values from MySQL to JSON +/// Handles ALL MySQL types automatically with proper NULL handling +/// Returns booleans as true/false (MySQL TINYINT(1) -> JSON bool) +fn convert_sql_value_to_json( + row: &sqlx::mysql::MySqlRow, + index: usize, + request_id: Option<&str>, + username: Option<&str>, + power: Option<i32>, + state: Option<&AppState>, +) -> anyhow::Result<Value> { + let column = &row.columns()[index]; + + use sqlx::TypeInfo; + let type_name = column.type_info().name(); + + // Comprehensive MySQL type handling - no need to manually add types anymore! + let result = match type_name { + // ===== String types ===== + "VARCHAR" | "TEXT" | "CHAR" | "LONGTEXT" | "MEDIUMTEXT" | "TINYTEXT" | "SET" | "ENUM" => { + row.try_get::<Option<String>, _>(index) + .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) + } + + // ===== Integer types ===== + "INT" | "BIGINT" | "MEDIUMINT" | "SMALLINT" | "INTEGER" => row + .try_get::<Option<i64>, _>(index) + .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)), + + // Unsigned integers + "INT UNSIGNED" | "BIGINT UNSIGNED" | "MEDIUMINT UNSIGNED" | "SMALLINT UNSIGNED" => row + .try_get::<Option<u64>, _>(index) + .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)), + + // ===== Boolean type (MySQL TINYINT(1)) ===== + // Returns proper JSON true/false instead of 1/0 + "TINYINT" | "BOOLEAN" | "BOOL" => { + // Try as bool first (for TINYINT(1)) + if let Ok(opt_bool) = row.try_get::<Option<bool>, _>(index) { + return Ok(opt_bool.map(Value::Bool).unwrap_or(Value::Null)); + } + // Fallback to i8 for regular TINYINT + row.try_get::<Option<i8>, _>(index) + .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)) + } + + // ===== Decimal/Numeric types ===== + "DECIMAL" | "NUMERIC" => { + row.try_get::<Option<rust_decimal::Decimal>, _>(index) + .map(|opt| { + opt.map(|decimal| { + // Keep precision by converting to string then parsing + let decimal_str = decimal.to_string(); + if let Ok(f) = decimal_str.parse::<f64>() { + serde_json::json!(f) + } else { + Value::String(decimal_str) + } + }) + .unwrap_or(Value::Null) + }) + } + + // ===== Floating point types ===== + "FLOAT" | "DOUBLE" | "REAL" => row + .try_get::<Option<f64>, _>(index) + .map(|opt| opt.map(|v| serde_json::json!(v)).unwrap_or(Value::Null)), + + // ===== Date/Time types ===== + "DATE" => { + use chrono::NaiveDate; + row.try_get::<Option<NaiveDate>, _>(index).map(|opt| { + opt.map(|date| Value::String(date.format("%Y-%m-%d").to_string())) + .unwrap_or(Value::Null) + }) + } + "DATETIME" => { + use chrono::NaiveDateTime; + row.try_get::<Option<NaiveDateTime>, _>(index).map(|opt| { + opt.map(|datetime| Value::String(datetime.format("%Y-%m-%d %H:%M:%S").to_string())) + .unwrap_or(Value::Null) + }) + } + "TIMESTAMP" => { + use chrono::{DateTime, Utc}; + row.try_get::<Option<DateTime<Utc>>, _>(index).map(|opt| { + opt.map(|timestamp| Value::String(timestamp.to_rfc3339())) + .unwrap_or(Value::Null) + }) + } + "TIME" => { + // TIME values come as strings in HH:MM:SS format + row.try_get::<Option<String>, _>(index) + .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) + } + "YEAR" => row + .try_get::<Option<i32>, _>(index) + .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)), + + // ===== JSON type ===== + "JSON" => row.try_get::<Option<String>, _>(index).map(|opt| { + opt.and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or(Value::Null) + }), + + // ===== Binary types ===== + // Return as base64-encoded strings for safe JSON transmission + "BLOB" | "MEDIUMBLOB" | "LONGBLOB" | "TINYBLOB" | "BINARY" | "VARBINARY" => { + row.try_get::<Option<Vec<u8>>, _>(index).map(|opt| { + opt.map(|bytes| { + use base64::{engine::general_purpose, Engine as _}; + Value::String(general_purpose::STANDARD.encode(&bytes)) + }) + .unwrap_or(Value::Null) + }) + } + + // ===== Bit type ===== + "BIT" => { + row.try_get::<Option<Vec<u8>>, _>(index).map(|opt| { + opt.map(|bytes| { + // Convert bit value to number + let mut val: i64 = 0; + for &byte in &bytes { + val = (val << 8) | byte as i64; + } + Value::Number(val.into()) + }) + .unwrap_or(Value::Null) + }) + } + + // ===== Spatial/Geometry types ===== + "GEOMETRY" | "POINT" | "LINESTRING" | "POLYGON" | "MULTIPOINT" | "MULTILINESTRING" + | "MULTIPOLYGON" | "GEOMETRYCOLLECTION" => { + // Return as WKT (Well-Known Text) string + row.try_get::<Option<String>, _>(index) + .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) + } + + // ===== Catch-all for unknown/new types ===== + // This ensures forward compatibility if MySQL adds new types + _ => { + warn!( + "Unknown MySQL type '{}' for column '{}', attempting string fallback", + type_name, + column.name() + ); + if let (Some(rid), Some(st)) = (request_id, state) { + super::log_warning_async( + &st.logging, + rid, + &format!( + "Unknown MySQL type '{}' for column '{}', attempting string fallback", + type_name, + column.name() + ), + Some("data_conversion"), + username, + power, + ); + } + row.try_get::<Option<String>, _>(index) + .map(|opt| opt.map(Value::String).unwrap_or(Value::Null)) + } + }; + + // Robust error handling with fallback + match result { + Ok(value) => Ok(value), + Err(e) => { + // Final fallback: try as string + match row.try_get::<Option<String>, _>(index) { + Ok(opt) => { + warn!("Primary conversion failed for column '{}' (type: {}), used string fallback", + column.name(), type_name); + if let (Some(rid), Some(st)) = (request_id, state) { + super::log_warning_async( + &st.logging, + rid, + &format!("Primary conversion failed for column '{}' (type: {}), used string fallback", column.name(), type_name), + Some("data_conversion"), + username, + power, + ); + } + Ok(opt.map(Value::String).unwrap_or(Value::Null)) + } + Err(_) => { + error!( + "Complete failure to decode column '{}' (index: {}, type: {}): {}", + column.name(), + index, + type_name, + e + ); + // Return NULL instead of failing the entire query + Ok(Value::Null) + } + } + } + } +} + +// Generate auto values based on configuration +async fn generate_auto_value( + state: &AppState, + table: &str, + config: &crate::config::AutoGenerationConfig, +) -> Result<String, anyhow::Error> { + match config.gen_type.as_str() { + "numeric" => generate_unique_numeric_id(state, table, config).await, + _ => Err(anyhow::anyhow!( + "Unsupported auto-generation type: {}", + config.gen_type + )), + } +} + +// Generate a unique numeric ID based on configuration +async fn generate_unique_numeric_id( + state: &AppState, + table: &str, + config: &crate::config::AutoGenerationConfig, +) -> Result<String, anyhow::Error> { + let range_min = config.range_min.unwrap_or(10000000); + let range_max = config.range_max.unwrap_or(99999999); + let max_attempts = config.max_attempts.unwrap_or(10) as usize; + let field_name = &config.field; + + for _attempt in 0..max_attempts { + // Generate random number in specified range + let id = { + let mut rng = rand::rng(); + rng.random_range(range_min..=range_max) + }; + let id_str = id.to_string(); + + // Check if this ID already exists + let query_str = format!( + "SELECT COUNT(*) as count FROM {} WHERE {} = ?", + table, field_name + ); + let exists = sqlx::query(&query_str) + .bind(&id_str) + .fetch_one(state.database.pool()) + .await?; + + let count: i64 = exists.try_get("count")?; + + if count == 0 { + return Ok(id_str); + } + } + + Err(anyhow::anyhow!( + "Failed to generate unique {} for table {} after {} attempts", + field_name, + table, + max_attempts + )) +} + +// Security validation functions +fn validate_input_security(payload: &QueryRequest) -> Result<(), String> { + // Check for null bytes in table name + if let Some(ref table) = payload.table { + if table.contains('\0') { + return Err("Null byte detected in table name".to_string()); + } + } + + // Check for null bytes in column names + if let Some(columns) = &payload.columns { + for column in columns { + if column.contains('\0') { + return Err("Null byte detected in column name".to_string()); + } + } + } + + // Check for null bytes in data values + if let Some(data) = &payload.data { + if contains_null_bytes_in_value(data) { + return Err("Null byte detected in data values".to_string()); + } + } + + // Check for null bytes in WHERE clause + if let Some(where_clause) = &payload.where_clause { + if contains_null_bytes_in_value(where_clause) { + return Err("Null byte detected in WHERE clause".to_string()); + } + } + + Ok(()) +} + +fn contains_null_bytes_in_value(value: &Value) -> bool { + match value { + Value::String(s) => s.contains('\0'), + Value::Array(arr) => arr.iter().any(contains_null_bytes_in_value), + Value::Object(map) => { + map.keys().any(|k| k.contains('\0')) || map.values().any(contains_null_bytes_in_value) + } + _ => false, + } +} + +// Core execution functions that work with mutable transaction references +// These are used by batch operations to execute multiple queries in a single atomic transaction + +async fn execute_select_core( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + session: &crate::models::Session, + state: &AppState, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + // Helper to count conditions in filter/where + fn count_conditions( + filter: &Option<crate::models::FilterCondition>, + where_clause: &Option<serde_json::Value>, + ) -> usize { + let mut count = 0; + if let Some(f) = filter { + count += count_filter_conditions(f); + } + if let Some(w) = where_clause { + count += count_where_conditions(w); + } + count + } + + fn count_filter_conditions(filter: &crate::models::FilterCondition) -> usize { + use crate::models::FilterCondition; + match filter { + FilterCondition::Logical { + and_conditions, + or_conditions, + } => { + let mut count = 0; + if let Some(conditions) = and_conditions { + count += conditions + .iter() + .map(|c| count_filter_conditions(c)) + .sum::<usize>(); + } + if let Some(conditions) = or_conditions { + count += conditions + .iter() + .map(|c| count_filter_conditions(c)) + .sum::<usize>(); + } + count + } + _ => 1, + } + } + + fn count_where_conditions(where_clause: &serde_json::Value) -> usize { + match where_clause { + serde_json::Value::Object(map) => { + if map.contains_key("AND") || map.contains_key("OR") { + if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) { + if let serde_json::Value::Array(conditions) = arr { + return conditions.iter().map(|c| count_where_conditions(c)).sum(); + } + } + } + 1 + } + _ => 1, + } + } + + let max_limit = state.config.get_max_limit(session.power); + let max_where = state.config.get_max_where_conditions(session.power); + + let requested_columns = if let Some(ref cols) = payload.columns { + cols.clone() + } else { + vec!["*".to_string()] + }; + + let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" { + "*".to_string() + } else { + let allowed_columns = + state + .config + .filter_readable_columns(session.power, &table, &requested_columns); + if allowed_columns.is_empty() { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("No readable columns available for this request".to_string()), + warning: None, + results: None, + }); + } + allowed_columns.join(", ") + }; + + let condition_count = count_conditions(&payload.filter, &payload.where_clause); + if condition_count > max_where as usize { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE/filter conditions ({}) > max {}", + condition_count, max_where + )), + warning: None, + results: None, + }); + } + + let mut query = format!("SELECT {} FROM {}", filtered_columns, table); + let mut values = Vec::new(); + + if let Some(joins) = &payload.joins { + for join in joins { + if !state.rbac.check_permission( + &state.config, + session.power, + &join.table, + &crate::models::QueryAction::Select, + ) { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Insufficient permissions to JOIN with table '{}'", + join.table + )), + warning: None, + results: None, + }); + } + } + let join_sql = crate::sql::build_join_clause(joins, &state.config)?; + query.push_str(&join_sql); + } + + if let Some(filter) = &payload.filter { + let (where_sql, where_values) = build_filter_clause(filter)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } else if let Some(where_clause) = &payload.where_clause { + let (where_sql, where_values) = build_where_clause(where_clause)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } + + if let Some(order_by) = &payload.order_by { + let order_clause = build_order_by_clause(order_by)?; + query.push_str(&order_clause); + } + + let requested_limit = payload.limit; + let limit = requested_limit.unwrap_or(max_limit); + let was_capped = limit > max_limit; + let limit = if limit > max_limit { max_limit } else { limit }; + query.push_str(&format!(" LIMIT {}", limit)); + + let limit_warning = if was_capped { + Some(format!( + "Requested LIMIT {} exceeded maximum {} for your power level, capped to {}", + requested_limit.unwrap(), + max_limit, + max_limit + )) + } else if requested_limit.is_none() { + Some(format!( + "No LIMIT specified, defaulted to {} (max for power level {})", + max_limit, session.power + )) + } else { + None + }; + + if let Err(e) = state + .logging + .log_query( + request_id, + chrono::Utc::now(), + username, + Some(session.power), + &query, + None, + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + let mut sqlx_query = sqlx::query(&query); + for value in values { + match value { + Some(v) => sqlx_query = sqlx_query.bind(v), + None => sqlx_query = sqlx_query.bind(Option::<String>::None), + } + } + + let rows = sqlx_query.fetch_all(&mut **tx).await?; + + let mut results = Vec::new(); + for row in rows { + let mut result_row = serde_json::Map::new(); + for (i, column) in row.columns().iter().enumerate() { + let value = convert_sql_value_to_json( + &row, + i, + Some(request_id), + Some(username), + Some(session.power), + Some(state), + )?; + result_row.insert(column.name().to_string(), value); + } + results.push(Value::Object(result_row)); + } + + Ok(QueryResponse { + success: true, + data: Some(Value::Array(results)), + rows_affected: None, + error: None, + warning: limit_warning, + results: None, + }) +} + +async fn execute_count_core( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + session: &crate::models::Session, + state: &AppState, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + // Helper to count conditions in filter/where (same as other functions) + fn count_conditions( + filter: &Option<crate::models::FilterCondition>, + where_clause: &Option<serde_json::Value>, + ) -> usize { + use crate::models::FilterCondition; + fn count_filter_conditions(filter: &FilterCondition) -> usize { + match filter { + FilterCondition::Logical { + and_conditions, + or_conditions, + } => { + let mut count = 0; + if let Some(conditions) = and_conditions { + count += conditions + .iter() + .map(|c| count_filter_conditions(c)) + .sum::<usize>(); + } + if let Some(conditions) = or_conditions { + count += conditions + .iter() + .map(|c| count_filter_conditions(c)) + .sum::<usize>(); + } + count + } + _ => 1, + } + } + + fn count_where_conditions(where_clause: &serde_json::Value) -> usize { + match where_clause { + serde_json::Value::Object(map) => { + if map.contains_key("AND") || map.contains_key("OR") { + if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) { + if let serde_json::Value::Array(conditions) = arr { + return conditions.iter().map(|c| count_where_conditions(c)).sum(); + } + } + } + 1 + } + _ => 1, + } + } + + let mut count = 0; + if let Some(f) = filter { + count += count_filter_conditions(f); + } + if let Some(w) = where_clause { + count += count_where_conditions(w); + } + count + } + + let max_where = state.config.get_max_where_conditions(session.power); + + // Enforce WHERE clause complexity + let condition_count = count_conditions(&payload.filter, &payload.where_clause); + if condition_count > max_where as usize { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE/filter conditions ({}) > max {}", + condition_count, max_where + )), + warning: None, + results: None, + }); + } + + let mut query = format!("SELECT COUNT(*) as count FROM {}", table); + let mut values = Vec::new(); + + // Add JOIN clauses if provided - validates permissions for all joined tables + if let Some(joins) = &payload.joins { + for join in joins { + if !state.rbac.check_permission( + &state.config, + session.power, + &join.table, + &crate::models::QueryAction::Select, + ) { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Insufficient permissions to JOIN with table '{}'", + join.table + )), + warning: None, + results: None, + }); + } + } + let join_sql = crate::sql::build_join_clause(joins, &state.config)?; + query.push_str(&join_sql); + } + + // Add WHERE conditions (filter takes precedence over where_clause if both are provided) + if let Some(filter) = &payload.filter { + let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } else if let Some(where_clause) = &payload.where_clause { + let (where_sql, where_values) = build_where_clause(where_clause)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } + + // Log the query + if let Err(e) = state + .logging + .log_query( + request_id, + chrono::Utc::now(), + username, + Some(session.power), + &query, + None, + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + let mut sqlx_query = sqlx::query(&query); + for value in values { + match value { + Some(v) => sqlx_query = sqlx_query.bind(v), + None => sqlx_query = sqlx_query.bind(Option::<String>::None), + } + } + + let result = sqlx_query.fetch_one(&mut **tx).await?; + let count: i64 = result.try_get("count")?; + + Ok(QueryResponse { + success: true, + data: Some(serde_json::json!(count)), + rows_affected: None, + error: None, + warning: None, + results: None, + }) +} + +async fn execute_update_core( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + let mut data = payload + .data + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))? + .clone(); + + let where_clause = payload + .where_clause + .as_ref() + .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?; + + let max_where = state.config.get_max_where_conditions(session.power); + let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0); + if condition_count > max_where as usize { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE conditions ({}) > max {}", + condition_count, max_where + )), + warning: None, + results: None, + }); + } + + // SECURITY: Apply column-level write filtering FIRST (before auto-generation) + if let Value::Object(ref mut map) = data { + let all_columns: Vec<String> = map.keys().cloned().collect(); + let writable_columns = + state + .config + .filter_writable_columns(session.power, &table, &all_columns); + + // Remove columns that user cannot write to + map.retain(|key, _| writable_columns.contains(key)); + + // Check for auto-generation (system-generated fields bypass write protection) + if let Some(auto_config) = state.config.get_auto_generation_config(&table) { + if auto_config.on_action == "update" || auto_config.on_action == "both" { + let field_name = &auto_config.field; + if !map.contains_key(field_name) + || map.get(field_name).map_or(true, |v| { + v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) + }) + { + let generated_value = generate_auto_value(&state, &table, auto_config).await?; + map.insert(field_name.clone(), Value::String(generated_value)); + } + } + } + } + + let writable_columns = data + .as_object() + .unwrap() + .keys() + .cloned() + .collect::<Vec<_>>(); + if writable_columns.is_empty() { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("No writable columns in UPDATE data".to_string()), + warning: None, + results: None, + }); + } + + let set_clause = writable_columns + .iter() + .map(|col| format!("{} = ?", col)) + .collect::<Vec<_>>() + .join(", "); + let (where_sql, where_values) = build_where_clause(where_clause)?; + let query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql); + + let requested_limit = payload.limit; + let max_limit = state.config.get_max_limit(session.power); + let limit = requested_limit.unwrap_or(max_limit); + let was_capped = limit > max_limit; + let limit = if limit > max_limit { max_limit } else { limit }; + let query_with_limit = format!("{} LIMIT {}", query, limit); + + let limit_warning = if was_capped { + Some(format!( + "Requested LIMIT {} exceeded maximum {}, capped to {}", + requested_limit.unwrap(), + max_limit, + max_limit + )) + } else { + None + }; + + if let Err(e) = state + .logging + .log_query( + request_id, + chrono::Utc::now(), + username, + Some(session.power), + &query_with_limit, + None, + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + let mut sqlx_query = sqlx::query(&query_with_limit); + for col in &writable_columns { + if let Some(val) = data.get(col) { + sqlx_query = sqlx_query.bind(val.clone()); + } + } + for val in where_values { + sqlx_query = sqlx_query.bind(val); + } + + let result = sqlx_query.execute(&mut **tx).await?; + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(result.rows_affected()), + error: None, + warning: limit_warning, + results: None, + }) +} + +async fn execute_delete_core( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + let where_clause = payload + .where_clause + .as_ref() + .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?; + + let max_where = state.config.get_max_where_conditions(session.power); + let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0); + if condition_count > max_where as usize { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE conditions ({}) > max {}", + condition_count, max_where + )), + warning: None, + results: None, + }); + } + + let (where_sql, where_values) = build_where_clause(where_clause)?; + let query = format!("DELETE FROM {} WHERE {}", table, where_sql); + + let requested_limit = payload.limit; + let max_limit = state.config.get_max_limit(session.power); + let limit = requested_limit.unwrap_or(max_limit); + let was_capped = limit > max_limit; + let limit = if limit > max_limit { max_limit } else { limit }; + let query_with_limit = format!("{} LIMIT {}", query, limit); + + let limit_warning = if was_capped { + Some(format!( + "Requested LIMIT {} exceeded maximum {}, capped to {}", + requested_limit.unwrap(), + max_limit, + max_limit + )) + } else { + None + }; + + if let Err(e) = state + .logging + .log_query( + request_id, + chrono::Utc::now(), + username, + Some(session.power), + &query_with_limit, + None, + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + let mut sqlx_query = sqlx::query(&query_with_limit); + for val in where_values { + sqlx_query = sqlx_query.bind(val); + } + + let result = sqlx_query.execute(&mut **tx).await?; + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(result.rows_affected()), + error: None, + warning: limit_warning, + results: None, + }) +} + +// Transaction-based execution functions for user context operations +// These create their own transactions and commit them - used for single query operations + +async fn execute_select_with_tx( + request_id: &str, + mut tx: sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + session: &crate::models::Session, + state: &AppState, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + // Helper to count conditions in filter/where + fn count_conditions( + filter: &Option<crate::models::FilterCondition>, + where_clause: &Option<serde_json::Value>, + ) -> usize { + use crate::models::FilterCondition; + fn count_filter(cond: &FilterCondition) -> usize { + match cond { + FilterCondition::Simple { .. } => 1, + FilterCondition::Logical { + and_conditions, + or_conditions, + } => { + and_conditions + .as_ref() + .map_or(0, |conds| conds.iter().map(count_filter).sum()) + + or_conditions + .as_ref() + .map_or(0, |conds| conds.iter().map(count_filter).sum()) + } + FilterCondition::Not { not } => count_filter(not), + } + } + let mut count = 0; + if let Some(f) = filter { + count += count_filter(f); + } + if let Some(w) = where_clause { + if let serde_json::Value::Object(map) = w { + count += map.len(); + } + } + count + } + + // Enforce query limits from config (power-level specific with fallback to defaults) + let max_limit = state.config.get_max_limit(session.power); + let max_where = state.config.get_max_where_conditions(session.power); + + // Apply granular column filtering based on user's power level + let requested_columns = if let Some(cols) = &payload.columns { + cols.clone() + } else { + vec!["*".to_string()] + }; + + let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" { + "*".to_string() + } else { + let allowed_columns = + state + .config + .filter_readable_columns(session.power, &table, &requested_columns); + if allowed_columns.is_empty() { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("No readable columns available for this request".to_string()), + warning: None, + results: None, + }); + } + allowed_columns.join(", ") + }; + + // Enforce WHERE clause complexity + let condition_count = count_conditions(&payload.filter, &payload.where_clause); + if condition_count > max_where as usize { + // Log security violation + let timestamp = chrono::Utc::now(); + if let Err(log_err) = state + .logging + .log_error( + &request_id, + timestamp, + &format!( + "Too many WHERE conditions: {} exceeds maximum {} for power level {}", + condition_count, max_where, session.power + ), + Some("query_limits"), + Some(&session.username), + Some(session.power), + ) + .await + { + error!( + "[{}] Failed to log WHERE limit violation: {}", + request_id, log_err + ); + } + + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]", + condition_count, max_where, request_id + )), + warning: None, + results: None, + }); + } + + let mut query = format!("SELECT {} FROM {}", filtered_columns, table); + let mut values = Vec::new(); + + // Add JOIN clauses if provided - validates permissions for all joined tables + if let Some(joins) = &payload.joins { + // Validate user has read permission for all joined tables + for join in joins { + if !state.rbac.check_permission( + &state.config, + session.power, + &join.table, + &crate::models::QueryAction::Select, + ) { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Insufficient permissions to JOIN with table '{}'", + join.table + )), + warning: None, + results: None, + }); + } + } + + // Build and append JOIN SQL + let join_sql = crate::sql::build_join_clause(joins, &state.config)?; + query.push_str(&join_sql); + } + + // Add WHERE clause - support both legacy and new filter format + if let Some(filter) = &payload.filter { + let (where_sql, where_values) = build_filter_clause(filter)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } else if let Some(where_clause) = &payload.where_clause { + let (where_sql, where_values) = build_where_clause(where_clause)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } + + // Add ORDER BY if provided + if let Some(order_by) = &payload.order_by { + let order_clause = build_order_by_clause(order_by)?; + query.push_str(&order_clause); + } + + // Enforce LIMIT and track if it was capped + let requested_limit = payload.limit; + let limit = requested_limit.unwrap_or(max_limit); + let was_capped = limit > max_limit; + let limit = if limit > max_limit { max_limit } else { limit }; + query.push_str(&format!(" LIMIT {}", limit)); + + let limit_warning = if was_capped { + Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]", + requested_limit.unwrap(), max_limit, max_limit, request_id)) + } else if requested_limit.is_none() { + Some(format!( + "No LIMIT specified, using default {} based on power level [request_id: {}]", + max_limit, request_id + )) + } else { + None + }; + + // Add OFFSET if provided + if let Some(offset) = payload.offset { + query.push_str(&format!(" OFFSET {}", offset)); + } + + // Log the query + let params_json = serde_json::to_value(&values).ok(); + if let Err(e) = state + .logging + .log_query( + request_id, + Utc::now(), + username, + Some(session.power), + &query, + params_json.as_ref(), + None, // Row count will be known after execution + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + // Execute the query + let mut sqlx_query = sqlx::query(&query); + for value in values { + match value { + Some(v) => sqlx_query = sqlx_query.bind(v), + None => sqlx_query = sqlx_query.bind(Option::<String>::None), + } + } + + let rows = sqlx_query.fetch_all(&mut *tx).await?; + tx.commit().await?; + + let mut results = Vec::new(); + for row in rows { + let mut result_row = serde_json::Map::new(); + for (i, column) in row.columns().iter().enumerate() { + let value = convert_sql_value_to_json( + &row, + i, + Some(request_id), + Some(username), + Some(session.power), + Some(state), + )?; + result_row.insert(column.name().to_string(), value); + } + results.push(Value::Object(result_row)); + } + + Ok(QueryResponse { + success: true, + data: Some(Value::Array(results)), + rows_affected: None, + error: None, + warning: limit_warning, + results: None, + }) +} + +async fn execute_insert_with_tx( + request_id: &str, + mut tx: sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + let mut data = payload + .data + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Data is required for INSERT"))? + .clone(); + + // SECURITY: Apply column-level write filtering FIRST (before auto-generation) + // This validates what the user is trying to write + if let Value::Object(ref mut map) = data { + let all_columns: Vec<String> = map.keys().cloned().collect(); + let writable_columns = + state + .config + .filter_writable_columns(session.power, &table, &all_columns); + + // Remove columns that user cannot write to + map.retain(|key, _| writable_columns.contains(key)); + + // Check for auto-generation requirements based on config + // Auto-generated fields bypass write protection since they're system-generated + if let Some(auto_config) = state.config.get_auto_generation_config(&table) { + // Check if auto-generation is enabled for INSERT action + if auto_config.on_action == "insert" || auto_config.on_action == "both" { + let field_name = &auto_config.field; + + if !map.contains_key(field_name) + || map.get(field_name).map_or(true, |v| { + v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) + }) + { + // Generate auto value based on config + let generated_value = generate_auto_value(&state, &table, auto_config).await?; + map.insert(field_name.clone(), Value::String(generated_value)); + } + } + } + } + + // Final validation: ensure we have columns to insert + if let Value::Object(ref map) = data { + if map.is_empty() { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("No writable columns in INSERT data".to_string()), + warning: None, + results: None, + }); + } + } + + let (columns_vec, placeholders_vec, values) = build_insert_data(&data)?; + let columns = columns_vec.join(", "); + let placeholders = placeholders_vec.join(", "); + + let query = format!( + "INSERT INTO {} ({}) VALUES ({})", + table, columns, placeholders + ); + + // Log the query + let params_json = serde_json::to_value(&values).ok(); + if let Err(e) = state + .logging + .log_query( + request_id, + Utc::now(), + username, + Some(session.power), + &query, + params_json.as_ref(), + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + // Execute the query + let mut sqlx_query = sqlx::query(&query); + for value in values { + match value { + Some(v) => sqlx_query = sqlx_query.bind(v), + None => sqlx_query = sqlx_query.bind(Option::<String>::None), + } + } + + let result = sqlx_query.execute(&mut *tx).await?; + let insert_id = result.last_insert_id(); + tx.commit().await?; + + Ok(QueryResponse { + success: true, + data: Some(serde_json::json!(insert_id)), + rows_affected: Some(result.rows_affected()), + error: None, + warning: None, // INSERT queries don't have LIMIT, + results: None, + }) +} + +async fn execute_update_with_tx( + request_id: &str, + mut tx: sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + let mut data = payload + .data + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))? + .clone(); + + let where_clause = payload + .where_clause + .as_ref() + .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?; + + // Enforce query limits from config (power-level specific with fallback to defaults) + let max_limit = state.config.get_max_limit(session.power); + let max_where = state.config.get_max_where_conditions(session.power); + + // Enforce WHERE clause complexity + let condition_count = if let Some(w) = &payload.where_clause { + if let serde_json::Value::Object(map) = w { + map.len() + } else { + 0 + } + } else { + 0 + }; + if condition_count > max_where as usize { + // Log security violation + let timestamp = chrono::Utc::now(); + if let Err(log_err) = state + .logging + .log_error( + &request_id, + timestamp, + &format!( + "Too many WHERE conditions: {} exceeds maximum {} for power level {}", + condition_count, max_where, session.power + ), + Some("query_limits"), + Some(&session.username), + Some(session.power), + ) + .await + { + error!( + "[{}] Failed to log WHERE limit violation: {}", + request_id, log_err + ); + } + + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE conditions ({}) > max {} [request_id: {}]", + condition_count, max_where, request_id + )), + warning: None, + results: None, + }); + } + + // SECURITY: Apply column-level write filtering FIRST (before auto-generation) + if let Value::Object(ref mut map) = data { + let all_columns: Vec<String> = map.keys().cloned().collect(); + let writable_columns = + state + .config + .filter_writable_columns(session.power, &table, &all_columns); + + // Remove columns that user cannot write to + map.retain(|key, _| writable_columns.contains(key)); + + // Check for auto-generation (system-generated fields bypass write protection) + if let Some(auto_config) = state.config.get_auto_generation_config(&table) { + if auto_config.on_action == "update" || auto_config.on_action == "both" { + let field_name = &auto_config.field; + if !map.contains_key(field_name) + || map.get(field_name).map_or(true, |v| { + v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) + }) + { + let generated_value = generate_auto_value(&state, &table, auto_config).await?; + map.insert(field_name.clone(), Value::String(generated_value)); + } + } + } + } + + // Final validation: ensure we have columns to update + if let Value::Object(ref map) = data { + if map.is_empty() { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("No writable columns in UPDATE data".to_string()), + warning: None, + results: None, + }); + } + } + + let (set_clause, mut values) = build_update_set_clause(&data)?; + let (where_sql, where_values) = build_where_clause(where_clause)?; + // Convert where_values to Option<String> to match set values + values.extend(where_values.into_iter().map(Some)); + + let mut query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql); + + // Enforce LIMIT and track if it was capped + let requested_limit = payload.limit; + let limit = requested_limit.unwrap_or(max_limit); + let was_capped = limit > max_limit; + let limit = if limit > max_limit { max_limit } else { limit }; + query.push_str(&format!(" LIMIT {}", limit)); + + let limit_warning = if was_capped { + Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]", + requested_limit.unwrap(), max_limit, max_limit, request_id)) + } else if requested_limit.is_none() { + Some(format!( + "No LIMIT specified, using default {} based on power level [request_id: {}]", + max_limit, request_id + )) + } else { + None + }; + + // Log the query + let params_json = serde_json::to_value(&values).ok(); + if let Err(e) = state + .logging + .log_query( + request_id, + Utc::now(), + username, + Some(session.power), + &query, + params_json.as_ref(), + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + // Execute the query + let mut sqlx_query = sqlx::query(&query); + for value in values { + match value { + Some(v) => sqlx_query = sqlx_query.bind(v), + None => sqlx_query = sqlx_query.bind(Option::<String>::None), + } + } + + let result = sqlx_query.execute(&mut *tx).await?; + tx.commit().await?; + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(result.rows_affected()), + error: None, + warning: limit_warning, + results: None, + }) +} + +async fn execute_delete_with_tx( + request_id: &str, + mut tx: sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + let where_clause = payload + .where_clause + .as_ref() + .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?; + + // Enforce query limits from config (power-level specific with fallback to defaults) + let max_limit = state.config.get_max_limit(session.power); + let max_where = state.config.get_max_where_conditions(session.power); + + // Enforce WHERE clause complexity + let condition_count = if let Some(w) = &payload.where_clause { + if let serde_json::Value::Object(map) = w { + map.len() + } else { + 0 + } + } else { + 0 + }; + if condition_count > max_where as usize { + // Log security violation + let timestamp = chrono::Utc::now(); + if let Err(log_err) = state + .logging + .log_error( + &request_id, + timestamp, + &format!( + "Too many WHERE conditions: {} exceeds maximum {} for power level {}", + condition_count, max_where, session.power + ), + Some("query_limits"), + Some(&session.username), + Some(session.power), + ) + .await + { + error!( + "[{}] Failed to log WHERE limit violation: {}", + request_id, log_err + ); + } + + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE conditions ({}) > max {} [request_id: {}]", + condition_count, max_where, request_id + )), + warning: None, + results: None, + }); + } + + let (where_sql, values) = build_where_clause(where_clause)?; + + let mut query = format!("DELETE FROM {} WHERE {}", table, where_sql); + + // Enforce LIMIT and track if it was capped + let requested_limit = payload.limit; + let limit = requested_limit.unwrap_or(max_limit); + let was_capped = limit > max_limit; + let limit = if limit > max_limit { max_limit } else { limit }; + query.push_str(&format!(" LIMIT {}", limit)); + + let limit_warning = if was_capped { + Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]", + requested_limit.unwrap(), max_limit, max_limit, request_id)) + } else if requested_limit.is_none() { + Some(format!( + "No LIMIT specified, using default {} based on power level [request_id: {}]", + max_limit, request_id + )) + } else { + None + }; + + // Log the query + let params_json = serde_json::to_value(&values).ok(); + if let Err(e) = state + .logging + .log_query( + request_id, + Utc::now(), + username, + Some(session.power), + &query, + params_json.as_ref(), + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + // Execute the query + let mut sqlx_query = sqlx::query(&query); + for value in values { + sqlx_query = sqlx_query.bind(value); + } + + let result = sqlx_query.execute(&mut *tx).await?; + tx.commit().await?; + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(result.rows_affected()), + error: None, + warning: limit_warning, + results: None, + }) +} + +async fn execute_count_with_tx( + request_id: &str, + mut tx: sqlx::Transaction<'_, sqlx::MySql>, + payload: &QueryRequest, + username: &str, + session: &crate::models::Session, + state: &AppState, +) -> anyhow::Result<QueryResponse> { + let table = payload + .table + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Table is required"))?; + + // Check read permissions for the table + if !state.rbac.check_permission( + &state.config, + session.power, + table, + &crate::models::QueryAction::Select, + ) { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Insufficient permissions to COUNT from table '{}' [request_id: {}]", + table, request_id + )), + warning: None, + results: None, + }); + } + + // Helper to count conditions in filter/where (same as select function) + fn count_conditions( + filter: &Option<crate::models::FilterCondition>, + where_clause: &Option<serde_json::Value>, + ) -> usize { + use crate::models::FilterCondition; + fn count_filter(cond: &FilterCondition) -> usize { + match cond { + FilterCondition::Simple { .. } => 1, + FilterCondition::Logical { + and_conditions, + or_conditions, + } => { + and_conditions + .as_ref() + .map_or(0, |conds| conds.iter().map(count_filter).sum()) + + or_conditions + .as_ref() + .map_or(0, |conds| conds.iter().map(count_filter).sum()) + } + FilterCondition::Not { not } => count_filter(not), + } + } + let mut count = 0; + if let Some(f) = filter { + count += count_filter(f); + } + if let Some(w) = where_clause { + if let serde_json::Value::Object(map) = w { + count += map.len(); + } + } + count + } + + // Enforce query limits from config (power-level specific with fallback to defaults) + let max_where = state.config.get_max_where_conditions(session.power); + + // Enforce WHERE clause complexity + let condition_count = count_conditions(&payload.filter, &payload.where_clause); + if condition_count > max_where as usize { + // Log security violation + let timestamp = chrono::Utc::now(); + if let Err(log_err) = state + .logging + .log_error( + &request_id, + timestamp, + &format!( + "Too many WHERE conditions: {} exceeds maximum {} for power level {}", + condition_count, max_where, session.power + ), + Some("query_limits"), + Some(&session.username), + Some(session.power), + ) + .await + { + error!( + "[{}] Failed to log WHERE limit violation: {}", + request_id, log_err + ); + } + + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]", + condition_count, max_where, request_id + )), + warning: None, + results: None, + }); + } + + let mut query = format!("SELECT COUNT(*) as count FROM {}", table); + let mut values = Vec::new(); + + // Add JOIN clauses if provided - validates permissions for all joined tables + if let Some(joins) = &payload.joins { + // Validate user has read permission for all joined tables + for join in joins { + if !state.rbac.check_permission( + &state.config, + session.power, + &join.table, + &crate::models::QueryAction::Select, + ) { + return Ok(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Insufficient permissions to JOIN with table '{}'", + join.table + )), + warning: None, + results: None, + }); + } + } + let join_sql = crate::sql::build_join_clause(joins, &state.config)?; + query.push_str(&join_sql); + } + + // Add WHERE conditions (filter takes precedence over where_clause if both are provided) + if let Some(filter) = &payload.filter { + let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } else if let Some(where_clause) = &payload.where_clause { + let (where_sql, where_values) = build_where_clause(where_clause)?; + query.push_str(&format!(" WHERE {}", where_sql)); + values.extend(where_values.into_iter().map(Some)); + } + + // Log the query + let params_json = serde_json::to_value(&values).ok(); + if let Err(e) = state + .logging + .log_query( + request_id, + chrono::Utc::now(), + username, + Some(session.power), + &query, + params_json.as_ref(), + None, + ) + .await + { + error!("[{}] Failed to log query: {}", request_id, e); + } + + // Execute the query + let mut sqlx_query = sqlx::query(&query); + for value in values { + match value { + Some(v) => sqlx_query = sqlx_query.bind(v), + None => sqlx_query = sqlx_query.bind(Option::<String>::None), + } + } + + let result = sqlx_query.fetch_one(&mut *tx).await?; + tx.commit().await?; + + let count: i64 = result.try_get("count")?; + + Ok(QueryResponse { + success: true, + data: Some(serde_json::json!(count)), + rows_affected: None, + error: None, + warning: None, + results: None, + }) +} + +/// Execute multiple queries in a single transaction +async fn execute_batch_mode( + state: AppState, + session: crate::models::Session, + request_id: String, + timestamp: chrono::DateTime<chrono::Utc>, + client_ip: String, + payload: &QueryRequest, +) -> Result<Json<QueryResponse>, StatusCode> { + let queries = payload.queries.as_ref().unwrap(); + + // Check if batch is empty + if queries.is_empty() { + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some("Batch request cannot be empty".to_string()), + warning: None, + results: Some(vec![]), + })); + } + + info!( + "[{}] Batch query request from user {} (power {}): {} queries", + request_id, + session.username, + session.power, + queries.len() + ); + + // Log the batch request (log full payload including action/table) + if let Err(e) = state + .logging + .log_request( + &request_id, + timestamp, + &client_ip, + Some(&session.username), + Some(session.power), + "/query", + &serde_json::to_value(payload).unwrap_or_default(), + ) + .await + { + error!("[{}] Failed to log batch request: {}", request_id, e); + } + + // Get action and table from parent request (all batch queries share these) + let action = payload + .action + .as_ref() + .ok_or_else(|| StatusCode::BAD_REQUEST)?; + + let table = payload + .table + .as_ref() + .ok_or_else(|| StatusCode::BAD_REQUEST)?; + + // Validate table name once for the entire batch + if let Err(e) = validate_table_name(table, &state.config) { + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!("Invalid table name: {}", e)), + warning: None, + results: Some(vec![]), + })); + } + + // Check RBAC permission once for the entire batch + if !state + .rbac + .check_permission(&state.config, session.power, table, action) + { + return Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!( + "Insufficient permissions for {} on table '{}'", + match action { + QueryAction::Select => "SELECT", + QueryAction::Insert => "INSERT", + QueryAction::Update => "UPDATE", + QueryAction::Delete => "DELETE", + QueryAction::Count => "COUNT", + }, + table + )), + warning: None, + results: Some(vec![]), + })); + } + + info!( + "[{}] Validated batch: {} x {:?} on table '{}'", + request_id, + queries.len(), + action, + table + ); + super::log_info_async( + &state.logging, + &request_id, + &format!( + "Validated batch: {} x {:?} on table '{}'", + queries.len(), + action, + table + ), + Some("query"), + Some(&session.username), + Some(session.power), + ); + + if !state.database.is_available() { + warn!( + "[{}] Database marked unavailable before batch execution", + request_id + ); + log_database_unavailable_event( + &state.logging, + &request_id, + Some(&session.username), + Some(session.power), + "Database flagged unavailable before batch", + ) + .await; + return Ok(Json(database_unavailable_batch_response(&request_id))); + } + + // Start a SINGLE transaction for the batch - proper atomic operation + let mut tx = match state.database.pool().begin().await { + Ok(tx) => { + state.database.mark_available(); + tx + } + Err(e) => { + state.database.mark_unavailable(); + error!("[{}] Failed to begin batch transaction: {}", request_id, e); + log_database_unavailable_event( + &state.logging, + &request_id, + Some(&session.username), + Some(session.power), + &format!("Failed to begin batch transaction: {}", e), + ) + .await; + return Ok(Json(database_unavailable_batch_response(&request_id))); + } + }; + + // Set user context and request ID in transaction + if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?") + .bind(session.user_id) + .bind(&request_id) + .execute(&mut *tx) + .await + { + state.database.mark_unavailable(); + error!( + "[{}] Failed to set current user context and request ID: {}", + request_id, e + ); + log_database_unavailable_event( + &state.logging, + &request_id, + Some(&session.username), + Some(session.power), + &format!("Failed to set batch user context: {}", e), + ) + .await; + return Ok(Json(database_unavailable_batch_response(&request_id))); + } + + let rollback_on_error = payload.rollback_on_error.unwrap_or(false); + + info!( + "[{}] Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})", + request_id, + queries.len(), + action, + table, + rollback_on_error + ); + super::log_info_async( + &state.logging, + &request_id, + &format!( + "Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})", + queries.len(), + action, + table, + rollback_on_error + ), + Some("query"), + Some(&session.username), + Some(session.power), + ); + + // Execute as SINGLE native batch query based on action type + let result = match action { + QueryAction::Insert => { + execute_batch_insert( + &request_id, + &mut tx, + queries, + table, + action, + &session.username, + &state, + &session, + ) + .await + } + QueryAction::Update => { + execute_batch_update( + &request_id, + &mut tx, + queries, + table, + action, + &session.username, + &state, + &session, + ) + .await + } + QueryAction::Delete => { + execute_batch_delete( + &request_id, + &mut tx, + queries, + table, + action, + &session.username, + &state, + &session, + ) + .await + } + QueryAction::Select => { + // SELECT batches are less common but we'll execute them individually + // (combining SELECTs into one query doesn't make sense as they return different results) + execute_batch_selects( + &request_id, + &mut tx, + queries, + table, + action, + &session.username, + &session, + &state, + ) + .await + } + QueryAction::Count => { + // COUNT batches execute individually (each returns different results) + execute_batch_counts( + &request_id, + &mut tx, + queries, + table, + action, + &session.username, + &session, + &state, + ) + .await + } + }; + + match result { + Ok(response) => { + if response.success || !rollback_on_error { + // Commit the transaction + tx.commit().await.map_err(|e| { + error!("[{}] Failed to commit batch transaction: {}", request_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + info!( + "[{}] Native batch committed: {} operations", + request_id, + queries.len() + ); + super::log_info_async( + &state.logging, + &request_id, + &format!("Native batch committed: {} operations", queries.len()), + Some("query"), + Some(&session.username), + Some(session.power), + ); + Ok(Json(response)) + } else { + // Rollback on error + error!( + "[{}] Rolling back batch transaction due to error", + request_id + ); + tx.rollback().await.map_err(|e| { + error!("[{}] Failed to rollback transaction: {}", request_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + Ok(Json(response)) + } + } + Err(e) => { + error!("[{}] Batch execution failed: {}", request_id, e); + tx.rollback().await.map_err(|e2| { + error!("[{}] Failed to rollback after error: {}", request_id, e2); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(Json(QueryResponse { + success: false, + data: None, + rows_affected: None, + error: Some(format!("Batch execution failed: {}", e)), + warning: None, + results: Some(vec![]), + })) + } + } +} + +// ===== NATIVE BATCH EXECUTION FUNCTIONS ===== + +/// Execute batch INSERT using MySQL multi-value INSERT +async fn execute_batch_insert( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + queries: &Vec<crate::models::BatchQuery>, + table: &str, + _action: &QueryAction, + _username: &str, + state: &AppState, + _session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + use serde_json::Value; + + // Extract all data objects, apply auto-generation, and validate they have the same columns + let mut all_data = Vec::new(); + let mut column_set: Option<std::collections::HashSet<String>> = None; + + // Check for auto-generation config + let auto_config = state.config.get_auto_generation_config(&table); + + for (idx, query) in queries.iter().enumerate() { + let mut data = query + .data + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Query {} missing data field for INSERT", idx + 1))? + .clone(); + + // Apply auto-generation if configured for INSERT + if let Some(ref auto_cfg) = auto_config { + if auto_cfg.on_action == "insert" || auto_cfg.on_action == "both" { + if let Value::Object(ref mut map) = data { + let field_name = &auto_cfg.field; + + if !map.contains_key(field_name) + || map.get(field_name).map_or(true, |v| { + v.is_null() || v.as_str().map_or(true, |s| s.is_empty()) + }) + { + // Generate auto value based on config + let generated_value = generate_auto_value(&state, &table, auto_cfg).await?; + map.insert(field_name.clone(), Value::String(generated_value)); + } + } + } + } + + if let Value::Object(map) = data { + let cols: std::collections::HashSet<String> = map.keys().cloned().collect(); + + if let Some(ref expected_cols) = column_set { + if *expected_cols != cols { + anyhow::bail!("All INSERT queries must have the same columns. Query {} has different columns", idx + 1); + } + } else { + column_set = Some(cols); + } + + all_data.push(map); + } else { + anyhow::bail!("Query {} data must be an object", idx + 1); + } + } + + let columns: Vec<String> = column_set.unwrap().into_iter().collect(); + + // Validate column names + for col in &columns { + validate_column_name(col)?; + } + + // Build multi-value INSERT: INSERT INTO table (col1, col2) VALUES (?, ?), (?, ?), ... + let column_list = columns.join(", "); + let value_placeholder = format!("({})", vec!["?"; columns.len()].join(", ")); + let values_clause = vec![value_placeholder; all_data.len()].join(", "); + + let sql = format!( + "INSERT INTO {} ({}) VALUES {}", + table, column_list, values_clause + ); + + info!( + "[{}] Native batch INSERT: {} rows into {}", + request_id, + all_data.len(), + table + ); + super::log_info_async( + &state.logging, + &request_id, + &format!( + "Native batch INSERT: {} rows into {}", + all_data.len(), + table + ), + Some("query"), + Some(&_session.username), + Some(_session.power), + ); + + // Bind all values + let mut query = sqlx::query(&sql); + for data_map in &all_data { + for col in &columns { + let value = data_map.get(col).and_then(|v| match v { + Value::String(s) => Some(s.clone()), + Value::Number(n) => Some(n.to_string()), + Value::Bool(b) => Some(b.to_string()), + Value::Null => None, + _ => Some(v.to_string()), + }); + query = query.bind(value); + } + } + + // Execute the batch INSERT + let result = query.execute(&mut **tx).await?; + let rows_affected = result.rows_affected(); + + info!( + "[{}] Batch INSERT affected {} rows", + request_id, rows_affected + ); + super::log_info_async( + &state.logging, + &request_id, + &format!("Batch INSERT affected {} rows", rows_affected), + Some("query"), + Some(&_session.username), + Some(_session.power), + ); + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(rows_affected), + error: None, + warning: None, + results: None, + }) +} + +/// Execute batch UPDATE (executes individually for now, could be optimized with CASE statements) +async fn execute_batch_update( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + queries: &Vec<crate::models::BatchQuery>, + table: &str, + action: &QueryAction, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + let mut total_rows = 0u64; + + for (idx, batch_query) in queries.iter().enumerate() { + // Convert BatchQuery to QueryRequest by adding inherited action/table + let query_req = QueryRequest { + action: Some(action.clone()), + table: Some(table.to_string()), + columns: batch_query.columns.clone(), + data: batch_query.data.clone(), + where_clause: batch_query.where_clause.clone(), + filter: batch_query.filter.clone(), + joins: None, + limit: batch_query.limit, + offset: batch_query.offset, + order_by: batch_query.order_by.clone(), + queries: None, + rollback_on_error: None, + }; + + let result = execute_update_core( + &format!("{}-{}", request_id, idx + 1), + tx, + &query_req, + username, + state, + session, + ) + .await?; + + if let Some(rows) = result.rows_affected { + total_rows += rows; + } + } + + info!( + "[{}] Batch UPDATE affected {} total rows", + request_id, total_rows + ); + super::log_info_async( + &state.logging, + &request_id, + &format!("Batch UPDATE affected {} total rows", total_rows), + Some("query"), + Some(&session.username), + Some(session.power), + ); + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(total_rows), + error: None, + warning: None, + results: None, + }) +} + +/// Execute batch DELETE using IN clause when possible +async fn execute_batch_delete( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + queries: &Vec<crate::models::BatchQuery>, + table: &str, + action: &QueryAction, + username: &str, + state: &AppState, + session: &crate::models::Session, +) -> anyhow::Result<QueryResponse> { + // For now, execute individually + // TODO: Optimize by detecting simple ID-based deletes and combining with IN clause + let mut total_rows = 0u64; + + for (idx, batch_query) in queries.iter().enumerate() { + // Convert BatchQuery to QueryRequest by adding inherited action/table + let query_req = QueryRequest { + action: Some(action.clone()), + table: Some(table.to_string()), + columns: batch_query.columns.clone(), + data: batch_query.data.clone(), + where_clause: batch_query.where_clause.clone(), + filter: batch_query.filter.clone(), + joins: None, + limit: batch_query.limit, + offset: batch_query.offset, + order_by: batch_query.order_by.clone(), + queries: None, + rollback_on_error: None, + }; + + let result = execute_delete_core( + &format!("{}-{}", request_id, idx + 1), + tx, + &query_req, + username, + state, + session, + ) + .await?; + + if let Some(rows) = result.rows_affected { + total_rows += rows; + } + } + + info!( + "[{}] Batch DELETE affected {} total rows", + request_id, total_rows + ); + super::log_info_async( + &state.logging, + &request_id, + &format!("Batch DELETE affected {} total rows", total_rows), + Some("query"), + Some(&session.username), + Some(session.power), + ); + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: Some(total_rows), + error: None, + warning: None, + results: None, + }) +} + +/// Execute batch SELECT (executes individually since they return different results) +async fn execute_batch_selects( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + queries: &Vec<crate::models::BatchQuery>, + table: &str, + action: &QueryAction, + username: &str, + session: &crate::models::Session, + state: &AppState, +) -> anyhow::Result<QueryResponse> { + let mut results = Vec::new(); + + for (idx, batch_query) in queries.iter().enumerate() { + // Convert BatchQuery to QueryRequest by adding inherited action/table + let query_req = QueryRequest { + action: Some(action.clone()), + table: Some(table.to_string()), + columns: batch_query.columns.clone(), + data: batch_query.data.clone(), + where_clause: batch_query.where_clause.clone(), + filter: batch_query.filter.clone(), + joins: None, + limit: batch_query.limit, + offset: batch_query.offset, + order_by: batch_query.order_by.clone(), + queries: None, + rollback_on_error: None, + }; + + let result = execute_select_core( + &format!("{}-{}", request_id, idx + 1), + tx, + &query_req, + username, + session, + state, + ) + .await?; + + results.push(result); + } + + info!( + "[{}] Batch SELECT executed {} queries", + request_id, + results.len() + ); + super::log_info_async( + &state.logging, + &request_id, + &format!("Batch SELECT executed {} queries", results.len()), + Some("query"), + Some(&session.username), + Some(session.power), + ); + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: None, + error: None, + warning: None, + results: Some(results), + }) +} + +/// Execute batch COUNT (executes individually since they return different results) +async fn execute_batch_counts( + request_id: &str, + tx: &mut sqlx::Transaction<'_, sqlx::MySql>, + queries: &Vec<crate::models::BatchQuery>, + table: &str, + action: &QueryAction, + username: &str, + session: &crate::models::Session, + state: &AppState, +) -> anyhow::Result<QueryResponse> { + let mut results = Vec::new(); + + for (idx, batch_query) in queries.iter().enumerate() { + // Convert BatchQuery to QueryRequest by adding inherited action/table + let query_req = QueryRequest { + action: Some(action.clone()), + table: Some(table.to_string()), + columns: batch_query.columns.clone(), + data: batch_query.data.clone(), + where_clause: batch_query.where_clause.clone(), + filter: batch_query.filter.clone(), + joins: None, + limit: batch_query.limit, + offset: batch_query.offset, + order_by: batch_query.order_by.clone(), + queries: None, + rollback_on_error: None, + }; + + let result = execute_count_core( + &format!("{}:{}", request_id, idx), + tx, + &query_req, + username, + session, + state, + ) + .await?; + + results.push(result); + } + + info!( + "[{}] Batch COUNT executed {} queries", + request_id, + results.len() + ); + super::log_info_async( + &state.logging, + &request_id, + &format!("Batch COUNT executed {} queries", results.len()), + Some("query"), + Some(&session.username), + Some(session.power), + ); + + Ok(QueryResponse { + success: true, + data: None, + rows_affected: None, + error: None, + warning: None, + results: Some(results), + }) +} |
