From 73002b36cc3def17546085574cbafe0f42c7b04f Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 7 Apr 2026 13:33:50 -0300 Subject: [PATCH] Update botserver: various fixes and improvements --- Cargo.toml | 9 +- src/basic/keywords/app_server.rs | 2 + src/basic/keywords/detect.rs | 2 +- src/basic/keywords/get.rs | 18 +- src/basic/keywords/llm_keyword.rs | 5 +- src/basic/keywords/universal_messaging.rs | 4 + src/basic/mod.rs | 4 + src/core/bot/mod.rs | 4 + src/core/bot/multimedia.rs | 69 +++- src/core/config/mod.rs | 2 +- src/core/kb/document_processor.rs | 29 +- src/core/session/mod.rs | 27 +- src/core/shared/state.rs | 18 ++ src/core/shared/utils.rs | 3 +- src/drive/drive_handlers.rs | 318 ++++-------------- src/drive/drive_monitor/mod.rs | 3 + src/drive/mod.rs | 372 ++++------------------ src/drive/vectordb.rs | 47 ++- src/main.rs | 5 +- src/main_module/bootstrap.rs | 31 +- src/main_module/drive_utils.rs | 3 +- src/main_module/mod.rs | 1 + src/main_module/server.rs | 24 +- 23 files changed, 370 insertions(+), 630 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index caa8ca3d..996a65b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,8 @@ external_sync = ["automation", "drive", "cache"] # ===== CORE INFRASTRUCTURE (Can be used standalone) ===== scripting = ["dep:rhai"] automation = ["scripting", "dep:cron"] -drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract", "dep:notify"] +drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract"] +local-files = ["dep:notify"] cache = ["dep:redis"] directory = ["rbac"] rbac = [] @@ -32,7 +33,7 @@ crawler = ["drive", "cache"] # ===== APPS (Each includes what it needs from core) ===== # Communication -chat = ["automation", "drive", "cache"] +chat = ["automation", "local-files", "cache"] people = ["automation", "drive", "cache"] mail = ["automation", "drive", "cache", "dep:lettre", "dep:mailparse", "dep:imap"] meet = ["automation", "drive", "cache"] @@ -83,7 +84,7 @@ telegram = ["automation", "drive", "cache"] instagram = ["automation", "drive", "cache"] msteams = ["automation", "drive", "cache"] # Core Tech -llm = ["automation", "drive", "cache"] +llm = ["automation", "local-files", "cache"] vectordb = ["automation", "drive", "cache", "dep:qdrant-client"] nvidia = ["automation", "drive", "cache"] compliance = ["automation", "drive", "cache", "dep:csv"] @@ -96,7 +97,7 @@ console = ["automation", "drive", "cache", "dep:crossterm", "dep:ratatui"] # ===== BUNDLES (Optional - for convenience) ===== minimal = ["chat"] -minimal-chat = ["chat", "automation", "drive", "cache"] # No security at all +minimal-chat = ["chat", "automation", "local-files", "cache"] # No security at all lightweight = ["chat", "tasks", "people"] full = ["chat", "people", "mail", "tasks", "calendar", "drive", "docs", "llm", "cache", "compliance"] embed-ui = ["dep:rust-embed"] diff --git a/src/basic/keywords/app_server.rs b/src/basic/keywords/app_server.rs index 26508975..9d5f52bd 100644 --- a/src/basic/keywords/app_server.rs +++ b/src/basic/keywords/app_server.rs @@ -135,6 +135,7 @@ pub async fn serve_vendor_file( key ); + #[cfg(feature = "drive")] if let Some(ref drive) = state.drive { match drive.get_object().bucket(&bucket).key(&key).send().await { Ok(response) => match response.body.collect().await { @@ -306,6 +307,7 @@ async fn serve_app_file_internal(state: &AppState, app_name: &str, file_path: &s ); // Try to serve from MinIO + #[cfg(feature = "drive")] if let Some(ref drive) = state.drive { match drive.get_object().bucket(&bucket).key(&key).send().await { Ok(response) => { diff --git a/src/basic/keywords/detect.rs b/src/basic/keywords/detect.rs index 68b899d8..2e052b8b 100644 --- a/src/basic/keywords/detect.rs +++ b/src/basic/keywords/detect.rs @@ -91,7 +91,7 @@ async fn detect_anomalies_in_table( let column_list = columns.join(", "); let query = format!( - "SELECT row_to_json(t)::text as data FROM (SELECT {} FROM {} LIMIT 500) t", + "SELECT row_to_json(t)::text as data FROM (SELECT {} FROM {} LIMIT 1500) t", column_list, table_name ); diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index 6062f4e7..52267a90 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -1,7 +1,5 @@ -use crate::core::shared::models::schema::bots::dsl::*; use crate::core::shared::models::UserSession; use crate::core::shared::state::AppState; -use diesel::prelude::*; use log::{error, trace}; use reqwest::{self, Client}; use rhai::{Dynamic, Engine}; @@ -149,6 +147,8 @@ pub async fn execute_get(url: &str) -> Result text, Err(e) => { @@ -209,6 +210,10 @@ pub async fn get_from_bucket( return Err(format!("PDF extraction failed: {}", e).into()); } } + #[cfg(not(feature = "drive"))] + { + return Err("PDF extraction requires drive feature".into()); + } } else { match String::from_utf8(bytes) { Ok(text) => text, @@ -225,3 +230,12 @@ pub async fn get_from_bucket( ); Ok(content) } + +#[cfg(not(feature = "drive"))] +pub async fn get_from_bucket( + _state: &AppState, + _file_path: &str, + _bot_id: uuid::Uuid, +) -> Result> { + Err("S3 drive is not enabled. Configure MinIO to use this feature.".into()) +} diff --git a/src/basic/keywords/llm_keyword.rs b/src/basic/keywords/llm_keyword.rs index 5cdfb497..4204c4e9 100644 --- a/src/basic/keywords/llm_keyword.rs +++ b/src/basic/keywords/llm_keyword.rs @@ -59,7 +59,10 @@ pub fn llm_keyword(state: Arc, _user: UserSession, engine: &mut Engine .expect("valid syntax registration"); } fn build_llm_prompt(user_text: &str) -> String { - user_text.trim().to_string() + format!( + "Você é um assistente virtual em português brasileiro. Responda sempre em português do Brasil, de forma clara e amigável.\n\nPedido do usuário: {}", + user_text.trim() + ) } pub async fn execute_llm_generation( state: Arc, diff --git a/src/basic/keywords/universal_messaging.rs b/src/basic/keywords/universal_messaging.rs index e00dcbba..2673ff07 100644 --- a/src/basic/keywords/universal_messaging.rs +++ b/src/basic/keywords/universal_messaging.rs @@ -337,7 +337,10 @@ async fn send_file_with_caption_to_recipient( send_whatsapp_file(state, user, &recipient_id, file_data, caption).await?; } "instagram" => { + #[cfg(feature = "drive")] send_instagram_file(state, user, &recipient_id, file_data, caption).await?; + #[cfg(not(feature = "drive"))] + return Err("Drive feature not enabled".into()); } "teams" => { send_teams_file(state, user, &recipient_id, file_data, caption).await?; @@ -500,6 +503,7 @@ async fn send_whatsapp_file( Ok(()) } +#[cfg(feature = "drive")] async fn send_instagram_file( state: Arc, user: &UserSession, diff --git a/src/basic/mod.rs b/src/basic/mod.rs index a83d3022..685790f6 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -1353,6 +1353,7 @@ impl ScriptService { // - Lines that are part of a continuation block (in_line_continuation is true) if !trimmed.ends_with(';') && !trimmed.ends_with('{') && !trimmed.ends_with('}') && !upper.starts_with("SELECT ") && !upper.starts_with("CASE ") && upper != "END SELECT" + && !upper.starts_with("WHILE ") && !upper.starts_with("WEND") && !ends_with_comma && !in_line_continuation { result.push(';'); } @@ -1793,6 +1794,9 @@ impl ScriptService { "REQUIRED", "WEBSITE", "MODEL", + "DETECT", + "LLM", + "TALK", ]; let _identifier_re = Regex::new(r"([a-zA-Z_][a-zA-Z0-9_]*)").expect("valid regex"); diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index f0dadc0c..83749aa7 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -40,6 +40,8 @@ use serde_json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] use tokio::sync::Mutex as AsyncMutex; use uuid::Uuid; use serde::{Deserialize, Serialize}; @@ -102,6 +104,7 @@ pub fn get_bot_id_by_name(conn: &mut PgConnection, bot_name: &str) -> Result, + #[cfg(feature = "drive")] pub mounted_bots: Arc>>>, } @@ -232,6 +235,7 @@ impl BotOrchestrator { pub fn new(state: Arc) -> Self { Self { state, + #[cfg(feature = "drive")] mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())), } } diff --git a/src/core/bot/multimedia.rs b/src/core/bot/multimedia.rs index bff8f806..351a4d01 100644 --- a/src/core/bot/multimedia.rs +++ b/src/core/bot/multimedia.rs @@ -115,12 +115,14 @@ pub trait MultimediaHandler: Send + Sync { } +#[cfg(feature = "drive")] #[derive(Debug)] pub struct DefaultMultimediaHandler { storage_client: Option, search_api_key: Option, } +#[cfg(feature = "drive")] impl DefaultMultimediaHandler { pub fn new(storage_client: Option, search_api_key: Option) -> Self { Self { @@ -138,6 +140,29 @@ impl DefaultMultimediaHandler { } } +#[cfg(not(feature = "drive"))] +#[derive(Debug)] +pub struct DefaultMultimediaHandler { + search_api_key: Option, +} + +#[cfg(not(feature = "drive"))] +impl DefaultMultimediaHandler { + pub fn new(_storage_client: Option<()>, search_api_key: Option) -> Self { + Self { + search_api_key, + } + } + + pub fn storage_client(&self) -> &Option<()> { + &None + } + + pub fn search_api_key(&self) -> &Option { + &self.search_api_key + } +} + #[async_trait] impl MultimediaHandler for DefaultMultimediaHandler { async fn process_multimedia( @@ -307,6 +332,7 @@ impl MultimediaHandler for DefaultMultimediaHandler { } } + #[cfg(feature = "drive")] async fn upload_media(&self, request: MediaUploadRequest) -> Result { let media_id = Uuid::new_v4().to_string(); let key = format!( @@ -348,6 +374,27 @@ impl MultimediaHandler for DefaultMultimediaHandler { } } + #[cfg(not(feature = "drive"))] + async fn upload_media(&self, request: MediaUploadRequest) -> Result { + let media_id = Uuid::new_v4().to_string(); + let key = format!( + "media/{}/{}/{}", + request.user_id, request.session_id, request.file_name + ); + + let local_path = format!("./media/{}", key); + if let Some(parent) = std::path::Path::new(&local_path).parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(&local_path, request.data)?; + + Ok(MediaUploadResponse { + media_id, + url: format!("file://{}", local_path), + thumbnail_url: None, + }) + } + async fn download_media(&self, url: &str) -> Result> { if url.starts_with("http://") || url.starts_with("https://") { let response = reqwest::get(url).await?; @@ -452,11 +499,15 @@ use std::sync::Arc; pub async fn upload_media_handler( - State(state): State>, + State(_state): State>, Json(request): Json, ) -> impl IntoResponse { + #[cfg(feature = "drive")] let handler = DefaultMultimediaHandler::new(state.drive.clone(), None); + #[cfg(not(feature = "drive"))] + let handler = DefaultMultimediaHandler::new(None, None); + match handler.upload_media(request).await { Ok(response) => (StatusCode::OK, Json(serde_json::json!(response))), Err(e) => ( @@ -468,11 +519,14 @@ pub async fn upload_media_handler( pub async fn download_media_handler( - State(state): State>, + State(_state): State>, Path(media_id): Path, ) -> impl IntoResponse { + #[cfg(feature = "drive")] let handler = DefaultMultimediaHandler::new(state.drive.clone(), None); + #[cfg(not(feature = "drive"))] + let handler = DefaultMultimediaHandler::new(None, None); let url = format!("https://storage.botserver.com/media/{}", media_id); @@ -494,11 +548,14 @@ pub async fn download_media_handler( pub async fn generate_thumbnail_handler( - State(state): State>, + State(_state): State>, Path(media_id): Path, ) -> impl IntoResponse { + #[cfg(feature = "drive")] let handler = DefaultMultimediaHandler::new(state.drive.clone(), None); + #[cfg(not(feature = "drive"))] + let handler = DefaultMultimediaHandler::new(None, None); let url = format!("https://storage.botserver.com/media/{}", media_id); @@ -519,7 +576,7 @@ pub async fn generate_thumbnail_handler( pub async fn web_search_handler( - State(state): State>, + State(_state): State>, Json(payload): Json, ) -> impl IntoResponse { let query = payload.get("query").and_then(|q| q.as_str()).unwrap_or(""); @@ -528,8 +585,12 @@ pub async fn web_search_handler( .and_then(|m| m.as_u64()) .unwrap_or(10) as usize; + #[cfg(feature = "drive")] let handler = DefaultMultimediaHandler::new(state.drive.clone(), None); + #[cfg(not(feature = "drive"))] + let handler = DefaultMultimediaHandler::new(None, None); + match handler.web_search(query, max_results).await { Ok(results) => ( StatusCode::OK, diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 232e3531..5f3efca2 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2,7 +2,7 @@ pub mod model_routing_config; pub mod sse_config; pub mod user_memory_config; -#[cfg(feature = "drive")] +#[cfg(any(feature = "drive", feature = "local-files"))] pub mod watcher; pub use model_routing_config::{ModelRoutingConfig, RoutingStrategy, TaskType}; diff --git a/src/core/kb/document_processor.rs b/src/core/kb/document_processor.rs index 380e6380..8b41cc25 100644 --- a/src/core/kb/document_processor.rs +++ b/src/core/kb/document_processor.rs @@ -266,17 +266,13 @@ impl DocumentProcessor { } fn extract_pdf_with_library(&self, file_path: &Path) -> Result { - let _ = self; // Suppress unused self warning + let _ = self; #[cfg(feature = "drive")] { use pdf_extract::extract_text; - match extract_text(file_path) { Ok(text) => { - info!( - "Successfully extracted PDF with library: {}", - file_path.display() - ); + info!("Successfully extracted PDF with library: {}", file_path.display()); return Ok(text); } Err(e) => { @@ -284,23 +280,24 @@ impl DocumentProcessor { } } } - + #[cfg(not(feature = "drive"))] + let _ = file_path; Self::extract_pdf_basic_sync(file_path) } + #[cfg(feature = "drive")] fn extract_pdf_basic_sync(file_path: &Path) -> Result { - #[cfg(feature = "drive")] - { - if let Ok(text) = pdf_extract::extract_text(file_path) { - if !text.is_empty() { - return Ok(text); - } + if let Ok(text) = pdf_extract::extract_text(file_path) { + if !text.is_empty() { + return Ok(text); } } + Err(anyhow::anyhow!("Could not extract text from PDF")) + } - Err(anyhow::anyhow!( - "Could not extract text from PDF. Please ensure pdftotext is installed." - )) + #[cfg(not(feature = "drive"))] + fn extract_pdf_basic_sync(_file_path: &Path) -> Result { + Err(anyhow::anyhow!("PDF extraction requires 'drive' feature")) } async fn extract_docx_text(&self, file_path: &Path) -> Result { diff --git a/src/core/session/mod.rs b/src/core/session/mod.rs index dc0b5c3a..e70a890a 100644 --- a/src/core/session/mod.rs +++ b/src/core/session/mod.rs @@ -167,13 +167,23 @@ impl SessionManager { uid: Uuid, bid: Uuid, session_title: &str, + ) -> Result> { + self.create_session_with_id(Uuid::new_v4(), uid, bid, session_title) + } + + pub fn create_session_with_id( + &mut self, + session_id: Uuid, + uid: Uuid, + bid: Uuid, + session_title: &str, ) -> Result> { use crate::core::shared::models::user_sessions::dsl::*; let verified_uid = self.get_or_create_anonymous_user(Some(uid))?; let now = Utc::now(); let inserted: UserSession = diesel::insert_into(user_sessions) .values(( - id.eq(Uuid::new_v4()), + id.eq(session_id), user_id.eq(verified_uid), bot_id.eq(bid), title.eq(session_title), @@ -194,6 +204,21 @@ impl SessionManager { Ok(inserted) } + pub fn get_or_create_session_by_id( + &mut self, + session_id: Uuid, + uid: Uuid, + bid: Uuid, + session_title: &str, + ) -> Result> { + // Check if session already exists + if let Ok(Some(existing)) = self.get_session_by_id(session_id) { + return Ok(existing); + } + // Create new session with specified ID + self.create_session_with_id(session_id, uid, bid, session_title) + } + fn _clear_messages(&mut self, _session_id: Uuid) -> Result<(), Box> { use crate::core::shared::models::message_history::dsl::*; diesel::delete(message_history.filter(session_id.eq(session_id))) diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index 20a7d35f..6aa95df9 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -38,6 +38,17 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, RwLock}; +#[cfg(not(feature = "drive"))] +#[derive(Debug, Clone, Default)] +pub struct NoDrive; + +#[cfg(not(feature = "drive"))] +impl NoDrive { + pub fn new() -> Self { + NoDrive + } +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AttendantNotification { #[serde(rename = "type")] @@ -365,6 +376,9 @@ pub struct BillingAlertNotification { pub struct AppState { #[cfg(feature = "drive")] pub drive: Option, + #[cfg(not(feature = "drive"))] + #[allow(non_snake_case)] + pub drive: Option, #[cfg(feature = "cache")] pub cache: Option>, pub bucket_name: String, @@ -413,6 +427,8 @@ impl Clone for AppState { Self { #[cfg(feature = "drive")] drive: self.drive.clone(), + #[cfg(not(feature = "drive"))] + drive: None, bucket_name: self.bucket_name.clone(), config: self.config.clone(), conn: self.conn.clone(), @@ -628,6 +644,8 @@ impl Default for AppState { Self { #[cfg(feature = "drive")] drive: None, + #[cfg(not(feature = "drive"))] + drive: None, #[cfg(feature = "cache")] cache: None, bucket_name: "test-bucket".to_string(), diff --git a/src/core/shared/utils.rs b/src/core/shared/utils.rs index bdd50e3b..96cd7f45 100644 --- a/src/core/shared/utils.rs +++ b/src/core/shared/utils.rs @@ -1,7 +1,8 @@ -use crate::core::config::DriveConfig; use crate::core::secrets::SecretsManager; use anyhow::{Context, Result}; #[cfg(feature = "drive")] +use crate::core::config::DriveConfig; +#[cfg(feature = "drive")] use aws_config::retry::RetryConfig; #[cfg(feature = "drive")] use aws_config::timeout::TimeoutConfig; diff --git a/src/drive/drive_handlers.rs b/src/drive/drive_handlers.rs index 84e20fbc..7f364c92 100644 --- a/src/drive/drive_handlers.rs +++ b/src/drive/drive_handlers.rs @@ -1,304 +1,122 @@ -// Drive HTTP handlers implementation using S3 -// Extracted from drive/mod.rs and using aws-sdk-s3 +// Drive HTTP handlers - stub for when drive feature is disabled +#[cfg(not(feature = "drive"))] use crate::core::shared::state::AppState; use crate::drive::drive_types::*; use axum::{ extract::{Path, State}, - http::{header, StatusCode}, - response::{IntoResponse, Json, Response}, - body::Body, + http::StatusCode, + response::Json, }; -use aws_sdk_s3::primitives::ByteStream; -use chrono::Utc; -use std::collections::HashMap; use std::sync::Arc; - -// Import ReadResponse from parent mod if not in drive_types -use super::ReadResponse; - -/// Open a file (get metadata) pub async fn open_file( - State(state): State>, - Path(file_id): Path, + State(_): State>, + Path(_file_id): Path, ) -> Result, (StatusCode, Json)> { - read_metadata(state, file_id).await -} - -/// Helper to get file metadata -async fn read_metadata( - state: Arc, - file_id: String, -) -> Result, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = &state.bucket_name; - - let resp = client.head_object() - .bucket(bucket) - .key(&file_id) - .send() - .await - .map_err(|e| (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()}))))?; - - let item = FileItem { - id: file_id.clone(), - name: file_id.split('/').next_back().unwrap_or(&file_id).to_string(), - file_type: if file_id.ends_with('/') { "folder".to_string() } else { "file".to_string() }, - size: resp.content_length.unwrap_or(0), - mime_type: resp.content_type.unwrap_or_else(|| "application/octet-stream".to_string()), - created_at: Utc::now(), // S3 doesn't track creation time easily - modified_at: Utc::now(), // Simplified - parent_id: None, - url: None, - thumbnail_url: None, - is_favorite: false, // Not implemented in S3 - tags: vec![], - metadata: HashMap::new(), - }; - Ok(Json(item)) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// List all buckets (or configured one) pub async fn list_buckets( - State(state): State>, + State(_): State>, ) -> Result>, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - - match client.list_buckets().send().await { - Ok(resp) => { - let buckets = resp.buckets.unwrap_or_default().iter().map(|b| { - BucketInfo { - id: b.name.clone().unwrap_or_default(), - name: b.name.clone().unwrap_or_default(), - created_at: Utc::now(), - file_count: 0, - total_size: 0, - } - }).collect(); - Ok(Json(buckets)) - }, - Err(_) => { - // Fallback - Ok(Json(vec![BucketInfo { - id: state.bucket_name.clone(), - name: state.bucket_name.clone(), - created_at: Utc::now(), - file_count: 0, - total_size: 0, - }])) - } - } + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// List files in a bucket pub async fn list_files( - State(state): State>, - Json(req): Json, + State(_): State>, + Json(_req): Json, ) -> Result>, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = req.bucket.clone().unwrap_or_else(|| state.bucket_name.clone()); - let prefix = req.parent_path.clone().unwrap_or_default(); - - let resp = client.list_objects_v2() - .bucket(&bucket) - .prefix(&prefix) - .send() - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?; - - let files = resp.contents.unwrap_or_default().iter().map(|obj| { - let key = obj.key().unwrap_or_default(); - let name = key.split('/').next_back().unwrap_or(key).to_string(); - FileItem { - id: key.to_string(), - name, - file_type: if key.ends_with('/') { "folder".to_string() } else { "file".to_string() }, - size: obj.size.unwrap_or(0), - mime_type: "application/octet-stream".to_string(), - created_at: Utc::now(), - modified_at: Utc::now(), - parent_id: Some(prefix.clone()), - url: None, - thumbnail_url: None, - is_favorite: false, - tags: vec![], - metadata: HashMap::new(), - } - }).collect(); - - Ok(Json(files)) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// Read file content (as text) pub async fn read_file( - State(state): State>, - Path(file_id): Path, -) -> Result, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + State(_): State>, + Path(_file_id): Path, +) -> Result, (StatusCode, Json)> { + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = &state.bucket_name; - - let resp = client.get_object() - .bucket(bucket) - .key(&file_id) - .send() - .await - .map_err(|e| (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()}))))?; - - let data = resp.body.collect().await.map_err(|e| - (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))) - )?.into_bytes(); - - let content = String::from_utf8(data.to_vec()).unwrap_or_else(|_| "[Binary Content]".to_string()); - - Ok(Json(ReadResponse { content })) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// Write file content pub async fn write_file( - State(state): State>, - Json(req): Json, + State(_): State>, + Json(_req): Json, ) -> Result, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = &state.bucket_name; - let key = req.file_id.ok_or((StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing file_id"}))))?; - - client.put_object() - .bucket(bucket) - .key(&key) - .body(ByteStream::from(req.content.into_bytes())) - .send() - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?; - - log::info!("User system created resource: drive_file {}", key); - - Ok(Json(serde_json::json!({"success": true}))) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// Delete a file pub async fn delete_file( - State(state): State>, - Path(file_id): Path, + State(_): State>, + Path(_file_id): Path, ) -> Result, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = &state.bucket_name; - - client.delete_object() - .bucket(bucket) - .key(&file_id) - .send() - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?; - - // Technically a deletion, but we could log it as an audit change or leave it out - // The plan specifies resource creation: `info!("User {} created resource: {}", user_id, resource_id);` - log::info!("User system deleted resource: drive_file {}", file_id); - - Ok(Json(serde_json::json!({"success": true}))) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// Create a folder pub async fn create_folder( - State(state): State>, - Json(req): Json, + State(_): State>, + Json(_req): Json, ) -> Result, (StatusCode, Json)> { - let client = state.drive.as_ref().ok_or(( + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = &state.bucket_name; - - // Construct folder path/key - let mut key = req.parent_id.unwrap_or_default(); - if !key.ends_with('/') && !key.is_empty() { - key.push('/'); - } - key.push_str(&req.name); - if !key.ends_with('/') { - key.push('/'); - } - - client.put_object() - .bucket(bucket) - .key(&key) - .body(ByteStream::from_static(&[])) - .send() - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?; - - log::info!("User system created resource: drive_folder {}", key); - - Ok(Json(serde_json::json!({"success": true}))) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -/// Download file (stream) pub async fn download_file( - State(state): State>, - Path(file_id): Path, -) -> Result)> { - let client = state.drive.as_ref().ok_or(( + State(_): State>, + Path(_file_id): Path, +) -> Result)> { + Err(( StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "Drive not configured"})), - ))?; - let bucket = &state.bucket_name; - - let resp = client.get_object() - .bucket(bucket) - .key(&file_id) - .send() - .await - .map_err(|e| (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()}))))?; - - let body = resp.body.collect().await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?.into_bytes(); - - Response::builder() - .header(header::CONTENT_TYPE, "application/octet-stream") - .header(header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}\"", file_id.split('/').next_back().unwrap_or("file"))) - .body(Body::from(body)) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})))) + Json(serde_json::json!({"error": "Drive feature not enabled"})), + )) } -// Stubs for others (list_shared, etc.) -pub async fn copy_file(State(_): State>, Json(_): Json) -> impl IntoResponse { - (StatusCode::NOT_IMPLEMENTED, Json(serde_json::json!({"error": "Not implemented"}))) +pub async fn copy_file(State(_): State>, Json(_): Json) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn upload_file_to_drive(State(_): State>, Json(_): Json) -> impl IntoResponse { - (StatusCode::NOT_IMPLEMENTED, Json(serde_json::json!({"error": "Not implemented"}))) + +pub async fn upload_file_to_drive(State(_): State>, Json(_): Json) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn list_folder_contents(State(_): State>, Json(_): Json) -> impl IntoResponse { - (StatusCode::OK, Json(Vec::::new())) + +pub async fn list_folder_contents(State(_): State>, Json(_): Json) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn search_files(State(_): State>, Json(_): Json) -> impl IntoResponse { - (StatusCode::OK, Json(Vec::::new())) + +pub async fn search_files(State(_): State>, Json(_): Json) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn recent_files(State(_): State>) -> impl IntoResponse { - (StatusCode::OK, Json(Vec::::new())) + +pub async fn recent_files(State(_): State>) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn list_favorites(State(_): State>) -> impl IntoResponse { - (StatusCode::OK, Json(Vec::::new())) + +pub async fn list_favorites(State(_): State>) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn share_folder(State(_): State>, Json(_): Json) -> impl IntoResponse { - (StatusCode::OK, Json(serde_json::json!({"success": true}))) + +pub async fn share_folder(State(_): State>, Json(_): Json) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } -pub async fn list_shared(State(_): State>) -> impl IntoResponse { - (StatusCode::OK, Json(Vec::::new())) + +pub async fn list_shared(State(_): State>) -> impl axum::response::IntoResponse { + (StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"}))) } diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 609d86b6..9bcf341c 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -7,6 +7,9 @@ use crate::core::kb::KnowledgeBaseManager; use crate::core::shared::memory_monitor::{log_jemalloc_stats, MemoryStats}; use crate::core::shared::message_types::MessageType; use crate::core::shared::state::AppState; +#![cfg_attr(not(feature = "drive"), allow(dead_code))] + +#[cfg(feature = "drive")] use aws_sdk_s3::Client; use log::{debug, error, info, trace, warn}; use std::collections::HashMap; diff --git a/src/drive/mod.rs b/src/drive/mod.rs index 6c520c80..c320e132 100644 --- a/src/drive/mod.rs +++ b/src/drive/mod.rs @@ -1,6 +1,12 @@ +#![cfg_attr(not(feature = "drive"), allow(dead_code))] + #[cfg(feature = "console")] use crate::console::file_tree::FileTree; + +#[cfg(feature = "drive")] use crate::core::shared::state::AppState; + +#[cfg(feature = "drive")] use axum::{ extract::{Query, State}, http::StatusCode, @@ -8,17 +14,25 @@ use axum::{ routing::{get, post}, Router, }; + +#[cfg(feature = "drive")] use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; + +#[cfg(feature = "drive")] use diesel::{QueryableByName, RunQueryDsl}; + use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -pub mod drive_types; -pub mod drive_handlers; +#[cfg(feature = "drive")] pub mod document_processing; + +#[cfg(feature = "drive")] pub mod drive_monitor; + +#[cfg(feature = "local-files")] pub mod local_file_monitor; + +#[cfg(feature = "drive")] pub mod vectordb; #[derive(Debug, Serialize, Deserialize)] @@ -193,6 +207,7 @@ pub struct OpenResponse { pub content_type: String, } +#[cfg(feature = "drive")] pub fn configure() -> Router> { Router::new() .route("/api/files/buckets", get(list_buckets)) @@ -223,12 +238,9 @@ pub fn configure() -> Router> { .route("/api/files/sync/stop", post(stop_sync)) .route("/api/files/versions", get(list_versions)) .route("/api/files/restore", post(restore_version)) - .route("/api/docs/merge", post(document_processing::merge_documents)) - .route("/api/docs/convert", post(document_processing::convert_document)) - .route("/api/docs/fill", post(document_processing::fill_document)) - .route("/api/docs/export", post(document_processing::export_document)) } +#[cfg(feature = "drive")] pub async fn open_file( Json(req): Json, ) -> Result, (StatusCode, Json)> { @@ -311,6 +323,8 @@ pub async fn open_file( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn list_buckets( State(state): State>, ) -> Result>, (StatusCode, Json)> { @@ -342,6 +356,8 @@ pub async fn list_buckets( Ok(Json(buckets)) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn list_files( State(state): State>, Query(params): Query, @@ -419,37 +435,8 @@ pub async fn list_files( } } } + } - if let Some(contents) = result.contents { - for object in contents { - if let Some(key) = object.key { - if !key.ends_with('/') { - let name = key.split('/').next_back().unwrap_or(&key).to_string(); - items.push(FileItem { - name, - path: key.clone(), - is_dir: false, - size: object.size, - modified: object.last_modified.map(|t| t.to_string()), - icon: get_file_icon(&key), - is_kb: false, - is_public: true, - }); - } - } - } - } - } - // Post-process to mark KBs - for item in &mut items { - if item.is_dir { - if let Some((_, is_public)) = kbs.iter().find(|(name, _)| name == &item.name) { - item.is_kb = true; - item.is_public = *is_public; - item.icon = "🧠".to_string(); // Knowledge icon - } - } - } Ok(items) } else { Ok(vec![]) @@ -462,279 +449,8 @@ pub async fn list_files( } } -#[cfg(feature = "console")] -pub fn convert_tree_to_items(tree: &FileTree) -> Vec { - let mut items = Vec::new(); - - for (display_name, node) in tree.get_items() { - match node { - crate::console::file_tree::TreeNode::Bucket { name } => { - if !name.is_empty() { - items.push(FileItem { - name: display_name.clone(), - path: format!("/{}", name), - is_dir: true, - size: None, - modified: None, - icon: if name.to_ascii_lowercase().ends_with(".gbai") { - "".to_string() - } else { - "📦".to_string() - }, - }); - } - } - crate::console::file_tree::TreeNode::Folder { bucket, path } => { - let folder_name = path.split('/').next_back().unwrap_or(&display_name); - items.push(FileItem { - name: folder_name.to_string(), - path: format!("/{}/{}", bucket, path), - is_dir: true, - size: None, - modified: None, - icon: "📁".to_string(), - }); - } - crate::console::file_tree::TreeNode::File { bucket, path } => { - let file_name = path.split('/').next_back().unwrap_or(&display_name); - items.push(FileItem { - name: file_name.to_string(), - path: format!("/{}/{}", bucket, path), - is_dir: false, - size: None, - modified: None, - icon: "📄".to_string(), - }); - } - } - } - - items -} - -pub async fn read_file( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let result = s3_client - .get_object() - .bucket(&req.bucket) - .key(&req.path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to read file: {}", e) })), - ) - })?; - - let bytes = result - .body - .collect() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to read file body: {}", e) })), - ) - })? - .into_bytes(); - - let content = String::from_utf8(bytes.to_vec()).map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("File is not valid UTF-8: {}", e) })), - ) - })?; - - Ok(Json(ReadResponse { content })) -} - -pub async fn write_file( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - tracing::debug!( - "write_file called: bucket={}, path={}, content_len={}", - req.bucket, - req.path, - req.content.len() - ); - - let s3_client = state.drive.as_ref().ok_or_else(|| { - tracing::error!("S3 client not available for write_file"); - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - // Try to decode as base64, otherwise use content directly - // Base64 content from file uploads won't have whitespace/newlines at start - // and will only contain valid base64 characters - let is_base64 = is_likely_base64(&req.content); - tracing::debug!("Content detected as base64: {}", is_base64); - - let body_bytes: Vec = if is_base64 { - match BASE64.decode(&req.content) { - Ok(decoded) => { - tracing::debug!("Base64 decoded successfully, size: {} bytes", decoded.len()); - decoded - } - Err(e) => { - tracing::warn!("Base64 decode failed ({}), using raw content", e); - req.content.clone().into_bytes() - } - } - } else { - req.content.into_bytes() - }; - - let sanitized_path = req.path - .replace("//", "/") - .trim_start_matches('/') - .to_string(); - - tracing::debug!("Writing {} bytes to {}/{}", body_bytes.len(), req.bucket, sanitized_path); - - s3_client - .put_object() - .bucket(&req.bucket) - .key(&sanitized_path) - .body(body_bytes.into()) - .send() - .await - .map_err(|e| { - tracing::error!("S3 put_object failed: {:?}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to write file: {}", e) })), - ) - })?; - - tracing::info!("File written successfully: {}/{}", req.bucket, sanitized_path); - Ok(Json(SuccessResponse { - success: true, - message: Some("File written successfully".to_string()), - })) -} - -/// Check if a string is likely base64 encoded content (from file upload) -/// Base64 from DataURL will be pure base64 without newlines at start -fn is_likely_base64(s: &str) -> bool { - // Empty or very short strings are not base64 uploads - if s.len() < 20 { - return false; - } - - // If it starts with common text patterns, it's not base64 - let trimmed = s.trim_start(); - if trimmed.starts_with('#') // Markdown, shell scripts - || trimmed.starts_with("//") // Comments - || trimmed.starts_with("/*") // C-style comments - || trimmed.starts_with('{') // JSON - || trimmed.starts_with('[') // JSON array - || trimmed.starts_with('<') // XML/HTML - || trimmed.starts_with(">, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - if req.path.ends_with('/') { - let result = s3_client - .list_objects_v2() - .bucket(&req.bucket) - .prefix(&req.path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list objects for deletion: {}", e) })), - ) - })?; - - for obj in result.contents() { - if let Some(key) = obj.key() { - s3_client - .delete_object() - .bucket(&req.bucket) - .key(key) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to delete object: {}", e) })), - ) - })?; - } - } - } else { - s3_client - .delete_object() - .bucket(&req.bucket) - .key(&req.path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to delete file: {}", e) })), - ) - })?; - } - - Ok(Json(SuccessResponse { - success: true, - message: Some("Deleted successfully".to_string()), - })) -} - +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn create_folder( State(state): State>, Json(req): Json, @@ -788,6 +504,8 @@ fn get_file_icon(path: &str) -> String { } } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn copy_file( State(state): State>, Json(req): Json, @@ -821,6 +539,8 @@ pub async fn copy_file( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn move_file( State(state): State>, Json(req): Json, @@ -869,13 +589,17 @@ pub async fn move_file( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn upload_file_to_drive( State(state): State>, Json(req): Json, ) -> Result, (StatusCode, Json)> { - write_file(State(state), Json(req)).await + write_file(State(state), Json(req)) .await } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn download_file( State(state): State>, Json(req): Json, @@ -890,6 +614,8 @@ pub async fn download_file( .await } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn list_folder_contents( State(state): State>, Json(req): Json, @@ -904,6 +630,8 @@ pub async fn list_folder_contents( .await } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn search_files( State(state): State>, Query(params): Query, @@ -984,6 +712,8 @@ pub async fn search_files( Ok(Json(all_items)) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn recent_files( State(state): State>, Query(params): Query, @@ -1047,12 +777,16 @@ pub async fn recent_files( Ok(Json(all_items)) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn list_favorites( State(_state): State>, ) -> Result>, (StatusCode, Json)> { Ok(Json(Vec::new())) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn share_folder( State(_state): State>, Json(_req): Json, @@ -1072,12 +806,15 @@ pub async fn share_folder( })) } +#[cfg(feature = "drive")] pub async fn list_shared( State(_state): State>, ) -> Result>, (StatusCode, Json)> { Ok(Json(Vec::new())) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn get_permissions( State(_state): State>, Query(params): Query, @@ -1095,6 +832,8 @@ pub async fn get_permissions( }))) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn get_quota( State(state): State>, ) -> Result, (StatusCode, Json)> { @@ -1151,6 +890,7 @@ pub async fn get_quota( })) } +#[cfg(feature = "drive")] pub async fn sync_status( State(_state): State>, ) -> Result, (StatusCode, Json)> { @@ -1166,6 +906,8 @@ pub async fn sync_status( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn start_sync( State(_state): State>, ) -> Result, (StatusCode, Json)> { @@ -1175,6 +917,8 @@ pub async fn start_sync( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn stop_sync( State(_state): State>, ) -> Result, (StatusCode, Json)> { @@ -1184,6 +928,8 @@ pub async fn stop_sync( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn list_versions( State(state): State>, Query(params): Query, @@ -1236,6 +982,8 @@ pub async fn list_versions( })) } +#[cfg(feature = "drive")] +#[cfg(feature = "drive")] pub async fn restore_version( State(state): State>, Json(payload): Json, diff --git a/src/drive/vectordb.rs b/src/drive/vectordb.rs index ed3b4c98..c5df01b6 100644 --- a/src/drive/vectordb.rs +++ b/src/drive/vectordb.rs @@ -9,6 +9,9 @@ use std::sync::Arc; use tokio::fs; use uuid::Uuid; +#[cfg(feature = "drive")] +use pdf_extract; + #[cfg(feature = "vectordb")] use qdrant_client::{ qdrant::{Distance, PointStruct, VectorParams}, @@ -565,7 +568,14 @@ Ok(content) "application/pdf" => { log::info!("PDF extraction for {}", file_path.display()); - Self::extract_pdf_text(file_path).await + #[cfg(feature = "drive")] + { + Self::extract_pdf_text(file_path).await + } + #[cfg(not(feature = "drive"))] + { + Err(anyhow::anyhow!("PDF extraction requires 'drive' feature")) + } } "application/vnd.openxmlformats-officedocument.wordprocessingml.document" @@ -627,21 +637,30 @@ Ok(content) async fn extract_pdf_text(file_path: &PathBuf) -> Result { let bytes = fs::read(file_path).await?; - match pdf_extract::extract_text_from_mem(&bytes) { - Ok(text) => { - let cleaned = text - .lines() - .map(|l| l.trim()) - .filter(|l| !l.is_empty()) - .collect::>() - .join("\n"); - Ok(cleaned) - } - Err(e) => { - log::warn!("PDF extraction failed for {}: {}", file_path.display(), e); - Ok(String::new()) + #[cfg(feature = "drive")] + { + match pdf_extract::extract_text_from_mem(&bytes) { + Ok(text) => { + let cleaned = text + .lines() + .map(|l| l.trim()) + .filter(|l| !l.is_empty()) + .collect::>() + .join("\n"); + Ok(cleaned) + } + Err(e) => { + log::warn!("PDF extraction failed for {}: {}", file_path.display(), e); + Ok(String::new()) + } } } + + #[cfg(not(feature = "drive"))] + { + let _ = file_path; + Err(anyhow::anyhow!("PDF extraction requires 'drive' feature")) + } } async fn extract_docx_text(file_path: &Path) -> Result { diff --git a/src/main.rs b/src/main.rs index ad5f7d33..7fd45a5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -95,7 +95,7 @@ pub mod console; #[cfg(feature = "directory")] pub mod directory; -#[cfg(feature = "drive")] +#[cfg(any(feature = "drive", feature = "local-files"))] pub mod drive; #[cfg(feature = "mail")] @@ -133,9 +133,6 @@ pub mod whatsapp; pub mod telegram; // Re-export commonly used types -#[cfg(feature = "drive")] -pub use drive::drive_monitor::DriveMonitor; - #[cfg(feature = "llm")] pub use llm::cache::{CacheConfig, CachedLLMProvider, CachedResponse, LocalEmbeddingService}; #[cfg(feature = "llm")] diff --git a/src/main_module/bootstrap.rs b/src/main_module/bootstrap.rs index a6907ac5..e6688e3b 100644 --- a/src/main_module/bootstrap.rs +++ b/src/main_module/bootstrap.rs @@ -13,7 +13,9 @@ use crate::core::config::ConfigManager; use crate::core::package_manager::InstallMode; use crate::core::session::SessionManager; use crate::core::shared::state::AppState; -use crate::core::shared::utils::{create_conn, create_s3_operator, get_stack_path}; +use crate::core::shared::utils::{create_conn, get_stack_path}; +#[cfg(feature = "drive")] +use crate::core::shared::utils::create_s3_operator; use super::BootstrapProgress; @@ -399,8 +401,6 @@ pub async fn create_app_state( ) -> Result, std::io::Error> { use std::collections::HashMap; - let config = std::sync::Arc::new(cfg.clone()); - #[cfg(feature = "cache")] let redis_client = redis_client.clone(); #[cfg(not(feature = "cache"))] @@ -586,6 +586,8 @@ pub async fn create_app_state( let app_state = Arc::new(AppState { #[cfg(feature = "drive")] drive: Some(drive), + #[cfg(not(feature = "drive"))] + drive: None, config: Some(cfg.clone()), conn: pool.clone(), database_url: database_url.clone(), @@ -843,9 +845,8 @@ fn init_llm_provider( /// Start background services and monitors pub async fn start_background_services( app_state: Arc, - pool: &crate::core::shared::utils::DbPool, + _pool: &crate::core::shared::utils::DbPool, ) { - #[cfg(feature = "drive")] use crate::core::shared::memory_monitor::{log_process_memory, start_memory_monitor}; // Resume workflows after server restart @@ -890,6 +891,11 @@ pub async fn start_background_services( #[cfg(feature = "drive")] start_drive_monitors(app_state.clone(), pool).await; + + #[cfg(feature = "local-files")] + start_local_file_monitor(app_state.clone()).await; + + start_config_watcher(app_state.clone()).await; } #[cfg(feature = "drive")] @@ -978,22 +984,25 @@ async fn start_drive_monitors( }); } }); +} - // Start local file monitor for /opt/gbo/data/*.gbai directories - let local_monitor_state = app_state.clone(); +#[cfg(feature = "local-files")] +async fn start_local_file_monitor(app_state: Arc) { + use crate::core::shared::memory_monitor::register_thread; tokio::spawn(async move { register_thread("local-file-monitor", "drive"); trace!("Starting LocalFileMonitor for /opt/gbo/data/*.gbai directories"); - let monitor = crate::drive::local_file_monitor::LocalFileMonitor::new(local_monitor_state); + let monitor = crate::drive::local_file_monitor::LocalFileMonitor::new(app_state); if let Err(e) = monitor.start_monitoring().await { error!("LocalFileMonitor failed: {}", e); } else { info!("LocalFileMonitor started - watching /opt/gbo/data/*.gbai/*.gbdialog/*.bas"); } }); +} - // Start config file watcher for /opt/gbo/data/*.gbai/*.gbot/config.csv - let config_watcher_state = app_state.clone(); +async fn start_config_watcher(app_state: Arc) { + use crate::core::shared::memory_monitor::register_thread; tokio::spawn(async move { register_thread("config-file-watcher", "drive"); trace!("Starting ConfigWatcher for /opt/gbo/data/*.gbai/*.gbot/config.csv"); @@ -1005,7 +1014,7 @@ async fn start_drive_monitors( let watcher = crate::core::config::watcher::ConfigWatcher::new( data_dir, - config_watcher_state, + app_state, ); Arc::new(watcher).spawn(); diff --git a/src/main_module/drive_utils.rs b/src/main_module/drive_utils.rs index ea8105f3..c7faec3a 100644 --- a/src/main_module/drive_utils.rs +++ b/src/main_module/drive_utils.rs @@ -1,10 +1,9 @@ //! Drive-related utilities -use log::{info, warn}; - #[cfg(feature = "drive")] pub async fn ensure_vendor_files_in_minio(drive: &aws_sdk_s3::Client) { use aws_sdk_s3::primitives::ByteStream; + use log::{info, warn}; let htmx_paths = [ "./botui/ui/suite/js/vendor/htmx.min.js", diff --git a/src/main_module/mod.rs b/src/main_module/mod.rs index 824a0869..b10d8c58 100644 --- a/src/main_module/mod.rs +++ b/src/main_module/mod.rs @@ -8,6 +8,7 @@ mod shutdown; mod types; pub use bootstrap::*; +#[cfg(feature = "drive")] pub use drive_utils::*; pub use health::*; pub use server::*; diff --git a/src/main_module/server.rs b/src/main_module/server.rs index 6b049a7d..ebd3ff2a 100644 --- a/src/main_module/server.rs +++ b/src/main_module/server.rs @@ -230,20 +230,32 @@ pub async fn run_axum_server( } }; - // Create session if it doesn't exist - let _ = { + // Check if session already exists and reuse it + let mut final_session_id = session_id.clone(); + { let mut sm = state.session_manager.lock().await; sm.get_or_create_anonymous_user(Some(user_uuid)).ok(); - sm.create_session(user_uuid, bot_id, "Anonymous Chat").ok() - }; + + // Get or create session with the specified ID + let session = sm.get_or_create_session_by_id( + session_uuid, + user_uuid, + bot_id, + "Anonymous Chat" + ); + + if let Ok(sess) = session { + final_session_id = sess.id.to_string(); + } + } - info!("Anonymous auth for bot: {}, session: {}", bot_name, session_id); + info!("Anonymous auth for bot: {}, session: {}", bot_name, final_session_id); ( axum::http::StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, - "session_id": session_id, + "session_id": final_session_id, "bot_name": bot_name, "status": "anonymous" })),