From ba3e2675efb2800d0337e26a2b5275c32f085b3c Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 14 Apr 2026 16:15:31 -0300 Subject: [PATCH] feat: stateful thinking tag stripping for Kimi, Minimax and DeepSeek stream --- src/llm/kimi.rs | 13 +++++++--- src/llm/llm_models/deepseek_r3.rs | 36 ++++++++++++++++++++++++++ src/llm/llm_models/minimax.rs | 42 +++++++++++++++++++++++++++++++ src/llm/llm_models/mod.rs | 3 +++ src/llm/mod.rs | 3 ++- 5 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/llm/kimi.rs b/src/llm/kimi.rs index 0eef4564..2b7c01bb 100644 --- a/src/llm/kimi.rs +++ b/src/llm/kimi.rs @@ -282,6 +282,8 @@ impl LLMProvider for KimiClient { info!("[Kimi] Connection established, starting stream"); + let handler = crate::llm::llm_models::get_handler(model); + let mut stream_state = String::new(); let mut stream = response.bytes_stream(); let mut total_content_chars: usize = 0; let mut chunk_count: usize = 0; @@ -327,10 +329,13 @@ impl LLMProvider for KimiClient { // Kimi K2.5: content has the answer, reasoning/reasoning_content is thinking if let Some(text) = delta.get("content").and_then(|c| c.as_str()) { if !text.is_empty() { - total_content_chars += text.len(); - if tx.send(text.to_string()).await.is_err() { - info!("[Kimi] Channel closed, stopping stream after {} content chars", total_content_chars); - return Ok(()); + let processed = handler.process_content_streaming(text, &mut stream_state); + if !processed.is_empty() { + total_content_chars += processed.len(); + if tx.send(processed).await.is_err() { + info!("[Kimi] Channel closed, stopping stream after {} content chars", total_content_chars); + return Ok(()); + } } } } diff --git a/src/llm/llm_models/deepseek_r3.rs b/src/llm/llm_models/deepseek_r3.rs index ca04b31a..d5a9dd8d 100644 --- a/src/llm/llm_models/deepseek_r3.rs +++ b/src/llm/llm_models/deepseek_r3.rs @@ -38,6 +38,42 @@ impl ModelHandler for DeepseekR3Handler { strip_think_tags(content) } + fn process_content_streaming(&self, chunk: &str, state: &mut String) -> String { + let old_len = state.len(); + state.push_str(chunk); + + let mut clean_current = String::new(); + let mut in_think = false; + + let mut current_pos = 0; + let full_text = state.as_str(); + + while current_pos < full_text.len() { + if !in_think { + if full_text[current_pos..].starts_with("") { + in_think = true; + current_pos += 7; + } else { + let c = full_text[current_pos..].chars().next().unwrap(); + if current_pos >= old_len { + clean_current.push(c); + } + current_pos += c.len_utf8(); + } + } else { + if full_text[current_pos..].starts_with("") { + in_think = false; + current_pos += 8; + } else { + let c = full_text[current_pos..].chars().next().unwrap(); + current_pos += c.len_utf8(); + } + } + } + + clean_current + } + fn has_analysis_markers(&self, buffer: &str) -> bool { buffer.contains("") } diff --git a/src/llm/llm_models/minimax.rs b/src/llm/llm_models/minimax.rs index 761bdec9..c95262e9 100644 --- a/src/llm/llm_models/minimax.rs +++ b/src/llm/llm_models/minimax.rs @@ -69,6 +69,48 @@ impl ModelHandler for MinimaxHandler { strip_think_tags(content) } + fn process_content_streaming(&self, chunk: &str, state: &mut String) -> String { + let old_len = state.len(); + state.push_str(chunk); + + let mut clean_current = String::new(); + let mut in_think = false; + + let mut current_pos = 0; + let full_text = state.as_str(); + + while current_pos < full_text.len() { + if !in_think { + if full_text[current_pos..].starts_with("") { + in_think = true; + current_pos += 7; + } else if full_text[current_pos..].starts_with("(分析)") || full_text[current_pos..].starts_with("【分析】") { + in_think = true; + current_pos += 12; // UTF-8 for these 3-char Chinese tags + } else { + let c = full_text[current_pos..].chars().next().unwrap(); + if current_pos >= old_len { + clean_current.push(c); + } + current_pos += c.len_utf8(); + } + } else { + if full_text[current_pos..].starts_with("") { + in_think = false; + current_pos += 8; + } else if full_text[current_pos..].starts_with("(/分析)") || full_text[current_pos..].starts_with("【/分析】") { + in_think = false; + current_pos += 13; // UTF-8 for these 4-char Chinese tags + } else { + let c = full_text[current_pos..].chars().next().unwrap(); + current_pos += c.len_utf8(); + } + } + } + + clean_current + } + fn has_analysis_markers(&self, buffer: &str) -> bool { buffer.contains("(分析)") || buffer.contains("") || buffer.contains("【分析】") } diff --git a/src/llm/llm_models/mod.rs b/src/llm/llm_models/mod.rs index 965adb99..7e405942 100644 --- a/src/llm/llm_models/mod.rs +++ b/src/llm/llm_models/mod.rs @@ -6,6 +6,9 @@ pub mod minimax; pub trait ModelHandler: Send + Sync { fn is_analysis_complete(&self, buffer: &str) -> bool; fn process_content(&self, content: &str) -> String; + fn process_content_streaming(&self, content: &str, _state_buffer: &mut String) -> String { + self.process_content(content) + } fn has_analysis_markers(&self, buffer: &str) -> bool; } diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 6d83eb9b..a220f3ac 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -432,6 +432,7 @@ impl LLMProvider for OpenAIClient { } let handler = get_handler(model); + let mut stream_state = String::new(); // State for the handler to track thinking tags across chunks let mut stream = response.bytes_stream(); while let Some(chunk_result) = stream.next().await { @@ -441,7 +442,7 @@ impl LLMProvider for OpenAIClient { if line.starts_with("data: ") && !line.contains("[DONE]") { if let Ok(data) = serde_json::from_str::(&line[6..]) { if let Some(content) = data["choices"][0]["delta"]["content"].as_str() { - let processed = handler.process_content(content); + let processed = handler.process_content_streaming(content, &mut stream_state); if !processed.is_empty() { let _ = tx.send(processed).await; }