diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index ac52283a..99b02d93 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -66,6 +66,12 @@ pub struct DriveMonitor { _pending_kb_index: Arc>>, // Database-backed file state repository (replaces JSON file_states) file_repo: Arc, + // Queue for pending file changes (prevents reentrant processing) - reserved for future use + #[allow(dead_code)] + pending_changes: Arc>>, + // Last processed etag snapshot for quick diff - reserved for future use + #[allow(dead_code)] + last_etag_snapshot: Arc>>, } impl DriveMonitor { fn normalize_config_value(value: &str) -> String { @@ -104,6 +110,8 @@ pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Sel #[cfg(not(any(feature = "research", feature = "llm")))] _pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())), file_repo, + pending_changes: Arc::new(TokioRwLock::new(Vec::new())), + last_etag_snapshot: Arc::new(TokioRwLock::new(HashMap::new())), } } @@ -427,26 +435,22 @@ match result { } pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - trace!( + info!( "Drive Monitor service started for bucket: {}", self.bucket_name ); loop { - let backoff = self.calculate_backoff(); - tokio::time::sleep(backoff).await; + tokio::time::sleep(Duration::from_secs(1)).await; if self.is_processing.load(Ordering::Acquire) { - log::warn!( - "Drive monitor is still processing previous changes, skipping this tick" - ); continue; } if !self.check_drive_health().await { let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1; if failures % 10 == 1 { - warn!("S3/MinIO unavailable for bucket {} (failures: {}), backing off to {:?}", - self.bucket_name, failures, self.calculate_backoff()); + warn!("S3/MinIO unavailable for bucket {} (failures: {})", + self.bucket_name, failures); } continue; } @@ -457,13 +461,13 @@ match result { Ok(_) => { let prev_failures = self.consecutive_failures.swap(0, Ordering::Relaxed); if prev_failures > 0 { - trace!("S3/MinIO recovered for bucket {} after {} failures", + info!("S3/MinIO recovered for bucket {} after {} failures", self.bucket_name, prev_failures); } } Err(e) => { self.consecutive_failures.fetch_add(1, Ordering::Relaxed); - log::error!("Error checking for drive changes: {}", e); + error!("Error checking for drive changes: {}", e); } } @@ -472,62 +476,22 @@ match result { }) } async fn check_for_changes(&self) -> Result<(), Box> { - trace!("check_for_changes ENTER"); - let start_mem = MemoryStats::current(); - trace!( - "check_for_changes START, RSS={}", - MemoryStats::format_bytes(start_mem.rss_bytes) - ); - let Some(client) = &self.state.drive else { warn!("No drive client available for bot {}, skipping file monitoring", self.bot_id); return Ok(()); }; - trace!("check_for_changes: calling check_gbdialog_changes..."); - trace!("Checking gbdialog..."); - self.check_gbdialog_changes(client).await?; - trace!("check_for_changes: check_gbdialog_changes done"); - let after_dialog = MemoryStats::current(); - trace!( - "After gbdialog, RSS={} (delta={})", - MemoryStats::format_bytes(after_dialog.rss_bytes), - MemoryStats::format_bytes(after_dialog.rss_bytes.saturating_sub(start_mem.rss_bytes)) - ); - - trace!("check_for_changes: calling check_gbot..."); - trace!("Checking gbot..."); - self.check_gbot(client).await?; - trace!("check_for_changes: check_gbot done"); - let after_gbot = MemoryStats::current(); - trace!( - "After gbot, RSS={} (delta={})", - MemoryStats::format_bytes(after_gbot.rss_bytes), - MemoryStats::format_bytes(after_gbot.rss_bytes.saturating_sub(after_dialog.rss_bytes)) - ); - - trace!("check_for_changes: calling check_gbkb_changes..."); - trace!("Checking gbkb..."); - self.check_gbkb_changes(client).await?; - trace!("check_for_changes: check_gbkb_changes done"); - let after_gbkb = MemoryStats::current(); - trace!( - "After gbkb, RSS={} (delta={})", - MemoryStats::format_bytes(after_gbkb.rss_bytes), - MemoryStats::format_bytes(after_gbkb.rss_bytes.saturating_sub(after_gbot.rss_bytes)) - ); - - log_jemalloc_stats(); - - let total_delta = after_gbkb.rss_bytes.saturating_sub(start_mem.rss_bytes); - if total_delta > 50 * 1024 * 1024 { - warn!( - "check_for_changes grew by {} - potential leak!", - MemoryStats::format_bytes(total_delta) - ); + // All checks run independently - one failure doesn't stop others + if let Err(e) = self.check_gbdialog_changes(client).await { + error!("gbdialog check failed: {}", e); + } + if let Err(e) = self.check_gbot(client).await { + error!("gbot check failed: {}", e); + } + if let Err(e) = self.check_gbkb_changes(client).await { + error!("gbkb check failed: {}", e); } - trace!("check_for_changes EXIT"); Ok(()) } async fn check_gbdialog_changes( @@ -557,12 +521,8 @@ match result { }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); - let path_parts: Vec<&str> = path.split('/').collect(); - // Filter for paths matching *.gbdialog/*.bas pattern - if path_parts.len() < 2 || !path_parts[0].ends_with(".gbdialog") { - continue; - } - if path.ends_with('/') || !path.to_ascii_lowercase().ends_with(".bas") { + // Must end with .bas and contain .gbdialog anywhere (not just at start) + if !path.to_ascii_lowercase().ends_with(".bas") || !path.to_lowercase().contains(".gbdialog") { continue; } let etag = normalize_etag(obj.e_tag().unwrap_or_default());