fix: Drop stream_tx after LLM spawn + ADD_SUGGESTION single-arg + lowercase fix + sync_bas_to_work

- drop(stream_tx) after spawning LLM task so stream_rx.recv() loop ends
  when LLM finishes. Without this, the streaming loop hung forever and
  is_complete:true + suggestions were never sent to WebSocket clients.
- Add single-arg ADD_SUGGESTION "text" syntax (registered LAST for
  highest Rhai priority so it matches before 2-arg form).
- convert_keywords_to_lowercase() now only lowercases Rhai built-in
  keywords (IF, ELSE, WHILE, etc.), not custom syntax keywords (TALK,
  HEAR, ADD_SUGGESTION) which are case-sensitive in Rhai.
- sync_bas_to_work() downloads changed .bas files from S3 to work dir
  when etag changes, preventing stale local copies used by compiler.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-21 22:01:17 +00:00
parent 1ae0ad7051
commit e6cd0ff02b
4 changed files with 112 additions and 39 deletions

View file

@ -69,13 +69,15 @@ pub fn add_suggestion_keyword(
) {
// Each closure needs its own Arc<redis::Client> and UserSession clone
let cache = state.cache.clone();
let cache2 = state.cache.clone();
let cache3 = state.cache.clone();
let cache4 = state.cache.clone();
let user_session = user_session.clone();
let user_session2 = user_session.clone();
let user_session3 = user_session.clone();
let user_session4 = user_session.clone();
// ADD_SUGGESTION_TOOL "tool_name" as "button text"
// Note: compiler converts AS -> as (lowercase keywords), so we use lowercase here
engine
.register_custom_syntax(
["ADD_SUGGESTION_TOOL", "$expr$", "as", "$expr$"],
@ -106,14 +108,14 @@ pub fn add_suggestion_keyword(
let text_value = context.eval_expression_tree(&inputs[0])?.to_string();
let button_text = context.eval_expression_tree(&inputs[1])?.to_string();
add_text_suggestion(cache3.as_ref(), &user_session3, &text_value, &button_text)?;
add_text_suggestion(cache2.as_ref(), &user_session2, &text_value, &button_text)?;
Ok(Dynamic::UNIT)
},
)
.expect("valid syntax registration");
// ADD_SUGGESTION "context_name" as "button text"
// ADD_SUGGESTION "context_name" as "button text" (register BEFORE simple form so simple form has higher priority)
engine
.register_custom_syntax(
["ADD_SUGGESTION", "$expr$", "as", "$expr$"],
@ -123,9 +125,30 @@ pub fn add_suggestion_keyword(
let button_text = context.eval_expression_tree(&inputs[1])?.to_string();
add_context_suggestion(
cache3.as_ref(),
&user_session3,
&context_name,
&button_text,
)?;
Ok(Dynamic::UNIT)
},
)
.expect("valid syntax registration");
// ADD_SUGGESTION "button text" (simple form - sends message on click)
// Registered LAST so it has HIGHEST priority — Rhai tries this first, falls back to 2-arg form
engine
.register_custom_syntax(
["ADD_SUGGESTION", "$expr$"],
true,
move |context, inputs| {
let button_text = context.eval_expression_tree(&inputs[0])?.to_string();
add_text_suggestion(
cache4.as_ref(),
&user_session4,
&context_name,
&button_text,
&button_text,
)?;

View file

@ -1260,27 +1260,29 @@ impl ScriptService {
}
/// Convert BASIC keywords to lowercase without touching variables
/// Uses the centralized keyword list from get_all_keywords()
pub fn convert_keywords_to_lowercase(script: &str) -> String {
use crate::basic::keywords::get_all_keywords;
let keywords = get_all_keywords();
/// Only lowercases Rhai built-in keywords (if, while, for, etc.)
/// Custom syntax keywords (TALK, HEAR, ADD_SUGGESTION, etc.) must remain uppercase
pub fn convert_keywords_to_lowercase(script: &str) -> String {
let rhai_builtins = [
"IF", "ELSE", "WHILE", "FOR", "IN", "LOOP", "RETURN", "LET",
"CONST", "IMPORT", "EXPORT", "FN", "PRIVATE", "SWITCH", "MATCH",
"TRUE", "FALSE", "BREAK", "CONTINUE", "DO", "TRY", "CATCH", "THROW",
];
let mut result = String::new();
for line in script.lines() {
let mut processed_line = line.to_string();
for keyword in &keywords {
// Use word boundaries to avoid replacing parts of variable names
let pattern = format!(r"\b{}\b", regex::escape(keyword));
if let Ok(re) = regex::Regex::new(&pattern) {
processed_line = re.replace_all(&processed_line, keyword.to_lowercase()).to_string();
}
let mut result = String::new();
for line in script.lines() {
let mut processed_line = line.to_string();
for keyword in &rhai_builtins {
let pattern = format!(r"\b{}\b", regex::escape(keyword));
if let Ok(re) = regex::Regex::new(&pattern) {
processed_line = re.replace_all(&processed_line, keyword.to_lowercase()).to_string();
}
result.push_str(&processed_line);
result.push('\n');
}
result
result.push_str(&processed_line);
result.push('\n');
}
result
}
/// Convert ALL multi-word keywords to underscore versions (function calls)

View file

@ -828,28 +828,32 @@ impl BotOrchestrator {
// #[cfg(feature = "drive")]
// set_llm_streaming(true);
let stream_tx_clone = stream_tx.clone();
let stream_tx_clone = stream_tx.clone();
// Create cancellation channel for this streaming session
let (cancel_tx, mut cancel_rx) = broadcast::channel::<()>(1);
let session_id_str = session.id.to_string();
// Create cancellation channel for this streaming session
let (cancel_tx, mut cancel_rx) = broadcast::channel::<()>(1);
let session_id_str = session.id.to_string();
// Register this streaming session for potential cancellation
{
let mut active_streams = self.state.active_streams.lock().await;
active_streams.insert(session_id_str.clone(), cancel_tx);
}
// Wrap the LLM task in a JoinHandle so we can abort it
let mut cancel_rx_for_abort = cancel_rx.resubscribe();
let llm_task = tokio::spawn(async move {
if let Err(e) = llm
.generate_stream("", &messages_clone, stream_tx_clone, &model_clone, &key_clone, tools_for_llm.as_ref())
.await
// Register this streaming session for potential cancellation
{
error!("LLM streaming error: {}", e);
let mut active_streams = self.state.active_streams.lock().await;
active_streams.insert(session_id_str.clone(), cancel_tx);
}
});
// Wrap the LLM task in a JoinHandle so we can abort it
let mut cancel_rx_for_abort = cancel_rx.resubscribe();
let llm_task = tokio::spawn(async move {
if let Err(e) = llm
.generate_stream("", &messages_clone, stream_tx_clone, &model_clone, &key_clone, tools_for_llm.as_ref())
.await
{
error!("LLM streaming error: {}", e);
}
});
// Drop the original stream_tx so stream_rx.recv() loop ends
// when the LLM task finishes and drops its clone.
drop(stream_tx);
// Wait for cancellation to abort LLM task
tokio::spawn(async move {

View file

@ -72,6 +72,10 @@ impl DriveMonitor {
Ok(_) => log::info!("Added/updated drive_files for: {} ({})", full_key, file_type),
Err(e) => log::error!("Failed to upsert {}: {}", full_key, e),
}
if file_type == "bas" {
self.sync_bas_to_work(bot_name, &obj.key).await;
}
} else {
log::debug!("{} unchanged, skipping upsert", full_key);
}
@ -249,6 +253,46 @@ impl DriveMonitor {
let _ = self.file_repo.mark_indexed(self.bot_id, &full_key);
}
async fn sync_bas_to_work(&self, bot_name: &str, s3_key: &str) {
let s3 = match &self.state.drive {
Some(s3) => s3,
None => {
log::error!("S3 client not available for .bas sync");
return;
}
};
let data = match s3.get_object_direct(&self.bucket_name, s3_key).await {
Ok(d) => d,
Err(e) => {
log::error!("Failed to download .bas from {}/{}: {}", self.bucket_name, s3_key, e);
return;
}
};
let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name));
if let Err(e) = std::fs::create_dir_all(&work_dir) {
log::error!("Failed to create work dir {}: {}", work_dir.display(), e);
return;
}
let file_name = s3_key.split('/').next_back().unwrap_or(s3_key);
let work_path = work_dir.join(file_name);
match String::from_utf8(data) {
Ok(content) => {
if let Err(e) = std::fs::write(&work_path, &content) {
log::error!("Failed to write {} to work dir: {}", work_path.display(), e);
} else {
log::info!("Synced {} to work dir {}", s3_key, work_path.display());
}
}
Err(e) => {
log::error!("Failed to parse .bas as UTF-8: {}", e);
}
}
}
#[cfg(any(feature = "research", feature = "llm"))]
fn delete_kb_file_vectors(&self, bot_name: &str, _full_key: &str, s3_key: &str) {
let parsed = match parse_kb_path(s3_key) {