Fix KB indexing: single file streaming, dedup tracking, .ast cache
All checks were successful
BotServer CI/CD / build (push) Successful in 12m31s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-11 13:10:09 -03:00
parent 821dd1d7ab
commit 12988b637d
9 changed files with 360 additions and 203 deletions

View file

@ -25,7 +25,6 @@ external_sync = ["automation", "drive", "cache"]
scripting = ["dep:rhai"]
automation = ["scripting", "dep:cron"]
drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract"]
local-files = ["dep:notify"]
cache = ["dep:redis"]
directory = ["rbac"]
rbac = []
@ -33,7 +32,7 @@ crawler = ["drive", "cache"]
# ===== APPS (Each includes what it needs from core) =====
# Communication
chat = ["automation", "local-files", "cache"]
chat = ["automation", "cache"]
people = ["automation", "drive", "cache"]
mail = ["automation", "drive", "cache", "dep:lettre", "dep:mailparse", "dep:imap"]
meet = ["automation", "drive", "cache"]
@ -84,7 +83,7 @@ telegram = ["automation", "drive", "cache"]
instagram = ["automation", "drive", "cache"]
msteams = ["automation", "drive", "cache"]
# Core Tech
llm = ["automation", "local-files", "cache"]
llm = ["automation", "cache"]
vectordb = ["automation", "drive", "cache", "dep:qdrant-client"]
nvidia = ["automation", "drive", "cache"]
compliance = ["automation", "drive", "cache", "dep:csv"]
@ -97,7 +96,7 @@ console = ["automation", "drive", "cache", "dep:crossterm", "dep:ratatui"]
# ===== BUNDLES (Optional - for convenience) =====
minimal = ["chat"]
minimal-chat = ["chat", "automation", "local-files", "cache"] # No security at all
minimal-chat = ["chat", "automation", "cache"] # No security at all
lightweight = ["chat", "tasks", "people"]
full = ["chat", "people", "mail", "tasks", "calendar", "drive", "docs", "llm", "cache", "compliance"]
embed-ui = ["dep:rust-embed"]
@ -230,9 +229,6 @@ rss = { workspace = true }
scraper = { workspace = true }
walkdir = { workspace = true }
# File system monitoring (for local .gbai monitoring)
notify = { workspace = true, optional = true }
# Embedded static files
rust-embed = { workspace = true, optional = true }

View file

