aboutsummaryrefslogtreecommitdiff
path: root/src/routes/query.rs
diff options
context:
space:
mode:
authorUMTS at Teleco <crt@teleco.ch>2025-12-13 02:48:13 +0100
committerUMTS at Teleco <crt@teleco.ch>2025-12-13 02:48:13 +0100
commite52b8e1c2e110d0feb74feb7905c2ff064b51d55 (patch)
tree3090814e422250e07e72cf1c83241ffd95cf20f7 /src/routes/query.rs
committing to insanityHEADmaster
Diffstat (limited to 'src/routes/query.rs')
-rw-r--r--src/routes/query.rs3255
1 files changed, 3255 insertions, 0 deletions
diff --git a/src/routes/query.rs b/src/routes/query.rs
new file mode 100644
index 0000000..9814215
--- /dev/null
+++ b/src/routes/query.rs
@@ -0,0 +1,3255 @@
+// Query routes and execution
+use anyhow::{Context, Result};
+use axum::{
+ extract::{ConnectInfo, State},
+ http::{HeaderMap, StatusCode},
+ Json,
+};
+use chrono::Utc;
+use rand::Rng;
+use serde_json::Value;
+use sqlx::{Column, Row};
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use tracing::{error, info, warn};
+
+use crate::logging::AuditLogger;
+use crate::models::{PermissionsResponse, QueryAction, QueryRequest, QueryResponse, UserInfo};
+use crate::sql::{
+ build_filter_clause, build_legacy_where_clause, build_order_by_clause, validate_column_name,
+ validate_column_names, validate_table_name,
+};
+use crate::AppState;
+
+// Helper function to extract token from Authorization header
+fn extract_token(headers: &HeaderMap) -> Option<String> {
+ headers
+ .get("Authorization")
+ .and_then(|header| header.to_str().ok())
+ .and_then(|auth_str| {
+ if auth_str.starts_with("Bearer ") {
+ Some(auth_str[7..].to_string())
+ } else {
+ None
+ }
+ })
+}
+
+fn database_unavailable_response(request_id: &str) -> QueryResponse {
+ QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Database temporarily unavailable, try again in a moment [request_id: {}]",
+ request_id
+ )),
+ warning: None,
+ results: None,
+ }
+}
+
+fn database_unavailable_batch_response(request_id: &str) -> QueryResponse {
+ let mut base = database_unavailable_response(request_id);
+ base.results = Some(vec![]);
+ base
+}
+
+async fn log_database_unavailable_event(
+ logger: &AuditLogger,
+ request_id: &str,
+ username: Option<&str>,
+ power: Option<i32>,
+ detail: &str,
+) {
+ if let Err(err) = logger
+ .log_error(
+ request_id,
+ Utc::now(),
+ detail,
+ Some("database_unavailable"),
+ username,
+ power,
+ )
+ .await
+ {
+ error!("[{}] Failed to record database outage: {}", request_id, err);
+ }
+}
+
+pub async fn execute_query(
+ State(state): State<AppState>,
+ ConnectInfo(addr): ConnectInfo<SocketAddr>,
+ headers: HeaderMap,
+ Json(payload): Json<QueryRequest>,
+) -> Result<Json<QueryResponse>, StatusCode> {
+ let timestamp = Utc::now();
+ let client_ip = addr.ip().to_string();
+ let request_id = AuditLogger::generate_request_id();
+
+ // Extract and validate session token
+ let token = match extract_token(&headers) {
+ Some(token) => token,
+ None => {
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(
+ "Please stop trying to access this resource without signing in".to_string(),
+ ),
+ warning: None,
+ results: None,
+ }));
+ }
+ };
+
+ let session = match state.session_manager.get_session(&token) {
+ Some(session) => session,
+ None => {
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("Session not found".to_string()),
+ warning: None,
+ results: None,
+ }));
+ }
+ };
+
+ // Detect batch mode - if queries field is present, handle as batch operation
+ if payload.queries.is_some() {
+ // SECURITY: Check if user has permission to use batch operations
+ let power_perms = state
+ .config
+ .permissions
+ .power_levels
+ .get(&session.power.to_string())
+ .ok_or(StatusCode::FORBIDDEN)?;
+
+ if !power_perms.allow_batch_operations {
+ super::log_warning_async(
+ &state.logging,
+ &request_id,
+ &format!(
+ "User {} (power {}) attempted batch operation without permission",
+ session.username, session.power
+ ),
+ Some("authorization"),
+ Some(&session.username),
+ Some(session.power),
+ );
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("Batch operations not permitted for your role".to_string()),
+ warning: None,
+ results: None,
+ }));
+ }
+
+ return execute_batch_mode(state, session, request_id, timestamp, client_ip, &payload)
+ .await;
+ }
+
+ // Validate input for very basic security vulnerabilities (null bytes, etc.)
+ if let Err(security_error) = validate_input_security(&payload) {
+ super::log_warning_async(
+ &state.logging,
+ &request_id,
+ &format!(
+ "Security validation failed for user {}: {}",
+ session.username, security_error
+ ),
+ Some("security_validation"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Invalid input detected, how did you even manage to do that? [request_id: {}]",
+ request_id
+ )),
+ warning: None,
+ results: None,
+ }));
+ }
+
+ // Log the request
+ if let Err(e) = state
+ .logging
+ .log_request(
+ &request_id,
+ timestamp,
+ &client_ip,
+ Some(&session.username),
+ Some(session.power),
+ "/query",
+ &serde_json::to_value(&payload).unwrap_or_default(),
+ )
+ .await
+ {
+ error!("[{}] Failed to log request: {}", request_id, e);
+ }
+
+ // Clone payload before extracting fields (to avoid partial move issues)
+ let payload_clone = payload.clone();
+
+ // Single query mode - validate required fields
+ let action = payload.action.ok_or_else(|| {
+ let error_msg = "Missing action field in single query mode";
+ super::log_error_async(
+ &state.logging,
+ &request_id,
+ error_msg,
+ Some("request_validation"),
+ Some(&session.username),
+ Some(session.power),
+ );
+ StatusCode::BAD_REQUEST
+ })?;
+
+ let table = payload.table.ok_or_else(|| {
+ let error_msg = "Missing table field in single query mode";
+ super::log_error_async(
+ &state.logging,
+ &request_id,
+ error_msg,
+ Some("request_validation"),
+ Some(&session.username),
+ Some(session.power),
+ );
+ StatusCode::BAD_REQUEST
+ })?;
+
+ // SECURITY: Validate table name before any operations
+ if let Err(e) = validate_table_name(&table, &state.config) {
+ let error_msg = format!("Invalid table name '{}': {}", table, e);
+ super::log_error_async(
+ &state.logging,
+ &request_id,
+ &error_msg,
+ Some("table_validation"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!("Invalid table name [request_id: {}]", request_id)),
+ warning: None,
+ results: None,
+ }));
+ }
+
+ // SECURITY: Validate column names if specified
+ if let Some(ref columns) = payload.columns {
+ if let Err(e) = validate_column_names(columns) {
+ let error_msg = format!("Invalid column names on table '{}': {}", table, e);
+ super::log_error_async(
+ &state.logging,
+ &request_id,
+ &error_msg,
+ Some("column_validation"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Invalid column name: {} [request_id: {}]",
+ e, request_id
+ )),
+ warning: None,
+ results: None,
+ }));
+ }
+ }
+
+ // Check permissions (after validation to avoid leaking table existence)
+ if !state
+ .rbac
+ .check_permission(&state.config, session.power, &table, &action)
+ {
+ let action_str = match action {
+ QueryAction::Select => "SELECT",
+ QueryAction::Insert => "INSERT",
+ QueryAction::Update => "UPDATE",
+ QueryAction::Delete => "DELETE",
+ QueryAction::Count => "COUNT",
+ };
+
+ super::log_warning_async(
+ &state.logging,
+ &request_id,
+ &format!(
+ "User {} attempted unauthorized {} on table {}",
+ session.username, action_str, table
+ ),
+ Some("authorization"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ // Log security violation
+ if let Err(log_err) = state
+ .logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &format!("Permission denied: {} on table {}", action_str, table),
+ Some("authorization"),
+ Some(&session.username),
+ Some(session.power),
+ )
+ .await
+ {
+ error!(
+ "[{}] Failed to log permission denial: {}",
+ request_id, log_err
+ );
+ }
+
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!("Insufficient permissions for this operation, Might as well give up [request_id: {}]", request_id)),
+ warning: None,
+ results: None,
+ }));
+ }
+
+ if !state.database.is_available() {
+ warn!(
+ "[{}] Database marked unavailable, returning graceful error",
+ request_id
+ );
+ log_database_unavailable_event(
+ &state.logging,
+ &request_id,
+ Some(&session.username),
+ Some(session.power),
+ "Database flagged unavailable before transaction",
+ )
+ .await;
+ return Ok(Json(database_unavailable_response(&request_id)));
+ }
+
+ // ALL database operations now use transactions with user context set
+ let mut tx = match state.database.pool().begin().await {
+ Ok(tx) => {
+ state.database.mark_available();
+ tx
+ }
+ Err(e) => {
+ state.database.mark_unavailable();
+ error!("[{}] Failed to begin transaction: {}", request_id, e);
+ log_database_unavailable_event(
+ &state.logging,
+ &request_id,
+ Some(&session.username),
+ Some(session.power),
+ &format!("Failed to begin transaction: {}", e),
+ )
+ .await;
+ return Ok(Json(database_unavailable_response(&request_id)));
+ }
+ };
+
+ // Set user context and request ID in transaction - ALL queries have user context now
+ if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?")
+ .bind(session.user_id)
+ .bind(&request_id)
+ .execute(&mut *tx)
+ .await
+ {
+ state.database.mark_unavailable();
+ error!(
+ "[{}] Failed to set current user context and request ID: {}",
+ request_id, e
+ );
+ log_database_unavailable_event(
+ &state.logging,
+ &request_id,
+ Some(&session.username),
+ Some(session.power),
+ &format!("Failed to set user context: {}", e),
+ )
+ .await;
+ return Ok(Json(database_unavailable_response(&request_id)));
+ }
+
+ // Execute the query within the transaction
+ let result = match action {
+ QueryAction::Select => {
+ execute_select_with_tx(
+ &request_id,
+ tx,
+ &payload_clone,
+ &session.username,
+ &session,
+ &state,
+ )
+ .await
+ }
+ QueryAction::Insert => {
+ execute_insert_with_tx(
+ &request_id,
+ tx,
+ &payload_clone,
+ &session.username,
+ &state,
+ &session,
+ )
+ .await
+ }
+ QueryAction::Update => {
+ execute_update_with_tx(
+ &request_id,
+ tx,
+ &payload_clone,
+ &session.username,
+ &state,
+ &session,
+ )
+ .await
+ }
+ QueryAction::Delete => {
+ execute_delete_with_tx(
+ &request_id,
+ tx,
+ &payload_clone,
+ &session.username,
+ &state,
+ &session,
+ )
+ .await
+ }
+ QueryAction::Count => {
+ execute_count_with_tx(
+ &request_id,
+ tx,
+ &payload_clone,
+ &session.username,
+ &session,
+ &state,
+ )
+ .await
+ }
+ };
+
+ match result {
+ Ok(response) => {
+ let action_str = match action {
+ QueryAction::Select => "SELECT",
+ QueryAction::Insert => "INSERT",
+ QueryAction::Update => "UPDATE",
+ QueryAction::Delete => "DELETE",
+ QueryAction::Count => "COUNT",
+ };
+
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Query executed successfully: {} on {}", action_str, table),
+ Some("query_execution"),
+ Some(&session.username),
+ Some(session.power),
+ );
+ Ok(Json(response))
+ }
+ Err(e) => {
+ error!("[{}] Query execution failed: {}", request_id, e);
+ if let Err(log_err) = state
+ .logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &format!("Query execution error: {}", e),
+ Some("query_execution"),
+ Some(&session.username),
+ Some(session.power),
+ )
+ .await
+ {
+ error!("[{}] Failed to log error: {}", request_id, log_err);
+ }
+
+ Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Database query failed [request_id: {}]",
+ request_id
+ )),
+ warning: None,
+ results: None,
+ }))
+ }
+ }
+}
+
+pub async fn get_permissions(
+ State(state): State<AppState>,
+ ConnectInfo(addr): ConnectInfo<SocketAddr>,
+ headers: HeaderMap,
+) -> Result<Json<PermissionsResponse>, StatusCode> {
+ let timestamp = Utc::now();
+ let client_ip = addr.ip().to_string();
+ let request_id = AuditLogger::generate_request_id();
+
+ // Extract and validate session token
+ let token = match extract_token(&headers) {
+ Some(token) => token,
+ None => {
+ return Ok(Json(PermissionsResponse {
+ success: false,
+ permissions: HashMap::new(),
+ user: UserInfo {
+ id: 0,
+ username: "".to_string(),
+ name: "".to_string(),
+ role: "".to_string(),
+ power: 0,
+ },
+ security_clearance: None,
+ user_settings_access: "".to_string(),
+ }));
+ }
+ };
+
+ let session = match state.session_manager.get_session(&token) {
+ Some(session) => session,
+ None => {
+ return Ok(Json(PermissionsResponse {
+ success: false,
+ permissions: HashMap::new(),
+ user: UserInfo {
+ id: 0,
+ username: "".to_string(),
+ name: "".to_string(),
+ role: "".to_string(),
+ power: 0,
+ },
+ security_clearance: None,
+ user_settings_access: "".to_string(),
+ }));
+ }
+ };
+
+ // Log the request
+ if let Err(e) = state
+ .logging
+ .log_request(
+ &request_id,
+ timestamp,
+ &client_ip,
+ Some(&session.username),
+ Some(session.power),
+ "/permissions",
+ &serde_json::json!({}),
+ )
+ .await
+ {
+ error!("[{}] Failed to log request: {}", request_id, e);
+ }
+
+ let permissions = state
+ .rbac
+ .get_table_permissions(&state.config, session.power);
+
+ // Get user settings access permission
+ let user_settings_permission = state
+ .config
+ .permissions
+ .power_levels
+ .get(&session.power.to_string())
+ .map(|p| &p.user_settings_access)
+ .unwrap_or(&state.config.security.default_user_settings_access);
+
+ let user_settings_access_str = match user_settings_permission {
+ crate::config::UserSettingsAccess::ReadOwnOnly => "read-own-only",
+ crate::config::UserSettingsAccess::ReadWriteOwn => "read-write-own",
+ crate::config::UserSettingsAccess::ReadWriteAll => "read-write-all",
+ };
+
+ Ok(Json(PermissionsResponse {
+ success: true,
+ permissions,
+ user: UserInfo {
+ id: session.user_id,
+ username: session.username,
+ name: "".to_string(), // We don't store name in session, would need to fetch from DB
+ role: session.role_name,
+ power: session.power,
+ },
+ security_clearance: None,
+ user_settings_access: user_settings_access_str.to_string(),
+ }))
+}
+
+// ===== SAFE SQL QUERY BUILDERS WITH VALIDATION =====
+// All functions validate table/column names to prevent SQL injection
+
+/// Build WHERE clause with column name validation (legacy simple format)
+fn build_where_clause(where_clause: &Value) -> anyhow::Result<(String, Vec<String>)> {
+ // Use the new validated builder from sql module
+ build_legacy_where_clause(where_clause)
+}
+
+/// Build INSERT data with column name validation
+fn build_insert_data(
+ data: &Value,
+) -> anyhow::Result<(Vec<String>, Vec<String>, Vec<Option<String>>)> {
+ let mut columns = Vec::new();
+ let mut placeholders = Vec::new();
+ let mut values = Vec::new();
+
+ if let Value::Object(map) = data {
+ for (key, value) in map {
+ // SECURITY: Validate column name before using it
+ validate_column_name(key)
+ .with_context(|| format!("Invalid column name in INSERT: {}", key))?;
+
+ // Handle special JSON fields like additional_fields
+ if key == "additional_fields" && value.is_object() {
+ columns.push(key.clone());
+ placeholders.push("?".to_string());
+ values.push(Some(
+ serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()),
+ ));
+ } else {
+ columns.push(key.clone());
+ placeholders.push("?".to_string());
+ values.push(json_value_to_option_string(value));
+ }
+ }
+ } else {
+ anyhow::bail!("INSERT data must be a JSON object");
+ }
+
+ if columns.is_empty() {
+ anyhow::bail!("INSERT data cannot be empty");
+ }
+
+ Ok((columns, placeholders, values))
+}
+
+/// Build UPDATE SET clause with column name validation
+fn build_update_set_clause(data: &Value) -> anyhow::Result<(String, Vec<Option<String>>)> {
+ let mut set_clauses = Vec::new();
+ let mut values = Vec::new();
+
+ if let Value::Object(map) = data {
+ for (key, value) in map {
+ // SECURITY: Validate column name before using it
+ validate_column_name(key)
+ .with_context(|| format!("Invalid column name in UPDATE: {}", key))?;
+
+ set_clauses.push(format!("{} = ?", key));
+ // Handle special JSON fields like additional_fields
+ if key == "additional_fields" && value.is_object() {
+ values.push(Some(
+ serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()),
+ ));
+ } else {
+ values.push(json_value_to_option_string(value));
+ }
+ }
+ } else {
+ anyhow::bail!("UPDATE data must be a JSON object");
+ }
+
+ if set_clauses.is_empty() {
+ anyhow::bail!("UPDATE data cannot be empty");
+ }
+
+ Ok((set_clauses.join(", "), values))
+}
+
+/// Convert JSON value to Option<String> for SQL binding
+/// Properly handles booleans (true/false -> "1"/"0" for MySQL TINYINT/BOOLEAN)
+/// NULL values return None for proper SQL NULL handling
+fn json_value_to_option_string(value: &Value) -> Option<String> {
+ match value {
+ Value::String(s) => Some(s.clone()),
+ Value::Number(n) => Some(n.to_string()),
+ // MySQL uses TINYINT(1) for booleans: true -> 1, false -> 0
+ Value::Bool(b) => Some(if *b { "1".to_string() } else { "0".to_string() }),
+ Value::Null => None, // Return None for proper SQL NULL
+ // Complex types (objects, arrays) get JSON serialized
+ _ => Some(serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())),
+ }
+}
+/// Convert SQL values from MySQL to JSON
+/// Handles ALL MySQL types automatically with proper NULL handling
+/// Returns booleans as true/false (MySQL TINYINT(1) -> JSON bool)
+fn convert_sql_value_to_json(
+ row: &sqlx::mysql::MySqlRow,
+ index: usize,
+ request_id: Option<&str>,
+ username: Option<&str>,
+ power: Option<i32>,
+ state: Option<&AppState>,
+) -> anyhow::Result<Value> {
+ let column = &row.columns()[index];
+
+ use sqlx::TypeInfo;
+ let type_name = column.type_info().name();
+
+ // Comprehensive MySQL type handling - no need to manually add types anymore!
+ let result = match type_name {
+ // ===== String types =====
+ "VARCHAR" | "TEXT" | "CHAR" | "LONGTEXT" | "MEDIUMTEXT" | "TINYTEXT" | "SET" | "ENUM" => {
+ row.try_get::<Option<String>, _>(index)
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
+ }
+
+ // ===== Integer types =====
+ "INT" | "BIGINT" | "MEDIUMINT" | "SMALLINT" | "INTEGER" => row
+ .try_get::<Option<i64>, _>(index)
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)),
+
+ // Unsigned integers
+ "INT UNSIGNED" | "BIGINT UNSIGNED" | "MEDIUMINT UNSIGNED" | "SMALLINT UNSIGNED" => row
+ .try_get::<Option<u64>, _>(index)
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)),
+
+ // ===== Boolean type (MySQL TINYINT(1)) =====
+ // Returns proper JSON true/false instead of 1/0
+ "TINYINT" | "BOOLEAN" | "BOOL" => {
+ // Try as bool first (for TINYINT(1))
+ if let Ok(opt_bool) = row.try_get::<Option<bool>, _>(index) {
+ return Ok(opt_bool.map(Value::Bool).unwrap_or(Value::Null));
+ }
+ // Fallback to i8 for regular TINYINT
+ row.try_get::<Option<i8>, _>(index)
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null))
+ }
+
+ // ===== Decimal/Numeric types =====
+ "DECIMAL" | "NUMERIC" => {
+ row.try_get::<Option<rust_decimal::Decimal>, _>(index)
+ .map(|opt| {
+ opt.map(|decimal| {
+ // Keep precision by converting to string then parsing
+ let decimal_str = decimal.to_string();
+ if let Ok(f) = decimal_str.parse::<f64>() {
+ serde_json::json!(f)
+ } else {
+ Value::String(decimal_str)
+ }
+ })
+ .unwrap_or(Value::Null)
+ })
+ }
+
+ // ===== Floating point types =====
+ "FLOAT" | "DOUBLE" | "REAL" => row
+ .try_get::<Option<f64>, _>(index)
+ .map(|opt| opt.map(|v| serde_json::json!(v)).unwrap_or(Value::Null)),
+
+ // ===== Date/Time types =====
+ "DATE" => {
+ use chrono::NaiveDate;
+ row.try_get::<Option<NaiveDate>, _>(index).map(|opt| {
+ opt.map(|date| Value::String(date.format("%Y-%m-%d").to_string()))
+ .unwrap_or(Value::Null)
+ })
+ }
+ "DATETIME" => {
+ use chrono::NaiveDateTime;
+ row.try_get::<Option<NaiveDateTime>, _>(index).map(|opt| {
+ opt.map(|datetime| Value::String(datetime.format("%Y-%m-%d %H:%M:%S").to_string()))
+ .unwrap_or(Value::Null)
+ })
+ }
+ "TIMESTAMP" => {
+ use chrono::{DateTime, Utc};
+ row.try_get::<Option<DateTime<Utc>>, _>(index).map(|opt| {
+ opt.map(|timestamp| Value::String(timestamp.to_rfc3339()))
+ .unwrap_or(Value::Null)
+ })
+ }
+ "TIME" => {
+ // TIME values come as strings in HH:MM:SS format
+ row.try_get::<Option<String>, _>(index)
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
+ }
+ "YEAR" => row
+ .try_get::<Option<i32>, _>(index)
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)),
+
+ // ===== JSON type =====
+ "JSON" => row.try_get::<Option<String>, _>(index).map(|opt| {
+ opt.and_then(|s| serde_json::from_str(&s).ok())
+ .unwrap_or(Value::Null)
+ }),
+
+ // ===== Binary types =====
+ // Return as base64-encoded strings for safe JSON transmission
+ "BLOB" | "MEDIUMBLOB" | "LONGBLOB" | "TINYBLOB" | "BINARY" | "VARBINARY" => {
+ row.try_get::<Option<Vec<u8>>, _>(index).map(|opt| {
+ opt.map(|bytes| {
+ use base64::{engine::general_purpose, Engine as _};
+ Value::String(general_purpose::STANDARD.encode(&bytes))
+ })
+ .unwrap_or(Value::Null)
+ })
+ }
+
+ // ===== Bit type =====
+ "BIT" => {
+ row.try_get::<Option<Vec<u8>>, _>(index).map(|opt| {
+ opt.map(|bytes| {
+ // Convert bit value to number
+ let mut val: i64 = 0;
+ for &byte in &bytes {
+ val = (val << 8) | byte as i64;
+ }
+ Value::Number(val.into())
+ })
+ .unwrap_or(Value::Null)
+ })
+ }
+
+ // ===== Spatial/Geometry types =====
+ "GEOMETRY" | "POINT" | "LINESTRING" | "POLYGON" | "MULTIPOINT" | "MULTILINESTRING"
+ | "MULTIPOLYGON" | "GEOMETRYCOLLECTION" => {
+ // Return as WKT (Well-Known Text) string
+ row.try_get::<Option<String>, _>(index)
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
+ }
+
+ // ===== Catch-all for unknown/new types =====
+ // This ensures forward compatibility if MySQL adds new types
+ _ => {
+ warn!(
+ "Unknown MySQL type '{}' for column '{}', attempting string fallback",
+ type_name,
+ column.name()
+ );
+ if let (Some(rid), Some(st)) = (request_id, state) {
+ super::log_warning_async(
+ &st.logging,
+ rid,
+ &format!(
+ "Unknown MySQL type '{}' for column '{}', attempting string fallback",
+ type_name,
+ column.name()
+ ),
+ Some("data_conversion"),
+ username,
+ power,
+ );
+ }
+ row.try_get::<Option<String>, _>(index)
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
+ }
+ };
+
+ // Robust error handling with fallback
+ match result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Final fallback: try as string
+ match row.try_get::<Option<String>, _>(index) {
+ Ok(opt) => {
+ warn!("Primary conversion failed for column '{}' (type: {}), used string fallback",
+ column.name(), type_name);
+ if let (Some(rid), Some(st)) = (request_id, state) {
+ super::log_warning_async(
+ &st.logging,
+ rid,
+ &format!("Primary conversion failed for column '{}' (type: {}), used string fallback", column.name(), type_name),
+ Some("data_conversion"),
+ username,
+ power,
+ );
+ }
+ Ok(opt.map(Value::String).unwrap_or(Value::Null))
+ }
+ Err(_) => {
+ error!(
+ "Complete failure to decode column '{}' (index: {}, type: {}): {}",
+ column.name(),
+ index,
+ type_name,
+ e
+ );
+ // Return NULL instead of failing the entire query
+ Ok(Value::Null)
+ }
+ }
+ }
+ }
+}
+
+// Generate auto values based on configuration
+async fn generate_auto_value(
+ state: &AppState,
+ table: &str,
+ config: &crate::config::AutoGenerationConfig,
+) -> Result<String, anyhow::Error> {
+ match config.gen_type.as_str() {
+ "numeric" => generate_unique_numeric_id(state, table, config).await,
+ _ => Err(anyhow::anyhow!(
+ "Unsupported auto-generation type: {}",
+ config.gen_type
+ )),
+ }
+}
+
+// Generate a unique numeric ID based on configuration
+async fn generate_unique_numeric_id(
+ state: &AppState,
+ table: &str,
+ config: &crate::config::AutoGenerationConfig,
+) -> Result<String, anyhow::Error> {
+ let range_min = config.range_min.unwrap_or(10000000);
+ let range_max = config.range_max.unwrap_or(99999999);
+ let max_attempts = config.max_attempts.unwrap_or(10) as usize;
+ let field_name = &config.field;
+
+ for _attempt in 0..max_attempts {
+ // Generate random number in specified range
+ let id = {
+ let mut rng = rand::rng();
+ rng.random_range(range_min..=range_max)
+ };
+ let id_str = id.to_string();
+
+ // Check if this ID already exists
+ let query_str = format!(
+ "SELECT COUNT(*) as count FROM {} WHERE {} = ?",
+ table, field_name
+ );
+ let exists = sqlx::query(&query_str)
+ .bind(&id_str)
+ .fetch_one(state.database.pool())
+ .await?;
+
+ let count: i64 = exists.try_get("count")?;
+
+ if count == 0 {
+ return Ok(id_str);
+ }
+ }
+
+ Err(anyhow::anyhow!(
+ "Failed to generate unique {} for table {} after {} attempts",
+ field_name,
+ table,
+ max_attempts
+ ))
+}
+
+// Security validation functions
+fn validate_input_security(payload: &QueryRequest) -> Result<(), String> {
+ // Check for null bytes in table name
+ if let Some(ref table) = payload.table {
+ if table.contains('\0') {
+ return Err("Null byte detected in table name".to_string());
+ }
+ }
+
+ // Check for null bytes in column names
+ if let Some(columns) = &payload.columns {
+ for column in columns {
+ if column.contains('\0') {
+ return Err("Null byte detected in column name".to_string());
+ }
+ }
+ }
+
+ // Check for null bytes in data values
+ if let Some(data) = &payload.data {
+ if contains_null_bytes_in_value(data) {
+ return Err("Null byte detected in data values".to_string());
+ }
+ }
+
+ // Check for null bytes in WHERE clause
+ if let Some(where_clause) = &payload.where_clause {
+ if contains_null_bytes_in_value(where_clause) {
+ return Err("Null byte detected in WHERE clause".to_string());
+ }
+ }
+
+ Ok(())
+}
+
+fn contains_null_bytes_in_value(value: &Value) -> bool {
+ match value {
+ Value::String(s) => s.contains('\0'),
+ Value::Array(arr) => arr.iter().any(contains_null_bytes_in_value),
+ Value::Object(map) => {
+ map.keys().any(|k| k.contains('\0')) || map.values().any(contains_null_bytes_in_value)
+ }
+ _ => false,
+ }
+}
+
+// Core execution functions that work with mutable transaction references
+// These are used by batch operations to execute multiple queries in a single atomic transaction
+
+async fn execute_select_core(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ session: &crate::models::Session,
+ state: &AppState,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ // Helper to count conditions in filter/where
+ fn count_conditions(
+ filter: &Option<crate::models::FilterCondition>,
+ where_clause: &Option<serde_json::Value>,
+ ) -> usize {
+ let mut count = 0;
+ if let Some(f) = filter {
+ count += count_filter_conditions(f);
+ }
+ if let Some(w) = where_clause {
+ count += count_where_conditions(w);
+ }
+ count
+ }
+
+ fn count_filter_conditions(filter: &crate::models::FilterCondition) -> usize {
+ use crate::models::FilterCondition;
+ match filter {
+ FilterCondition::Logical {
+ and_conditions,
+ or_conditions,
+ } => {
+ let mut count = 0;
+ if let Some(conditions) = and_conditions {
+ count += conditions
+ .iter()
+ .map(|c| count_filter_conditions(c))
+ .sum::<usize>();
+ }
+ if let Some(conditions) = or_conditions {
+ count += conditions
+ .iter()
+ .map(|c| count_filter_conditions(c))
+ .sum::<usize>();
+ }
+ count
+ }
+ _ => 1,
+ }
+ }
+
+ fn count_where_conditions(where_clause: &serde_json::Value) -> usize {
+ match where_clause {
+ serde_json::Value::Object(map) => {
+ if map.contains_key("AND") || map.contains_key("OR") {
+ if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) {
+ if let serde_json::Value::Array(conditions) = arr {
+ return conditions.iter().map(|c| count_where_conditions(c)).sum();
+ }
+ }
+ }
+ 1
+ }
+ _ => 1,
+ }
+ }
+
+ let max_limit = state.config.get_max_limit(session.power);
+ let max_where = state.config.get_max_where_conditions(session.power);
+
+ let requested_columns = if let Some(ref cols) = payload.columns {
+ cols.clone()
+ } else {
+ vec!["*".to_string()]
+ };
+
+ let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" {
+ "*".to_string()
+ } else {
+ let allowed_columns =
+ state
+ .config
+ .filter_readable_columns(session.power, &table, &requested_columns);
+ if allowed_columns.is_empty() {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("No readable columns available for this request".to_string()),
+ warning: None,
+ results: None,
+ });
+ }
+ allowed_columns.join(", ")
+ };
+
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
+ if condition_count > max_where as usize {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE/filter conditions ({}) > max {}",
+ condition_count, max_where
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let mut query = format!("SELECT {} FROM {}", filtered_columns, table);
+ let mut values = Vec::new();
+
+ if let Some(joins) = &payload.joins {
+ for join in joins {
+ if !state.rbac.check_permission(
+ &state.config,
+ session.power,
+ &join.table,
+ &crate::models::QueryAction::Select,
+ ) {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Insufficient permissions to JOIN with table '{}'",
+ join.table
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+ }
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
+ query.push_str(&join_sql);
+ }
+
+ if let Some(filter) = &payload.filter {
+ let (where_sql, where_values) = build_filter_clause(filter)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ } else if let Some(where_clause) = &payload.where_clause {
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ }
+
+ if let Some(order_by) = &payload.order_by {
+ let order_clause = build_order_by_clause(order_by)?;
+ query.push_str(&order_clause);
+ }
+
+ let requested_limit = payload.limit;
+ let limit = requested_limit.unwrap_or(max_limit);
+ let was_capped = limit > max_limit;
+ let limit = if limit > max_limit { max_limit } else { limit };
+ query.push_str(&format!(" LIMIT {}", limit));
+
+ let limit_warning = if was_capped {
+ Some(format!(
+ "Requested LIMIT {} exceeded maximum {} for your power level, capped to {}",
+ requested_limit.unwrap(),
+ max_limit,
+ max_limit
+ ))
+ } else if requested_limit.is_none() {
+ Some(format!(
+ "No LIMIT specified, defaulted to {} (max for power level {})",
+ max_limit, session.power
+ ))
+ } else {
+ None
+ };
+
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ chrono::Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ None,
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ match value {
+ Some(v) => sqlx_query = sqlx_query.bind(v),
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
+ }
+ }
+
+ let rows = sqlx_query.fetch_all(&mut **tx).await?;
+
+ let mut results = Vec::new();
+ for row in rows {
+ let mut result_row = serde_json::Map::new();
+ for (i, column) in row.columns().iter().enumerate() {
+ let value = convert_sql_value_to_json(
+ &row,
+ i,
+ Some(request_id),
+ Some(username),
+ Some(session.power),
+ Some(state),
+ )?;
+ result_row.insert(column.name().to_string(), value);
+ }
+ results.push(Value::Object(result_row));
+ }
+
+ Ok(QueryResponse {
+ success: true,
+ data: Some(Value::Array(results)),
+ rows_affected: None,
+ error: None,
+ warning: limit_warning,
+ results: None,
+ })
+}
+
+async fn execute_count_core(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ session: &crate::models::Session,
+ state: &AppState,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ // Helper to count conditions in filter/where (same as other functions)
+ fn count_conditions(
+ filter: &Option<crate::models::FilterCondition>,
+ where_clause: &Option<serde_json::Value>,
+ ) -> usize {
+ use crate::models::FilterCondition;
+ fn count_filter_conditions(filter: &FilterCondition) -> usize {
+ match filter {
+ FilterCondition::Logical {
+ and_conditions,
+ or_conditions,
+ } => {
+ let mut count = 0;
+ if let Some(conditions) = and_conditions {
+ count += conditions
+ .iter()
+ .map(|c| count_filter_conditions(c))
+ .sum::<usize>();
+ }
+ if let Some(conditions) = or_conditions {
+ count += conditions
+ .iter()
+ .map(|c| count_filter_conditions(c))
+ .sum::<usize>();
+ }
+ count
+ }
+ _ => 1,
+ }
+ }
+
+ fn count_where_conditions(where_clause: &serde_json::Value) -> usize {
+ match where_clause {
+ serde_json::Value::Object(map) => {
+ if map.contains_key("AND") || map.contains_key("OR") {
+ if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) {
+ if let serde_json::Value::Array(conditions) = arr {
+ return conditions.iter().map(|c| count_where_conditions(c)).sum();
+ }
+ }
+ }
+ 1
+ }
+ _ => 1,
+ }
+ }
+
+ let mut count = 0;
+ if let Some(f) = filter {
+ count += count_filter_conditions(f);
+ }
+ if let Some(w) = where_clause {
+ count += count_where_conditions(w);
+ }
+ count
+ }
+
+ let max_where = state.config.get_max_where_conditions(session.power);
+
+ // Enforce WHERE clause complexity
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
+ if condition_count > max_where as usize {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE/filter conditions ({}) > max {}",
+ condition_count, max_where
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let mut query = format!("SELECT COUNT(*) as count FROM {}", table);
+ let mut values = Vec::new();
+
+ // Add JOIN clauses if provided - validates permissions for all joined tables
+ if let Some(joins) = &payload.joins {
+ for join in joins {
+ if !state.rbac.check_permission(
+ &state.config,
+ session.power,
+ &join.table,
+ &crate::models::QueryAction::Select,
+ ) {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Insufficient permissions to JOIN with table '{}'",
+ join.table
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+ }
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
+ query.push_str(&join_sql);
+ }
+
+ // Add WHERE conditions (filter takes precedence over where_clause if both are provided)
+ if let Some(filter) = &payload.filter {
+ let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ } else if let Some(where_clause) = &payload.where_clause {
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ }
+
+ // Log the query
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ chrono::Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ None,
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ match value {
+ Some(v) => sqlx_query = sqlx_query.bind(v),
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
+ }
+ }
+
+ let result = sqlx_query.fetch_one(&mut **tx).await?;
+ let count: i64 = result.try_get("count")?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: Some(serde_json::json!(count)),
+ rows_affected: None,
+ error: None,
+ warning: None,
+ results: None,
+ })
+}
+
+async fn execute_update_core(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ let mut data = payload
+ .data
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))?
+ .clone();
+
+ let where_clause = payload
+ .where_clause
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?;
+
+ let max_where = state.config.get_max_where_conditions(session.power);
+ let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0);
+ if condition_count > max_where as usize {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE conditions ({}) > max {}",
+ condition_count, max_where
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ // SECURITY: Apply column-level write filtering FIRST (before auto-generation)
+ if let Value::Object(ref mut map) = data {
+ let all_columns: Vec<String> = map.keys().cloned().collect();
+ let writable_columns =
+ state
+ .config
+ .filter_writable_columns(session.power, &table, &all_columns);
+
+ // Remove columns that user cannot write to
+ map.retain(|key, _| writable_columns.contains(key));
+
+ // Check for auto-generation (system-generated fields bypass write protection)
+ if let Some(auto_config) = state.config.get_auto_generation_config(&table) {
+ if auto_config.on_action == "update" || auto_config.on_action == "both" {
+ let field_name = &auto_config.field;
+ if !map.contains_key(field_name)
+ || map.get(field_name).map_or(true, |v| {
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
+ })
+ {
+ let generated_value = generate_auto_value(&state, &table, auto_config).await?;
+ map.insert(field_name.clone(), Value::String(generated_value));
+ }
+ }
+ }
+ }
+
+ let writable_columns = data
+ .as_object()
+ .unwrap()
+ .keys()
+ .cloned()
+ .collect::<Vec<_>>();
+ if writable_columns.is_empty() {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("No writable columns in UPDATE data".to_string()),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let set_clause = writable_columns
+ .iter()
+ .map(|col| format!("{} = ?", col))
+ .collect::<Vec<_>>()
+ .join(", ");
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ let query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql);
+
+ let requested_limit = payload.limit;
+ let max_limit = state.config.get_max_limit(session.power);
+ let limit = requested_limit.unwrap_or(max_limit);
+ let was_capped = limit > max_limit;
+ let limit = if limit > max_limit { max_limit } else { limit };
+ let query_with_limit = format!("{} LIMIT {}", query, limit);
+
+ let limit_warning = if was_capped {
+ Some(format!(
+ "Requested LIMIT {} exceeded maximum {}, capped to {}",
+ requested_limit.unwrap(),
+ max_limit,
+ max_limit
+ ))
+ } else {
+ None
+ };
+
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ chrono::Utc::now(),
+ username,
+ Some(session.power),
+ &query_with_limit,
+ None,
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ let mut sqlx_query = sqlx::query(&query_with_limit);
+ for col in &writable_columns {
+ if let Some(val) = data.get(col) {
+ sqlx_query = sqlx_query.bind(val.clone());
+ }
+ }
+ for val in where_values {
+ sqlx_query = sqlx_query.bind(val);
+ }
+
+ let result = sqlx_query.execute(&mut **tx).await?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(result.rows_affected()),
+ error: None,
+ warning: limit_warning,
+ results: None,
+ })
+}
+
+async fn execute_delete_core(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ let where_clause = payload
+ .where_clause
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?;
+
+ let max_where = state.config.get_max_where_conditions(session.power);
+ let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0);
+ if condition_count > max_where as usize {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE conditions ({}) > max {}",
+ condition_count, max_where
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ let query = format!("DELETE FROM {} WHERE {}", table, where_sql);
+
+ let requested_limit = payload.limit;
+ let max_limit = state.config.get_max_limit(session.power);
+ let limit = requested_limit.unwrap_or(max_limit);
+ let was_capped = limit > max_limit;
+ let limit = if limit > max_limit { max_limit } else { limit };
+ let query_with_limit = format!("{} LIMIT {}", query, limit);
+
+ let limit_warning = if was_capped {
+ Some(format!(
+ "Requested LIMIT {} exceeded maximum {}, capped to {}",
+ requested_limit.unwrap(),
+ max_limit,
+ max_limit
+ ))
+ } else {
+ None
+ };
+
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ chrono::Utc::now(),
+ username,
+ Some(session.power),
+ &query_with_limit,
+ None,
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ let mut sqlx_query = sqlx::query(&query_with_limit);
+ for val in where_values {
+ sqlx_query = sqlx_query.bind(val);
+ }
+
+ let result = sqlx_query.execute(&mut **tx).await?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(result.rows_affected()),
+ error: None,
+ warning: limit_warning,
+ results: None,
+ })
+}
+
+// Transaction-based execution functions for user context operations
+// These create their own transactions and commit them - used for single query operations
+
+async fn execute_select_with_tx(
+ request_id: &str,
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ session: &crate::models::Session,
+ state: &AppState,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ // Helper to count conditions in filter/where
+ fn count_conditions(
+ filter: &Option<crate::models::FilterCondition>,
+ where_clause: &Option<serde_json::Value>,
+ ) -> usize {
+ use crate::models::FilterCondition;
+ fn count_filter(cond: &FilterCondition) -> usize {
+ match cond {
+ FilterCondition::Simple { .. } => 1,
+ FilterCondition::Logical {
+ and_conditions,
+ or_conditions,
+ } => {
+ and_conditions
+ .as_ref()
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
+ + or_conditions
+ .as_ref()
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
+ }
+ FilterCondition::Not { not } => count_filter(not),
+ }
+ }
+ let mut count = 0;
+ if let Some(f) = filter {
+ count += count_filter(f);
+ }
+ if let Some(w) = where_clause {
+ if let serde_json::Value::Object(map) = w {
+ count += map.len();
+ }
+ }
+ count
+ }
+
+ // Enforce query limits from config (power-level specific with fallback to defaults)
+ let max_limit = state.config.get_max_limit(session.power);
+ let max_where = state.config.get_max_where_conditions(session.power);
+
+ // Apply granular column filtering based on user's power level
+ let requested_columns = if let Some(cols) = &payload.columns {
+ cols.clone()
+ } else {
+ vec!["*".to_string()]
+ };
+
+ let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" {
+ "*".to_string()
+ } else {
+ let allowed_columns =
+ state
+ .config
+ .filter_readable_columns(session.power, &table, &requested_columns);
+ if allowed_columns.is_empty() {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("No readable columns available for this request".to_string()),
+ warning: None,
+ results: None,
+ });
+ }
+ allowed_columns.join(", ")
+ };
+
+ // Enforce WHERE clause complexity
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
+ if condition_count > max_where as usize {
+ // Log security violation
+ let timestamp = chrono::Utc::now();
+ if let Err(log_err) = state
+ .logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &format!(
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
+ condition_count, max_where, session.power
+ ),
+ Some("query_limits"),
+ Some(&session.username),
+ Some(session.power),
+ )
+ .await
+ {
+ error!(
+ "[{}] Failed to log WHERE limit violation: {}",
+ request_id, log_err
+ );
+ }
+
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]",
+ condition_count, max_where, request_id
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let mut query = format!("SELECT {} FROM {}", filtered_columns, table);
+ let mut values = Vec::new();
+
+ // Add JOIN clauses if provided - validates permissions for all joined tables
+ if let Some(joins) = &payload.joins {
+ // Validate user has read permission for all joined tables
+ for join in joins {
+ if !state.rbac.check_permission(
+ &state.config,
+ session.power,
+ &join.table,
+ &crate::models::QueryAction::Select,
+ ) {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Insufficient permissions to JOIN with table '{}'",
+ join.table
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+ }
+
+ // Build and append JOIN SQL
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
+ query.push_str(&join_sql);
+ }
+
+ // Add WHERE clause - support both legacy and new filter format
+ if let Some(filter) = &payload.filter {
+ let (where_sql, where_values) = build_filter_clause(filter)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ } else if let Some(where_clause) = &payload.where_clause {
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ }
+
+ // Add ORDER BY if provided
+ if let Some(order_by) = &payload.order_by {
+ let order_clause = build_order_by_clause(order_by)?;
+ query.push_str(&order_clause);
+ }
+
+ // Enforce LIMIT and track if it was capped
+ let requested_limit = payload.limit;
+ let limit = requested_limit.unwrap_or(max_limit);
+ let was_capped = limit > max_limit;
+ let limit = if limit > max_limit { max_limit } else { limit };
+ query.push_str(&format!(" LIMIT {}", limit));
+
+ let limit_warning = if was_capped {
+ Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]",
+ requested_limit.unwrap(), max_limit, max_limit, request_id))
+ } else if requested_limit.is_none() {
+ Some(format!(
+ "No LIMIT specified, using default {} based on power level [request_id: {}]",
+ max_limit, request_id
+ ))
+ } else {
+ None
+ };
+
+ // Add OFFSET if provided
+ if let Some(offset) = payload.offset {
+ query.push_str(&format!(" OFFSET {}", offset));
+ }
+
+ // Log the query
+ let params_json = serde_json::to_value(&values).ok();
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ params_json.as_ref(),
+ None, // Row count will be known after execution
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ // Execute the query
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ match value {
+ Some(v) => sqlx_query = sqlx_query.bind(v),
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
+ }
+ }
+
+ let rows = sqlx_query.fetch_all(&mut *tx).await?;
+ tx.commit().await?;
+
+ let mut results = Vec::new();
+ for row in rows {
+ let mut result_row = serde_json::Map::new();
+ for (i, column) in row.columns().iter().enumerate() {
+ let value = convert_sql_value_to_json(
+ &row,
+ i,
+ Some(request_id),
+ Some(username),
+ Some(session.power),
+ Some(state),
+ )?;
+ result_row.insert(column.name().to_string(), value);
+ }
+ results.push(Value::Object(result_row));
+ }
+
+ Ok(QueryResponse {
+ success: true,
+ data: Some(Value::Array(results)),
+ rows_affected: None,
+ error: None,
+ warning: limit_warning,
+ results: None,
+ })
+}
+
+async fn execute_insert_with_tx(
+ request_id: &str,
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ let mut data = payload
+ .data
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Data is required for INSERT"))?
+ .clone();
+
+ // SECURITY: Apply column-level write filtering FIRST (before auto-generation)
+ // This validates what the user is trying to write
+ if let Value::Object(ref mut map) = data {
+ let all_columns: Vec<String> = map.keys().cloned().collect();
+ let writable_columns =
+ state
+ .config
+ .filter_writable_columns(session.power, &table, &all_columns);
+
+ // Remove columns that user cannot write to
+ map.retain(|key, _| writable_columns.contains(key));
+
+ // Check for auto-generation requirements based on config
+ // Auto-generated fields bypass write protection since they're system-generated
+ if let Some(auto_config) = state.config.get_auto_generation_config(&table) {
+ // Check if auto-generation is enabled for INSERT action
+ if auto_config.on_action == "insert" || auto_config.on_action == "both" {
+ let field_name = &auto_config.field;
+
+ if !map.contains_key(field_name)
+ || map.get(field_name).map_or(true, |v| {
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
+ })
+ {
+ // Generate auto value based on config
+ let generated_value = generate_auto_value(&state, &table, auto_config).await?;
+ map.insert(field_name.clone(), Value::String(generated_value));
+ }
+ }
+ }
+ }
+
+ // Final validation: ensure we have columns to insert
+ if let Value::Object(ref map) = data {
+ if map.is_empty() {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("No writable columns in INSERT data".to_string()),
+ warning: None,
+ results: None,
+ });
+ }
+ }
+
+ let (columns_vec, placeholders_vec, values) = build_insert_data(&data)?;
+ let columns = columns_vec.join(", ");
+ let placeholders = placeholders_vec.join(", ");
+
+ let query = format!(
+ "INSERT INTO {} ({}) VALUES ({})",
+ table, columns, placeholders
+ );
+
+ // Log the query
+ let params_json = serde_json::to_value(&values).ok();
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ params_json.as_ref(),
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ // Execute the query
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ match value {
+ Some(v) => sqlx_query = sqlx_query.bind(v),
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
+ }
+ }
+
+ let result = sqlx_query.execute(&mut *tx).await?;
+ let insert_id = result.last_insert_id();
+ tx.commit().await?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: Some(serde_json::json!(insert_id)),
+ rows_affected: Some(result.rows_affected()),
+ error: None,
+ warning: None, // INSERT queries don't have LIMIT,
+ results: None,
+ })
+}
+
+async fn execute_update_with_tx(
+ request_id: &str,
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ let mut data = payload
+ .data
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))?
+ .clone();
+
+ let where_clause = payload
+ .where_clause
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?;
+
+ // Enforce query limits from config (power-level specific with fallback to defaults)
+ let max_limit = state.config.get_max_limit(session.power);
+ let max_where = state.config.get_max_where_conditions(session.power);
+
+ // Enforce WHERE clause complexity
+ let condition_count = if let Some(w) = &payload.where_clause {
+ if let serde_json::Value::Object(map) = w {
+ map.len()
+ } else {
+ 0
+ }
+ } else {
+ 0
+ };
+ if condition_count > max_where as usize {
+ // Log security violation
+ let timestamp = chrono::Utc::now();
+ if let Err(log_err) = state
+ .logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &format!(
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
+ condition_count, max_where, session.power
+ ),
+ Some("query_limits"),
+ Some(&session.username),
+ Some(session.power),
+ )
+ .await
+ {
+ error!(
+ "[{}] Failed to log WHERE limit violation: {}",
+ request_id, log_err
+ );
+ }
+
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE conditions ({}) > max {} [request_id: {}]",
+ condition_count, max_where, request_id
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ // SECURITY: Apply column-level write filtering FIRST (before auto-generation)
+ if let Value::Object(ref mut map) = data {
+ let all_columns: Vec<String> = map.keys().cloned().collect();
+ let writable_columns =
+ state
+ .config
+ .filter_writable_columns(session.power, &table, &all_columns);
+
+ // Remove columns that user cannot write to
+ map.retain(|key, _| writable_columns.contains(key));
+
+ // Check for auto-generation (system-generated fields bypass write protection)
+ if let Some(auto_config) = state.config.get_auto_generation_config(&table) {
+ if auto_config.on_action == "update" || auto_config.on_action == "both" {
+ let field_name = &auto_config.field;
+ if !map.contains_key(field_name)
+ || map.get(field_name).map_or(true, |v| {
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
+ })
+ {
+ let generated_value = generate_auto_value(&state, &table, auto_config).await?;
+ map.insert(field_name.clone(), Value::String(generated_value));
+ }
+ }
+ }
+ }
+
+ // Final validation: ensure we have columns to update
+ if let Value::Object(ref map) = data {
+ if map.is_empty() {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("No writable columns in UPDATE data".to_string()),
+ warning: None,
+ results: None,
+ });
+ }
+ }
+
+ let (set_clause, mut values) = build_update_set_clause(&data)?;
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ // Convert where_values to Option<String> to match set values
+ values.extend(where_values.into_iter().map(Some));
+
+ let mut query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql);
+
+ // Enforce LIMIT and track if it was capped
+ let requested_limit = payload.limit;
+ let limit = requested_limit.unwrap_or(max_limit);
+ let was_capped = limit > max_limit;
+ let limit = if limit > max_limit { max_limit } else { limit };
+ query.push_str(&format!(" LIMIT {}", limit));
+
+ let limit_warning = if was_capped {
+ Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]",
+ requested_limit.unwrap(), max_limit, max_limit, request_id))
+ } else if requested_limit.is_none() {
+ Some(format!(
+ "No LIMIT specified, using default {} based on power level [request_id: {}]",
+ max_limit, request_id
+ ))
+ } else {
+ None
+ };
+
+ // Log the query
+ let params_json = serde_json::to_value(&values).ok();
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ params_json.as_ref(),
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ // Execute the query
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ match value {
+ Some(v) => sqlx_query = sqlx_query.bind(v),
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
+ }
+ }
+
+ let result = sqlx_query.execute(&mut *tx).await?;
+ tx.commit().await?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(result.rows_affected()),
+ error: None,
+ warning: limit_warning,
+ results: None,
+ })
+}
+
+async fn execute_delete_with_tx(
+ request_id: &str,
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ let where_clause = payload
+ .where_clause
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?;
+
+ // Enforce query limits from config (power-level specific with fallback to defaults)
+ let max_limit = state.config.get_max_limit(session.power);
+ let max_where = state.config.get_max_where_conditions(session.power);
+
+ // Enforce WHERE clause complexity
+ let condition_count = if let Some(w) = &payload.where_clause {
+ if let serde_json::Value::Object(map) = w {
+ map.len()
+ } else {
+ 0
+ }
+ } else {
+ 0
+ };
+ if condition_count > max_where as usize {
+ // Log security violation
+ let timestamp = chrono::Utc::now();
+ if let Err(log_err) = state
+ .logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &format!(
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
+ condition_count, max_where, session.power
+ ),
+ Some("query_limits"),
+ Some(&session.username),
+ Some(session.power),
+ )
+ .await
+ {
+ error!(
+ "[{}] Failed to log WHERE limit violation: {}",
+ request_id, log_err
+ );
+ }
+
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE conditions ({}) > max {} [request_id: {}]",
+ condition_count, max_where, request_id
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let (where_sql, values) = build_where_clause(where_clause)?;
+
+ let mut query = format!("DELETE FROM {} WHERE {}", table, where_sql);
+
+ // Enforce LIMIT and track if it was capped
+ let requested_limit = payload.limit;
+ let limit = requested_limit.unwrap_or(max_limit);
+ let was_capped = limit > max_limit;
+ let limit = if limit > max_limit { max_limit } else { limit };
+ query.push_str(&format!(" LIMIT {}", limit));
+
+ let limit_warning = if was_capped {
+ Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]",
+ requested_limit.unwrap(), max_limit, max_limit, request_id))
+ } else if requested_limit.is_none() {
+ Some(format!(
+ "No LIMIT specified, using default {} based on power level [request_id: {}]",
+ max_limit, request_id
+ ))
+ } else {
+ None
+ };
+
+ // Log the query
+ let params_json = serde_json::to_value(&values).ok();
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ params_json.as_ref(),
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ // Execute the query
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ sqlx_query = sqlx_query.bind(value);
+ }
+
+ let result = sqlx_query.execute(&mut *tx).await?;
+ tx.commit().await?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(result.rows_affected()),
+ error: None,
+ warning: limit_warning,
+ results: None,
+ })
+}
+
+async fn execute_count_with_tx(
+ request_id: &str,
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
+ payload: &QueryRequest,
+ username: &str,
+ session: &crate::models::Session,
+ state: &AppState,
+) -> anyhow::Result<QueryResponse> {
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
+
+ // Check read permissions for the table
+ if !state.rbac.check_permission(
+ &state.config,
+ session.power,
+ table,
+ &crate::models::QueryAction::Select,
+ ) {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Insufficient permissions to COUNT from table '{}' [request_id: {}]",
+ table, request_id
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ // Helper to count conditions in filter/where (same as select function)
+ fn count_conditions(
+ filter: &Option<crate::models::FilterCondition>,
+ where_clause: &Option<serde_json::Value>,
+ ) -> usize {
+ use crate::models::FilterCondition;
+ fn count_filter(cond: &FilterCondition) -> usize {
+ match cond {
+ FilterCondition::Simple { .. } => 1,
+ FilterCondition::Logical {
+ and_conditions,
+ or_conditions,
+ } => {
+ and_conditions
+ .as_ref()
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
+ + or_conditions
+ .as_ref()
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
+ }
+ FilterCondition::Not { not } => count_filter(not),
+ }
+ }
+ let mut count = 0;
+ if let Some(f) = filter {
+ count += count_filter(f);
+ }
+ if let Some(w) = where_clause {
+ if let serde_json::Value::Object(map) = w {
+ count += map.len();
+ }
+ }
+ count
+ }
+
+ // Enforce query limits from config (power-level specific with fallback to defaults)
+ let max_where = state.config.get_max_where_conditions(session.power);
+
+ // Enforce WHERE clause complexity
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
+ if condition_count > max_where as usize {
+ // Log security violation
+ let timestamp = chrono::Utc::now();
+ if let Err(log_err) = state
+ .logging
+ .log_error(
+ &request_id,
+ timestamp,
+ &format!(
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
+ condition_count, max_where, session.power
+ ),
+ Some("query_limits"),
+ Some(&session.username),
+ Some(session.power),
+ )
+ .await
+ {
+ error!(
+ "[{}] Failed to log WHERE limit violation: {}",
+ request_id, log_err
+ );
+ }
+
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]",
+ condition_count, max_where, request_id
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+
+ let mut query = format!("SELECT COUNT(*) as count FROM {}", table);
+ let mut values = Vec::new();
+
+ // Add JOIN clauses if provided - validates permissions for all joined tables
+ if let Some(joins) = &payload.joins {
+ // Validate user has read permission for all joined tables
+ for join in joins {
+ if !state.rbac.check_permission(
+ &state.config,
+ session.power,
+ &join.table,
+ &crate::models::QueryAction::Select,
+ ) {
+ return Ok(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Insufficient permissions to JOIN with table '{}'",
+ join.table
+ )),
+ warning: None,
+ results: None,
+ });
+ }
+ }
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
+ query.push_str(&join_sql);
+ }
+
+ // Add WHERE conditions (filter takes precedence over where_clause if both are provided)
+ if let Some(filter) = &payload.filter {
+ let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ } else if let Some(where_clause) = &payload.where_clause {
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
+ query.push_str(&format!(" WHERE {}", where_sql));
+ values.extend(where_values.into_iter().map(Some));
+ }
+
+ // Log the query
+ let params_json = serde_json::to_value(&values).ok();
+ if let Err(e) = state
+ .logging
+ .log_query(
+ request_id,
+ chrono::Utc::now(),
+ username,
+ Some(session.power),
+ &query,
+ params_json.as_ref(),
+ None,
+ )
+ .await
+ {
+ error!("[{}] Failed to log query: {}", request_id, e);
+ }
+
+ // Execute the query
+ let mut sqlx_query = sqlx::query(&query);
+ for value in values {
+ match value {
+ Some(v) => sqlx_query = sqlx_query.bind(v),
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
+ }
+ }
+
+ let result = sqlx_query.fetch_one(&mut *tx).await?;
+ tx.commit().await?;
+
+ let count: i64 = result.try_get("count")?;
+
+ Ok(QueryResponse {
+ success: true,
+ data: Some(serde_json::json!(count)),
+ rows_affected: None,
+ error: None,
+ warning: None,
+ results: None,
+ })
+}
+
+/// Execute multiple queries in a single transaction
+async fn execute_batch_mode(
+ state: AppState,
+ session: crate::models::Session,
+ request_id: String,
+ timestamp: chrono::DateTime<chrono::Utc>,
+ client_ip: String,
+ payload: &QueryRequest,
+) -> Result<Json<QueryResponse>, StatusCode> {
+ let queries = payload.queries.as_ref().unwrap();
+
+ // Check if batch is empty
+ if queries.is_empty() {
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some("Batch request cannot be empty".to_string()),
+ warning: None,
+ results: Some(vec![]),
+ }));
+ }
+
+ info!(
+ "[{}] Batch query request from user {} (power {}): {} queries",
+ request_id,
+ session.username,
+ session.power,
+ queries.len()
+ );
+
+ // Log the batch request (log full payload including action/table)
+ if let Err(e) = state
+ .logging
+ .log_request(
+ &request_id,
+ timestamp,
+ &client_ip,
+ Some(&session.username),
+ Some(session.power),
+ "/query",
+ &serde_json::to_value(payload).unwrap_or_default(),
+ )
+ .await
+ {
+ error!("[{}] Failed to log batch request: {}", request_id, e);
+ }
+
+ // Get action and table from parent request (all batch queries share these)
+ let action = payload
+ .action
+ .as_ref()
+ .ok_or_else(|| StatusCode::BAD_REQUEST)?;
+
+ let table = payload
+ .table
+ .as_ref()
+ .ok_or_else(|| StatusCode::BAD_REQUEST)?;
+
+ // Validate table name once for the entire batch
+ if let Err(e) = validate_table_name(table, &state.config) {
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!("Invalid table name: {}", e)),
+ warning: None,
+ results: Some(vec![]),
+ }));
+ }
+
+ // Check RBAC permission once for the entire batch
+ if !state
+ .rbac
+ .check_permission(&state.config, session.power, table, action)
+ {
+ return Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!(
+ "Insufficient permissions for {} on table '{}'",
+ match action {
+ QueryAction::Select => "SELECT",
+ QueryAction::Insert => "INSERT",
+ QueryAction::Update => "UPDATE",
+ QueryAction::Delete => "DELETE",
+ QueryAction::Count => "COUNT",
+ },
+ table
+ )),
+ warning: None,
+ results: Some(vec![]),
+ }));
+ }
+
+ info!(
+ "[{}] Validated batch: {} x {:?} on table '{}'",
+ request_id,
+ queries.len(),
+ action,
+ table
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!(
+ "Validated batch: {} x {:?} on table '{}'",
+ queries.len(),
+ action,
+ table
+ ),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ if !state.database.is_available() {
+ warn!(
+ "[{}] Database marked unavailable before batch execution",
+ request_id
+ );
+ log_database_unavailable_event(
+ &state.logging,
+ &request_id,
+ Some(&session.username),
+ Some(session.power),
+ "Database flagged unavailable before batch",
+ )
+ .await;
+ return Ok(Json(database_unavailable_batch_response(&request_id)));
+ }
+
+ // Start a SINGLE transaction for the batch - proper atomic operation
+ let mut tx = match state.database.pool().begin().await {
+ Ok(tx) => {
+ state.database.mark_available();
+ tx
+ }
+ Err(e) => {
+ state.database.mark_unavailable();
+ error!("[{}] Failed to begin batch transaction: {}", request_id, e);
+ log_database_unavailable_event(
+ &state.logging,
+ &request_id,
+ Some(&session.username),
+ Some(session.power),
+ &format!("Failed to begin batch transaction: {}", e),
+ )
+ .await;
+ return Ok(Json(database_unavailable_batch_response(&request_id)));
+ }
+ };
+
+ // Set user context and request ID in transaction
+ if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?")
+ .bind(session.user_id)
+ .bind(&request_id)
+ .execute(&mut *tx)
+ .await
+ {
+ state.database.mark_unavailable();
+ error!(
+ "[{}] Failed to set current user context and request ID: {}",
+ request_id, e
+ );
+ log_database_unavailable_event(
+ &state.logging,
+ &request_id,
+ Some(&session.username),
+ Some(session.power),
+ &format!("Failed to set batch user context: {}", e),
+ )
+ .await;
+ return Ok(Json(database_unavailable_batch_response(&request_id)));
+ }
+
+ let rollback_on_error = payload.rollback_on_error.unwrap_or(false);
+
+ info!(
+ "[{}] Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})",
+ request_id,
+ queries.len(),
+ action,
+ table,
+ rollback_on_error
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!(
+ "Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})",
+ queries.len(),
+ action,
+ table,
+ rollback_on_error
+ ),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ // Execute as SINGLE native batch query based on action type
+ let result = match action {
+ QueryAction::Insert => {
+ execute_batch_insert(
+ &request_id,
+ &mut tx,
+ queries,
+ table,
+ action,
+ &session.username,
+ &state,
+ &session,
+ )
+ .await
+ }
+ QueryAction::Update => {
+ execute_batch_update(
+ &request_id,
+ &mut tx,
+ queries,
+ table,
+ action,
+ &session.username,
+ &state,
+ &session,
+ )
+ .await
+ }
+ QueryAction::Delete => {
+ execute_batch_delete(
+ &request_id,
+ &mut tx,
+ queries,
+ table,
+ action,
+ &session.username,
+ &state,
+ &session,
+ )
+ .await
+ }
+ QueryAction::Select => {
+ // SELECT batches are less common but we'll execute them individually
+ // (combining SELECTs into one query doesn't make sense as they return different results)
+ execute_batch_selects(
+ &request_id,
+ &mut tx,
+ queries,
+ table,
+ action,
+ &session.username,
+ &session,
+ &state,
+ )
+ .await
+ }
+ QueryAction::Count => {
+ // COUNT batches execute individually (each returns different results)
+ execute_batch_counts(
+ &request_id,
+ &mut tx,
+ queries,
+ table,
+ action,
+ &session.username,
+ &session,
+ &state,
+ )
+ .await
+ }
+ };
+
+ match result {
+ Ok(response) => {
+ if response.success || !rollback_on_error {
+ // Commit the transaction
+ tx.commit().await.map_err(|e| {
+ error!("[{}] Failed to commit batch transaction: {}", request_id, e);
+ StatusCode::INTERNAL_SERVER_ERROR
+ })?;
+
+ info!(
+ "[{}] Native batch committed: {} operations",
+ request_id,
+ queries.len()
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Native batch committed: {} operations", queries.len()),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+ Ok(Json(response))
+ } else {
+ // Rollback on error
+ error!(
+ "[{}] Rolling back batch transaction due to error",
+ request_id
+ );
+ tx.rollback().await.map_err(|e| {
+ error!("[{}] Failed to rollback transaction: {}", request_id, e);
+ StatusCode::INTERNAL_SERVER_ERROR
+ })?;
+ Ok(Json(response))
+ }
+ }
+ Err(e) => {
+ error!("[{}] Batch execution failed: {}", request_id, e);
+ tx.rollback().await.map_err(|e2| {
+ error!("[{}] Failed to rollback after error: {}", request_id, e2);
+ StatusCode::INTERNAL_SERVER_ERROR
+ })?;
+
+ Ok(Json(QueryResponse {
+ success: false,
+ data: None,
+ rows_affected: None,
+ error: Some(format!("Batch execution failed: {}", e)),
+ warning: None,
+ results: Some(vec![]),
+ }))
+ }
+ }
+}
+
+// ===== NATIVE BATCH EXECUTION FUNCTIONS =====
+
+/// Execute batch INSERT using MySQL multi-value INSERT
+async fn execute_batch_insert(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ queries: &Vec<crate::models::BatchQuery>,
+ table: &str,
+ _action: &QueryAction,
+ _username: &str,
+ state: &AppState,
+ _session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ use serde_json::Value;
+
+ // Extract all data objects, apply auto-generation, and validate they have the same columns
+ let mut all_data = Vec::new();
+ let mut column_set: Option<std::collections::HashSet<String>> = None;
+
+ // Check for auto-generation config
+ let auto_config = state.config.get_auto_generation_config(&table);
+
+ for (idx, query) in queries.iter().enumerate() {
+ let mut data = query
+ .data
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("Query {} missing data field for INSERT", idx + 1))?
+ .clone();
+
+ // Apply auto-generation if configured for INSERT
+ if let Some(ref auto_cfg) = auto_config {
+ if auto_cfg.on_action == "insert" || auto_cfg.on_action == "both" {
+ if let Value::Object(ref mut map) = data {
+ let field_name = &auto_cfg.field;
+
+ if !map.contains_key(field_name)
+ || map.get(field_name).map_or(true, |v| {
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
+ })
+ {
+ // Generate auto value based on config
+ let generated_value = generate_auto_value(&state, &table, auto_cfg).await?;
+ map.insert(field_name.clone(), Value::String(generated_value));
+ }
+ }
+ }
+ }
+
+ if let Value::Object(map) = data {
+ let cols: std::collections::HashSet<String> = map.keys().cloned().collect();
+
+ if let Some(ref expected_cols) = column_set {
+ if *expected_cols != cols {
+ anyhow::bail!("All INSERT queries must have the same columns. Query {} has different columns", idx + 1);
+ }
+ } else {
+ column_set = Some(cols);
+ }
+
+ all_data.push(map);
+ } else {
+ anyhow::bail!("Query {} data must be an object", idx + 1);
+ }
+ }
+
+ let columns: Vec<String> = column_set.unwrap().into_iter().collect();
+
+ // Validate column names
+ for col in &columns {
+ validate_column_name(col)?;
+ }
+
+ // Build multi-value INSERT: INSERT INTO table (col1, col2) VALUES (?, ?), (?, ?), ...
+ let column_list = columns.join(", ");
+ let value_placeholder = format!("({})", vec!["?"; columns.len()].join(", "));
+ let values_clause = vec![value_placeholder; all_data.len()].join(", ");
+
+ let sql = format!(
+ "INSERT INTO {} ({}) VALUES {}",
+ table, column_list, values_clause
+ );
+
+ info!(
+ "[{}] Native batch INSERT: {} rows into {}",
+ request_id,
+ all_data.len(),
+ table
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!(
+ "Native batch INSERT: {} rows into {}",
+ all_data.len(),
+ table
+ ),
+ Some("query"),
+ Some(&_session.username),
+ Some(_session.power),
+ );
+
+ // Bind all values
+ let mut query = sqlx::query(&sql);
+ for data_map in &all_data {
+ for col in &columns {
+ let value = data_map.get(col).and_then(|v| match v {
+ Value::String(s) => Some(s.clone()),
+ Value::Number(n) => Some(n.to_string()),
+ Value::Bool(b) => Some(b.to_string()),
+ Value::Null => None,
+ _ => Some(v.to_string()),
+ });
+ query = query.bind(value);
+ }
+ }
+
+ // Execute the batch INSERT
+ let result = query.execute(&mut **tx).await?;
+ let rows_affected = result.rows_affected();
+
+ info!(
+ "[{}] Batch INSERT affected {} rows",
+ request_id, rows_affected
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Batch INSERT affected {} rows", rows_affected),
+ Some("query"),
+ Some(&_session.username),
+ Some(_session.power),
+ );
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(rows_affected),
+ error: None,
+ warning: None,
+ results: None,
+ })
+}
+
+/// Execute batch UPDATE (executes individually for now, could be optimized with CASE statements)
+async fn execute_batch_update(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ queries: &Vec<crate::models::BatchQuery>,
+ table: &str,
+ action: &QueryAction,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ let mut total_rows = 0u64;
+
+ for (idx, batch_query) in queries.iter().enumerate() {
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
+ let query_req = QueryRequest {
+ action: Some(action.clone()),
+ table: Some(table.to_string()),
+ columns: batch_query.columns.clone(),
+ data: batch_query.data.clone(),
+ where_clause: batch_query.where_clause.clone(),
+ filter: batch_query.filter.clone(),
+ joins: None,
+ limit: batch_query.limit,
+ offset: batch_query.offset,
+ order_by: batch_query.order_by.clone(),
+ queries: None,
+ rollback_on_error: None,
+ };
+
+ let result = execute_update_core(
+ &format!("{}-{}", request_id, idx + 1),
+ tx,
+ &query_req,
+ username,
+ state,
+ session,
+ )
+ .await?;
+
+ if let Some(rows) = result.rows_affected {
+ total_rows += rows;
+ }
+ }
+
+ info!(
+ "[{}] Batch UPDATE affected {} total rows",
+ request_id, total_rows
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Batch UPDATE affected {} total rows", total_rows),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(total_rows),
+ error: None,
+ warning: None,
+ results: None,
+ })
+}
+
+/// Execute batch DELETE using IN clause when possible
+async fn execute_batch_delete(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ queries: &Vec<crate::models::BatchQuery>,
+ table: &str,
+ action: &QueryAction,
+ username: &str,
+ state: &AppState,
+ session: &crate::models::Session,
+) -> anyhow::Result<QueryResponse> {
+ // For now, execute individually
+ // TODO: Optimize by detecting simple ID-based deletes and combining with IN clause
+ let mut total_rows = 0u64;
+
+ for (idx, batch_query) in queries.iter().enumerate() {
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
+ let query_req = QueryRequest {
+ action: Some(action.clone()),
+ table: Some(table.to_string()),
+ columns: batch_query.columns.clone(),
+ data: batch_query.data.clone(),
+ where_clause: batch_query.where_clause.clone(),
+ filter: batch_query.filter.clone(),
+ joins: None,
+ limit: batch_query.limit,
+ offset: batch_query.offset,
+ order_by: batch_query.order_by.clone(),
+ queries: None,
+ rollback_on_error: None,
+ };
+
+ let result = execute_delete_core(
+ &format!("{}-{}", request_id, idx + 1),
+ tx,
+ &query_req,
+ username,
+ state,
+ session,
+ )
+ .await?;
+
+ if let Some(rows) = result.rows_affected {
+ total_rows += rows;
+ }
+ }
+
+ info!(
+ "[{}] Batch DELETE affected {} total rows",
+ request_id, total_rows
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Batch DELETE affected {} total rows", total_rows),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: Some(total_rows),
+ error: None,
+ warning: None,
+ results: None,
+ })
+}
+
+/// Execute batch SELECT (executes individually since they return different results)
+async fn execute_batch_selects(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ queries: &Vec<crate::models::BatchQuery>,
+ table: &str,
+ action: &QueryAction,
+ username: &str,
+ session: &crate::models::Session,
+ state: &AppState,
+) -> anyhow::Result<QueryResponse> {
+ let mut results = Vec::new();
+
+ for (idx, batch_query) in queries.iter().enumerate() {
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
+ let query_req = QueryRequest {
+ action: Some(action.clone()),
+ table: Some(table.to_string()),
+ columns: batch_query.columns.clone(),
+ data: batch_query.data.clone(),
+ where_clause: batch_query.where_clause.clone(),
+ filter: batch_query.filter.clone(),
+ joins: None,
+ limit: batch_query.limit,
+ offset: batch_query.offset,
+ order_by: batch_query.order_by.clone(),
+ queries: None,
+ rollback_on_error: None,
+ };
+
+ let result = execute_select_core(
+ &format!("{}-{}", request_id, idx + 1),
+ tx,
+ &query_req,
+ username,
+ session,
+ state,
+ )
+ .await?;
+
+ results.push(result);
+ }
+
+ info!(
+ "[{}] Batch SELECT executed {} queries",
+ request_id,
+ results.len()
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Batch SELECT executed {} queries", results.len()),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: None,
+ error: None,
+ warning: None,
+ results: Some(results),
+ })
+}
+
+/// Execute batch COUNT (executes individually since they return different results)
+async fn execute_batch_counts(
+ request_id: &str,
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
+ queries: &Vec<crate::models::BatchQuery>,
+ table: &str,
+ action: &QueryAction,
+ username: &str,
+ session: &crate::models::Session,
+ state: &AppState,
+) -> anyhow::Result<QueryResponse> {
+ let mut results = Vec::new();
+
+ for (idx, batch_query) in queries.iter().enumerate() {
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
+ let query_req = QueryRequest {
+ action: Some(action.clone()),
+ table: Some(table.to_string()),
+ columns: batch_query.columns.clone(),
+ data: batch_query.data.clone(),
+ where_clause: batch_query.where_clause.clone(),
+ filter: batch_query.filter.clone(),
+ joins: None,
+ limit: batch_query.limit,
+ offset: batch_query.offset,
+ order_by: batch_query.order_by.clone(),
+ queries: None,
+ rollback_on_error: None,
+ };
+
+ let result = execute_count_core(
+ &format!("{}:{}", request_id, idx),
+ tx,
+ &query_req,
+ username,
+ session,
+ state,
+ )
+ .await?;
+
+ results.push(result);
+ }
+
+ info!(
+ "[{}] Batch COUNT executed {} queries",
+ request_id,
+ results.len()
+ );
+ super::log_info_async(
+ &state.logging,
+ &request_id,
+ &format!("Batch COUNT executed {} queries", results.len()),
+ Some("query"),
+ Some(&session.username),
+ Some(session.power),
+ );
+
+ Ok(QueryResponse {
+ success: true,
+ data: None,
+ rows_affected: None,
+ error: None,
+ warning: None,
+ results: Some(results),
+ })
+}