From a1311206382bf36b0cdf3b296b78e88c507f9a8f Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sat, 11 Apr 2026 13:27:48 -0300 Subject: [PATCH] Fix KB indexing: bot-specific embedding config, PROMPT.md sync, single-file streaming --- src/core/kb/kb_indexer.rs | 84 +++++++++++++++++++++++++++++++--- src/core/kb/mod.rs | 14 ++++-- src/drive/drive_monitor/mod.rs | 43 ++++++++++++++++- 3 files changed, 130 insertions(+), 11 deletions(-) diff --git a/src/core/kb/kb_indexer.rs b/src/core/kb/kb_indexer.rs index 72349220..4372ac3c 100644 --- a/src/core/kb/kb_indexer.rs +++ b/src/core/kb/kb_indexer.rs @@ -82,6 +82,7 @@ pub struct KbIndexer { embedding_generator: KbEmbeddingGenerator, qdrant_config: QdrantConfig, http_client: reqwest::Client, + db_pool: Option, } impl std::fmt::Debug for KbIndexer { @@ -91,6 +92,7 @@ impl std::fmt::Debug for KbIndexer { .field("embedding_generator", &self.embedding_generator) .field("qdrant_config", &self.qdrant_config) .field("http_client", &"reqwest::Client") + .field("db_pool", &"DbPool") .finish() } } @@ -107,9 +109,33 @@ impl KbIndexer { embedding_generator, qdrant_config, http_client, + db_pool: None, } } + pub fn new_with_pool(embedding_config: EmbeddingConfig, qdrant_config: QdrantConfig, db_pool: DbPool) -> Self { + let document_processor = DocumentProcessor::default(); + let embedding_generator = KbEmbeddingGenerator::new(embedding_config); + + let http_client = create_tls_client(Some(qdrant_config.timeout_secs)); + + Self { + document_processor, + embedding_generator, + qdrant_config, + http_client, + db_pool: Some(db_pool), + } + } + + pub fn get_db_pool(&self) -> Option<&DbPool> { + self.db_pool.as_ref() + } + + pub fn get_pool(&self) -> DbPool { + self.db_pool.clone().unwrap_or_else(|| panic!("DbPool not available")) + } + pub async fn check_qdrant_health(&self) -> Result { let health_url = format!("{}/healthz", self.qdrant_config.url); @@ -667,6 +693,53 @@ impl KbIndexer { embedding.vector }; + self.execute_search(collection_name, search_vector, limit).await + } + + pub async fn search_with_config( + &self, + collection_name: &str, + query: &str, + limit: usize, + embedding_config: &EmbeddingConfig, + ) -> Result> { + let collection_dimension = self.get_collection_vector_dimension(collection_name).await?; + + let embedding_generator = KbEmbeddingGenerator::new(embedding_config.clone()); + let embedding = embedding_generator.generate_single_embedding(query).await?; + + let search_vector = if let Some(target_dim) = collection_dimension { + if embedding.vector.len() > target_dim { + debug!( + "Truncating embedding from {} to {} dimensions for collection '{}'", + embedding.vector.len(), target_dim, collection_name + ); + embedding.vector[..target_dim].to_vec() + } else if embedding.vector.len() < target_dim { + warn!( + "Embedding dimension ({}) is smaller than collection dimension ({}). \ + Search may return poor results for collection '{}'.", + embedding.vector.len(), target_dim, collection_name + ); + let mut padded = embedding.vector.clone(); + padded.resize(target_dim, 0.0); + padded + } else { + embedding.vector + } + } else { + embedding.vector + }; + + self.execute_search(collection_name, search_vector, limit).await + } + + async fn execute_search( + &self, + collection_name: &str, + search_vector: Vec, + limit: usize, + ) -> Result> { let search_request = SearchRequest { vector: search_vector, limit, @@ -703,21 +776,20 @@ impl KbIndexer { { let content = payload .get("content") - .and_then(|v| v.as_str()) - .unwrap_or("") + .and_then(|c| c.as_str()) + .unwrap_or_default() .to_string(); - let document_path = payload .get("document_path") - .and_then(|v| v.as_str()) - .unwrap_or("") + .and_then(|p| p.as_str()) + .unwrap_or_default() .to_string(); results.push(SearchResult { content, document_path, score: score as f32, - metadata: payload.clone(), + metadata: serde_json::Map::new(), }); } } diff --git a/src/core/kb/mod.rs b/src/core/kb/mod.rs index 7a9c1930..ddac2ad0 100644 --- a/src/core/kb/mod.rs +++ b/src/core/kb/mod.rs @@ -58,9 +58,9 @@ impl KnowledgeBaseManager { let embedding_config = EmbeddingConfig::from_bot_config(&pool, &bot_id); info!("KB Manager using embedding config from bot {}: url={}, model={}", bot_id, embedding_config.embedding_url, embedding_config.embedding_model); - let qdrant_config = QdrantConfig::from_config(pool, &bot_id); + let qdrant_config = QdrantConfig::from_config(pool.clone(), &bot_id); - let indexer = Arc::new(KbIndexer::new(embedding_config.clone(), qdrant_config)); + let indexer = Arc::new(KbIndexer::new_with_pool(embedding_config.clone(), qdrant_config, pool)); let processor = Arc::new(DocumentProcessor::default()); let monitor = Arc::new(RwLock::new(KbFolderMonitor::new( work_root, @@ -140,7 +140,15 @@ impl KnowledgeBaseManager { ) -> Result> { let bot_id_short = bot_id.to_string().chars().take(8).collect::(); let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); - self.indexer.search(&collection_name, query, limit).await + + // Use from_bot_config with state connection if available + if let Some(pool) = self.indexer.get_db_pool() { + let embedding_config = EmbeddingConfig::from_bot_config(pool, &bot_id); + self.indexer.search_with_config(&collection_name, query, limit, &embedding_config).await + } else { + // Fallback to default config + self.indexer.search(&collection_name, query, limit).await + } } pub async fn search_collection( diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index e870793b..0b70d47e 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -588,9 +588,48 @@ impl DriveMonitor { || path_lower.ends_with("/config.csv") || path_lower.contains(".gbot/config.csv"); - debug!("check_gbot: Checking path: {} (is_config_csv: {})", path, is_config_csv); + let is_prompt_file = path_lower.ends_with("prompt.md") + || path_lower.ends_with("prompt.txt") + || path_lower.ends_with("PROMPT.MD") + || path_lower.ends_with("PROMPT.TXT"); - if !is_config_csv { + debug!("check_gbot: Checking path: {} (is_config_csv: {}, is_prompt: {})", path, is_config_csv, is_prompt_file); + + if !is_config_csv && !is_prompt_file { + continue; + } + + if is_prompt_file { + // Download prompt file to work directory + match client.get_object().bucket(&self.bucket_name).key(&path).send().await { + Ok(response) => { + let bytes = response.body.collect().await?.into_bytes(); + let content = String::from_utf8(bytes.to_vec()) + .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; + let bot_name = self.bucket_name.strip_suffix(".gbai").unwrap_or(&self.bucket_name); + let gbot_dir = self.work_root.join(format!("{}.gbai/{}.gbot", bot_name, bot_name)); + let path_buf = PathBuf::from(&path); + let file_name = path_buf.file_name() + .and_then(|n| n.to_str()).unwrap_or("PROMPT.md"); + if let Err(e) = tokio::task::spawn_blocking({ + let gbot_dir_str = gbot_dir.to_string_lossy().to_string(); + let file_name_owned = file_name.to_string(); + let content_owned = content.clone(); + move || { + std::fs::create_dir_all(&gbot_dir_str)?; + std::fs::write(format!("{}/{}", gbot_dir_str, file_name_owned), &content_owned)?; + Ok::<(), Box>(()) + } + }).await { + log::error!("Failed to save prompt file: {}", e); + } else { + log::info!("Downloaded prompt file {} to work directory", path); + } + } + Err(e) => { + log::error!("Failed to download prompt file {}: {}", path, e); + } + } continue; }