diff --git a/botserver/src/drive/drive_compiler.rs b/botserver/src/drive/drive_compiler.rs index 9b1fa9cf..e6c3500d 100644 --- a/botserver/src/drive/drive_compiler.rs +++ b/botserver/src/drive/drive_compiler.rs @@ -13,6 +13,7 @@ use crate::core::config::DriveConfig; use crate::core::shared::state::AppState; use crate::core::shared::utils::get_work_path; use crate::drive::drive_files::drive_files as drive_files_table; +use crate::drive::drive_monitor::monitor::CHECK_INTERVAL_SECS; use diesel::prelude::*; use log::{debug, error, info, warn}; use std::collections::HashMap; @@ -74,9 +75,9 @@ impl DriveCompiler { let compiler = self.clone(); - // Loop que verifica drive_files a cada 30s + // Loop que verifica drive_files a cada 1s tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); + let mut interval = tokio::time::interval(Duration::from_secs(CHECK_INTERVAL_SECS)); while compiler.is_processing.load(Ordering::SeqCst) { interval.tick().await; diff --git a/botserver/src/drive/drive_monitor/monitor.rs b/botserver/src/drive/drive_monitor/monitor.rs index 7e4b1351..bca34d34 100644 --- a/botserver/src/drive/drive_monitor/monitor.rs +++ b/botserver/src/drive/drive_monitor/monitor.rs @@ -3,13 +3,16 @@ use std::time::Duration; use super::types::DriveMonitor; +/// Intervalo de verificação do DriveMonitor e DriveCompiler (em segundos) +pub const CHECK_INTERVAL_SECS: u64 = 1; + impl DriveMonitor { pub fn calculate_backoff(&self) -> Duration { let failures = self.consecutive_failures.load(Ordering::Relaxed); if failures == 0 { - return Duration::from_secs(30); + return Duration::from_secs(CHECK_INTERVAL_SECS); } - let backoff_secs = 30u64 * (1u64 << failures.min(4)); + let backoff_secs = CHECK_INTERVAL_SECS * (1u64 << failures.min(4)); Duration::from_secs(backoff_secs.min(300)) } } diff --git a/botserver/src/drive/drive_monitor/types.rs b/botserver/src/drive/drive_monitor/types.rs index cd63f901..9c875880 100644 --- a/botserver/src/drive/drive_monitor/types.rs +++ b/botserver/src/drive/drive_monitor/types.rs @@ -1,9 +1,10 @@ use crate::core::shared::state::AppState; use crate::drive::drive_files::DriveFileRepository; +use crate::drive::drive_monitor::monitor::CHECK_INTERVAL_SECS; #[cfg(any(feature = "research", feature = "llm"))] use crate::core::kb::KnowledgeBaseManager; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, AtomicU32}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; pub fn normalize_etag(etag: &str) -> String { @@ -15,15 +16,23 @@ impl DriveMonitor { log::info!("DriveMonitor monitoring started for bucket: {}", self.bucket_name); loop { - if let Err(e) = self.scan_bucket().await { - log::error!("Failed to scan bucket {}: {}", self.bucket_name, e); + // Reentrancy protection: skip if previous scan is still running + if self.is_processing.load(Ordering::Relaxed) { + log::debug!("DriveMonitor still processing, skipping iteration"); + } else { + self.is_processing.store(true, Ordering::Relaxed); + if let Err(e) = self.scan_bucket().await { + log::error!("Failed to scan bucket {}: {}", self.bucket_name, e); + } + self.is_processing.store(false, Ordering::Relaxed); } - tokio::time::sleep(std::time::Duration::from_secs(60)).await; + tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)).await; } } async fn scan_bucket(&self) -> Result<(), Box> { - log::info!("Scanning bucket {} for files", self.bucket_name); + log::info!("DriveMonitor: Starting scan of bucket {}", self.bucket_name); + let start = std::time::Instant::now(); if let Some(s3) = &self.state.drive { match s3.list_objects_with_metadata(&self.bucket_name, None).await { @@ -92,16 +101,18 @@ impl DriveMonitor { } } - self.handle_deleted_files(bot_name, ¤t_keys); - } - Err(e) => { - log::error!("Failed to list objects in {}: {}", self.bucket_name, e); - } - } + self.handle_deleted_files(bot_name, ¤t_keys); + } + Err(e) => { + log::error!("Failed to list objects in {}: {}", self.bucket_name, e); + } + } } else { log::warn!("S3 client not available for bucket scan"); } + let elapsed = start.elapsed(); + log::info!("DriveMonitor: Completed scan of {} in {:.2?}", self.bucket_name, elapsed); Ok(()) }