feat: DriveMonitor com intervalo de 1s e protecao de reentrancia
- CHECK_INTERVAL_SECS: constante compartilhada (1 segundo) - Protecao contra reentrancia usando is_processing - Logging de tempo de scan para debugging - DriveCompiler agora usa mesma constante - Ideal para PDFs longos e .bas grandes
This commit is contained in:
parent
6bf879a78a
commit
a86238b132
3 changed files with 30 additions and 15 deletions
|
|
@ -13,6 +13,7 @@ use crate::core::config::DriveConfig;
|
|||
use crate::core::shared::state::AppState;
|
||||
use crate::core::shared::utils::get_work_path;
|
||||
use crate::drive::drive_files::drive_files as drive_files_table;
|
||||
use crate::drive::drive_monitor::monitor::CHECK_INTERVAL_SECS;
|
||||
use diesel::prelude::*;
|
||||
use log::{debug, error, info, warn};
|
||||
use std::collections::HashMap;
|
||||
|
|
@ -74,9 +75,9 @@ impl DriveCompiler {
|
|||
|
||||
let compiler = self.clone();
|
||||
|
||||
// Loop que verifica drive_files a cada 30s
|
||||
// Loop que verifica drive_files a cada 1s
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(CHECK_INTERVAL_SECS));
|
||||
|
||||
while compiler.is_processing.load(Ordering::SeqCst) {
|
||||
interval.tick().await;
|
||||
|
|
|
|||
|
|
@ -3,13 +3,16 @@ use std::time::Duration;
|
|||
|
||||
use super::types::DriveMonitor;
|
||||
|
||||
/// Intervalo de verificação do DriveMonitor e DriveCompiler (em segundos)
|
||||
pub const CHECK_INTERVAL_SECS: u64 = 1;
|
||||
|
||||
impl DriveMonitor {
|
||||
pub fn calculate_backoff(&self) -> Duration {
|
||||
let failures = self.consecutive_failures.load(Ordering::Relaxed);
|
||||
if failures == 0 {
|
||||
return Duration::from_secs(30);
|
||||
return Duration::from_secs(CHECK_INTERVAL_SECS);
|
||||
}
|
||||
let backoff_secs = 30u64 * (1u64 << failures.min(4));
|
||||
let backoff_secs = CHECK_INTERVAL_SECS * (1u64 << failures.min(4));
|
||||
Duration::from_secs(backoff_secs.min(300))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
use crate::core::shared::state::AppState;
|
||||
use crate::drive::drive_files::DriveFileRepository;
|
||||
use crate::drive::drive_monitor::monitor::CHECK_INTERVAL_SECS;
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
use crate::core::kb::KnowledgeBaseManager;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub fn normalize_etag(etag: &str) -> String {
|
||||
|
|
@ -15,15 +16,23 @@ impl DriveMonitor {
|
|||
log::info!("DriveMonitor monitoring started for bucket: {}", self.bucket_name);
|
||||
|
||||
loop {
|
||||
if let Err(e) = self.scan_bucket().await {
|
||||
log::error!("Failed to scan bucket {}: {}", self.bucket_name, e);
|
||||
// Reentrancy protection: skip if previous scan is still running
|
||||
if self.is_processing.load(Ordering::Relaxed) {
|
||||
log::debug!("DriveMonitor still processing, skipping iteration");
|
||||
} else {
|
||||
self.is_processing.store(true, Ordering::Relaxed);
|
||||
if let Err(e) = self.scan_bucket().await {
|
||||
log::error!("Failed to scan bucket {}: {}", self.bucket_name, e);
|
||||
}
|
||||
self.is_processing.store(false, Ordering::Relaxed);
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn scan_bucket(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
log::info!("Scanning bucket {} for files", self.bucket_name);
|
||||
log::info!("DriveMonitor: Starting scan of bucket {}", self.bucket_name);
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
if let Some(s3) = &self.state.drive {
|
||||
match s3.list_objects_with_metadata(&self.bucket_name, None).await {
|
||||
|
|
@ -92,16 +101,18 @@ impl DriveMonitor {
|
|||
}
|
||||
}
|
||||
|
||||
self.handle_deleted_files(bot_name, ¤t_keys);
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to list objects in {}: {}", self.bucket_name, e);
|
||||
}
|
||||
}
|
||||
self.handle_deleted_files(bot_name, ¤t_keys);
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to list objects in {}: {}", self.bucket_name, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::warn!("S3 client not available for bucket scan");
|
||||
}
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
log::info!("DriveMonitor: Completed scan of {} in {:.2?}", self.bucket_name, elapsed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue