fix: spawn LLM response in separate task to prevent recv_task blocking
All checks were successful
BotServer CI/CD / build (push) Successful in 5m3s
All checks were successful
BotServer CI/CD / build (push) Successful in 5m3s
Previously the recv_task awaited stream_response() directly, which froze the entire WebSocket message receiver while the LLM ran (30s+). This meant a second user message couldn't be processed until the first LLM call finished — a race condition that locked the session. Now stream_response runs in its own tokio::spawn, keeping recv_task free to handle new messages immediately. Also fixed borrow/lifetime issue by cloning the response channel sender out of the lock scope. Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
parent
dc97813614
commit
3159d04414
1 changed files with 39 additions and 36 deletions
|
|
@ -1604,39 +1604,37 @@ let mut send_task = tokio::spawn(async move {
|
||||||
info!("Processing message for session {}", session_id);
|
info!("Processing message for session {}", session_id);
|
||||||
|
|
||||||
if let Ok(user_msg) = serde_json::from_str::<UserMessage>(&text) {
|
if let Ok(user_msg) = serde_json::from_str::<UserMessage>(&text) {
|
||||||
let orchestrator = BotOrchestrator::new(state_clone.clone());
|
// Get session first, outside any lock scope
|
||||||
if let Some(tx_clone) = state_clone
|
let session_result = {
|
||||||
.response_channels
|
let mut sm = state_clone.session_manager.lock().await;
|
||||||
.lock()
|
sm.get_session_by_id(session_id)
|
||||||
.await
|
};
|
||||||
.get(&session_id.to_string())
|
|
||||||
{
|
|
||||||
// Ensure session exists - create if not
|
|
||||||
let session_result = {
|
|
||||||
let mut sm = state_clone.session_manager.lock().await;
|
|
||||||
sm.get_session_by_id(session_id)
|
|
||||||
};
|
|
||||||
|
|
||||||
let session = match session_result {
|
let session = match session_result {
|
||||||
Ok(Some(sess)) => sess,
|
Ok(Some(sess)) => sess,
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// Use session manager to create session (will generate new UUID)
|
let mut sm = state_clone.session_manager.lock().await;
|
||||||
let mut sm = state_clone.session_manager.lock().await;
|
match sm.create_session(user_id, bot_id, "WebSocket Chat") {
|
||||||
match sm.create_session(user_id, bot_id, "WebSocket Chat") {
|
Ok(new_session) => new_session,
|
||||||
Ok(new_session) => new_session,
|
Err(e) => {
|
||||||
Err(e) => {
|
error!("Failed to create session: {}", e);
|
||||||
error!("Failed to create session: {}", e);
|
continue;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
}
|
||||||
error!("Error getting session: {}", e);
|
Err(e) => {
|
||||||
continue;
|
error!("Error getting session: {}", e);
|
||||||
}
|
continue;
|
||||||
};
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Use bot_id from WebSocket connection instead of from message
|
// Get response channel sender out of lock scope
|
||||||
|
let tx_opt = {
|
||||||
|
let channels = state_clone.response_channels.lock().await;
|
||||||
|
channels.get(&session_id.to_string()).cloned()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(tx_clone) = tx_opt {
|
||||||
let corrected_msg = UserMessage {
|
let corrected_msg = UserMessage {
|
||||||
bot_id: bot_id.to_string(),
|
bot_id: bot_id.to_string(),
|
||||||
user_id: session.user_id.to_string(),
|
user_id: session.user_id.to_string(),
|
||||||
|
|
@ -1645,12 +1643,17 @@ let mut send_task = tokio::spawn(async move {
|
||||||
};
|
};
|
||||||
info!("Calling orchestrator for session {}", session_id);
|
info!("Calling orchestrator for session {}", session_id);
|
||||||
|
|
||||||
if let Err(e) = orchestrator
|
// Spawn LLM in its own task so recv_task stays free to handle
|
||||||
.stream_response(corrected_msg, tx_clone.clone())
|
// new messages — prevents one hung LLM from locking the session.
|
||||||
.await
|
let orch = BotOrchestrator::new(state_clone.clone());
|
||||||
{
|
tokio::spawn(async move {
|
||||||
error!("Failed to stream response: {}", e);
|
if let Err(e) = orch
|
||||||
}
|
.stream_response(corrected_msg, tx_clone)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!("Failed to stream response: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
warn!("Response channel NOT found for session: {}", session_id);
|
warn!("Response channel NOT found for session: {}", session_id);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue