diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 6292c054..cef79d81 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -859,7 +859,17 @@ impl BotOrchestrator { } } - while let Some(chunk) = stream_rx.recv().await { + let stream_timeout = tokio::time::Duration::from_secs(60); +'stream_loop: loop { + let chunk_result = tokio::time::timeout(stream_timeout, stream_rx.recv()).await; + let chunk = match chunk_result { + Ok(Some(c)) => c, + Ok(None) => break 'stream_loop, + Err(_) => { + error!("LLM stream timeout after {}s - aborting response", stream_timeout.as_secs()); + break 'stream_loop + } + }; // ===== GENERIC TOOL EXECUTION ===== // Add chunk to tool_call_buffer and try to parse diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 3ba08f26..ef41e616 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -431,7 +431,7 @@ match result { trace!("start_monitoring: calling check_for_changes..."); trace!("Calling initial check_for_changes..."); - match tokio::time::timeout(Duration::from_secs(300), self.check_for_changes()).await { + match tokio::time::timeout(Duration::from_secs(12), self.check_for_changes()).await { Ok(Ok(_)) => { self.consecutive_failures.store(0, Ordering::Relaxed); } @@ -514,7 +514,7 @@ match result { debug!("[DRIVE_MONITOR] About to call check_for_changes for bot {}", self_clone.bot_id); // Add timeout to prevent hanging - match tokio::time::timeout(Duration::from_secs(300), self_clone.check_for_changes()).await { + match tokio::time::timeout(Duration::from_secs(12), self_clone.check_for_changes()).await { Ok(Ok(_)) => { let prev_failures = self_clone.consecutive_failures.swap(0, Ordering::Relaxed); diff --git a/src/llm/mod.rs b/src/llm/mod.rs index e254ea01..c4f929ed 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -438,13 +438,15 @@ impl LLMProvider for OpenAIClient { } } - let response = self - .client - .post(&full_url) - .header("Authorization", &auth_header) - .json(&request_body) - .send() - .await?; +info!("LLM: Sending request to {}", full_url); +let response = self + .client + .post(&full_url) + .header("Authorization", &auth_header) + .json(&request_body) + .send() + .await?; +info!("LLM: Response received with status {}", response.status()); let status = response.status(); if status != reqwest::StatusCode::OK { @@ -460,14 +462,19 @@ impl LLMProvider for OpenAIClient { return Err(format!("LLM request failed with status: {}", status).into()); } - let handler = get_handler(model); - let mut stream = response.bytes_stream(); +let handler = get_handler(model); +let mut stream = response.bytes_stream(); - // Accumulate tool calls here because OpenAI streams them in fragments - let mut active_tool_calls: Vec = Vec::new(); +// Accumulate tool calls here because OpenAI streams them in fragments +let mut active_tool_calls: Vec = Vec::new(); +let mut chunk_count = 0u32; - while let Some(chunk_result) = stream.next().await { - let chunk = chunk_result?; +while let Some(chunk_result) = stream.next().await { + chunk_count += 1; + if chunk_count <= 5 || chunk_count % 50 == 0 { + info!("LLM: Received chunk #{}", chunk_count); + } + let chunk = chunk_result?; let chunk_str = String::from_utf8_lossy(&chunk); for line in chunk_str.lines() { if line.starts_with("data: ") && !line.contains("[DONE]") { @@ -526,18 +533,19 @@ let content = data["choices"][0]["delta"]["content"].as_str(); } } - // Send accumulated tool calls when stream finishes - for tool_call in active_tool_calls { - if !tool_call["function"]["name"].as_str().unwrap_or("").is_empty() { - let tool_call_json = serde_json::json!({ - "type": "tool_call", - "content": tool_call - }).to_string(); - let _ = tx.send(tool_call_json).await; - } - } +// Send accumulated tool calls when stream finishes +for tool_call in active_tool_calls { +if !tool_call["function"]["name"].as_str().unwrap_or("").is_empty() { +let tool_call_json = serde_json::json!({ +"type": "tool_call", +"content": tool_call +}).to_string(); +let _ = tx.send(tool_call_json).await; +} +} - Ok(()) +info!("LLM: Stream complete, sent {} chunks", chunk_count); +Ok(()) } async fn cancel_job(