From 3a260a5703aa5475b2db37c801dbd9a8c22db0b3 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 24 Dec 2025 09:42:16 -0300 Subject: [PATCH] Fix more clippy warnings: unused imports, raw string hashes, conditional imports --- src/email/vectordb.rs | 1 + src/vector-db/hybrid_search.rs | 94 ++------------------------- src/vector-db/vectordb_indexer.rs | 104 +++++++++--------------------- 3 files changed, 37 insertions(+), 162 deletions(-) diff --git a/src/email/vectordb.rs b/src/email/vectordb.rs index 258b4b73..3f666330 100644 --- a/src/email/vectordb.rs +++ b/src/email/vectordb.rs @@ -3,6 +3,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Arc; +#[cfg(not(feature = "vectordb"))] use tokio::fs; use uuid::Uuid; diff --git a/src/vector-db/hybrid_search.rs b/src/vector-db/hybrid_search.rs index 8ad8dff9..786b0598 100644 --- a/src/vector-db/hybrid_search.rs +++ b/src/vector-db/hybrid_search.rs @@ -1,54 +1,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -use log::{debug, error, info, trace, warn}; +use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::sync::Arc; use uuid::Uuid; use crate::shared::state::AppState; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HybridSearchConfig { - pub dense_weight: f32, pub sparse_weight: f32, @@ -82,7 +40,6 @@ impl Default for HybridSearchConfig { } impl HybridSearchConfig { - pub fn from_bot_config(state: &AppState, bot_id: Uuid) -> Self { use diesel::prelude::*; @@ -136,7 +93,6 @@ impl HybridSearchConfig { } } - let total = config.dense_weight + config.sparse_weight; if total > 0.0 { config.dense_weight /= total; @@ -151,21 +107,17 @@ impl HybridSearchConfig { config } - pub fn use_sparse_search(&self) -> bool { self.bm25_enabled && self.sparse_weight > 0.0 } - pub fn use_dense_search(&self) -> bool { self.dense_weight > 0.0 } } - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SearchResult { - pub doc_id: String, pub content: String, @@ -179,7 +131,6 @@ pub struct SearchResult { pub search_method: SearchMethod, } - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum SearchMethod { Dense, @@ -188,8 +139,6 @@ pub enum SearchMethod { Reranked, } - - pub struct BM25Index { doc_freq: HashMap, doc_count: usize, @@ -341,7 +290,6 @@ impl Default for BM25Index { } } - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BM25Stats { pub doc_count: usize, @@ -350,7 +298,6 @@ pub struct BM25Stats { pub enabled: bool, } - #[derive(Debug, Clone)] struct DocumentEntry { pub content: String, @@ -358,9 +305,7 @@ struct DocumentEntry { pub metadata: HashMap, } - pub struct HybridSearchEngine { - bm25_index: BM25Index, documents: HashMap, @@ -391,7 +336,6 @@ impl HybridSearchEngine { } } - pub async fn index_document( &mut self, doc_id: &str, @@ -400,10 +344,8 @@ impl HybridSearchEngine { metadata: HashMap, embedding: Option>, ) -> Result<(), String> { - self.bm25_index.add_document(doc_id, content, source); - self.documents.insert( doc_id.to_string(), DocumentEntry { @@ -413,7 +355,6 @@ impl HybridSearchEngine { }, ); - if let Some(emb) = embedding { self.upsert_to_qdrant(doc_id, &emb).await?; } @@ -421,12 +362,10 @@ impl HybridSearchEngine { Ok(()) } - pub fn commit(&mut self) -> Result<(), String> { Ok(()) } - pub async fn remove_document(&mut self, doc_id: &str) -> Result<(), String> { self.bm25_index.remove_document(doc_id); self.documents.remove(doc_id); @@ -434,7 +373,6 @@ impl HybridSearchEngine { Ok(()) } - pub async fn search( &self, query: &str, @@ -442,7 +380,6 @@ impl HybridSearchEngine { ) -> Result, String> { let fetch_count = self.config.max_results * 3; - let sparse_results: Vec<(String, f32)> = if self.config.use_sparse_search() { self.bm25_index .search(query, fetch_count) @@ -453,7 +390,6 @@ impl HybridSearchEngine { Vec::new() }; - let dense_results = if self.config.use_dense_search() { if let Some(embedding) = query_embedding { self.search_qdrant(&embedding, fetch_count).await? @@ -464,7 +400,6 @@ impl HybridSearchEngine { Vec::new() }; - let (results, method) = if sparse_results.is_empty() && dense_results.is_empty() { (Vec::new(), SearchMethod::Hybrid) } else if sparse_results.is_empty() { @@ -478,7 +413,6 @@ impl HybridSearchEngine { ) }; - let mut search_results: Vec = results .into_iter() .filter_map(|(doc_id, score)| { @@ -495,7 +429,6 @@ impl HybridSearchEngine { .take(self.config.max_results) .collect(); - if self.config.reranker_enabled && !search_results.is_empty() { search_results = self.rerank(query, search_results).await?; } @@ -503,7 +436,6 @@ impl HybridSearchEngine { Ok(search_results) } - pub fn sparse_search(&self, query: &str) -> Vec { let results = self.bm25_index.search(query, self.config.max_results); @@ -522,7 +454,6 @@ impl HybridSearchEngine { .collect() } - pub async fn dense_search( &self, query_embedding: Vec, @@ -548,7 +479,6 @@ impl HybridSearchEngine { Ok(search_results) } - fn reciprocal_rank_fusion( &self, sparse: &[(String, f32)], @@ -557,23 +487,19 @@ impl HybridSearchEngine { let k = self.config.rrf_k as f32; let mut scores: HashMap = HashMap::new(); - for (rank, (doc_id, _)) in sparse.iter().enumerate() { let rrf_score = self.config.sparse_weight / (k + rank as f32 + 1.0); *scores.entry(doc_id.clone()).or_insert(0.0) += rrf_score; } - for (rank, (doc_id, _)) in dense.iter().enumerate() { let rrf_score = self.config.dense_weight / (k + rank as f32 + 1.0); *scores.entry(doc_id.clone()).or_insert(0.0) += rrf_score; } - let mut results: Vec<(String, f32)> = scores.into_iter().collect(); results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); - let max_score = results.first().map(|(_, s)| *s).unwrap_or(0.0); if max_score > 0.0 { for (_, score) in &mut results { @@ -584,19 +510,18 @@ impl HybridSearchEngine { results } - async fn rerank( &self, query: &str, results: Vec, ) -> Result, String> { - - let mut reranked = results; let query_lower = query.to_lowercase(); - let query_terms: std::collections::HashSet = - query_lower.split_whitespace().map(|s| s.to_string()).collect(); + let query_terms: std::collections::HashSet = query_lower + .split_whitespace() + .map(|s| s.to_string()) + .collect(); let query_terms_len = query_terms.len(); for result in &mut reranked { @@ -623,7 +548,6 @@ impl HybridSearchEngine { Ok(reranked) } - async fn search_qdrant( &self, embedding: &[f32], @@ -673,7 +597,6 @@ impl HybridSearchEngine { Ok(results) } - async fn upsert_to_qdrant(&self, doc_id: &str, embedding: &[f32]) -> Result<(), String> { let client = reqwest::Client::new(); @@ -702,7 +625,6 @@ impl HybridSearchEngine { Ok(()) } - async fn delete_from_qdrant(&self, doc_id: &str) -> Result<(), String> { let client = reqwest::Client::new(); @@ -731,7 +653,6 @@ impl HybridSearchEngine { Ok(()) } - pub fn stats(&self) -> HybridSearchStats { let bm25_stats = self.bm25_index.stats(); @@ -746,7 +667,6 @@ impl HybridSearchEngine { } } - #[derive(Debug, Clone)] pub struct HybridSearchStats { pub total_documents: usize, @@ -757,7 +677,6 @@ pub struct HybridSearchStats { pub config: HybridSearchConfig, } - pub struct QueryDecomposer { llm_endpoint: String, api_key: String, @@ -771,11 +690,9 @@ impl QueryDecomposer { } } - pub async fn decompose(&self, query: &str) -> Result, String> { let mut sub_queries = Vec::new(); - let conjunctions = ["and", "also", "as well as", "in addition to"]; let mut parts: Vec<&str> = vec![query]; @@ -821,7 +738,6 @@ impl QueryDecomposer { Ok(sub_queries) } - pub fn synthesize(&self, query: &str, sub_answers: &[String]) -> String { if sub_answers.len() == 1 { return sub_answers[0].clone(); diff --git a/src/vector-db/vectordb_indexer.rs b/src/vector-db/vectordb_indexer.rs index bdca9fd9..257d6de4 100644 --- a/src/vector-db/vectordb_indexer.rs +++ b/src/vector-db/vectordb_indexer.rs @@ -1,6 +1,5 @@ use anyhow::Result; use chrono::{DateTime, Utc}; -use diesel::RunQueryDsl; use log::{error, info, warn}; use std::collections::HashMap; use std::path::PathBuf; @@ -50,7 +49,6 @@ impl UserWorkspace { } } - #[derive(Debug, Clone, PartialEq)] pub enum IndexingStatus { Idle, @@ -59,7 +57,6 @@ pub enum IndexingStatus { Failed(String), } - #[derive(Debug, Clone)] pub struct IndexingStats { pub emails_indexed: u64, @@ -70,7 +67,6 @@ pub struct IndexingStats { pub errors: u64, } - struct UserIndexingJob { user_id: Uuid, bot_id: Uuid, @@ -83,7 +79,6 @@ struct UserIndexingJob { status: IndexingStatus, } - pub struct VectorDBIndexer { db_pool: DbPool, work_root: PathBuf, @@ -96,7 +91,6 @@ pub struct VectorDBIndexer { } impl VectorDBIndexer { - pub fn new( db_pool: DbPool, work_root: PathBuf, @@ -115,7 +109,6 @@ impl VectorDBIndexer { } } - pub async fn start(self: Arc) -> Result<()> { let mut running = self.running.write().await; if *running { @@ -135,17 +128,14 @@ impl VectorDBIndexer { Ok(()) } - pub async fn stop(&self) { let mut running = self.running.write().await; *running = false; info!("🛑 Stopping Vector DB Indexer"); } - async fn run_indexing_loop(self: Arc) { loop { - { let running = self.running.read().await; if !*running { @@ -155,7 +145,6 @@ impl VectorDBIndexer { info!(" Running vector DB indexing cycle..."); - match self.get_active_users().await { Ok(users) => { info!("Found {} active users to index", users.len()); @@ -173,14 +162,12 @@ impl VectorDBIndexer { info!(" Indexing cycle complete"); - sleep(Duration::from_secs(self.interval_seconds)).await; } info!("Vector DB Indexer stopped"); } - async fn get_active_users(&self) -> Result> { let conn = self.db_pool.clone(); @@ -190,7 +177,6 @@ impl VectorDBIndexer { let mut db_conn = conn.get()?; - let results: Vec<(Uuid, Uuid)> = user_sessions .select((user_id, bot_id)) .distinct() @@ -201,11 +187,9 @@ impl VectorDBIndexer { .await? } - async fn index_user_data(&self, user_id: Uuid, bot_id: Uuid) -> Result<()> { info!("Indexing user: {} (bot: {})", user_id, bot_id); - let mut jobs = self.jobs.write().await; let job = jobs.entry(user_id).or_insert_with(|| { let workspace = UserWorkspace::new(self.work_root.clone(), &bot_id, &user_id); @@ -235,7 +219,6 @@ impl VectorDBIndexer { job.status = IndexingStatus::Running; - if job.email_db.is_none() { let mut email_db = UserEmailVectorDB::new(user_id, bot_id, job.workspace.email_vectordb().into()); @@ -264,17 +247,14 @@ impl VectorDBIndexer { drop(jobs); - if let Err(e) = self.index_user_emails(user_id).await { error!("Failed to index emails for user {}: {}", user_id, e); } - if let Err(e) = self.index_user_files(user_id).await { error!("Failed to index files for user {}: {}", user_id, e); } - let mut jobs = self.jobs.write().await; if let Some(job) = jobs.get_mut(&user_id) { job.status = IndexingStatus::Idle; @@ -284,7 +264,6 @@ impl VectorDBIndexer { Ok(()) } - async fn index_user_emails(&self, user_id: Uuid) -> Result<()> { let jobs = self.jobs.read().await; let job = jobs @@ -299,7 +278,6 @@ impl VectorDBIndexer { } }; - let accounts = self.get_user_email_accounts(user_id).await?; info!( @@ -309,7 +287,6 @@ impl VectorDBIndexer { ); for account_id in accounts { - match self.get_unindexed_emails(user_id, &account_id).await { Ok(emails) => { if emails.is_empty() { @@ -322,7 +299,6 @@ impl VectorDBIndexer { account_id ); - for chunk in emails.chunks(self.batch_size) { for email in chunk { match self.embedding_generator.generate_embedding(&email).await { @@ -342,7 +318,6 @@ impl VectorDBIndexer { } } - sleep(Duration::from_millis(100)).await; } } @@ -358,7 +333,6 @@ impl VectorDBIndexer { Ok(()) } - async fn index_user_files(&self, user_id: Uuid) -> Result<()> { let jobs = self.jobs.read().await; let job = jobs @@ -373,7 +347,6 @@ impl VectorDBIndexer { } }; - match self.get_unindexed_files(user_id).await { Ok(files) => { if files.is_empty() { @@ -382,16 +355,13 @@ impl VectorDBIndexer { info!("Indexing {} files for user {}", files.len(), user_id); - for chunk in files.chunks(self.batch_size) { for file in chunk { - let mime_type = file.mime_type.as_ref().map(|s| s.as_str()).unwrap_or(""); if !FileContentExtractor::should_index(&mime_type, file.file_size) { continue; } - let text = format!( "File: {}\nType: {}\n\n{}", file.file_name, file.file_type, file.content_text @@ -415,7 +385,6 @@ impl VectorDBIndexer { } } - sleep(Duration::from_millis(100)).await; } } @@ -427,7 +396,6 @@ impl VectorDBIndexer { Ok(()) } - async fn get_user_email_accounts(&self, user_id: Uuid) -> Result> { let conn = self.db_pool.clone(); @@ -490,7 +458,7 @@ impl VectorDBIndexer { folder: String, } - let query = r#" + let query = r" SELECT e.id, e.message_id, e.subject, e.from_address, e.to_addresses, e.body_text, e.body_html, e.received_at, e.folder FROM emails e @@ -500,7 +468,7 @@ impl VectorDBIndexer { AND (eis.indexed_at IS NULL OR eis.needs_reindex = true) ORDER BY e.received_at DESC LIMIT 100 - "#; + "; let rows: Vec = diesel::sql_query(query) .bind::(user_id) @@ -510,20 +478,18 @@ impl VectorDBIndexer { let emails: Vec = rows .into_iter() - .map(|row| { - EmailDocument { - id: row.id.to_string(), - account_id: account_id.clone(), - from_email: row.from_address.clone(), - from_name: row.from_address, - to_email: row.to_addresses, - subject: row.subject, - body_text: row.body_text.unwrap_or_default(), - date: row.received_at, - folder: row.folder, - has_attachments: false, - thread_id: None, - } + .map(|row| EmailDocument { + id: row.id.to_string(), + account_id: account_id.clone(), + from_email: row.from_address.clone(), + from_name: row.from_address, + to_email: row.to_addresses, + subject: row.subject, + body_text: row.body_text.unwrap_or_default(), + date: row.received_at, + folder: row.folder, + has_attachments: false, + thread_id: None, }) .collect(); @@ -566,17 +532,16 @@ impl VectorDBIndexer { modified_at: DateTime, } - let query = r#" + let query = r" SELECT f.id, f.file_path, f.file_name, f.file_type, f.file_size, f.bucket, f.mime_type, f.created_at, f.modified_at - FROM files f + FROM user_files f LEFT JOIN file_index_status fis ON f.id = fis.file_id WHERE f.user_id = $1 AND (fis.indexed_at IS NULL OR fis.needs_reindex = true) - AND f.file_size < 10485760 ORDER BY f.modified_at DESC LIMIT 100 - "#; + "; let rows: Vec = diesel::sql_query(query) .bind::(user_id) @@ -585,22 +550,20 @@ impl VectorDBIndexer { let files: Vec = rows .into_iter() - .map(|row| { - FileDocument { - id: row.id.to_string(), - file_path: row.file_path, - file_name: row.file_name, - file_type: row.file_type, - file_size: row.file_size as u64, - bucket: row.bucket, - content_text: String::new(), - content_summary: None, - created_at: row.created_at, - modified_at: row.modified_at, - indexed_at: Utc::now(), - mime_type: row.mime_type, - tags: Vec::new(), - } + .map(|row| FileDocument { + id: row.id.to_string(), + file_path: row.file_path, + file_name: row.file_name, + file_type: row.file_type, + file_size: row.file_size as u64, + bucket: row.bucket, + content_text: String::new(), + content_summary: None, + created_at: row.created_at, + modified_at: row.modified_at, + indexed_at: Utc::now(), + mime_type: row.mime_type, + tags: Vec::new(), }) .collect(); @@ -611,13 +574,11 @@ impl VectorDBIndexer { Ok(results) } - pub async fn get_user_stats(&self, user_id: Uuid) -> Option { let jobs = self.jobs.read().await; jobs.get(&user_id).map(|job| job.stats.clone()) } - pub async fn get_overall_stats(&self) -> IndexingStats { let jobs = self.jobs.read().await; @@ -647,7 +608,6 @@ impl VectorDBIndexer { total_stats } - pub async fn pause_user_indexing(&self, user_id: Uuid) -> Result<()> { let mut jobs = self.jobs.write().await; if let Some(job) = jobs.get_mut(&user_id) { @@ -657,7 +617,6 @@ impl VectorDBIndexer { Ok(()) } - pub async fn resume_user_indexing(&self, user_id: Uuid) -> Result<()> { let mut jobs = self.jobs.write().await; if let Some(job) = jobs.get_mut(&user_id) { @@ -667,7 +626,6 @@ impl VectorDBIndexer { Ok(()) } - pub async fn trigger_user_indexing(&self, user_id: Uuid, bot_id: Uuid) -> Result<()> { info!(" Triggering immediate indexing for user {}", user_id); self.index_user_data(user_id, bot_id).await