Add LLM stream timeout and debug logs
All checks were successful
BotServer CI/CD / build (push) Successful in 4m8s
All checks were successful
BotServer CI/CD / build (push) Successful in 4m8s
This commit is contained in:
parent
da9facf036
commit
301a7dda33
3 changed files with 45 additions and 27 deletions
|
|
@ -859,7 +859,17 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(chunk) = stream_rx.recv().await {
|
let stream_timeout = tokio::time::Duration::from_secs(60);
|
||||||
|
'stream_loop: loop {
|
||||||
|
let chunk_result = tokio::time::timeout(stream_timeout, stream_rx.recv()).await;
|
||||||
|
let chunk = match chunk_result {
|
||||||
|
Ok(Some(c)) => c,
|
||||||
|
Ok(None) => break 'stream_loop,
|
||||||
|
Err(_) => {
|
||||||
|
error!("LLM stream timeout after {}s - aborting response", stream_timeout.as_secs());
|
||||||
|
break 'stream_loop
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// ===== GENERIC TOOL EXECUTION =====
|
// ===== GENERIC TOOL EXECUTION =====
|
||||||
// Add chunk to tool_call_buffer and try to parse
|
// Add chunk to tool_call_buffer and try to parse
|
||||||
|
|
|
||||||
|
|
@ -431,7 +431,7 @@ match result {
|
||||||
trace!("start_monitoring: calling check_for_changes...");
|
trace!("start_monitoring: calling check_for_changes...");
|
||||||
trace!("Calling initial check_for_changes...");
|
trace!("Calling initial check_for_changes...");
|
||||||
|
|
||||||
match tokio::time::timeout(Duration::from_secs(300), self.check_for_changes()).await {
|
match tokio::time::timeout(Duration::from_secs(12), self.check_for_changes()).await {
|
||||||
Ok(Ok(_)) => {
|
Ok(Ok(_)) => {
|
||||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
@ -514,7 +514,7 @@ match result {
|
||||||
|
|
||||||
debug!("[DRIVE_MONITOR] About to call check_for_changes for bot {}", self_clone.bot_id);
|
debug!("[DRIVE_MONITOR] About to call check_for_changes for bot {}", self_clone.bot_id);
|
||||||
// Add timeout to prevent hanging
|
// Add timeout to prevent hanging
|
||||||
match tokio::time::timeout(Duration::from_secs(300), self_clone.check_for_changes()).await {
|
match tokio::time::timeout(Duration::from_secs(12), self_clone.check_for_changes()).await {
|
||||||
Ok(Ok(_)) => {
|
Ok(Ok(_)) => {
|
||||||
let prev_failures =
|
let prev_failures =
|
||||||
self_clone.consecutive_failures.swap(0, Ordering::Relaxed);
|
self_clone.consecutive_failures.swap(0, Ordering::Relaxed);
|
||||||
|
|
|
||||||
|
|
@ -438,13 +438,15 @@ impl LLMProvider for OpenAIClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let response = self
|
info!("LLM: Sending request to {}", full_url);
|
||||||
.client
|
let response = self
|
||||||
.post(&full_url)
|
.client
|
||||||
.header("Authorization", &auth_header)
|
.post(&full_url)
|
||||||
.json(&request_body)
|
.header("Authorization", &auth_header)
|
||||||
.send()
|
.json(&request_body)
|
||||||
.await?;
|
.send()
|
||||||
|
.await?;
|
||||||
|
info!("LLM: Response received with status {}", response.status());
|
||||||
|
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
if status != reqwest::StatusCode::OK {
|
if status != reqwest::StatusCode::OK {
|
||||||
|
|
@ -460,14 +462,19 @@ impl LLMProvider for OpenAIClient {
|
||||||
return Err(format!("LLM request failed with status: {}", status).into());
|
return Err(format!("LLM request failed with status: {}", status).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let handler = get_handler(model);
|
let handler = get_handler(model);
|
||||||
let mut stream = response.bytes_stream();
|
let mut stream = response.bytes_stream();
|
||||||
|
|
||||||
// Accumulate tool calls here because OpenAI streams them in fragments
|
// Accumulate tool calls here because OpenAI streams them in fragments
|
||||||
let mut active_tool_calls: Vec<serde_json::Value> = Vec::new();
|
let mut active_tool_calls: Vec<serde_json::Value> = Vec::new();
|
||||||
|
let mut chunk_count = 0u32;
|
||||||
|
|
||||||
while let Some(chunk_result) = stream.next().await {
|
while let Some(chunk_result) = stream.next().await {
|
||||||
let chunk = chunk_result?;
|
chunk_count += 1;
|
||||||
|
if chunk_count <= 5 || chunk_count % 50 == 0 {
|
||||||
|
info!("LLM: Received chunk #{}", chunk_count);
|
||||||
|
}
|
||||||
|
let chunk = chunk_result?;
|
||||||
let chunk_str = String::from_utf8_lossy(&chunk);
|
let chunk_str = String::from_utf8_lossy(&chunk);
|
||||||
for line in chunk_str.lines() {
|
for line in chunk_str.lines() {
|
||||||
if line.starts_with("data: ") && !line.contains("[DONE]") {
|
if line.starts_with("data: ") && !line.contains("[DONE]") {
|
||||||
|
|
@ -526,18 +533,19 @@ let content = data["choices"][0]["delta"]["content"].as_str();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send accumulated tool calls when stream finishes
|
// Send accumulated tool calls when stream finishes
|
||||||
for tool_call in active_tool_calls {
|
for tool_call in active_tool_calls {
|
||||||
if !tool_call["function"]["name"].as_str().unwrap_or("").is_empty() {
|
if !tool_call["function"]["name"].as_str().unwrap_or("").is_empty() {
|
||||||
let tool_call_json = serde_json::json!({
|
let tool_call_json = serde_json::json!({
|
||||||
"type": "tool_call",
|
"type": "tool_call",
|
||||||
"content": tool_call
|
"content": tool_call
|
||||||
}).to_string();
|
}).to_string();
|
||||||
let _ = tx.send(tool_call_json).await;
|
let _ = tx.send(tool_call_json).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
info!("LLM: Stream complete, sent {} chunks", chunk_count);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn cancel_job(
|
async fn cancel_job(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue