diff --git a/src/drive/drive_files.rs b/src/drive/drive_files.rs index 5ad0bb1c..405ba5f2 100644 --- a/src/drive/drive_files.rs +++ b/src/drive/drive_files.rs @@ -180,4 +180,176 @@ impl DriveFileRepository { .load(&mut conn) .unwrap_or_default() } + + pub fn delete_file(&self, bot_id: Uuid, file_path: &str) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + diesel::delete(drive_files::table) + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.eq(file_path)), + ) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + pub fn get_all_files_for_bot(&self, bot_id: Uuid) -> Vec { + let mut conn = match self.pool.get() { + Ok(c) => c, + Err(_) => return vec![], + }; + + drive_files::table + .filter(drive_files::bot_id.eq(bot_id)) + .load(&mut conn) + .unwrap_or_default() + } + + pub fn get_files_by_type(&self, bot_id: Uuid, file_type: &str) -> Vec { + let mut conn = match self.pool.get() { + Ok(c) => c, + Err(_) => return vec![], + }; + + drive_files::table + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_type.eq(file_type)), + ) + .load(&mut conn) + .unwrap_or_default() + } + + /// Check if a file exists for the given bot and path + pub fn has_file(&self, bot_id: Uuid, file_path: &str) -> bool { + self.get_file_state(bot_id, file_path).is_some() + } + + /// Upsert a file with full state (including indexed and fail_count) + pub fn upsert_file_full( + &self, + bot_id: Uuid, + file_path: &str, + file_type: &str, + etag: Option, + last_modified: Option>, + indexed: bool, + fail_count: i32, + last_failed_at: Option>, + ) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + let now = Utc::now(); + + diesel::insert_into(drive_files::table) + .values(( + drive_files::bot_id.eq(bot_id), + drive_files::file_path.eq(file_path), + drive_files::file_type.eq(file_type), + drive_files::etag.eq(&etag), + drive_files::last_modified.eq(last_modified), + drive_files::indexed.eq(indexed), + drive_files::fail_count.eq(fail_count), + drive_files::last_failed_at.eq(last_failed_at), + drive_files::created_at.eq(now), + drive_files::updated_at.eq(now), + )) + .on_conflict((drive_files::bot_id, drive_files::file_path)) + .do_update() + .set(( + drive_files::etag.eq(&etag), + drive_files::last_modified.eq(last_modified), + drive_files::indexed.eq(indexed), + drive_files::fail_count.eq(fail_count), + drive_files::last_failed_at.eq(last_failed_at), + drive_files::updated_at.eq(now), + )) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + /// Mark all files matching a path pattern as indexed (for KB folder indexing) + pub fn mark_indexed_by_pattern(&self, bot_id: Uuid, pattern: &str) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + diesel::update(drive_files::table) + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.like(format!("%{pattern}%"))), + ) + .set(( + drive_files::indexed.eq(true), + drive_files::fail_count.eq(0), + drive_files::last_failed_at.eq(None::>), + drive_files::updated_at.eq(Utc::now()), + )) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + /// Mark all files matching a path pattern as failed (increment fail_count) + pub fn mark_failed_by_pattern(&self, bot_id: Uuid, pattern: &str) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + diesel::update(drive_files::table) + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.like(format!("%{pattern}%"))), + ) + .set(( + drive_files::fail_count.eq(sql("fail_count + 1")), + drive_files::last_failed_at.eq(Some(Utc::now())), + drive_files::updated_at.eq(Utc::now()), + )) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + /// Get all files for a bot whose path starts with the given prefix + pub fn get_files_by_prefix(&self, bot_id: Uuid, prefix: &str) -> Vec { + let mut conn = match self.pool.get() { + Ok(c) => c, + Err(_) => return vec![], + }; + + drive_files::table + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.like(format!("{prefix}%"))), + ) + .load(&mut conn) + .unwrap_or_default() + } + + /// Delete all files for a bot whose path starts with the given prefix + pub fn delete_by_prefix(&self, bot_id: Uuid, prefix: &str) -> Result { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + diesel::delete(drive_files::table) + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.like(format!("{prefix}%"))), + ) + .execute(&mut conn) + .map_err(|e| e.to_string()) + } + + /// Check if any files exist with the given prefix + pub fn has_files_with_prefix(&self, bot_id: Uuid, prefix: &str) -> bool { + !self.get_files_by_prefix(bot_id, prefix).is_empty() + } } diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 30903396..36b4129f 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -20,10 +20,7 @@ use std::sync::Arc; #[cfg(any(feature = "research", feature = "llm"))] use tokio::sync::RwLock as TokioRwLock; use tokio::time::Duration; -use serde::{Deserialize, Serialize}; -use tokio::fs as tokio_fs; -#[cfg(any(feature = "research", feature = "llm"))] use crate::drive::drive_files::DriveFileRepository; #[cfg(any(feature = "research", feature = "llm"))] @@ -42,30 +39,16 @@ pub fn is_llm_streaming() -> bool { const MAX_BACKOFF_SECS: u64 = 300; const INITIAL_BACKOFF_SECS: u64 = 30; const RETRY_BACKOFF_SECS: i64 = 3600; -const MAX_FAIL_COUNT: u32 = 3; +const MAX_FAIL_COUNT: i32 = 3; fn normalize_etag(etag: &str) -> String { etag.trim_matches('"').to_string() } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct FileState { - pub etag: String, - #[serde(default)] - pub indexed: bool, - #[serde(default)] - pub last_failed_at: Option>, - #[serde(default)] - pub fail_count: u32, - #[serde(default)] - pub last_modified: Option, -} - #[derive(Debug, Clone)] pub struct DriveMonitor { state: Arc, bucket_name: String, - file_states: Arc>>, bot_id: uuid::Uuid, #[cfg(any(feature = "research", feature = "llm"))] kb_manager: Arc, @@ -81,7 +64,7 @@ pub struct DriveMonitor { kb_indexed_folders: Arc>>, #[cfg(not(any(feature = "research", feature = "llm")))] _pending_kb_index: Arc>>, - // Database-backed file state repository + // Database-backed file state repository (replaces JSON file_states) file_repo: Arc, } impl DriveMonitor { @@ -94,7 +77,7 @@ impl DriveMonitor { } } - pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Self { +pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Self { let work_root = PathBuf::from(crate::core::shared::utils::get_work_path()); #[cfg(any(feature = "research", feature = "llm"))] let kb_manager = Arc::new(KnowledgeBaseManager::with_bot_config(work_root.clone(), state.conn.clone(), bot_id)); @@ -105,7 +88,6 @@ impl DriveMonitor { Self { state, bucket_name, - file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), bot_id, #[cfg(any(feature = "research", feature = "llm"))] kb_manager, @@ -125,98 +107,6 @@ impl DriveMonitor { } } - /// Get the path to the file states JSON file for this bot - fn file_state_path(&self) -> PathBuf { - self.work_root - .join(&self.bucket_name) - .join("file_states.json") - } - - /// Load file states from disk to avoid reprocessing unchanged files - async fn load_file_states(&self) -> Result<(), Box> { - let path = self.file_state_path(); - if path.exists() { - match tokio_fs::read_to_string(&path).await { - Ok(content) => { - match serde_json::from_str::>(&content) { - Ok(states) => { - let mut file_states = self.file_states.write().await; - let count = states.len(); - *file_states = states; - info!( - "Loaded {} file states from disk for bot {}", - count, - self.bot_id - ); - } - Err(e) => { - warn!( - "Failed to parse file states from {}: {}. Starting with empty state.", - path.display(), - e - ); - } - } - } - Err(e) => { - warn!( - "Failed to read file states from {}: {}. Starting with empty state.", - path.display(), - e - ); - } - } - } - Ok(()) - } - - /// Static helper to save file states (used by background tasks) - async fn save_file_states_static( - file_states: &Arc>>, - work_root: &PathBuf, - bucket_name: &str, - ) -> Result<(), Box> { - let path = work_root - .join(bucket_name) - .join("file_states.json"); - - if let Some(parent) = path.parent() { - if let Err(e) = tokio_fs::create_dir_all(parent).await { - warn!( - "Failed to create directory for file states: {} - {}", - parent.display(), - e - ); - } - } - - let states = file_states.read().await; - match serde_json::to_string_pretty(&*states) { - Ok(content) => { - if let Err(e) = tokio_fs::write(&path, content).await { - warn!( - "Failed to save file states to {}: {}", - path.display(), - e - ); - } else { - debug!( - "Saved {} file states to disk for bucket {}", - states.len(), - bucket_name - ); - } - } - Err(e) => { - warn!( - "Failed to serialize file states: {}", - e - ); - } - } - Ok(()) - } - async fn check_drive_health(&self) -> bool { let Some(client) = &self.state.drive else { return false; @@ -261,7 +151,7 @@ impl DriveMonitor { Arc::clone(&self.pending_kb_index), Arc::clone(&self.files_being_indexed), Arc::clone(&self.kb_indexed_folders), - Arc::clone(&self.file_states), + Arc::clone(&self.file_repo), Arc::clone(&self.is_processing), ); } @@ -275,7 +165,7 @@ impl DriveMonitor { pending_kb_index: Arc>>, files_being_indexed: Arc>>, kb_indexed_folders: Arc>>, - file_states: Arc>>, + file_repo: Arc, is_processing: Arc, ) { tokio::spawn(async move { @@ -348,34 +238,23 @@ match result { let mut indexed = kb_indexed_folders.write().await; indexed.insert(kb_key.clone()); } - let mut states = file_states.write().await; - for (path, state) in states.iter_mut() { - if path.contains(&format!("{}/", kb_folder_name)) { - state.indexed = true; - state.fail_count = 0; - state.last_failed_at = None; - } + let pattern = format!("{}/", kb_folder_name); + if let Err(e) = file_repo.mark_indexed_by_pattern(bot_id, &pattern) { + warn!("Failed to mark files indexed for {}: {}", kb_key, e); } } Ok(Err(e)) => { warn!("Failed to index KB {}: {}", kb_key, e); - // Update fail count - let mut states = file_states.write().await; - for (path, state) in states.iter_mut() { - if path.contains(&format!("{}/", kb_folder_name)) { - state.fail_count = state.fail_count.saturating_add(1); - state.last_failed_at = Some(chrono::Utc::now()); - } + let pattern = format!("{}/", kb_folder_name); + if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) { + warn!("Failed to mark files failed for {}: {}", kb_key, e); } } Err(_) => { error!("KB indexing timed out after 120s for {}", kb_key); - let mut states = file_states.write().await; - for (path, state) in states.iter_mut() { - if path.contains(&format!("{}/", kb_folder_name)) { - state.fail_count = state.fail_count.saturating_add(1); - state.last_failed_at = Some(chrono::Utc::now()); - } + let pattern = format!("{}/", kb_folder_name); + if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) { + warn!("Failed to mark files failed for {}: {}", kb_key, e); } } } @@ -404,13 +283,7 @@ match result { return Ok(()); } - // Load file states from disk to avoid reprocessing unchanged files - if let Err(e) = self.load_file_states().await { - warn!( - "Failed to load file states for bot {}: {}", - self.bot_id, e - ); - } + // File states are now loaded from DB on demand - no need to load from disk if !self.check_drive_health().await { warn!( @@ -466,11 +339,7 @@ match result { // Smart sleep based on fail_count - prevent excessive retries { - let states = self_clone.file_states.read().await; - let max_fail_count = states.values() - .map(|s| s.fail_count) - .max() - .unwrap_or(0); + let max_fail_count = self_clone.file_repo.get_max_fail_count(self_clone.bot_id); let base_sleep = if max_fail_count >= 3 { 3600 @@ -486,8 +355,7 @@ match result { debug!("Sleep {}s based on fail_count={}", base_sleep, max_fail_count); } - drop(states); - tokio::time::sleep(Duration::from_secs(base_sleep)).await; + tokio::time::sleep(Duration::from_secs(base_sleep as u64)).await; } // Skip drive health check - just proceed with monitoring @@ -553,7 +421,6 @@ match result { self.is_processing .store(false, std::sync::atomic::Ordering::SeqCst); - self.file_states.write().await.clear(); self.consecutive_failures.store(0, Ordering::Relaxed); Ok(()) @@ -698,14 +565,11 @@ match result { if path.ends_with('/') || !path.to_ascii_lowercase().ends_with(".bas") { continue; } -let file_state = FileState { -etag: normalize_etag(obj.e_tag().unwrap_or_default()), - indexed: false, - last_failed_at: None, - fail_count: 0, - last_modified: obj.last_modified().map(|dt| dt.to_string()), - }; - current_files.insert(path, file_state); + let etag = normalize_etag(obj.e_tag().unwrap_or_default()); + let last_modified = obj.last_modified().and_then(|dt| { + DateTime::parse_from_rfc3339(&dt.to_string()).ok().map(|d| d.with_timezone(&Utc)) + }); + current_files.insert(path, (etag, last_modified)); } if !list_objects.is_truncated.unwrap_or(false) { break; @@ -713,14 +577,12 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), continuation_token = list_objects.next_continuation_token; } // First pass: identify which files need compilation - // We must do this BEFORE acquiring the write lock to avoid deadlock let files_to_compile: Vec = { - let file_states = self.file_states.read().await; current_files .iter() - .filter_map(|(path, current_state)| { - if let Some(previous_state) = file_states.get(path) { - if current_state.etag != previous_state.etag { + .filter_map(|(path, (current_etag, _))| { + if let Some(prev) = self.file_repo.get_file_state(self.bot_id, path) { + if prev.etag.as_deref() != Some(current_etag.as_str()) { Some(path.clone()) } else { None @@ -746,16 +608,10 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), } } - // Now acquire write lock to merge current_files into file_states - let mut file_states = self.file_states.write().await; - // Remove files that no longer exist (deleted from MinIO) - let previous_paths: Vec = file_states - .keys() - .cloned() - .collect(); - for path in previous_paths { - if !current_files.contains_key(&path) { + let previous_files = self.file_repo.get_files_by_type(self.bot_id, "gbdialog"); + for prev_file in &previous_files { + if !current_files.contains_key(&prev_file.file_path) { // Delete the compiled .ast file from disk let bot_name = self .bucket_name @@ -763,7 +619,7 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), .unwrap_or(&self.bucket_name); let ast_path = self.work_root .join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)) - .join(PathBuf::from(&path).file_name().unwrap_or_default().to_str().unwrap_or("")) + .join(PathBuf::from(&prev_file.file_path).file_name().unwrap_or_default().to_str().unwrap_or("")) .with_extension("ast"); if ast_path.exists() { @@ -785,49 +641,53 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), } } - file_states.remove(&path); + if let Err(e) = self.file_repo.delete_file(self.bot_id, &prev_file.file_path) { + warn!("Failed to delete file state for {}: {}", prev_file.file_path, e); + } } } - // Merge current_files into file_states + // Merge current_files into DB via file_repo // For each file in current_files: // - If compilation succeeded: set indexed=true // - If compilation failed: preserve previous indexed status, increment fail_count // - If unchanged: preserve existing state (including indexed status) // - If new and not compiled: add with default state (indexed=false) - for (path, mut new_state) in current_files { - if successful_compilations.contains(&path) { + for (path, (etag, last_modified)) in ¤t_files { + if successful_compilations.contains(path) { // Compilation succeeded - mark as indexed - new_state.indexed = true; - new_state.fail_count = 0; - new_state.last_failed_at = None; - } else if let Some(prev_state) = file_states.get(&path) { - if prev_state.etag == new_state.etag { - // File unchanged - preserve all previous state - new_state.indexed = prev_state.indexed; - new_state.fail_count = prev_state.fail_count; - new_state.last_failed_at = prev_state.last_failed_at; + if let Err(e) = self.file_repo.upsert_file_full( + self.bot_id, path, "gbdialog", + Some(etag.clone()), *last_modified, + true, 0, None, + ) { + warn!("Failed to upsert file {}: {}", path, e); + } + } else if let Some(prev) = self.file_repo.get_file_state(self.bot_id, path) { + let etag_unchanged = prev.etag.as_deref() == Some(etag.as_str()); + if etag_unchanged { + // File unchanged - preserve all previous state (already in DB) } else { - // File changed but compilation failed - preserve previous state - // Keep previous indexed status, increment fail_count - new_state.indexed = prev_state.indexed; - new_state.fail_count = prev_state.fail_count + 1; - new_state.last_failed_at = Some(chrono::Utc::now()); + // File changed but compilation failed - increment fail_count + if let Err(e) = self.file_repo.upsert_file_full( + self.bot_id, path, "gbdialog", + Some(etag.clone()), *last_modified, + prev.indexed, prev.fail_count + 1, Some(Utc::now()), + ) { + warn!("Failed to upsert file {}: {}", path, e); + } + } + } else { + // New file where compilation failed: indexed=false + if let Err(e) = self.file_repo.upsert_file_full( + self.bot_id, path, "gbdialog", + Some(etag.clone()), *last_modified, + false, 0, None, + ) { + warn!("Failed to upsert file {}: {}", path, e); } } - // For new files where compilation failed: indexed remains false - file_states.insert(path, new_state); } - // Save file states to disk in background to avoid blocking - // Use static helper to avoid double Arc (fixes "dispatch failure" error) - let file_states_clone = Arc::clone(&self.file_states); - let work_root_clone = self.work_root.clone(); - let bucket_name_clone = self.bucket_name.clone(); - tokio::spawn(async move { - if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await { - warn!("Failed to save file states: {}", e); - } - }); Ok(()) } async fn check_gbot(&self, client: &Client) -> Result<(), Box> { @@ -885,12 +745,9 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), // Check etag to avoid re-downloading unchanged prompt files let etag = normalize_etag(obj.e_tag().unwrap_or_default()); let prompt_state_key = format!("__prompt__{}", path); - let should_download = { - let states = self.file_states.read().await; - match states.get(&prompt_state_key) { - Some(prev) => prev.etag != etag, - None => true, - } + let should_download = match self.file_repo.get_file_state(self.bot_id, &prompt_state_key) { + Some(prev) => prev.etag.as_deref() != Some(&etag), + None => true, }; if should_download { match client.get_object().bucket(&self.bucket_name).key(&path).send().await { @@ -922,17 +779,12 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), log::error!("Failed to download prompt file {}: {}", path, e); } } - let mut states = self.file_states.write().await; - states.insert(prompt_state_key, FileState { etag, indexed: false, last_failed_at: None, fail_count: 0, last_modified: None }); - drop(states); - let file_states_clone = Arc::clone(&self.file_states); - let work_root_clone = self.work_root.clone(); - let bucket_name_clone = self.bucket_name.clone(); - tokio::spawn(async move { - if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await { - warn!("Failed to save file states after prompt update: {}", e); - } - }); + if let Err(e) = self.file_repo.upsert_file_full( + self.bot_id, &prompt_state_key, "gbot-prompt", + Some(etag), None, false, 0, None, + ) { + warn!("Failed to save prompt file state: {}", e); + } } else { trace!("Prompt file {} unchanged (etag match), skipping download", path); } @@ -943,20 +795,17 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), let etag = normalize_etag(obj.e_tag().unwrap_or_default()); let last_modified = obj.last_modified().map(|dt| dt.to_string()); let config_state_key = format!("__config__{}", path); - let should_sync = { - let states = self.file_states.read().await; - match states.get(&config_state_key) { - Some(prev) => { - let etag_changed = prev.etag != etag; - let mod_changed = match (&prev.last_modified, &last_modified) { - (Some(prev_dt), Some(new_dt)) => prev_dt != new_dt, - (None, Some(_)) => true, - _ => false, - }; - etag_changed || mod_changed - } - None => true, + let should_sync = match self.file_repo.get_file_state(self.bot_id, &config_state_key) { + Some(prev) => { + let etag_changed = prev.etag.as_deref() != Some(&etag); + let mod_changed = match (&prev.last_modified, &last_modified) { + (Some(prev_dt), Some(new_dt)) => prev_dt.to_string() != new_dt.to_string(), + (None, Some(_)) => true, + _ => false, + }; + etag_changed || mod_changed } + None => true, }; debug!("check_gbot: config.csv should_sync={} (etag={}, last_modified={:?})", should_sync, etag, last_modified); if should_sync { @@ -1090,18 +939,16 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), self.broadcast_theme_change(&csv_content).await?; } - // Update file_states with config.csv ETag and last_modified - let mut states = self.file_states.write().await; - states.insert(config_state_key, FileState { etag, indexed: false, last_failed_at: None, fail_count: 0, last_modified }); - drop(states); - let file_states_clone = Arc::clone(&self.file_states); - let work_root_clone = self.work_root.clone(); - let bucket_name_clone = self.bucket_name.clone(); - tokio::spawn(async move { - if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await { - warn!("Failed to save file states after config update: {}", e); - } + // Update file state in DB for config.csv + let last_mod_dt = last_modified.as_ref().and_then(|s| { + DateTime::parse_from_rfc3339(s).ok().map(|d| d.with_timezone(&Utc)) }); + if let Err(e) = self.file_repo.upsert_file_full( + self.bot_id, &config_state_key, "gbot-config", + Some(etag), last_mod_dt, false, 0, None, + ) { + warn!("Failed to save config file state: {}", e); + } // Check for system-prompt-file and download it let prompt_file_line = csv_content @@ -1525,14 +1372,11 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), continue; } -let file_state = FileState { - etag: normalize_etag(obj.e_tag().unwrap_or_default()), - indexed: false, - last_failed_at: None, - fail_count: 0, - last_modified: obj.last_modified().map(|dt| dt.to_string()), - }; - current_files.insert(path.clone(), file_state); + let etag = normalize_etag(obj.e_tag().unwrap_or_default()); + let last_modified = obj.last_modified().and_then(|dt| { + DateTime::parse_from_rfc3339(&dt.to_string()).ok().map(|d| d.with_timezone(&Utc)) + }); + current_files.insert(path.clone(), (etag, last_modified)); } if !list_objects.is_truncated.unwrap_or(false) { @@ -1566,8 +1410,6 @@ let file_state = FileState { } } - let mut file_states = self.file_states.write().await; - // Build set of already-indexed KB folder names for quick lookup let indexed_kb_names: HashSet = { let indexed = self.kb_indexed_folders.read().await; @@ -1577,29 +1419,30 @@ let file_state = FileState { .collect() }; - for (path, current_state) in current_files.iter() { - let is_new = !file_states.contains_key(path); + for (path, (_, current_last_modified)) in current_files.iter() { + let prev_state = self.file_repo.get_file_state(self.bot_id, path); + let is_new = prev_state.is_none(); // Skip files from already-indexed KB folders that are not new - // This prevents re-download loop when file_states fails to load + // This prevents re-download loop when DB is loaded fresh let kb_name_from_path = path.split('/').nth(1).map(|s| s.to_string()); if all_indexed && !is_new { trace!("Skipping already indexed file: {}", path); continue; } - // Extra safety: if file_states is empty but KB is indexed, skip non-new files - if file_states.is_empty() && all_indexed { + // Extra safety: if the KB folder is indexed, skip non-new files + if all_indexed { if let Some(kb) = &kb_name_from_path { if indexed_kb_names.contains(kb) { - trace!("Skipping file from indexed KB (empty file_states): {}", path); + trace!("Skipping file from indexed KB: {}", path); continue; } } } // Use only last_modified for change detection - more reliable than ETag - let is_modified = if let Some(prev) = file_states.get(path) { - prev.last_modified != current_state.last_modified + let is_modified = if let Some(prev) = &prev_state { + prev.last_modified != *current_last_modified } else { false }; @@ -1626,10 +1469,10 @@ let file_state = FileState { } } } - if let Some(prev_state) = file_states.get(path) { - if prev_state.fail_count >= MAX_FAIL_COUNT { + if let Some(prev) = &prev_state { + if prev.fail_count >= MAX_FAIL_COUNT { let elapsed = Utc::now() - .signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now())); + .signed_duration_since(prev.last_failed_at.unwrap_or(Utc::now())); if elapsed.num_seconds() < RETRY_BACKOFF_SECS { continue; } @@ -1703,10 +1546,11 @@ let file_state = FileState { } } - let paths_to_remove: Vec = file_states - .keys() - .filter(|path| path.starts_with(&gbkb_prefix) && !current_files.contains_key(*path)) - .cloned() + // Find files deleted from MinIO + let previous_gbkb = self.file_repo.get_files_by_prefix(self.bot_id, &gbkb_prefix); + let paths_to_remove: Vec = previous_gbkb.iter() + .filter(|f| !current_files.contains_key(&f.file_path)) + .map(|f| f.file_path.clone()) .collect(); if files_processed > 0 { @@ -1715,36 +1559,34 @@ let file_state = FileState { files_processed, pdf_files_found ); } - for (path, mut state) in current_files { - // Preserve indexed status and fail history when file hasn't actually changed - // Use last_modified as the primary check (more stable than ETag) - if let Some(previous) = file_states.get(&path) { - let content_unchanged = previous.last_modified == state.last_modified - || (previous.etag == state.etag && previous.last_modified.is_some() && state.last_modified.is_some()); - + // Persist each current file to the DB, preserving state when unchanged + for (path, (etag, last_modified)) in ¤t_files { + if let Some(previous) = self.file_repo.get_file_state(self.bot_id, path) { + let content_unchanged = previous.last_modified == *last_modified + || (previous.etag.as_deref() == Some(etag.as_str()) + && previous.last_modified.is_some() + && last_modified.is_some()); + if content_unchanged { - state.indexed = previous.indexed; - state.fail_count = previous.fail_count; - state.last_failed_at = previous.last_failed_at; + // Unchanged - leave existing DB row as-is + continue; } } - file_states.insert(path, state); - } - - // Save file states to disk in background to avoid blocking - // Use static helper to avoid double Arc (fixes "dispatch failure" error) - let file_states_clone = Arc::clone(&self.file_states); - let work_root_clone = self.work_root.clone(); - let bucket_name_clone = self.bucket_name.clone(); - tokio::spawn(async move { - if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await { - warn!("Failed to save file states: {}", e); + // New or changed file — upsert with default state (indexed=false) + if let Err(e) = self.file_repo.upsert_file_full( + self.bot_id, path, "gbkb", + Some(etag.clone()), *last_modified, + false, 0, None, + ) { + warn!("Failed to upsert gbkb file {}: {}", path, e); } - }); + } for path in paths_to_remove { trace!("Detected deletion in .gbkb: {}", path); - file_states.remove(&path); + if let Err(e) = self.file_repo.delete_file(self.bot_id, &path) { + warn!("Failed to delete gbkb file state {}: {}", path, e); + } // Delete the downloaded file from disk let bot_name = self @@ -1766,7 +1608,7 @@ let file_state = FileState { let kb_name = path_parts[1]; let kb_prefix = format!("{}{}/", gbkb_prefix, kb_name); - if !file_states.keys().any(|k| k.starts_with(&kb_prefix)) { + if !self.file_repo.has_files_with_prefix(self.bot_id, &kb_prefix) { #[cfg(any(feature = "research", feature = "llm"))] { if let Err(e) = self.kb_manager.clear_kb(self.bot_id, bot_name, kb_name).await {