debug: add logging to track check_gbkb_changes hang
All checks were successful
BotServer CI/CD / build (push) Successful in 3m40s

Added debug logging at key points in check_gbkb_changes:
- ENTER with bot ID and prefix
- Object listing results
- File states lock acquisition
- New/modified file detection
- PDF detection
- File download batches
- Final remaining files download
- EXIT confirmation

This will help identify exactly where the 5-minute timeout occurs.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-12 22:09:38 -03:00
parent 8e539206d4
commit 3322234712

View file

@ -1432,13 +1432,14 @@ impl DriveMonitor {
&self, &self,
client: &Client, client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
trace!("check_gbkb_changes ENTER"); debug!("[GBKB] check_gbkb_changes ENTER for bot {} (prefix: {})", self.bot_id, self.bucket_name);
let bot_name = self let bot_name = self
.bucket_name .bucket_name
.strip_suffix(".gbai") .strip_suffix(".gbai")
.unwrap_or(&self.bucket_name); .unwrap_or(&self.bucket_name);
let gbkb_prefix = format!("{}.gbkb/", bot_name); let gbkb_prefix = format!("{}.gbkb/", bot_name);
debug!("[GBKB] Listing objects with prefix: {}", gbkb_prefix);
let mut current_files = HashMap::new(); let mut current_files = HashMap::new();
let mut continuation_token = None; let mut continuation_token = None;
@ -1459,7 +1460,10 @@ impl DriveMonitor {
.await .await
{ {
Ok(Ok(list)) => list, Ok(Ok(list)) => list,
Ok(Err(e)) => return Err(e.into()), Ok(Err(e)) => {
debug!("[GBKB] Error listing objects: {}", e);
return Err(e.into());
}
Err(_) => { Err(_) => {
log::error!( log::error!(
"Timeout listing .gbkb objects in bucket {}", "Timeout listing .gbkb objects in bucket {}",
@ -1468,6 +1472,7 @@ impl DriveMonitor {
return Ok(()); return Ok(());
} }
}; };
debug!("[GBKB] Listed {} objects in this page", list_objects.contents.as_ref().map(|c| c.len()).unwrap_or(0));
for obj in list_objects.contents.unwrap_or_default() { for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string(); let path = obj.key().unwrap_or_default().to_string();
@ -1497,7 +1502,9 @@ impl DriveMonitor {
continuation_token = list_objects.next_continuation_token; continuation_token = list_objects.next_continuation_token;
} }
debug!("[GBKB] Found {} files total, acquiring file_states lock...", current_files.len());
let mut file_states = self.file_states.write().await; let mut file_states = self.file_states.write().await;
debug!("[GBKB] file_states lock acquired, processing {} files", current_files.len());
for (path, current_state) in current_files.iter() { for (path, current_state) in current_files.iter() {
let is_new = !file_states.contains_key(path); let is_new = !file_states.contains_key(path);
@ -1507,6 +1514,7 @@ impl DriveMonitor {
.unwrap_or(false); .unwrap_or(false);
if is_new || is_modified { if is_new || is_modified {
debug!("[GBKB] New/modified file: {} (new={}, modified={})", path, is_new, is_modified);
if let Some(prev_state) = file_states.get(path) { if let Some(prev_state) = file_states.get(path) {
if prev_state.fail_count >= MAX_FAIL_COUNT { if prev_state.fail_count >= MAX_FAIL_COUNT {
let elapsed = Utc::now() let elapsed = Utc::now()
@ -1523,11 +1531,7 @@ impl DriveMonitor {
if path.to_lowercase().ends_with(".pdf") { if path.to_lowercase().ends_with(".pdf") {
pdf_files_found += 1; pdf_files_found += 1;
trace!( debug!("[GBKB] Detected PDF: {}", path);
"Detected {} PDF in .gbkb: {} (will extract text for vectordb)",
if is_new { "new" } else { "changed" },
path
);
} else { } else {
trace!( trace!(
"Detected {} in .gbkb: {}", "Detected {} in .gbkb: {}",
@ -1540,12 +1544,13 @@ impl DriveMonitor {
files_processed += 1; files_processed += 1;
if files_to_process.len() >= 10 { if files_to_process.len() >= 10 {
debug!("[GBKB] Downloading batch of {} files", files_to_process.len());
for file_path in std::mem::take(&mut files_to_process) { for file_path in std::mem::take(&mut files_to_process) {
debug!("[GBKB] Downloading: {}", file_path);
if let Err(e) = self.download_gbkb_file(client, &file_path).await { if let Err(e) = self.download_gbkb_file(client, &file_path).await {
log::error!("Failed to download .gbkb file {}: {}", file_path, e); log::error!("Failed to download .gbkb file {}: {}", file_path, e);
} }
} }
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
@ -1615,6 +1620,19 @@ impl DriveMonitor {
} }
} }
// Download remaining files (less than 10)
if !files_to_process.is_empty() {
debug!("[GBKB] Downloading final batch of {} files", files_to_process.len());
for file_path in std::mem::take(&mut files_to_process) {
debug!("[GBKB] Downloading: {}", file_path);
if let Err(e) = self.download_gbkb_file(client, &file_path).await {
log::error!("Failed to download .gbkb file {}: {}", file_path, e);
}
}
}
debug!("[GBKB] Processed {} files, {} PDFs found", files_processed, pdf_files_found);
let paths_to_remove: Vec<String> = file_states let paths_to_remove: Vec<String> = file_states
.keys() .keys()
.filter(|path| path.starts_with(&gbkb_prefix) && !current_files.contains_key(*path)) .filter(|path| path.starts_with(&gbkb_prefix) && !current_files.contains_key(*path))
@ -1699,6 +1717,7 @@ impl DriveMonitor {
} }
} }
debug!("[GBKB] check_gbkb_changes EXIT for bot {}", self.bot_id);
trace!("check_gbkb_changes EXIT"); trace!("check_gbkb_changes EXIT");
Ok(()) Ok(())
} }