@ -625,42 +625,48 @@ impl BotOrchestrator {
trace!("Executing start.bas for session {} at: {}", actual_session_id, start_script_path);
if let Ok(metadata) = tokio::fs::metadata(&start_script_path).await {
if metadata.is_file() {
if let Ok(start_script) = tokio::fs::read_to_string(&start_script_path).await {
let state_clone = self.state.clone();
let actual_session_id_for_task = session.id;
let bot_id_clone = session.bot_id;
// Use pre-compiled .ast if available (avoids recompilation)
let ast_path = start_script_path.replace(".bas", ".ast");
let script_content = if std::path::Path::new(&ast_path).exists() {
tokio::fs::read_to_string(&ast_path).await.unwrap_or_default()
} else {
tokio::fs::read_to_string(&start_script_path).await.unwrap_or_default()
};
// Execute start.bas synchronously (blocking)
let result = tokio::task::spawn_blocking(move || {
let session_result = {
let mut sm = state_clone.session_manager.blocking_lock();
sm.get_session_by_id(actual_session_id_for_task)
};
if !script_content.is_empty() {
let state_clone = self.state.clone();
let actual_session_id_for_task = session.id;
let bot_id_clone = session.bot_id;
let sess = match session_result {
Ok(Some(s)) => s,
Ok(None) => {
return Err(format!("Session {} not found during start.bas execution", actual_session_id_for_task));
}
Err(e) => return Err(format!("Failed to get session: {}", e)),
};
// Execute start.bas synchronously (blocking)
let result = tokio::task::spawn_blocking(move || {
let session_result = {
let mut sm = state_clone.session_manager.blocking_lock();
sm.get_session_by_id(actual_session_id_for_task)
};
let mut script_service = crate::basic::ScriptService::new(
state_clone.clone(),
sess
);
script_service.load_bot_config_params(&state_clone, bot_id_clone);
let sess = match session_result {
Ok(Some(s)) => s,
Ok(None) => {
return Err(format!("Session {} not found during start.bas execution", actual_session_id_for_task));
}
Err(e) => return Err(format!("Failed to get session: {}", e)),
};
match script_service.compile(&start_script) {
Ok(ast) => match script_service.run(&ast) {
Ok(_) => Ok(()),
Err(e) => Err(format!("Script execution error: {}", e)),
},
Err(e) => Err(format!("Script compilation error: {}", e)),
}
}).await;
let mut script_service = crate::basic::ScriptService::new(
state_clone.clone(),
sess
);
script_service.load_bot_config_params(&state_clone, bot_id_clone);
match script_service.compile(&script_content) {
Ok(ast) => match script_service.run(&ast) {
Ok(_) => Ok(()),
Err(e) => Err(format!("Script execution error: {}", e)),
},
Err(e) => Err(format!("Script compilation error: {}", e)),
}
}).await;
match result {
Ok(Ok(())) => {
@ -682,13 +688,11 @@ impl BotOrchestrator {
Ok(Err(e)) => {
error!("start.bas error for session {}: {}", actual_session_id, e);
}
Err(e) => {
error!("start.bas task error for session {}: {}", actual_session_id, e);
}
}
}
Err(e) => {
error!("start.bas task error for session {}: {}", actual_session_id, e);
}
}
}
} // End of if should_execute_start_bas
// If message content is empty, we stop here after potentially running start.bas.
@ -1404,64 +1408,82 @@ async fn handle_websocket(
info!("Looking for start.bas at: {}", start_script_path);
if let Ok(metadata) = tokio::fs::metadata(&start_script_path).await {
if metadata.is_file() {
info!("Found start.bas file, reading contents...");
if let Ok(start_script) = tokio::fs::read_to_string(&start_script_path).await {
info!(
"Executing start.bas for bot {} on session {}",
bot_name, session_id
);
// Check for pre-compiled .ast file first (avoids recompilation overhead)
let ast_path = start_script_path.replace(".bas", ".ast");
let (script_content, _using_ast) = if tokio::fs::metadata(&ast_path).await.is_ok() {
if let Ok(content) = tokio::fs::read_to_string(&ast_path).await {
info!("Using pre-compiled start.ast for {}", bot_name);
(content, true)
} else {
(String::new(), false)
}
} else if tokio::fs::metadata(&start_script_path).await.is_ok() {
if let Ok(content) = tokio::fs::read_to_string(&start_script_path).await {
info!("Compiling start.bas for {}", bot_name);
(content, false)
} else {
(String::new(), false)
}
} else {
(String::new(), false)
};
let state_for_start = state.clone();
let tx_for_start = tx.clone();
let bot_id_str = bot_id.to_string();
let session_id_str = session_id.to_string();
let mut send_ready_rx = send_ready_rx;
if !script_content.is_empty() {
info!(
"Executing start.bas for bot {} on session {}",
bot_name, session_id
);
tokio::spawn(async move {
let _ = send_ready_rx.recv().await;
let session_result = {
let mut sm = state_for_start.session_manager.lock().await;
let by_id = sm.get_session_by_id(session_id);
match by_id {
Ok(Some(s)) => Ok(Some(s)),
_ => sm.get_or_create_user_session(user_id, bot_id, "Chat Session"),
let state_for_start = state.clone();
let tx_for_start = tx.clone();
let bot_id_str = bot_id.to_string();
let session_id_str = session_id.to_string();
let mut send_ready_rx = send_ready_rx;
let script_content_owned = script_content.clone();
tokio::spawn(async move {
let _ = send_ready_rx.recv().await;
let session_result = {
let mut sm = state_for_start.session_manager.lock().await;
let by_id = sm.get_session_by_id(session_id);
match by_id {
Ok(Some(s)) => Ok(Some(s)),
_ => sm.get_or_create_user_session(user_id, bot_id, "Chat Session"),
}
};
if let Ok(Some(mut session)) = session_result {
info!("start.bas: Found session {} for websocket session {}", session.id, session_id);
// Store WebSocket session_id in context so TALK can route messages correctly
if let serde_json::Value::Object(ref mut map) = session.context_data {
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
} else {
let mut map = serde_json::Map::new();
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
session.context_data = serde_json::Value::Object(map);
}
// Clone state_for_start for use in Redis SET after execution
let state_for_redis = state_for_start.clone();
let result = tokio::task::spawn_blocking(move || {
info!("start.bas: Creating ScriptService with session.id={}", session.id);
let mut script_service = crate::basic::ScriptService::new(
state_for_start.clone(),
session.clone()
);
script_service.load_bot_config_params(&state_for_start, bot_id);
match script_service.compile(&script_content_owned) {
Ok(ast) => match script_service.run(&ast) {
Ok(_) => Ok(()),
Err(e) => Err(format!("Script execution error: {}", e)),
},
Err(e) => Err(format!("Script compilation error: {}", e)),
}
};
if let Ok(Some(mut session)) = session_result {
info!("start.bas: Found session {} for websocket session {}", session.id, session_id);
// Store WebSocket session_id in context so TALK can route messages correctly
if let serde_json::Value::Object(ref mut map) = session.context_data {
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
} else {
let mut map = serde_json::Map::new();
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
session.context_data = serde_json::Value::Object(map);
}
// Clone state_for_start for use in Redis SET after execution
let state_for_redis = state_for_start.clone();
let result = tokio::task::spawn_blocking(move || {
info!("start.bas: Creating ScriptService with session.id={}", session.id);
let mut script_service = crate::basic::ScriptService::new(
state_for_start.clone(),
session.clone()
);
script_service.load_bot_config_params(&state_for_start, bot_id);
match script_service.compile(&start_script) {
Ok(ast) => match script_service.run(&ast) {
Ok(_) => Ok(()),
Err(e) => Err(format!("Script execution error: {}", e)),
},
Err(e) => Err(format!("Script compilation error: {}", e)),
}
}).await;
}).await;
match result {
Ok(Ok(())) => {
@ -1509,19 +1531,17 @@ async fn handle_websocket(
error!("start.bas error for bot {}: {}", bot_name, e);
}
Err(e) => {
error!("start.bas task error for bot {}: {}", bot_name, e);
}
}
}
});
}
}
error!("start.bas task error for bot {}: {}", bot_name, e);
}
}
} // End of if should_execute_start_bas
}
});
}
}
} // End of if should_execute_start_bas
}
let mut send_task = tokio::spawn(async move {
let mut send_task = tokio::spawn(async move {
while let Some(response) = rx.recv().await {
if let Ok(json_str) = serde_json::to_string(&response) {
if sender.send(Message::Text(json_str)).await.is_err() {

View file

@ -2,7 +2,7 @@ pub mod model_routing_config;
pub mod sse_config;
pub mod user_memory_config;
#[cfg(any(feature = "drive", feature = "local-files"))]
#[cfg(feature = "drive")]
pub mod watcher;
pub use model_routing_config::{ModelRoutingConfig, RoutingStrategy, TaskType};

View file

@ -505,6 +505,113 @@ impl KbIndexer {
Ok(())
}
pub async fn index_single_file(
&self,
bot_id: Uuid,
bot_name: &str,
kb_name: &str,
file_path: &Path,
) -> Result<IndexingResult> {
if !is_embedding_server_ready() {
return Err(anyhow::anyhow!(
"Embedding server not available. Cannot index file."
));
}
if !self.check_qdrant_health().await.unwrap_or(false) {
return Err(anyhow::anyhow!(
"Qdrant vector database is not available."
));
}
let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
self.ensure_collection_exists(&collection_name).await?;
info!(
"Indexing single file: {} into collection {}",
file_path.display(),
collection_name
);
let chunks = self.document_processor.process_document(file_path).await?;
if chunks.is_empty() {
warn!("No chunks extracted from file: {}", file_path.display());
return Ok(IndexingResult {
collection_name,
documents_processed: 0,
chunks_indexed: 0,
});
}
let doc_path = file_path.to_string_lossy().to_string();
let embeddings = self
.embedding_generator
.generate_embeddings(&chunks)
.await?;
let points = Self::create_qdrant_points(&doc_path, embeddings)?;
self.upsert_points(&collection_name, points).await?;
self.update_collection_metadata(&collection_name, bot_name, kb_name, chunks.len())?;
info!(
"Indexed {} chunks from {} into collection {}",
chunks.len(),
file_path.display(),
collection_name
);
Ok(IndexingResult {
collection_name,
documents_processed: 1,
chunks_indexed: chunks.len(),
})
}
pub async fn delete_file_points(
&self,
collection_name: &str,
document_path: &str,
) -> Result<()> {
let filter = serde_json::json!({
"must": [
{
"key": "document_path",
"match": {
"value": document_path
}
}
]
});
let delete_url = format!(
"{}/collections/{}/points/delete?wait=true",
self.qdrant_config.url, collection_name
);
let response = self
.http_client
.post(&delete_url)
.json(&serde_json::json!({ "filter": filter }))
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(anyhow::anyhow!("Failed to delete points: {}", error_text));
}
info!(
"Deleted points for document {} from collection {}",
document_path, collection_name
);
Ok(())
}
fn update_collection_metadata(
&self,
collection_name: &str,

View file

@ -101,6 +101,35 @@ impl KnowledgeBaseManager {
Ok(())
}
pub async fn index_single_file(
&self,
bot_id: Uuid,
bot_name: &str,
kb_name: &str,
file_path: &Path,
) -> Result<kb_indexer::IndexingResult> {
info!(
"Indexing single file: {} into KB {} for bot {}",
file_path.display(),
kb_name,
bot_name
);
let result = self
.indexer
.index_single_file(bot_id, bot_name, kb_name, file_path)
.await?;
info!(
"Successfully indexed {} chunks from {} into collection {}",
result.chunks_indexed,
file_path.display(),
result.collection_name
);
Ok(result)
}
pub async fn search(
&self,
bot_id: Uuid,

View file

@ -24,14 +24,16 @@ use tokio::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::fs as tokio_fs;
#[cfg(any(feature = "research", feature = "llm"))]
const KB_INDEXING_TIMEOUT_SECS: u64 = 60;
const MAX_BACKOFF_SECS: u64 = 300;
const INITIAL_BACKOFF_SECS: u64 = 30;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileState {
pub etag: String,
#[serde(default)]
pub indexed: bool,
}
#[derive(Debug, Clone)]
pub struct DriveMonitor {
state: Arc<AppState>,
@ -44,7 +46,7 @@ pub struct DriveMonitor {
is_processing: Arc<AtomicBool>,
consecutive_failures: Arc<AtomicU32>,
#[cfg(any(feature = "research", feature = "llm"))]
kb_indexing_in_progress: Arc<TokioRwLock<HashSet<String>>>,
files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
}
impl DriveMonitor {
fn normalize_config_value(value: &str) -> String {
@ -72,7 +74,7 @@ impl DriveMonitor {
is_processing: Arc::new(AtomicBool::new(false)),
consecutive_failures: Arc::new(AtomicU32::new(0)),
#[cfg(any(feature = "research", feature = "llm"))]
kb_indexing_in_progress: Arc::new(TokioRwLock::new(HashSet::new())),
files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())),
}
}
@ -499,9 +501,10 @@ impl DriveMonitor {
if path.ends_with('/') || !path.to_ascii_lowercase().ends_with(".bas") {
continue;
}
let file_state = FileState {
etag: obj.e_tag().unwrap_or_default().to_string(),
};
let file_state = FileState {
etag: obj.e_tag().unwrap_or_default().to_string(),
indexed: false,
};
current_files.insert(path, file_state);
}
if !list_objects.is_truncated.unwrap_or(false) {
@ -1127,9 +1130,10 @@ impl DriveMonitor {
continue;
}
let file_state = FileState {
etag: obj.e_tag().unwrap_or_default().to_string(),
};
let file_state = FileState {
etag: obj.e_tag().unwrap_or_default().to_string(),
indexed: false,
};
current_files.insert(path.clone(), file_state);
}
@ -1206,72 +1210,71 @@ impl DriveMonitor {
// Create a unique key for this KB folder to track indexing state
let kb_key = format!("{}_{}", bot_name, kb_name);
// Check if this KB folder is already being indexed
{
let indexing_set = self.kb_indexing_in_progress.read().await;
if indexing_set.contains(&kb_key) {
debug!("[DRIVE_MONITOR] KB folder {} already being indexed, skipping duplicate task", kb_key);
continue;
}
}
// Check if this KB folder is already being indexed
{
let indexing_set = self.files_being_indexed.read().await;
if indexing_set.contains(&kb_key) {
debug!("[DRIVE_MONITOR] KB folder {} already being indexed, skipping duplicate task", kb_key);
continue;
}
}
// Mark this KB folder as being indexed
{
let mut indexing_set = self.kb_indexing_in_progress.write().await;
indexing_set.insert(kb_key.clone());
}
// Mark this KB folder as being indexed
{
let mut indexing_set = self.files_being_indexed.write().await;
indexing_set.insert(kb_key.clone());
}
let kb_manager = Arc::clone(&self.kb_manager);
let bot_id = self.bot_id;
let bot_name_owned = bot_name.to_string();
let kb_name_owned = kb_name.to_string();
let kb_folder_owned = kb_folder_path.clone();
let indexing_tracker = Arc::clone(&self.kb_indexing_in_progress);
let kb_key_owned = kb_key.clone();
let _files_being_indexed = Arc::clone(&self.files_being_indexed);
let file_key = Arc::clone(&self.files_being_indexed);
let kb_key_owned = kb_key.clone();
tokio::spawn(async move {
trace!(
"Triggering KB indexing for folder: {} (PDF text extraction enabled)",
kb_folder_owned.display()
);
tokio::spawn(async move {
trace!(
"Triggering KB indexing for folder: {} (PDF text extraction enabled)",
kb_folder_owned.display()
);
let result = tokio::time::timeout(
Duration::from_secs(KB_INDEXING_TIMEOUT_SECS),
kb_manager.handle_gbkb_change(bot_id, &bot_name_owned, &kb_folder_owned),
)
.await;
let result = tokio::time::timeout(
Duration::from_secs(60),
kb_manager.handle_gbkb_change(bot_id, &bot_name_owned, &kb_folder_owned),
)
.await;
// Always remove from tracking set when done, regardless of outcome
{
let mut indexing_set = indexing_tracker.write().await;
indexing_set.remove(&kb_key_owned);
}
// Always remove from tracking set when done, regardless of outcome
{
let mut indexing_set = file_key.write().await;
indexing_set.remove(&kb_key_owned);
}
match result {
Ok(Ok(_)) => {
debug!(
"Successfully processed KB change for {}/{}",
bot_name_owned, kb_name_owned
);
}
Ok(Err(e)) => {
log::error!(
"Failed to process .gbkb change for {}/{}: {}",
bot_name_owned,
kb_name_owned,
e
);
}
Err(_) => {
log::error!(
"KB indexing timed out after {}s for {}/{}",
KB_INDEXING_TIMEOUT_SECS,
bot_name_owned,
kb_name_owned
);
}
}
});
match result {
Ok(Ok(_)) => {
debug!(
"Successfully processed KB change for {}/{}",
bot_name_owned, kb_name_owned
);
}
Ok(Err(e)) => {
log::error!(
"Failed to process .gbkb change for {}/{}: {}",
bot_name_owned,
kb_name_owned,
e
);
}
Err(_) => {
log::error!(
"KB indexing timed out after 60s for {}/{}",
bot_name_owned, kb_name_owned
);
}
}
});
}
#[cfg(not(any(feature = "research", feature = "llm")))]

View file

@ -27,8 +27,9 @@ pub mod document_processing;
#[cfg(feature = "drive")]
pub mod drive_monitor;
#[cfg(feature = "local-files")]
pub mod local_file_monitor;
// Local file monitoring removed - Drive (MinIO) is the only source now
// #[cfg(feature = "local-files")]
// pub mod local_file_monitor;
#[cfg(feature = "drive")]
pub mod vectordb;

View file

@ -95,7 +95,7 @@ pub mod console;
#[cfg(feature = "directory")]
pub mod directory;
#[cfg(any(feature = "drive", feature = "local-files"))]
#[cfg(feature = "drive")]
pub mod drive;
#[cfg(feature = "mail")]

View file

@ -893,11 +893,12 @@ pub async fn start_background_services(
#[cfg(feature = "drive")]
start_drive_monitors(app_state.clone(), pool).await;
#[cfg(feature = "local-files")]
start_local_file_monitor(app_state.clone()).await;
// Local file monitoring removed - Drive (MinIO) is the only source now
// #[cfg(feature = "local-files")]
// start_local_file_monitor(app_state.clone()).await;
#[cfg(feature = "local-files")]
start_config_watcher(app_state.clone()).await;
// #[cfg(feature = "local-files")]
// start_config_watcher(app_state.clone()).await;
}
#[cfg(feature = "drive")]
@ -1098,25 +1099,25 @@ fn create_bot_from_drive(
}
// LocalFileMonitor and ConfigWatcher disabled - drive (MinIO) is the only source now
async fn start_local_file_monitor(app_state: Arc<AppState>) {
use crate::drive::local_file_monitor::LocalFileMonitor;
let monitor = LocalFileMonitor::new(app_state.clone());
if let Err(e) = monitor.start_monitoring().await {
error!("Failed to start LocalFileMonitor: {}", e);
} else {
trace!("LocalFileMonitor started - monitoring /opt/gbo/data for bot changes");
}
}
async fn start_config_watcher(app_state: Arc<AppState>) {
use crate::core::config::watcher::ConfigWatcher;
use crate::core::shared::utils::get_work_path;
use std::sync::Arc as StdArc;
let data_dir = std::path::PathBuf::from(get_work_path());
let watcher = ConfigWatcher::new(data_dir, app_state.clone());
let _handle = StdArc::new(watcher).spawn();
trace!("ConfigWatcher started - monitoring config.csv changes");
}
// // LocalFileMonitor and ConfigWatcher disabled - drive (MinIO) is the only source now
// async fn start_local_file_monitor(app_state: Arc<AppState>) {
// use crate::drive::local_file_monitor::LocalFileMonitor;
//
// let monitor = LocalFileMonitor::new(app_state.clone());
// if let Err(e) = monitor.start_monitoring().await {
// error!("Failed to start LocalFileMonitor: {}", e);
// } else {
// trace!("LocalFileMonitor started - monitoring /opt/gbo/data for bot changes");
// }
// }
//
// async fn start_config_watcher(app_state: Arc<AppState>) {
// use crate::core::config::watcher::ConfigWatcher;
// use crate::core::shared::utils::get_work_path;
// use std::sync::Arc as StdArc;
//
// let data_dir = std::path::PathBuf::from(get_work_path());
// let watcher = ConfigWatcher::new(data_dir, app_state.clone());
// let _handle = StdArc::new(watcher).spawn();
// trace!("ConfigWatcher started - monitoring config.csv changes");
// }