revert: restore llm/mod.rs to stable April 9 version
All checks were successful
BotServer CI/CD / build (push) Successful in 3m26s

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-13 15:07:19 -03:00
parent 765bd624f4
commit c5d30adebe

View file

@ -1,6 +1,6 @@
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use log::{error, info, trace}; use log::{error, info};
use serde_json::Value; use serde_json::Value;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
@ -11,7 +11,6 @@ pub mod episodic_memory;
pub mod glm; pub mod glm;
pub mod hallucination_detector; pub mod hallucination_detector;
pub mod llm_models; pub mod llm_models;
#[cfg(feature = "llm")]
pub mod local; pub mod local;
pub mod observability; pub mod observability;
pub mod rate_limiter; pub mod rate_limiter;
@ -290,7 +289,7 @@ impl LLMProvider for OpenAIClient {
128000 // Cerebras gpt-oss models and GPT-4 variants 128000 // Cerebras gpt-oss models and GPT-4 variants
} else if model.contains("gpt-3.5") { } else if model.contains("gpt-3.5") {
16385 16385
} else if model == "local" || model.is_empty() { } else if model.starts_with("http://localhost:808") || model == "local" {
768 // Local llama.cpp server context limit 768 // Local llama.cpp server context limit
} else { } else {
32768 // Default conservative limit for modern models 32768 // Default conservative limit for modern models
@ -379,7 +378,7 @@ impl LLMProvider for OpenAIClient {
128000 // Cerebras gpt-oss models and GPT-4 variants 128000 // Cerebras gpt-oss models and GPT-4 variants
} else if model.contains("gpt-3.5") { } else if model.contains("gpt-3.5") {
16385 16385
} else if model == "local" || model.is_empty() { } else if model.starts_with("http://localhost:808") || model == "local" {
768 // Local llama.cpp server context limit 768 // Local llama.cpp server context limit
} else { } else {
32768 // Default conservative limit for modern models 32768 // Default conservative limit for modern models
@ -413,8 +412,7 @@ impl LLMProvider for OpenAIClient {
let mut request_body = serde_json::json!({ let mut request_body = serde_json::json!({
"model": model, "model": model,
"messages": messages, "messages": messages,
"stream": true, "stream": true
"max_tokens": 16384
}); });
// Add tools to the request if provided // Add tools to the request if provided
@ -449,86 +447,54 @@ impl LLMProvider for OpenAIClient {
let handler = get_handler(model); let handler = get_handler(model);
let mut stream = response.bytes_stream(); let mut stream = response.bytes_stream();
// Accumulate tool calls here because OpenAI streams them in fragments // Accumulate tool calls here because OpenAI streams them in fragments
let mut active_tool_calls: Vec<serde_json::Value> = Vec::new(); let mut active_tool_calls: Vec<serde_json::Value> = Vec::new();
// Add timeout to stream reads - if Kimi/Nvidia stops responding, fail gracefully while let Some(chunk_result) = stream.next().await {
const STREAM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); let chunk = chunk_result?;
let chunk_str = String::from_utf8_lossy(&chunk);
for line in chunk_str.lines() {
if line.starts_with("data: ") && !line.contains("[DONE]") {
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
if let Some(content) = data["choices"][0]["delta"]["content"].as_str() {
let processed = handler.process_content(content);
if !processed.is_empty() {
let _ = tx.send(processed).await;
}
}
loop { // Handle standard OpenAI tool_calls
let chunk_opt = match tokio::time::timeout( if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() {
STREAM_TIMEOUT, for tool_delta in tool_calls {
stream.next(), if let Some(index) = tool_delta["index"].as_u64() {
).await { let idx = index as usize;
Ok(opt) => opt, if active_tool_calls.len() <= idx {
Err(_) => { active_tool_calls.resize(idx + 1, serde_json::json!({
// Timeout - LLM stopped sending data "id": "",
log::warn!("[LLM] Stream timed out after {}s for model {}", "type": "function",
STREAM_TIMEOUT.as_secs(), model); "function": {
let _ = tx.send(format!("[ERROR] LLM response timed out after {} seconds.", "name": "",
STREAM_TIMEOUT.as_secs())).await; "arguments": ""
break; }
} }));
};
match chunk_opt {
Some(Ok(chunk)) => {
let chunk_str = String::from_utf8_lossy(&chunk);
for line in chunk_str.lines() {
if line.starts_with("data: ") && !line.contains("[DONE]") {
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
// Kimi K2.5 and other reasoning models send thinking in "reasoning" field
// Only process "content" (actual response), ignore "reasoning" (thinking)
let content = data["choices"][0]["delta"]["content"].as_str();
let reasoning = data["choices"][0]["delta"]["reasoning"].as_str();
// Log first chunk to help debug reasoning models
if reasoning.is_some() && content.is_none() {
trace!("[LLM] Kimi reasoning chunk (no content yet): {} chars",
reasoning.unwrap_or("").len());
}
if let Some(content) = content {
let processed = handler.process_content(content);
if !processed.is_empty() {
let _ = tx.send(processed).await;
} }
}
let current = &mut active_tool_calls[idx];
// Handle standard OpenAI tool_calls
if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { if let Some(id) = tool_delta["id"].as_str() {
for tool_delta in tool_calls { current["id"] = serde_json::Value::String(id.to_string());
if let Some(index) = tool_delta["index"].as_u64() { }
let idx = index as usize;
if active_tool_calls.len() <= idx { if let Some(func) = tool_delta.get("function") {
active_tool_calls.resize(idx + 1, serde_json::json!({ if let Some(name) = func.get("name").and_then(|n| n.as_str()) {
"id": "", current["function"]["name"] = serde_json::Value::String(name.to_string());
"type": "function", }
"function": { if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
"name": "", if let Some(existing_args) = current["function"]["arguments"].as_str() {
"arguments": "" let mut new_args = existing_args.to_string();
} new_args.push_str(args);
})); current["function"]["arguments"] = serde_json::Value::String(new_args);
}
let current = &mut active_tool_calls[idx];
if let Some(id) = tool_delta["id"].as_str() {
current["id"] = serde_json::Value::String(id.to_string());
}
if let Some(func) = tool_delta.get("function") {
if let Some(name) = func.get("name").and_then(|n| n.as_str()) {
current["function"]["name"] = serde_json::Value::String(name.to_string());
}
if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
if let Some(existing_args) = current["function"]["arguments"].as_str() {
let mut new_args = existing_args.to_string();
new_args.push_str(args);
current["function"]["arguments"] = serde_json::Value::String(new_args);
}
}
} }
} }
} }
@ -537,14 +503,6 @@ impl LLMProvider for OpenAIClient {
} }
} }
} }
Some(Err(e)) => {
log::error!("[LLM] Stream error: {}", e);
break;
}
None => {
// Stream ended
break;
}
} }
} }
@ -927,10 +885,10 @@ mod tests {
fn test_openai_client_new_custom_url() { fn test_openai_client_new_custom_url() {
let client = OpenAIClient::new( let client = OpenAIClient::new(
"test_key".to_string(), "test_key".to_string(),
Some("".to_string()), Some("http://localhost:9000".to_string()),
None, None,
); );
assert_eq!(client.base_url, ""); assert_eq!(client.base_url, "http://localhost:9000");
} }
#[test] #[test]