diff --git a/src/basic/keywords/table_migration.rs b/src/basic/keywords/table_migration.rs index 2eafb8da..22eb0608 100644 --- a/src/basic/keywords/table_migration.rs +++ b/src/basic/keywords/table_migration.rs @@ -13,7 +13,9 @@ use std::error::Error; use std::sync::Arc; use uuid::Uuid; -use super::table_definition::{FieldDefinition, TableDefinition, map_type_to_sql, parse_table_definition}; +use super::table_definition::{ + map_type_to_sql, parse_table_definition, FieldDefinition, TableDefinition, +}; /// Schema migration result #[derive(Debug, Default)] @@ -21,6 +23,7 @@ pub struct MigrationResult { pub tables_created: usize, pub tables_altered: usize, pub columns_added: usize, + pub columns_dropped: usize, pub errors: Vec, } @@ -32,6 +35,20 @@ pub struct DbColumn { pub is_nullable: bool, } +/// Columns that should never be dropped automatically +const PROTECTED_COLUMNS: &[&str] = &[ + "id", + "bot_id", + "org_id", + "user_id", + "created_at", + "updated_at", + "deleted_at", + "is_deleted", + "version", + "tenant_id", +]; + /// Compare and sync table schema with definition pub fn sync_table_schema( table: &TableDefinition, @@ -44,16 +61,20 @@ pub fn sync_table_schema( // If no columns exist, create the table if existing_columns.is_empty() { info!("Creating new table: {}", table.name); - sql_query(create_sql).execute(conn) + sql_query(create_sql) + .execute(conn) .map_err(|e| format!("Failed to create table {}: {}", table.name, e))?; result.tables_created += 1; return Ok(result); } - // Check for schema drift + let table_name = sanitize_identifier(&table.name); + let defined_col_names: std::collections::HashSet = + table.fields.iter().map(|f| f.name.clone()).collect(); let existing_col_names: std::collections::HashSet = existing_columns.iter().map(|c| c.name.clone()).collect(); + // Add missing columns let mut missing_columns: Vec<&FieldDefinition> = Vec::new(); for field in &table.fields { if !existing_col_names.contains(&field.name) { @@ -61,40 +82,44 @@ pub fn sync_table_schema( } } - // Add missing columns if !missing_columns.is_empty() { - info!("Table {} is missing {} columns, adding them", table.name, missing_columns.len()); + info!( + "Table {} is missing {} columns, adding them", + table.name, + missing_columns.len() + ); for field in &missing_columns { let sql_type = map_type_to_sql(field, "postgres"); - let column_sql = if field.is_nullable { - format!("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}", - sanitize_identifier(&table.name), - sanitize_identifier(&field.name), - sql_type) - } else { - // For NOT NULL columns, add as nullable first then set default - format!("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}", - sanitize_identifier(&table.name), - sanitize_identifier(&field.name), - sql_type) - }; + let column_sql = format!( + "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}", + &table_name, + sanitize_identifier(&field.name), + sql_type + ); - info!("Adding column: {}.{} ({})", table.name, field.name, sql_type); + info!( + "Adding column: {}.{} ({})", + table.name, field.name, sql_type + ); match sql_query(&column_sql).execute(conn) { Ok(_) => { result.columns_added += 1; info!("Successfully added column {}.{}", table.name, field.name); } Err(e) => { - // Check if column already exists (ignore error) let err_str = e.to_string(); - if !err_str.contains("already exists") && !err_str.contains("duplicate column") { - let error_msg = format!("Failed to add column {}.{}: {}", table.name, field.name, e); + if !err_str.contains("already exists") && !err_str.contains("duplicate column") + { + let error_msg = + format!("Failed to add column {}.{}: {}", table.name, field.name, e); error!("{}", error_msg); result.errors.push(error_msg); } else { - info!("Column {}.{} already exists, skipping", table.name, field.name); + info!( + "Column {}.{} already exists, skipping", + table.name, field.name + ); } } } @@ -102,6 +127,193 @@ pub fn sync_table_schema( result.tables_altered += 1; } + // Drop columns that were removed from definition (with protection) + let mut orphaned_columns: Vec<&DbColumn> = Vec::new(); + for col in existing_columns { + if !defined_col_names.contains(&col.name) && !PROTECTED_COLUMNS.contains(&col.name.as_str()) + { + orphaned_columns.push(col); + } + } + + if !orphaned_columns.is_empty() { + warn!( + "Table {} has {} orphaned columns not in definition:", + table.name, + orphaned_columns.len() + ); + + for col in &orphaned_columns { + info!("Dropping orphaned column: {}.{}", table.name, col.name); + let drop_sql = format!( + "ALTER TABLE {} DROP COLUMN IF EXISTS {}", + &table_name, + sanitize_identifier(&col.name) + ); + + match sql_query(&drop_sql).execute(conn) { + Ok(_) => { + result.columns_dropped += 1; + info!("Successfully dropped column {}.{}", table.name, col.name); + } + Err(e) => { + let error_msg = + format!("Failed to drop column {}.{}: {}", table.name, col.name, e); + error!("{}", error_msg); + result.errors.push(error_msg); + } + } + } + if result.columns_dropped > 0 { + result.tables_altered += 1; + } + } + + Ok(result) +} + +/// Column metadata from database +#[derive(Debug, Clone)] +pub struct DbColumn { + pub name: String, + pub data_type: String, + pub is_nullable: bool, +} + +/// Columns that should never be dropped automatically +const PROTECTED_COLUMNS: &[&str] = &[ + "id", + "bot_id", + "org_id", + "user_id", + "created_at", + "updated_at", + "deleted_at", + "is_deleted", + "version", + "tenant_id", +]; + +/// Compare and sync table schema with definition +pub fn sync_table_schema( + table: &TableDefinition, + existing_columns: &[DbColumn], + create_sql: &str, + conn: &mut diesel::PgConnection, +) -> Result> { + let mut result = MigrationResult::default(); + + // If no columns exist, create the table + if existing_columns.is_empty() { + info!("Creating new table: {}", table.name); + sql_query(create_sql) + .execute(conn) + .map_err(|e| format!("Failed to create table {}: {}", table.name, e))?; + result.tables_created += 1; + return Ok(result); + } + + let table_name = sanitize_identifier(&table.name); + let defined_col_names: std::collections::HashSet = + table.fields.iter().map(|f| f.name.clone()).collect(); + let existing_col_names: std::collections::HashSet = + existing_columns.iter().map(|c| c.name.clone()).collect(); + + // Add missing columns + let mut missing_columns: Vec<&FieldDefinition> = Vec::new(); + for field in &table.fields { + if !existing_col_names.contains(&field.name) { + missing_columns.push(field); + } + } + + if !missing_columns.is_empty() { + info!( + "Table {} is missing {} columns, adding them", + table.name, + missing_columns.len() + ); + + for field in &missing_columns { + let sql_type = map_type_to_sql(field, "postgres"); + let column_sql = format!( + "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}", + &table_name, + sanitize_identifier(&field.name), + sql_type + ); + + info!( + "Adding column: {}.{} ({})", + table.name, field.name, sql_type + ); + match sql_query(&column_sql).execute(conn) { + Ok(_) => { + result.columns_added += 1; + info!("Successfully added column {}.{}", table.name, field.name); + } + Err(e) => { + let err_str = e.to_string(); + if !err_str.contains("already exists") && !err_str.contains("duplicate column") + { + let error_msg = + format!("Failed to add column {}.{}: {}", table.name, field.name, e); + error!("{}", error_msg); + result.errors.push(error_msg); + } else { + info!( + "Column {}.{} already exists, skipping", + table.name, field.name + ); + } + } + } + } + result.tables_altered += 1; + } + + // Drop columns that were removed from definition (with protection) + let mut orphaned_columns: Vec<&DbColumn> = Vec::new(); + for col in existing_columns { + if !defined_col_names.contains(&col.name) && !PROTECTED_COLUMNS.contains(&col.name.as_str()) + { + orphaned_columns.push(col); + } + } + + if !orphaned_columns.is_empty() { + warn!( + "Table {} has {} orphaned columns not in definition:", + table.name, + orphaned_columns.len() + ); + + for col in &orphaned_columns { + info!("Dropping orphaned column: {}.{}", table.name, col.name); + let drop_sql = format!( + "ALTER TABLE {} DROP COLUMN IF EXISTS {}", + &table_name, + sanitize_identifier(&col.name) + ); + + match sql_query(&drop_sql).execute(conn) { + Ok(_) => { + result.columns_dropped += 1; + info!("Successfully dropped column {}.{}", table.name, col.name); + } + Err(e) => { + let error_msg = + format!("Failed to drop column {}.{}: {}", table.name, col.name, e); + error!("{}", error_msg); + result.errors.push(error_msg); + } + } + } + if result.columns_dropped > 0 { + result.tables_altered += 1; + } + } + Ok(result) } @@ -136,11 +348,14 @@ pub fn get_table_columns( } }; - Ok(rows.into_iter().map(|row| DbColumn { - name: row.column_name, - data_type: row.data_type, - is_nullable: row.is_nullable == "YES", - }).collect()) + Ok(rows + .into_iter() + .map(|row| DbColumn { + name: row.column_name, + data_type: row.data_type, + is_nullable: row.is_nullable == "YES", + }) + .collect()) } /// Process table definitions with schema sync for a specific bot @@ -152,7 +367,11 @@ pub fn sync_bot_tables( let tables = parse_table_definition(source)?; let mut result = MigrationResult::default(); - info!("Processing {} table definitions with schema sync for bot {}", tables.len(), bot_id); + info!( + "Processing {} table definitions with schema sync for bot {}", + tables.len(), + bot_id + ); // Get bot's database connection let pool = state.bot_database_manager.get_bot_pool(bot_id)?; @@ -177,6 +396,7 @@ pub fn sync_bot_tables( result.tables_created += sync_result.tables_created; result.tables_altered += sync_result.tables_altered; result.columns_added += sync_result.columns_added; + result.columns_dropped += sync_result.columns_dropped; result.errors.extend(sync_result.errors); } Err(e) => { @@ -188,8 +408,8 @@ pub fn sync_bot_tables( } // Log summary - info!("Schema sync summary for bot {}: {} tables created, {} altered, {} columns added, {} errors", - bot_id, result.tables_created, result.tables_altered, result.columns_added, result.errors.len()); + info!("Schema sync summary for bot {}: {} tables created, {} altered, {} columns added, {} columns dropped, {} errors", + bot_id, result.tables_created, result.tables_altered, result.columns_added, result.columns_dropped, result.errors.len()); if !result.errors.is_empty() { warn!("Schema sync completed with {} errors:", result.errors.len());