fix(drive_monitor): 1s interval, path matching, error isolation
All checks were successful
BotServer CI/CD / build (push) Successful in 4m31s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-17 08:17:43 -03:00
parent f9178e947e
commit 162515ba11

View file

@ -66,6 +66,12 @@ pub struct DriveMonitor {
_pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
// Database-backed file state repository (replaces JSON file_states)
file_repo: Arc<DriveFileRepository>,
// Queue for pending file changes (prevents reentrant processing) - reserved for future use
#[allow(dead_code)]
pending_changes: Arc<TokioRwLock<Vec<String>>>,
// Last processed etag snapshot for quick diff - reserved for future use
#[allow(dead_code)]
last_etag_snapshot: Arc<TokioRwLock<HashMap<String, String>>>,
}
impl DriveMonitor {
fn normalize_config_value(value: &str) -> String {
@ -104,6 +110,8 @@ pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Sel
#[cfg(not(any(feature = "research", feature = "llm")))]
_pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
file_repo,
pending_changes: Arc::new(TokioRwLock::new(Vec::new())),
last_etag_snapshot: Arc::new(TokioRwLock::new(HashMap::new())),
}
}
@ -427,26 +435,22 @@ match result {
}
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
trace!(
info!(
"Drive Monitor service started for bucket: {}",
self.bucket_name
);
loop {
let backoff = self.calculate_backoff();
tokio::time::sleep(backoff).await;
tokio::time::sleep(Duration::from_secs(1)).await;
if self.is_processing.load(Ordering::Acquire) {
log::warn!(
"Drive monitor is still processing previous changes, skipping this tick"
);
continue;
}
if !self.check_drive_health().await {
let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
if failures % 10 == 1 {
warn!("S3/MinIO unavailable for bucket {} (failures: {}), backing off to {:?}",
self.bucket_name, failures, self.calculate_backoff());
warn!("S3/MinIO unavailable for bucket {} (failures: {})",
self.bucket_name, failures);
}
continue;
}
@ -457,13 +461,13 @@ match result {
Ok(_) => {
let prev_failures = self.consecutive_failures.swap(0, Ordering::Relaxed);
if prev_failures > 0 {
trace!("S3/MinIO recovered for bucket {} after {} failures",
info!("S3/MinIO recovered for bucket {} after {} failures",
self.bucket_name, prev_failures);
}
}
Err(e) => {
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
log::error!("Error checking for drive changes: {}", e);
error!("Error checking for drive changes: {}", e);
}
}
@ -472,62 +476,22 @@ match result {
})
}
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
trace!("check_for_changes ENTER");
let start_mem = MemoryStats::current();
trace!(
"check_for_changes START, RSS={}",
MemoryStats::format_bytes(start_mem.rss_bytes)
);
let Some(client) = &self.state.drive else {
warn!("No drive client available for bot {}, skipping file monitoring", self.bot_id);
return Ok(());
};
trace!("check_for_changes: calling check_gbdialog_changes...");
trace!("Checking gbdialog...");
self.check_gbdialog_changes(client).await?;
trace!("check_for_changes: check_gbdialog_changes done");
let after_dialog = MemoryStats::current();
trace!(
"After gbdialog, RSS={} (delta={})",
MemoryStats::format_bytes(after_dialog.rss_bytes),
MemoryStats::format_bytes(after_dialog.rss_bytes.saturating_sub(start_mem.rss_bytes))
);
trace!("check_for_changes: calling check_gbot...");
trace!("Checking gbot...");
self.check_gbot(client).await?;
trace!("check_for_changes: check_gbot done");
let after_gbot = MemoryStats::current();
trace!(
"After gbot, RSS={} (delta={})",
MemoryStats::format_bytes(after_gbot.rss_bytes),
MemoryStats::format_bytes(after_gbot.rss_bytes.saturating_sub(after_dialog.rss_bytes))
);
trace!("check_for_changes: calling check_gbkb_changes...");
trace!("Checking gbkb...");
self.check_gbkb_changes(client).await?;
trace!("check_for_changes: check_gbkb_changes done");
let after_gbkb = MemoryStats::current();
trace!(
"After gbkb, RSS={} (delta={})",
MemoryStats::format_bytes(after_gbkb.rss_bytes),
MemoryStats::format_bytes(after_gbkb.rss_bytes.saturating_sub(after_gbot.rss_bytes))
);
log_jemalloc_stats();
let total_delta = after_gbkb.rss_bytes.saturating_sub(start_mem.rss_bytes);
if total_delta > 50 * 1024 * 1024 {
warn!(
"check_for_changes grew by {} - potential leak!",
MemoryStats::format_bytes(total_delta)
);
// All checks run independently - one failure doesn't stop others
if let Err(e) = self.check_gbdialog_changes(client).await {
error!("gbdialog check failed: {}", e);
}
if let Err(e) = self.check_gbot(client).await {
error!("gbot check failed: {}", e);
}
if let Err(e) = self.check_gbkb_changes(client).await {
error!("gbkb check failed: {}", e);
}
trace!("check_for_changes EXIT");
Ok(())
}
async fn check_gbdialog_changes(
@ -557,12 +521,8 @@ match result {
};
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
let path_parts: Vec<&str> = path.split('/').collect();
// Filter for paths matching *.gbdialog/*.bas pattern
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbdialog") {
continue;
}
if path.ends_with('/') || !path.to_ascii_lowercase().ends_with(".bas") {
// Must end with .bas and contain .gbdialog anywhere (not just at start)
if !path.to_ascii_lowercase().ends_with(".bas") || !path.to_lowercase().contains(".gbdialog") {
continue;
}
let etag = normalize_etag(obj.e_tag().unwrap_or_default());