From 650cb709615ceacdb8a9a47a2c3c83c353d4ce19 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 13 Apr 2026 20:46:28 -0300 Subject: [PATCH] debug: add WebSocket message tracing --- src/core/bot/mod.rs | 12 ++++++++++-- src/drive/drive_monitor/mod.rs | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index f4662186..5b964a5a 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -10,7 +10,7 @@ use tool_executor::ToolExecutor; use crate::core::config::ConfigManager; #[cfg(feature = "drive")] -use crate::drive::drive_monitor::DriveMonitor; +use crate::drive::drive_monitor::{DriveMonitor, set_llm_streaming}; #[cfg(feature = "llm")] use crate::llm::llm_models; #[cfg(feature = "llm")] @@ -811,13 +811,20 @@ impl BotOrchestrator { // Clone messages for the async task let messages_clone = messages.clone(); + // Set flag to prevent DriveMonitor PDF downloads during LLM streaming + #[cfg(feature = "drive")] + set_llm_streaming(true); + + let stream_tx_clone = stream_tx.clone(); tokio::spawn(async move { if let Err(e) = llm - .generate_stream("", &messages_clone, stream_tx, &model_clone, &key_clone, tools_for_llm.as_ref()) + .generate_stream("", &messages_clone, stream_tx_clone, &model_clone, &key_clone, tools_for_llm.as_ref()) .await { error!("LLM streaming error: {}", e); } + #[cfg(feature = "drive")] + set_llm_streaming(false); }); let mut full_response = String::new(); @@ -1570,6 +1577,7 @@ let mut send_task = tokio::spawn(async move { while let Some(Ok(msg)) = receiver.next().await { match msg { Message::Text(text) => { + debug!("WebSocket received text: {}", text); if let Ok(user_msg) = serde_json::from_str::(&text) { let orchestrator = BotOrchestrator::new(state_clone.clone()); if let Some(tx_clone) = state_clone diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index a674274b..bde15028 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -23,6 +23,19 @@ use tokio::time::Duration; use serde::{Deserialize, Serialize}; use tokio::fs as tokio_fs; +#[cfg(any(feature = "research", feature = "llm"))] +static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + +#[cfg(any(feature = "research", feature = "llm"))] +pub fn set_llm_streaming(streaming: bool) { + LLM_STREAMING.store(streaming, Ordering::SeqCst); +} + +#[cfg(any(feature = "research", feature = "llm"))] +pub fn is_llm_streaming() -> bool { + LLM_STREAMING.load(Ordering::SeqCst) +} + const MAX_BACKOFF_SECS: u64 = 300; const INITIAL_BACKOFF_SECS: u64 = 30; const RETRY_BACKOFF_SECS: i64 = 3600; @@ -1672,6 +1685,14 @@ let file_state = FileState { files_processed += 1; debug!("[GBKB] Queue size: {}/10", files_to_process.len()); + // Skip downloads if LLM is actively streaming to prevent deadlock + #[cfg(any(feature = "research", feature = "llm"))] + if is_llm_streaming() { + debug!("[GBKB] Skipping download - LLM is streaming, will retry later"); + files_to_process.clear(); + break; + } + if files_to_process.len() >= 10 { debug!("[GBKB] Downloading batch of {} files", files_to_process.len()); for file_path in std::mem::take(&mut files_to_process) {