Fix KB indexing: bot-specific embedding config, PROMPT.md sync, single-file streaming
All checks were successful
BotServer CI/CD / build (push) Successful in 4m1s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-11 13:27:48 -03:00
parent 12988b637d
commit a131120638
3 changed files with 130 additions and 11 deletions

View file

@ -82,6 +82,7 @@ pub struct KbIndexer {
embedding_generator: KbEmbeddingGenerator,
qdrant_config: QdrantConfig,
http_client: reqwest::Client,
db_pool: Option<DbPool>,
}
impl std::fmt::Debug for KbIndexer {
@ -91,6 +92,7 @@ impl std::fmt::Debug for KbIndexer {
.field("embedding_generator", &self.embedding_generator)
.field("qdrant_config", &self.qdrant_config)
.field("http_client", &"reqwest::Client")
.field("db_pool", &"DbPool")
.finish()
}
}
@ -107,9 +109,33 @@ impl KbIndexer {
embedding_generator,
qdrant_config,
http_client,
db_pool: None,
}
}
pub fn new_with_pool(embedding_config: EmbeddingConfig, qdrant_config: QdrantConfig, db_pool: DbPool) -> Self {
let document_processor = DocumentProcessor::default();
let embedding_generator = KbEmbeddingGenerator::new(embedding_config);
let http_client = create_tls_client(Some(qdrant_config.timeout_secs));
Self {
document_processor,
embedding_generator,
qdrant_config,
http_client,
db_pool: Some(db_pool),
}
}
pub fn get_db_pool(&self) -> Option<&DbPool> {
self.db_pool.as_ref()
}
pub fn get_pool(&self) -> DbPool {
self.db_pool.clone().unwrap_or_else(|| panic!("DbPool not available"))
}
pub async fn check_qdrant_health(&self) -> Result<bool> {
let health_url = format!("{}/healthz", self.qdrant_config.url);
@ -667,6 +693,53 @@ impl KbIndexer {
embedding.vector
};
self.execute_search(collection_name, search_vector, limit).await
}
pub async fn search_with_config(
&self,
collection_name: &str,
query: &str,
limit: usize,
embedding_config: &EmbeddingConfig,
) -> Result<Vec<SearchResult>> {
let collection_dimension = self.get_collection_vector_dimension(collection_name).await?;
let embedding_generator = KbEmbeddingGenerator::new(embedding_config.clone());
let embedding = embedding_generator.generate_single_embedding(query).await?;
let search_vector = if let Some(target_dim) = collection_dimension {
if embedding.vector.len() > target_dim {
debug!(
"Truncating embedding from {} to {} dimensions for collection '{}'",
embedding.vector.len(), target_dim, collection_name
);
embedding.vector[..target_dim].to_vec()
} else if embedding.vector.len() < target_dim {
warn!(
"Embedding dimension ({}) is smaller than collection dimension ({}). \
Search may return poor results for collection '{}'.",
embedding.vector.len(), target_dim, collection_name
);
let mut padded = embedding.vector.clone();
padded.resize(target_dim, 0.0);
padded
} else {
embedding.vector
}
} else {
embedding.vector
};
self.execute_search(collection_name, search_vector, limit).await
}
async fn execute_search(
&self,
collection_name: &str,
search_vector: Vec<f32>,
limit: usize,
) -> Result<Vec<SearchResult>> {
let search_request = SearchRequest {
vector: search_vector,
limit,
@ -703,21 +776,20 @@ impl KbIndexer {
{
let content = payload
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.and_then(|c| c.as_str())
.unwrap_or_default()
.to_string();
let document_path = payload
.get("document_path")
.and_then(|v| v.as_str())
.unwrap_or("")
.and_then(|p| p.as_str())
.unwrap_or_default()
.to_string();
results.push(SearchResult {
content,
document_path,
score: score as f32,
metadata: payload.clone(),
metadata: serde_json::Map::new(),
});
}
}

View file

@ -58,9 +58,9 @@ impl KnowledgeBaseManager {
let embedding_config = EmbeddingConfig::from_bot_config(&pool, &bot_id);
info!("KB Manager using embedding config from bot {}: url={}, model={}",
bot_id, embedding_config.embedding_url, embedding_config.embedding_model);
let qdrant_config = QdrantConfig::from_config(pool, &bot_id);
let qdrant_config = QdrantConfig::from_config(pool.clone(), &bot_id);
let indexer = Arc::new(KbIndexer::new(embedding_config.clone(), qdrant_config));
let indexer = Arc::new(KbIndexer::new_with_pool(embedding_config.clone(), qdrant_config, pool));
let processor = Arc::new(DocumentProcessor::default());
let monitor = Arc::new(RwLock::new(KbFolderMonitor::new(
work_root,
@ -140,7 +140,15 @@ impl KnowledgeBaseManager {
) -> Result<Vec<SearchResult>> {
let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
self.indexer.search(&collection_name, query, limit).await
// Use from_bot_config with state connection if available
if let Some(pool) = self.indexer.get_db_pool() {
let embedding_config = EmbeddingConfig::from_bot_config(pool, &bot_id);
self.indexer.search_with_config(&collection_name, query, limit, &embedding_config).await
} else {
// Fallback to default config
self.indexer.search(&collection_name, query, limit).await
}
}
pub async fn search_collection(

View file

@ -588,9 +588,48 @@ impl DriveMonitor {
|| path_lower.ends_with("/config.csv")
|| path_lower.contains(".gbot/config.csv");
debug!("check_gbot: Checking path: {} (is_config_csv: {})", path, is_config_csv);
let is_prompt_file = path_lower.ends_with("prompt.md")
|| path_lower.ends_with("prompt.txt")
|| path_lower.ends_with("PROMPT.MD")
|| path_lower.ends_with("PROMPT.TXT");
if !is_config_csv {
debug!("check_gbot: Checking path: {} (is_config_csv: {}, is_prompt: {})", path, is_config_csv, is_prompt_file);
if !is_config_csv && !is_prompt_file {
continue;
}
if is_prompt_file {
// Download prompt file to work directory
match client.get_object().bucket(&self.bucket_name).key(&path).send().await {
Ok(response) => {
let bytes = response.body.collect().await?.into_bytes();
let content = String::from_utf8(bytes.to_vec())
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
let bot_name = self.bucket_name.strip_suffix(".gbai").unwrap_or(&self.bucket_name);
let gbot_dir = self.work_root.join(format!("{}.gbai/{}.gbot", bot_name, bot_name));
let path_buf = PathBuf::from(&path);
let file_name = path_buf.file_name()
.and_then(|n| n.to_str()).unwrap_or("PROMPT.md");
if let Err(e) = tokio::task::spawn_blocking({
let gbot_dir_str = gbot_dir.to_string_lossy().to_string();
let file_name_owned = file_name.to_string();
let content_owned = content.clone();
move || {
std::fs::create_dir_all(&gbot_dir_str)?;
std::fs::write(format!("{}/{}", gbot_dir_str, file_name_owned), &content_owned)?;
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}
}).await {
log::error!("Failed to save prompt file: {}", e);
} else {
log::info!("Downloaded prompt file {} to work directory", path);
}
}
Err(e) => {
log::error!("Failed to download prompt file {}: {}", path, e);
}
}
continue;
}