feat: Add column drop protection in table schema sync
Some checks failed
BotServer CI/CD / build (push) Failing after 2m30s
Some checks failed
BotServer CI/CD / build (push) Failing after 2m30s
- Add columns_dropped counter to MigrationResult - Add PROTECTED_COLUMNS list (id, bot_id, org_id, user_id, created_at, etc.) - Detect orphaned columns (in DB but not in tables.bas) and drop them - Protected columns are never dropped automatically - Uses DROP COLUMN IF EXISTS for safety - Logs warnings for orphaned columns before dropping
This commit is contained in:
parent
44669c3825
commit
d785d255c6
1 changed files with 250 additions and 30 deletions
|
|
@ -13,7 +13,9 @@ use std::error::Error;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::table_definition::{FieldDefinition, TableDefinition, map_type_to_sql, parse_table_definition};
|
use super::table_definition::{
|
||||||
|
map_type_to_sql, parse_table_definition, FieldDefinition, TableDefinition,
|
||||||
|
};
|
||||||
|
|
||||||
/// Schema migration result
|
/// Schema migration result
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
|
|
@ -21,6 +23,7 @@ pub struct MigrationResult {
|
||||||
pub tables_created: usize,
|
pub tables_created: usize,
|
||||||
pub tables_altered: usize,
|
pub tables_altered: usize,
|
||||||
pub columns_added: usize,
|
pub columns_added: usize,
|
||||||
|
pub columns_dropped: usize,
|
||||||
pub errors: Vec<String>,
|
pub errors: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -32,6 +35,20 @@ pub struct DbColumn {
|
||||||
pub is_nullable: bool,
|
pub is_nullable: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Columns that should never be dropped automatically
|
||||||
|
const PROTECTED_COLUMNS: &[&str] = &[
|
||||||
|
"id",
|
||||||
|
"bot_id",
|
||||||
|
"org_id",
|
||||||
|
"user_id",
|
||||||
|
"created_at",
|
||||||
|
"updated_at",
|
||||||
|
"deleted_at",
|
||||||
|
"is_deleted",
|
||||||
|
"version",
|
||||||
|
"tenant_id",
|
||||||
|
];
|
||||||
|
|
||||||
/// Compare and sync table schema with definition
|
/// Compare and sync table schema with definition
|
||||||
pub fn sync_table_schema(
|
pub fn sync_table_schema(
|
||||||
table: &TableDefinition,
|
table: &TableDefinition,
|
||||||
|
|
@ -44,16 +61,20 @@ pub fn sync_table_schema(
|
||||||
// If no columns exist, create the table
|
// If no columns exist, create the table
|
||||||
if existing_columns.is_empty() {
|
if existing_columns.is_empty() {
|
||||||
info!("Creating new table: {}", table.name);
|
info!("Creating new table: {}", table.name);
|
||||||
sql_query(create_sql).execute(conn)
|
sql_query(create_sql)
|
||||||
|
.execute(conn)
|
||||||
.map_err(|e| format!("Failed to create table {}: {}", table.name, e))?;
|
.map_err(|e| format!("Failed to create table {}: {}", table.name, e))?;
|
||||||
result.tables_created += 1;
|
result.tables_created += 1;
|
||||||
return Ok(result);
|
return Ok(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for schema drift
|
let table_name = sanitize_identifier(&table.name);
|
||||||
|
let defined_col_names: std::collections::HashSet<String> =
|
||||||
|
table.fields.iter().map(|f| f.name.clone()).collect();
|
||||||
let existing_col_names: std::collections::HashSet<String> =
|
let existing_col_names: std::collections::HashSet<String> =
|
||||||
existing_columns.iter().map(|c| c.name.clone()).collect();
|
existing_columns.iter().map(|c| c.name.clone()).collect();
|
||||||
|
|
||||||
|
// Add missing columns
|
||||||
let mut missing_columns: Vec<&FieldDefinition> = Vec::new();
|
let mut missing_columns: Vec<&FieldDefinition> = Vec::new();
|
||||||
for field in &table.fields {
|
for field in &table.fields {
|
||||||
if !existing_col_names.contains(&field.name) {
|
if !existing_col_names.contains(&field.name) {
|
||||||
|
|
@ -61,40 +82,44 @@ pub fn sync_table_schema(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add missing columns
|
|
||||||
if !missing_columns.is_empty() {
|
if !missing_columns.is_empty() {
|
||||||
info!("Table {} is missing {} columns, adding them", table.name, missing_columns.len());
|
info!(
|
||||||
|
"Table {} is missing {} columns, adding them",
|
||||||
|
table.name,
|
||||||
|
missing_columns.len()
|
||||||
|
);
|
||||||
|
|
||||||
for field in &missing_columns {
|
for field in &missing_columns {
|
||||||
let sql_type = map_type_to_sql(field, "postgres");
|
let sql_type = map_type_to_sql(field, "postgres");
|
||||||
let column_sql = if field.is_nullable {
|
let column_sql = format!(
|
||||||
format!("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
|
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
|
||||||
sanitize_identifier(&table.name),
|
&table_name,
|
||||||
sanitize_identifier(&field.name),
|
sanitize_identifier(&field.name),
|
||||||
sql_type)
|
sql_type
|
||||||
} else {
|
);
|
||||||
// For NOT NULL columns, add as nullable first then set default
|
|
||||||
format!("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
|
|
||||||
sanitize_identifier(&table.name),
|
|
||||||
sanitize_identifier(&field.name),
|
|
||||||
sql_type)
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("Adding column: {}.{} ({})", table.name, field.name, sql_type);
|
info!(
|
||||||
|
"Adding column: {}.{} ({})",
|
||||||
|
table.name, field.name, sql_type
|
||||||
|
);
|
||||||
match sql_query(&column_sql).execute(conn) {
|
match sql_query(&column_sql).execute(conn) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
result.columns_added += 1;
|
result.columns_added += 1;
|
||||||
info!("Successfully added column {}.{}", table.name, field.name);
|
info!("Successfully added column {}.{}", table.name, field.name);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Check if column already exists (ignore error)
|
|
||||||
let err_str = e.to_string();
|
let err_str = e.to_string();
|
||||||
if !err_str.contains("already exists") && !err_str.contains("duplicate column") {
|
if !err_str.contains("already exists") && !err_str.contains("duplicate column")
|
||||||
let error_msg = format!("Failed to add column {}.{}: {}", table.name, field.name, e);
|
{
|
||||||
|
let error_msg =
|
||||||
|
format!("Failed to add column {}.{}: {}", table.name, field.name, e);
|
||||||
error!("{}", error_msg);
|
error!("{}", error_msg);
|
||||||
result.errors.push(error_msg);
|
result.errors.push(error_msg);
|
||||||
} else {
|
} else {
|
||||||
info!("Column {}.{} already exists, skipping", table.name, field.name);
|
info!(
|
||||||
|
"Column {}.{} already exists, skipping",
|
||||||
|
table.name, field.name
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -102,6 +127,193 @@ pub fn sync_table_schema(
|
||||||
result.tables_altered += 1;
|
result.tables_altered += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Drop columns that were removed from definition (with protection)
|
||||||
|
let mut orphaned_columns: Vec<&DbColumn> = Vec::new();
|
||||||
|
for col in existing_columns {
|
||||||
|
if !defined_col_names.contains(&col.name) && !PROTECTED_COLUMNS.contains(&col.name.as_str())
|
||||||
|
{
|
||||||
|
orphaned_columns.push(col);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !orphaned_columns.is_empty() {
|
||||||
|
warn!(
|
||||||
|
"Table {} has {} orphaned columns not in definition:",
|
||||||
|
table.name,
|
||||||
|
orphaned_columns.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
for col in &orphaned_columns {
|
||||||
|
info!("Dropping orphaned column: {}.{}", table.name, col.name);
|
||||||
|
let drop_sql = format!(
|
||||||
|
"ALTER TABLE {} DROP COLUMN IF EXISTS {}",
|
||||||
|
&table_name,
|
||||||
|
sanitize_identifier(&col.name)
|
||||||
|
);
|
||||||
|
|
||||||
|
match sql_query(&drop_sql).execute(conn) {
|
||||||
|
Ok(_) => {
|
||||||
|
result.columns_dropped += 1;
|
||||||
|
info!("Successfully dropped column {}.{}", table.name, col.name);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let error_msg =
|
||||||
|
format!("Failed to drop column {}.{}: {}", table.name, col.name, e);
|
||||||
|
error!("{}", error_msg);
|
||||||
|
result.errors.push(error_msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if result.columns_dropped > 0 {
|
||||||
|
result.tables_altered += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Column metadata from database
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DbColumn {
|
||||||
|
pub name: String,
|
||||||
|
pub data_type: String,
|
||||||
|
pub is_nullable: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Columns that should never be dropped automatically
|
||||||
|
const PROTECTED_COLUMNS: &[&str] = &[
|
||||||
|
"id",
|
||||||
|
"bot_id",
|
||||||
|
"org_id",
|
||||||
|
"user_id",
|
||||||
|
"created_at",
|
||||||
|
"updated_at",
|
||||||
|
"deleted_at",
|
||||||
|
"is_deleted",
|
||||||
|
"version",
|
||||||
|
"tenant_id",
|
||||||
|
];
|
||||||
|
|
||||||
|
/// Compare and sync table schema with definition
|
||||||
|
pub fn sync_table_schema(
|
||||||
|
table: &TableDefinition,
|
||||||
|
existing_columns: &[DbColumn],
|
||||||
|
create_sql: &str,
|
||||||
|
conn: &mut diesel::PgConnection,
|
||||||
|
) -> Result<MigrationResult, Box<dyn Error + Send + Sync>> {
|
||||||
|
let mut result = MigrationResult::default();
|
||||||
|
|
||||||
|
// If no columns exist, create the table
|
||||||
|
if existing_columns.is_empty() {
|
||||||
|
info!("Creating new table: {}", table.name);
|
||||||
|
sql_query(create_sql)
|
||||||
|
.execute(conn)
|
||||||
|
.map_err(|e| format!("Failed to create table {}: {}", table.name, e))?;
|
||||||
|
result.tables_created += 1;
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
let table_name = sanitize_identifier(&table.name);
|
||||||
|
let defined_col_names: std::collections::HashSet<String> =
|
||||||
|
table.fields.iter().map(|f| f.name.clone()).collect();
|
||||||
|
let existing_col_names: std::collections::HashSet<String> =
|
||||||
|
existing_columns.iter().map(|c| c.name.clone()).collect();
|
||||||
|
|
||||||
|
// Add missing columns
|
||||||
|
let mut missing_columns: Vec<&FieldDefinition> = Vec::new();
|
||||||
|
for field in &table.fields {
|
||||||
|
if !existing_col_names.contains(&field.name) {
|
||||||
|
missing_columns.push(field);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !missing_columns.is_empty() {
|
||||||
|
info!(
|
||||||
|
"Table {} is missing {} columns, adding them",
|
||||||
|
table.name,
|
||||||
|
missing_columns.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
for field in &missing_columns {
|
||||||
|
let sql_type = map_type_to_sql(field, "postgres");
|
||||||
|
let column_sql = format!(
|
||||||
|
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
|
||||||
|
&table_name,
|
||||||
|
sanitize_identifier(&field.name),
|
||||||
|
sql_type
|
||||||
|
);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Adding column: {}.{} ({})",
|
||||||
|
table.name, field.name, sql_type
|
||||||
|
);
|
||||||
|
match sql_query(&column_sql).execute(conn) {
|
||||||
|
Ok(_) => {
|
||||||
|
result.columns_added += 1;
|
||||||
|
info!("Successfully added column {}.{}", table.name, field.name);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let err_str = e.to_string();
|
||||||
|
if !err_str.contains("already exists") && !err_str.contains("duplicate column")
|
||||||
|
{
|
||||||
|
let error_msg =
|
||||||
|
format!("Failed to add column {}.{}: {}", table.name, field.name, e);
|
||||||
|
error!("{}", error_msg);
|
||||||
|
result.errors.push(error_msg);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"Column {}.{} already exists, skipping",
|
||||||
|
table.name, field.name
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.tables_altered += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop columns that were removed from definition (with protection)
|
||||||
|
let mut orphaned_columns: Vec<&DbColumn> = Vec::new();
|
||||||
|
for col in existing_columns {
|
||||||
|
if !defined_col_names.contains(&col.name) && !PROTECTED_COLUMNS.contains(&col.name.as_str())
|
||||||
|
{
|
||||||
|
orphaned_columns.push(col);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !orphaned_columns.is_empty() {
|
||||||
|
warn!(
|
||||||
|
"Table {} has {} orphaned columns not in definition:",
|
||||||
|
table.name,
|
||||||
|
orphaned_columns.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
for col in &orphaned_columns {
|
||||||
|
info!("Dropping orphaned column: {}.{}", table.name, col.name);
|
||||||
|
let drop_sql = format!(
|
||||||
|
"ALTER TABLE {} DROP COLUMN IF EXISTS {}",
|
||||||
|
&table_name,
|
||||||
|
sanitize_identifier(&col.name)
|
||||||
|
);
|
||||||
|
|
||||||
|
match sql_query(&drop_sql).execute(conn) {
|
||||||
|
Ok(_) => {
|
||||||
|
result.columns_dropped += 1;
|
||||||
|
info!("Successfully dropped column {}.{}", table.name, col.name);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let error_msg =
|
||||||
|
format!("Failed to drop column {}.{}: {}", table.name, col.name, e);
|
||||||
|
error!("{}", error_msg);
|
||||||
|
result.errors.push(error_msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if result.columns_dropped > 0 {
|
||||||
|
result.tables_altered += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -136,11 +348,14 @@ pub fn get_table_columns(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(rows.into_iter().map(|row| DbColumn {
|
Ok(rows
|
||||||
name: row.column_name,
|
.into_iter()
|
||||||
data_type: row.data_type,
|
.map(|row| DbColumn {
|
||||||
is_nullable: row.is_nullable == "YES",
|
name: row.column_name,
|
||||||
}).collect())
|
data_type: row.data_type,
|
||||||
|
is_nullable: row.is_nullable == "YES",
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process table definitions with schema sync for a specific bot
|
/// Process table definitions with schema sync for a specific bot
|
||||||
|
|
@ -152,7 +367,11 @@ pub fn sync_bot_tables(
|
||||||
let tables = parse_table_definition(source)?;
|
let tables = parse_table_definition(source)?;
|
||||||
let mut result = MigrationResult::default();
|
let mut result = MigrationResult::default();
|
||||||
|
|
||||||
info!("Processing {} table definitions with schema sync for bot {}", tables.len(), bot_id);
|
info!(
|
||||||
|
"Processing {} table definitions with schema sync for bot {}",
|
||||||
|
tables.len(),
|
||||||
|
bot_id
|
||||||
|
);
|
||||||
|
|
||||||
// Get bot's database connection
|
// Get bot's database connection
|
||||||
let pool = state.bot_database_manager.get_bot_pool(bot_id)?;
|
let pool = state.bot_database_manager.get_bot_pool(bot_id)?;
|
||||||
|
|
@ -177,6 +396,7 @@ pub fn sync_bot_tables(
|
||||||
result.tables_created += sync_result.tables_created;
|
result.tables_created += sync_result.tables_created;
|
||||||
result.tables_altered += sync_result.tables_altered;
|
result.tables_altered += sync_result.tables_altered;
|
||||||
result.columns_added += sync_result.columns_added;
|
result.columns_added += sync_result.columns_added;
|
||||||
|
result.columns_dropped += sync_result.columns_dropped;
|
||||||
result.errors.extend(sync_result.errors);
|
result.errors.extend(sync_result.errors);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -188,8 +408,8 @@ pub fn sync_bot_tables(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log summary
|
// Log summary
|
||||||
info!("Schema sync summary for bot {}: {} tables created, {} altered, {} columns added, {} errors",
|
info!("Schema sync summary for bot {}: {} tables created, {} altered, {} columns added, {} columns dropped, {} errors",
|
||||||
bot_id, result.tables_created, result.tables_altered, result.columns_added, result.errors.len());
|
bot_id, result.tables_created, result.tables_altered, result.columns_added, result.columns_dropped, result.errors.len());
|
||||||
|
|
||||||
if !result.errors.is_empty() {
|
if !result.errors.is_empty() {
|
||||||
warn!("Schema sync completed with {} errors:", result.errors.len());
|
warn!("Schema sync completed with {} errors:", result.errors.len());
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue