From 2bc44413c52528e560e293f3550b72ddee9bbb96 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sat, 18 Apr 2026 14:40:34 -0300 Subject: [PATCH] refactor: Unify GBDialog compilation to use drive_files table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Renamed LocalFileMonitor to DriveCompiler - DriveCompiler reads drive_files table to detect changes - Compiles .bas → .ast when etag changes (like GBKB/GBOT) - Unified flow: MinIO → drive_files → DriveCompiler → .ast --- src/drive/drive_compiler.rs | 172 ++++ src/drive/local_file_monitor.rs | 565 ------------- src/drive/mod.rs | 1389 +------------------------------ src/main_module/bootstrap.rs | 22 +- 4 files changed, 187 insertions(+), 1961 deletions(-) create mode 100644 src/drive/drive_compiler.rs delete mode 100644 src/drive/local_file_monitor.rs diff --git a/src/drive/drive_compiler.rs b/src/drive/drive_compiler.rs new file mode 100644 index 00000000..d42ed401 --- /dev/null +++ b/src/drive/drive_compiler.rs @@ -0,0 +1,172 @@ +/// DriveCompiler - Unificado para compilar arquivos .bas do Drive (MinIO) +/// +/// Fluxo: +/// 1. DriveMonitor (S3) baixa .bas do MinIO para /opt/gbo/data/{bot}.gbai/{bot}.gbdialog/ +/// 2. DriveMonitor atualiza tabela drive_files com etag, last_modified +/// 3. DriveCompiler lê drive_files, detecta mudanças, compila para /opt/gbo/work/ +/// 4. Compilados: .bas → .ast (Rhai) + +use crate::basic::compiler::BasicCompiler; +use crate::core::shared::state::AppState; +use crate::core::shared::utils::get_work_path; +use crate::drive::drive_files::{drive_files as drive_files_table, DriveFileRepository}; +use diesel::prelude::*; +use log::{debug, error, info, warn}; +use std::collections::HashMap; +use std::error::Error; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::time::Duration; +use uuid::Uuid; + +/// Estado de compilação de um arquivo +#[derive(Debug, Clone)] +struct CompileState { + etag: String, + compiled: bool, +} + +pub struct DriveCompiler { + state: Arc, + work_root: PathBuf, + is_processing: Arc, + /// Últimos etags conhecidos: file_path -> etag + last_etags: Arc>>, +} + +impl DriveCompiler { + pub fn new(state: Arc) -> Self { + let work_root = PathBuf::from(get_work_path()); + + Self { + state, + work_root, + is_processing: Arc::new(AtomicBool::new(false)), + last_etags: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Iniciar loop de compilação baseado em drive_files + pub async fn start_compiling(&self) -> Result<(), Box> { + info!("DriveCompiler started - monitoring drive_files table for changes"); + + self.is_processing.store(true, Ordering::SeqCst); + + let compiler = self.clone(); + + // Spawn loop que verifica drive_files a cada 30s + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + + while compiler.is_processing.load(Ordering::SeqCst) { + interval.tick().await; + + if let Err(e) = compiler.check_and_compile().await { + error!("DriveCompiler error: {}", e); + } + } + }); + + Ok(()) + } + + /// Verifica drive_files e compila .bas files mudaram + async fn check_and_compile(&self) -> Result<(), Box> { + use drive_files_table::dsl::*; + use diesel::dsl::eq; + + let mut conn = self.state.conn.get()?; + + // Selecionar todos os arquivos .gbdialog/*.bas não compilados ou com etag diferente + let files: Vec<(Uuid, String, String, Option)> = drive_files_table + .filter(file_type.eq("bas")) + .filter(file_path.like("%.gbdialog/%")) + .select((bot_id, file_path, file_type, etag.clone())) + .load(&mut conn)?; + + for (bot_id, file_path, _file_type, current_etag_opt) in files { + let current_etag = current_etag_opt.unwrap_or_default(); + + // Verificar se precisa compilar + let should_compile = { + let etags = self.last_etags.read().await; + etags.get(&file_path).map(|e| e != ¤t_etag).unwrap_or(true) + }; + + if should_compile { + debug!("DriveCompiler: {} changed, compiling...", file_path); + + // Compilar + if let Err(e) = self.compile_file(bot_id, &file_path).await { + error!("Failed to compile {}: {}", file_path, e); + } else { + // Atualizar estado + let mut etags = self.last_etags.write().await; + etags.insert(file_path.clone(), current_etag.clone()); + + // Marcar como compilado na DB + diesel::update(drive_files_table + .filter(bot_id.eq(bot_id)) + .filter(file_path.eq(&file_path))) + .set(indexed.eq(true)) + .execute(&mut conn)?; + + info!("DriveCompiler: {} compiled successfully", file_path); + } + } + } + + Ok(()) + } + + /// Compilar um arquivo .bas → .ast + async fn compile_file(&self, bot_id: Uuid, file_path: &str) -> Result<(), Box> { + // Extrair nome do bot e tool + // file_path: salesianos.gbai/salesianos.gbdialog/tool.bas + let parts: Vec<&str> = file_path.split('/').collect(); + if parts.len() < 3 { + return Err("Invalid file path format".into()); + } + + let bot_name = parts[0].trim_end_matches(".gbai"); + let tool_name = parts.last().unwrap().trim_end_matches(".bas"); + + // Caminho do arquivo .bas em /opt/gbo/data/ + let bas_path = format!("/opt/gbo/data/{}.gbai/{}.gbdialog/{}.bas", + bot_name, bot_name, tool_name); + + // Ler conteúdo + let content = std::fs::read_to_string(&bas_path) + .map_err(|e| format!("Failed to read {}: {}", bas_path, e))?; + + // Criar work dir + let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)); + std::fs::create_dir_all(&work_dir)?; + + // Escrever .bas em work + let work_bas_path = work_dir.join(format!("{}.bas", tool_name)); + std::fs::write(&work_bas_path, &content)?; + + // Compilar com BasicCompiler + let mut compiler = BasicCompiler::new(self.state.clone(), bot_id); + compiler.compile_file( + work_bas_path.to_str().ok_or("Invalid path")?, + work_dir.to_str().ok_or("Invalid path")? + )?; + + Ok(()) + } +} + +impl Clone for DriveCompiler { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + work_root: self.work_root.clone(), + is_processing: Arc::clone(&self.is_processing), + last_etags: Arc::clone(&self.last_etags), + } + } +} diff --git a/src/drive/local_file_monitor.rs b/src/drive/local_file_monitor.rs deleted file mode 100644 index 3bc15b22..00000000 --- a/src/drive/local_file_monitor.rs +++ /dev/null @@ -1,565 +0,0 @@ -use crate::basic::compiler::BasicCompiler; -use crate::core::kb::{EmbeddingConfig, KnowledgeBaseManager}; -use crate::core::shared::state::AppState; -use crate::core::shared::utils::get_work_path; -use diesel::prelude::*; -use log::{debug, error, info, trace, warn}; -use std::collections::HashMap; -use std::error::Error; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::SystemTime; -use tokio::sync::RwLock; -use tokio::time::Duration; -use notify::{RecursiveMode, EventKind, RecommendedWatcher, Watcher}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct LocalFileState { - modified: SystemTime, - size: u64, -} - -/// Tracks state of a KB folder for change detection -#[derive(Debug, Clone, Serialize, Deserialize)] -struct KbFolderState { - /// Combined hash of all file mtimes and sizes in the folder tree - content_hash: u64, - /// Number of files indexed last time - file_count: usize, -} - -pub struct LocalFileMonitor { - state: Arc, - data_dir: PathBuf, - work_root: PathBuf, - file_states: Arc>>, - kb_states: Arc>>, - is_processing: Arc, - #[cfg(any(feature = "research", feature = "llm"))] - kb_manager: Option>, -} - -impl LocalFileMonitor { - pub fn new(state: Arc) -> Self { - let work_root = PathBuf::from(get_work_path()); - let data_dir = PathBuf::from("/opt/gbo/data"); - - #[cfg(any(feature = "research", feature = "llm"))] - let kb_manager = match &state.kb_manager { - Some(km) => Some(Arc::clone(km)), - None => { - debug!("KB manager not available in LocalFileMonitor"); - None - } - }; - - trace!("Initializing with data_dir: {:?}, work_root: {:?}", data_dir, work_root); - - Self { - state, - data_dir, - work_root, - file_states: Arc::new(RwLock::new(HashMap::new())), - kb_states: Arc::new(RwLock::new(HashMap::new())), - is_processing: Arc::new(AtomicBool::new(false)), - #[cfg(any(feature = "research", feature = "llm"))] - kb_manager, - } - } - - pub async fn start_monitoring(&self) -> Result<(), Box> { - info!("Local file monitor started - watching /opt/gbo/data/*.gbai directories"); - - // Create data directory if it doesn't exist - if let Err(e) = tokio::fs::create_dir_all(&self.data_dir).await { - warn!("Failed to create data directory: {}", e); - } - - // Load persisted file states from disk - self.load_states().await; - - // Initial scan of all .gbai directories - self.scan_and_compile_all().await?; - - // Persist states back to disk - self.save_states().await; - - self.is_processing.store(true, Ordering::SeqCst); - - // Spawn the monitoring loop - let monitor = self.clone(); - tokio::spawn(async move { - monitor.monitoring_loop().await; - }); - - debug!("Local file monitor successfully initialized"); - Ok(()) - } - - async fn monitoring_loop(&self) { - trace!("Starting monitoring loop"); - - // Try to create a file system watcher - let (tx, mut rx) = tokio::sync::mpsc::channel(100); - - // Use notify crate for file system watching - let tx_clone = tx.clone(); - let mut watcher: RecommendedWatcher = match RecommendedWatcher::new( - move |res| { - if let Ok(event) = res { - let _ = tx_clone.try_send(event); - } - }, - notify::Config::default(), - ) { - Ok(w) => w, - Err(e) => { - error!("Failed to create watcher: {}. Falling back to polling.", e); - // Fall back to polling if watcher creation fails - self.polling_loop().await; - return; - } - }; - - // Watch the data directory - if let Err(e) = watcher.watch(&self.data_dir, RecursiveMode::Recursive) { - warn!("Failed to watch directory {:?}: {}. Using polling fallback.", self.data_dir, e); - drop(watcher); - self.polling_loop().await; - return; - } - - trace!("Watching directory: {:?}", self.data_dir); - - while self.is_processing.load(Ordering::SeqCst) { - tokio::time::sleep(Duration::from_secs(60)).await; - - // Process events from the watcher - while let Ok(event) = rx.try_recv() { - match event.kind { - EventKind::Create(_) | EventKind::Modify(_) | EventKind::Any => { - for path in &event.paths { - if self.is_gbdialog_file(path) { - debug!("Detected change in: {:?}", path); - if let Err(e) = self.compile_local_file(path).await { - error!("Failed to compile {:?}: {}", path, e); - } - } - } - } - EventKind::Remove(_) => { - for path in &event.paths { - if self.is_gbdialog_file(path) { - debug!("File removed: {:?}", path); - self.remove_file_state(path).await; - } - } - } - _ => {} - } - } - } - - trace!("Monitoring loop ended"); - } - - async fn polling_loop(&self) { - trace!("Using polling fallback (checking every 60s)"); - - while self.is_processing.load(Ordering::SeqCst) { - tokio::time::sleep(Duration::from_secs(60)).await; - - if let Err(e) = self.scan_and_compile_all().await { - error!("Scan failed: {}", e); - } - } - } - - fn is_gbdialog_file(&self, path: &Path) -> bool { - // Check if path is something like /opt/gbo/data/*.gbai/*.gbdialog/*.bas - path.extension() - .and_then(|e| e.to_str()) - .map(|e| e.eq_ignore_ascii_case("bas")) - .unwrap_or(false) - && path.ancestors() - .any(|p| { - p.file_name() - .and_then(|n| n.to_str()) - .map(|n| n.ends_with(".gbdialog")) - .unwrap_or(false) - }) - } - - async fn scan_and_compile_all(&self) -> Result<(), Box> { - trace!("Scanning directory: {:?}", self.data_dir); - - let entries = match tokio::fs::read_dir(&self.data_dir).await { - Ok(e) => e, - Err(e) => { - debug!("[LOCAL_MONITOR] Cannot read data directory: {}", e); - return Ok(()); - } - }; - - let mut entries = entries; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - - // Check if this is a .gbai directory - if path.extension() - .and_then(|e| e.to_str()) - .map(|e| e.eq_ignore_ascii_case("gbai")) - .unwrap_or(false) - { - let bot_name = path.file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("unknown"); - - // Look for .gbdialog folder inside (e.g., cristo.gbai/cristo.gbdialog) - let gbdialog_path = path.join(format!("{}.gbdialog", bot_name)); - if gbdialog_path.exists() { - self.compile_gbdialog(bot_name, &gbdialog_path).await?; - } - - // Index .gbkb folders - #[cfg(any(feature = "research", feature = "llm"))] - { - if let Some(ref kb_manager) = self.kb_manager { - let gbkb_path = path.join(format!("{}.gbkb", bot_name)); - if gbkb_path.exists() { - if let Err(e) = self.index_gbkb_folder(bot_name, &gbkb_path, kb_manager).await { - error!("Failed to index .gbkb folder {:?}: {}", gbkb_path, e); - } - } - } - } - } - } - - Ok(()) - } - - #[cfg(any(feature = "research", feature = "llm"))] - async fn index_gbkb_folder( - &self, - bot_name: &str, - gbkb_path: &Path, - _kb_manager: &Arc, - ) -> Result<(), Box> { - // Get bot_id from database - let bot_id = { - use crate::core::shared::models::schema::bots::dsl::*; - let mut conn = self.state.conn.get() - .map_err(|e| format!("Failed to get DB connection: {}", e))?; - - bots.filter(name.eq(bot_name)) - .select(id) - .first::(&mut *conn) - .map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name, e))? - }; - - // Load bot-specific embedding config from database - let embedding_config = EmbeddingConfig::from_bot_config(&self.state.conn, &bot_id); - - // Compute content hash of the entire .gbkb tree - let (content_hash, file_count) = self.compute_gbkb_hash(gbkb_path).await?; - - // Index each KB folder inside .gbkb (e.g., carta, proc) - let entries = tokio::fs::read_dir(gbkb_path).await?; - let mut entries = entries; - - while let Some(entry) = entries.next_entry().await? { - let kb_folder_path = entry.path(); - - if kb_folder_path.is_dir() { - if let Some(kb_name) = kb_folder_path.file_name().and_then(|n| n.to_str()) { - let kb_key = format!("{}:{}", bot_name, kb_name); - - // Check if KB content changed since last index - let should_index = { - let states = self.kb_states.read().await; - states.get(&kb_key) - .map(|state| state.content_hash != content_hash || state.file_count != file_count) - .unwrap_or(true) - }; - - if !should_index { - debug!("KB '{}' for bot '{}' unchanged, skipping re-index", kb_name, bot_name); - continue; - } - - info!("Indexing KB '{}' for bot '{}'", kb_name, bot_name); - - // Create a temporary KbIndexer with the bot-specific config - let qdrant_config = crate::core::kb::QdrantConfig::from_config(self.state.conn.clone(), &bot_id); - let indexer = crate::core::kb::KbIndexer::new(embedding_config.clone(), qdrant_config); - - if let Err(e) = indexer.index_kb_folder( - bot_id, - bot_name, - kb_name, - &kb_folder_path, - ).await { - error!("Failed to index KB '{}' for bot '{}': {}", kb_name, bot_name, e); - } - - // Update state to mark as indexed - let mut states = self.kb_states.write().await; - states.insert(kb_key, KbFolderState { content_hash, file_count }); - } - } - } - - Ok(()) - } - - /// Compute a simple hash over all file metadata in a folder tree - #[cfg(any(feature = "research", feature = "llm"))] - async fn compute_gbkb_hash(&self, root: &Path) -> Result<(u64, usize), Box> { - let mut hash: u64 = 0; - let mut file_count: usize = 0; - - let mut stack = vec![root.to_path_buf()]; - while let Some(dir) = stack.pop() { - let mut entries = tokio::fs::read_dir(&dir).await?; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() { - stack.push(path); - } else if let Ok(meta) = tokio::fs::metadata(&path).await { - let mtime = meta.modified() - .map(|t| t.duration_since(SystemTime::UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0)) - .unwrap_or(0); - let size = meta.len(); - // Simple combinatorial hash - hash = hash.wrapping_mul(31).wrapping_add(mtime.wrapping_mul(37).wrapping_add(size)); - file_count += 1; - } - } - } - - Ok((hash, file_count)) - } - - async fn compile_gbdialog(&self, bot_name: &str, gbdialog_path: &Path) -> Result<(), Box> { - let entries = tokio::fs::read_dir(gbdialog_path).await?; - let mut entries = entries; - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - - if path.extension() - .and_then(|e| e.to_str()) - .map(|e| e.eq_ignore_ascii_case("bas")) - .unwrap_or(false) - { - let metadata = tokio::fs::metadata(&path).await?; - let modified = metadata.modified()?; - let size = metadata.len(); - - let file_key = path.to_string_lossy().to_string(); - - // Check if file changed - let should_compile = { - let states = self.file_states.read().await; - states.get(&file_key) - .map(|state| state.modified != modified || state.size != size) - .unwrap_or(true) - }; - - if should_compile { - info!("Compiling bot: {}", bot_name); - debug!("Recompiling {:?} - modification detected", path); - if let Err(e) = self.compile_local_file(&path).await { - error!("Failed to compile {:?}: {}", path, e); - } - - // Update state - let mut states = self.file_states.write().await; - states.insert(file_key, LocalFileState { modified, size }); - } - } - } - - Ok(()) - } - - async fn compile_local_file(&self, file_path: &Path) -> Result<(), Box> { - let tool_name = file_path - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("unknown"); - - // Extract bot name from path like /opt/gbo/data/cristo.gbai/.gbdialog/file.bas - let bot_name = file_path - .ancestors() - .find(|p| p.extension().and_then(|e| e.to_str()).map(|e| e.eq_ignore_ascii_case("gbai")).unwrap_or(false)) - .and_then(|p| p.file_stem()) - .and_then(|s| s.to_str()) - .unwrap_or("unknown"); - - // Create work directory structure in botserver/work (not in data/) - let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)); - - // Read the file content - let source_content = tokio::fs::read_to_string(file_path).await?; - - // Compile the file - let state_clone = Arc::clone(&self.state); - let work_dir_clone = work_dir.clone(); - let tool_name_clone = tool_name.to_string(); - let source_content_clone = source_content.clone(); - let bot_name_clone = bot_name.to_string(); - - // Get the actual bot_id from the database for this bot_name - let bot_id = { - use crate::core::shared::models::schema::bots::dsl::*; - let mut conn = state_clone.conn.get() - .map_err(|e| format!("Failed to get DB connection: {}", e))?; - - bots.filter(name.eq(&bot_name_clone)) - .select(id) - .first::(&mut *conn) - .map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name_clone, e))? - }; - - let elapsed_ms = tokio::task::spawn_blocking(move || { - std::fs::create_dir_all(&work_dir_clone)?; - let local_source_path = work_dir_clone.join(format!("{}.bas", tool_name_clone)); - std::fs::write(&local_source_path, &source_content_clone)?; - let mut compiler = BasicCompiler::new(state_clone, bot_id); - let local_source_str = local_source_path.to_str() - .ok_or_else(|| "Invalid UTF-8 in local source path".to_string())?; - let work_dir_str = work_dir_clone.to_str() - .ok_or_else(|| "Invalid UTF-8 in work directory path".to_string())?; - let start_time = std::time::Instant::now(); - let result = compiler.compile_file(local_source_str, work_dir_str)?; - let elapsed_ms = start_time.elapsed().as_millis(); - if let Some(mcp_tool) = result.mcp_tool { - trace!( - "[LOCAL_MONITOR] MCP tool generated with {} parameters for bot {}", - mcp_tool.input_schema.properties.len(), - bot_name_clone - ); - } - Ok::>(elapsed_ms) - }) - .await??; - - info!("Successfully compiled: {:?} in {} ms", file_path, elapsed_ms); - Ok(()) - } - - async fn remove_file_state(&self, path: &Path) { - let file_key = path.to_string_lossy().to_string(); - let mut states = self.file_states.write().await; - states.remove(&file_key); - } - - /// Persist file states and KB states to disk for survival across restarts - async fn save_states(&self) { - if let Err(e) = tokio::fs::create_dir_all(&self.work_root).await { - warn!("Failed to create work directory: {}", e); - return; - } - - // Persist file states - let file_states_file = self.work_root.join("local_file_states.json"); - { - let states = self.file_states.read().await; - match serde_json::to_string_pretty(&*states) { - Ok(json) => { - if let Err(e) = tokio::fs::write(&file_states_file, json).await { - warn!("Failed to persist file states: {}", e); - } else { - debug!("Persisted {} file states to disk", states.len()); - } - } - Err(e) => warn!("Failed to serialize file states: {}", e), - } - } - - // Persist KB states - let kb_states_file = self.work_root.join("local_kb_states.json"); - { - let states = self.kb_states.read().await; - match serde_json::to_string_pretty(&*states) { - Ok(json) => { - if let Err(e) = tokio::fs::write(&kb_states_file, json).await { - warn!("Failed to persist KB states: {}", e); - } else { - debug!("Persisted {} KB states to disk", states.len()); - } - } - Err(e) => warn!("Failed to serialize KB states: {}", e), - } - } - } - - /// Load file states and KB states from disk - async fn load_states(&self) { - if let Err(e) = tokio::fs::create_dir_all(&self.work_root).await { - warn!("Failed to create work directory: {}", e); - } - - // Load file states - let file_states_file = self.work_root.join("local_file_states.json"); - match tokio::fs::read_to_string(&file_states_file).await { - Ok(json) => { - match serde_json::from_str::>(&json) { - Ok(states) => { - let count = states.len(); - *self.file_states.write().await = states; - info!("Loaded {} persisted file states from disk", count); - } - Err(e) => warn!("Failed to parse persisted file states: {}", e), - } - } - Err(_) => { - debug!("No persisted file states found, starting fresh"); - } - } - - // Load KB states - let kb_states_file = self.work_root.join("local_kb_states.json"); - match tokio::fs::read_to_string(&kb_states_file).await { - Ok(json) => { - match serde_json::from_str::>(&json) { - Ok(states) => { - let count = states.len(); - *self.kb_states.write().await = states; - info!("Loaded {} persisted KB states from disk", count); - } - Err(e) => warn!("Failed to parse persisted KB states: {}", e), - } - } - Err(_) => { - debug!("No persisted KB states found, starting fresh"); - } - } - } - - pub async fn stop_monitoring(&self) { - trace!("Stopping local file monitor"); - self.is_processing.store(false, Ordering::SeqCst); - self.save_states().await; - } -} - -impl Clone for LocalFileMonitor { - fn clone(&self) -> Self { - Self { - state: Arc::clone(&self.state), - data_dir: self.data_dir.clone(), - work_root: self.work_root.clone(), - file_states: Arc::clone(&self.file_states), - kb_states: Arc::clone(&self.kb_states), - is_processing: Arc::clone(&self.is_processing), - #[cfg(any(feature = "research", feature = "llm"))] - kb_manager: self.kb_manager.clone(), - } - } -} diff --git a/src/drive/mod.rs b/src/drive/mod.rs index 8af464dd..74d6f04e 100644 --- a/src/drive/mod.rs +++ b/src/drive/mod.rs @@ -1,1391 +1,8 @@ -#![cfg_attr(not(feature = "drive"), allow(dead_code))] - -#[cfg(feature = "console")] -use crate::console::file_tree::FileTree; - -#[cfg(feature = "drive")] -use crate::core::shared::state::AppState; - -#[cfg(feature = "drive")] -use axum::{ - extract::{Query, State}, - http::StatusCode, - response::Json, - routing::{get, post}, - Router, -}; - - -#[cfg(feature = "drive")] - -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -#[cfg(feature = "drive")] pub mod document_processing; - -#[cfg(feature = "drive")] pub mod drive_files; - -#[cfg(feature = "drive")] pub mod drive_monitor; - -// Local file monitoring removed - Drive (MinIO) is the only source now -// #[cfg(feature = "local-files")] -// pub mod local_file_monitor; - -#[cfg(feature = "drive")] +pub mod drive_compiler; pub mod vectordb; -#[derive(Debug, Serialize, Deserialize)] -pub struct FileItem { - pub name: String, - pub path: String, - pub is_dir: bool, - pub size: Option, - pub modified: Option, - pub icon: String, - pub is_kb: bool, - pub is_public: bool, -} - -#[derive(Debug, Deserialize)] -pub struct ListQuery { - pub path: Option, - pub bucket: Option, -} - -#[derive(Debug, Deserialize)] -pub struct ReadRequest { - pub bucket: String, - pub path: String, -} - -#[derive(Debug, Serialize)] -pub struct ReadResponse { - pub content: String, -} - -#[derive(Debug, Deserialize)] -pub struct WriteRequest { - pub bucket: String, - pub path: String, - pub content: String, -} - -#[derive(Debug, Deserialize)] -pub struct DeleteRequest { - pub bucket: String, - pub path: String, -} - -#[derive(Debug, Deserialize)] -pub struct CreateFolderRequest { - pub bucket: String, - pub path: String, - pub name: String, -} - -#[derive(Debug, Deserialize)] -pub struct CopyRequest { - pub source_bucket: String, - pub source_path: String, - pub dest_bucket: String, - pub dest_path: String, -} - -#[derive(Debug, Deserialize)] -pub struct MoveRequest { - pub source_bucket: String, - pub source_path: String, - pub dest_bucket: String, - pub dest_path: String, -} - -#[derive(Debug, Deserialize)] -pub struct DownloadRequest { - pub bucket: String, - pub path: String, -} - -#[derive(Debug, Deserialize)] -pub struct SearchQuery { - pub bucket: Option, - pub query: String, - pub file_type: Option, -} - -#[derive(Debug, Deserialize)] -pub struct ShareRequest { - pub bucket: String, - pub path: String, - pub users: Vec, - pub permissions: String, -} - -#[derive(Debug, Serialize)] -pub struct SuccessResponse { - pub success: bool, - pub message: Option, -} - -#[derive(Debug, Serialize)] -pub struct QuotaResponse { - pub total_bytes: i64, - pub used_bytes: i64, - pub available_bytes: i64, - pub percentage_used: f64, -} - -#[derive(Debug, Serialize)] -pub struct ShareResponse { - pub share_id: String, - pub url: String, - pub expires_at: Option, -} - -#[derive(Debug, Serialize)] -pub struct SyncStatus { - pub status: String, - pub last_sync: Option, - pub files_synced: i64, - pub bytes_synced: i64, - pub is_desktop: bool, - pub message: Option, -} - -#[derive(Debug, Deserialize)] -pub struct VersionsQuery { - pub bucket: Option, - pub path: String, -} - -#[derive(Debug, Serialize)] -pub struct FileVersion { - pub version_id: String, - pub modified: String, - pub size: i64, - pub is_latest: bool, - pub etag: Option, -} - -#[derive(Debug, Serialize)] -pub struct VersionsResponse { - pub path: String, - pub versions: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct RestoreRequest { - pub bucket: Option, - pub path: String, - pub version_id: String, -} - -#[derive(Debug, Serialize)] -pub struct RestoreResponse { - pub success: bool, - pub message: String, - pub restored_version: String, - pub new_version_id: Option, -} - -#[derive(Debug, Serialize)] -pub struct BucketInfo { - pub name: String, - pub is_gbai: bool, -} - -#[derive(Debug, Deserialize)] -pub struct OpenRequest { - pub bucket: String, - pub path: String, -} - -#[derive(Debug, Serialize)] -pub struct OpenResponse { - pub app: String, - pub url: String, - pub content_type: String, -} - -#[cfg(feature = "drive")] -pub fn configure() -> Router> { - Router::new() - .route("/api/files/buckets", get(list_buckets)) - .route("/api/files/list", get(list_files)) - .route("/api/files/open", post(open_file)) - .route("/api/files/read", post(read_file)) - .route("/api/drive/content", post(read_file)) - .route("/api/files/write", post(write_file)) - .route("/api/files/save", post(write_file)) - .route("/api/files/getContents", post(read_file)) - .route("/api/files/delete", post(delete_file)) - .route("/api/files/upload", post(upload_file_to_drive)) - .route("/api/files/download", post(download_file)) - .route("/api/files/copy", post(copy_file)) - .route("/api/files/move", post(move_file)) - .route("/api/files/createFolder", post(create_folder)) - .route("/api/files/create-folder", post(create_folder)) - .route("/api/files/dirFolder", post(list_folder_contents)) - .route("/api/files/search", get(search_files)) - .route("/api/files/recent", get(recent_files)) - .route("/api/files/favorite", get(list_favorites)) - .route("/api/files/shareFolder", post(share_folder)) - .route("/api/files/shared", get(list_shared)) - .route("/api/files/permissions", get(get_permissions)) - .route("/api/files/quota", get(get_quota)) - .route("/api/files/sync/status", get(sync_status)) - .route("/api/files/sync/start", post(start_sync)) - .route("/api/files/sync/stop", post(stop_sync)) - .route("/api/files/versions", get(list_versions)) - .route("/api/files/restore", post(restore_version)) -} - -#[cfg(feature = "drive")] -pub async fn open_file( - Json(req): Json, -) -> Result, (StatusCode, Json)> { - let ext = req.path - .rsplit('.') - .next() - .unwrap_or("") - .to_lowercase(); - - let params = format!("bucket={}&path={}", - urlencoding::encode(&req.bucket), - urlencoding::encode(&req.path)); - - let (app, url, content_type) = match ext.as_str() { - // Designer - BASIC dialogs - "bas" => ("designer", format!("/suite/designer.html?{params}"), "text/x-basic"), - - // Sheet - Spreadsheets - "csv" => ("sheet", format!("/suite/sheet/sheet.html?{params}"), "text/csv"), - "tsv" => ("sheet", format!("/suite/sheet/sheet.html?{params}"), "text/tab-separated-values"), - "xlsx" => ("sheet", format!("/suite/sheet/sheet.html?{params}"), "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"), - "xls" => ("sheet", format!("/suite/sheet/sheet.html?{params}"), "application/vnd.ms-excel"), - "ods" => ("sheet", format!("/suite/sheet/sheet.html?{params}"), "application/vnd.oasis.opendocument.spreadsheet"), - "numbers" => ("sheet", format!("/suite/sheet/sheet.html?{params}"), "application/vnd.apple.numbers"), - - // Docs - Documents - "docx" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/vnd.openxmlformats-officedocument.wordprocessingml.document"), - "doc" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/msword"), - "odt" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/vnd.oasis.opendocument.text"), - "rtf" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/rtf"), - "pdf" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/pdf"), - "md" => ("docs", format!("/suite/docs/docs.html?{params}"), "text/markdown"), - "markdown" => ("docs", format!("/suite/docs/docs.html?{params}"), "text/markdown"), - "txt" => ("docs", format!("/suite/docs/docs.html?{params}"), "text/plain"), - "tex" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/x-tex"), - "latex" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/x-latex"), - "epub" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/epub+zip"), - "pages" => ("docs", format!("/suite/docs/docs.html?{params}"), "application/vnd.apple.pages"), - - // Slides - Presentations - "pptx" => ("slides", format!("/suite/slides/slides.html?{params}"), "application/vnd.openxmlformats-officedocument.presentationml.presentation"), - "ppt" => ("slides", format!("/suite/slides/slides.html?{params}"), "application/vnd.ms-powerpoint"), - "odp" => ("slides", format!("/suite/slides/slides.html?{params}"), "application/vnd.oasis.opendocument.presentation"), - "key" => ("slides", format!("/suite/slides/slides.html?{params}"), "application/vnd.apple.keynote"), - - // Images - use video player (supports images too) - "png" | "jpg" | "jpeg" | "gif" | "webp" | "svg" | "bmp" | "ico" | "tiff" | "tif" | "heic" | "heif" => - ("video", format!("/suite/video/video.html?{params}"), "image/*"), - - // Video - "mp4" | "webm" | "mov" | "avi" | "mkv" | "wmv" | "flv" | "m4v" => - ("video", format!("/suite/video/video.html?{params}"), "video/*"), - - // Audio - use player - "mp3" | "wav" | "ogg" | "oga" | "flac" | "aac" | "m4a" | "wma" | "aiff" | "aif" => - ("player", format!("/suite/player/player.html?{params}"), "audio/*"), - - // Archives - direct download - "zip" | "rar" | "7z" | "tar" | "gz" | "bz2" | "xz" => - ("download", format!("/api/files/download?{params}"), "application/octet-stream"), - - // Code/Config - Editor - "json" | "xml" | "yaml" | "yml" | "toml" | "ini" | "conf" | "config" | - "js" | "ts" | "jsx" | "tsx" | "css" | "scss" | "sass" | "less" | - "html" | "htm" | "vue" | "svelte" | - "py" | "rb" | "php" | "java" | "c" | "cpp" | "h" | "hpp" | "cs" | - "rs" | "go" | "swift" | "kt" | "scala" | "r" | "lua" | "pl" | "sh" | "bash" | - "sql" | "graphql" | "proto" | - "dockerfile" | "makefile" | "gitignore" | "env" | "log" => - ("editor", format!("/suite/editor/editor.html?{params}"), "text/plain"), - - // Default - Editor for unknown text files - _ => ("editor", format!("/suite/editor/editor.html?{params}"), "application/octet-stream"), - }; - - Ok(Json(OpenResponse { - app: app.to_string(), - url, - content_type: content_type.to_string(), - })) -} - -#[cfg(feature = "drive")] -pub async fn list_buckets( - State(state): State>, -) -> Result>, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({"error": "S3 service not available"})), - ) - })?; - - let result = s3_client.list_buckets().send().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": format!("Failed to list buckets: {}", e)})), - ) - })?; - - let buckets: Vec = result - .buckets() - .iter() - .filter_map(|b| { - b.name().map(|name| { - let name_str = name.to_string(); - // Include all .gbai buckets (no gbo- prefix filter) - if !name_str.ends_with(".gbai") { - return None; - } - Some(BucketInfo { - name: name_str, - is_gbai: true, - }) - }) - }) - .flatten() - .collect(); - - Ok(Json(buckets)) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn list_files( - State(state): State>, - Query(params): Query, -) -> Result>, (StatusCode, Json)> { - let result: Result, (StatusCode, Json)> = { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": "S3 client not configured"})), - ) - })?; - - if let Some(bucket) = ¶ms.bucket { - let mut items = Vec::new(); - let prefix = params.path.as_deref().unwrap_or(""); - - let paginator = s3_client - .list_objects_v2() - .bucket(bucket) - .prefix(prefix) - .delimiter("/") - .into_paginator() - .send(); - - let mut stream = paginator; - while let Some(result) = stream.try_next().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - })? { - if let Some(prefixes) = result.common_prefixes { - for prefix in prefixes { - if let Some(dir) = prefix.prefix { - let name = dir - .trim_end_matches('/') - .split('/') - .next_back() - .unwrap_or(&dir) - .to_string(); - items.push(FileItem { - name, - path: dir.clone(), - is_dir: true, - size: None, - modified: None, - icon: get_file_icon(&dir), - is_kb: false, - is_public: true, - }); - } - } - } - } - - Ok(items) - } else { - Ok(vec![]) - } - }; - - match result { - Ok(items) => Ok(Json(items)), - Err(e) => Err(e), - } -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn create_folder( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let folder_path = if req.path.is_empty() || req.path == "/" { - format!("{}/", req.name) - } else { - format!("{}/{}/", req.path.trim_end_matches('/'), req.name) - }; - - s3_client - .put_object() - .bucket(&req.bucket) - .key(&folder_path) - .body(Vec::new().into()) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to create folder: {}", e) })), - ) - })?; - - Ok(Json(SuccessResponse { - success: true, - message: Some("Folder created successfully".to_string()), - })) -} - -fn get_file_icon(path: &str) -> String { - let ext = std::path::Path::new(path) - .extension() - .and_then(|e| e.to_str()) - .map(|e| e.to_lowercase()); - - match ext.as_deref() { - Some("bas" | "ast" | "csv" | "gbkb") => "".to_string(), - Some("json") => "🔖".to_string(), - Some("txt" | "md") => "📃".to_string(), - Some("pdf") => "📕".to_string(), - Some("zip" | "tar" | "gz") => "📦".to_string(), - _ => "📄".to_string(), - } -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn copy_file( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let copy_source = format!("{}/{}", req.source_bucket, req.source_path); - - s3_client - .copy_object() - .copy_source(©_source) - .bucket(&req.dest_bucket) - .key(&req.dest_path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to copy file: {}", e) })), - ) - })?; - - Ok(Json(SuccessResponse { - success: true, - message: Some("File copied successfully".to_string()), - })) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn move_file( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let copy_source = format!("{}/{}", req.source_bucket, req.source_path); - - s3_client - .copy_object() - .copy_source(©_source) - .bucket(&req.dest_bucket) - .key(&req.dest_path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to move file: {}", e) })), - ) - })?; - - s3_client - .delete_object() - .bucket(&req.source_bucket) - .key(&req.source_path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json( - serde_json::json!({ "error": format!("Failed to delete source file: {}", e) }), - ), - ) - })?; - - Ok(Json(SuccessResponse { - success: true, - message: Some("File moved successfully".to_string()), - })) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn upload_file_to_drive( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - write_file(State(state), Json(req)) .await -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn download_file( - State(state): State>, - Json(req): Json, -) -> Result, (StatusCode, Json)> { - read_file( - State(state), - Json(ReadRequest { - bucket: req.bucket, - path: req.path, - }), - ) - .await -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn list_folder_contents( - State(state): State>, - Json(req): Json, -) -> Result>, (StatusCode, Json)> { - list_files( - State(state), - Query(ListQuery { - path: Some(req.path), - bucket: Some(req.bucket), - }), - ) - .await -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn search_files( - State(state): State>, - Query(params): Query, -) -> Result>, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let mut all_items = Vec::new(); - let buckets = if let Some(bucket) = params.bucket.as_ref() { - vec![bucket.clone()] - } else { - let result = s3_client.list_buckets().send().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list buckets: {}", e) })), - ) - })?; - result - .buckets() - .iter() - .filter_map(|b| b.name().map(String::from)) - .collect() - }; - - for bucket in buckets { - let result = s3_client - .list_objects_v2() - .bucket(&bucket) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list objects: {}", e) })), - ) - })?; - - for obj in result.contents() { - if let Some(key) = obj.key() { - let name = key.split('/').next_back().unwrap_or(key).to_lowercase(); - let query_lower = params.query.to_lowercase(); - - if name.contains(&query_lower) { - if let Some(file_type) = ¶ms.file_type { - if key.ends_with(file_type) { - all_items.push(FileItem { - name: name.to_string(), - path: key.to_string(), - is_dir: false, - size: obj.size(), - modified: obj.last_modified().map(|t| t.to_string()), - icon: get_file_icon(key), - is_kb: false, - is_public: true, - }); - } - } else { - all_items.push(FileItem { - name: name.to_string(), - path: key.to_string(), - is_dir: false, - size: obj.size(), - modified: obj.last_modified().map(|t| t.to_string()), - icon: get_file_icon(key), - is_kb: false, - is_public: true, - }); - } - } - } - } - } - - Ok(Json(all_items)) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn recent_files( - State(state): State>, - Query(params): Query, -) -> Result>, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let mut all_items = Vec::new(); - let buckets = if let Some(bucket) = ¶ms.bucket { - vec![bucket.clone()] - } else { - let result = s3_client.list_buckets().send().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list buckets: {}", e) })), - ) - })?; - result - .buckets() - .iter() - .filter_map(|b| b.name().map(String::from)) - .collect() - }; - - for bucket in buckets { - let result = s3_client - .list_objects_v2() - .bucket(&bucket) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list objects: {}", e) })), - ) - })?; - - for obj in result.contents() { - if let Some(key) = obj.key() { - all_items.push(FileItem { - name: key.split('/').next_back().unwrap_or(key).to_string(), - path: key.to_string(), - is_dir: false, - size: obj.size(), - modified: obj.last_modified().map(|t| t.to_string()), - icon: get_file_icon(key), - is_kb: false, - is_public: true, - }); - } - } - } - - all_items.sort_by(|a, b| b.modified.cmp(&a.modified)); - all_items.truncate(50); - - Ok(Json(all_items)) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn list_favorites( - State(_state): State>, -) -> Result>, (StatusCode, Json)> { - Ok(Json(Vec::new())) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn share_folder( - State(_state): State>, - Json(_req): Json, -) -> Result, (StatusCode, Json)> { - let share_id = uuid::Uuid::new_v4().to_string(); - let url = format!("https://share.example.com/{}", share_id); - - Ok(Json(ShareResponse { - share_id, - url, - expires_at: Some( - chrono::Utc::now() - .checked_add_signed(chrono::Duration::hours(24)) - .unwrap_or_else(chrono::Utc::now) - .to_rfc3339(), - ), - })) -} - -#[cfg(feature = "drive")] -pub async fn list_shared( - State(_state): State>, -) -> Result>, (StatusCode, Json)> { - Ok(Json(Vec::new())) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn get_permissions( - State(_state): State>, - Query(params): Query, -) -> Result, (StatusCode, Json)> { - Ok(Json(serde_json::json!({ - "bucket": params.bucket, - "path": params.path, - "permissions": { - "read": true, - "write": true, - "delete": true, - "share": true - }, - "shared_with": [] - }))) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn get_quota( - State(state): State>, -) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - let mut total_size = 0i64; - - let result = s3_client.list_buckets().send().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list buckets: {}", e) })), - ) - })?; - - let buckets: Vec = result - .buckets() - .iter() - .filter_map(|b| b.name().map(String::from)) - .collect(); - - for bucket in buckets { - let list_result = s3_client - .list_objects_v2() - .bucket(&bucket) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list objects: {}", e) })), - ) - })?; - - for obj in list_result.contents() { - total_size += obj.size().unwrap_or(0); - } - } - - let total_bytes = 100_000_000_000i64; - let used_bytes = total_size; - let available_bytes = total_bytes - used_bytes; - let percentage_used = (used_bytes as f64 / total_bytes as f64) * 100.0; - - Ok(Json(QuotaResponse { - total_bytes, - used_bytes, - available_bytes, - percentage_used, - })) -} - -#[cfg(feature = "drive")] -pub async fn sync_status( - State(_state): State>, -) -> Result, (StatusCode, Json)> { - Ok(Json(SyncStatus { - status: "unavailable".to_string(), - last_sync: None, - files_synced: 0, - bytes_synced: 0, - is_desktop: false, - message: Some( - "File sync requires the General Bots desktop app with rclone installed".to_string(), - ), - })) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn start_sync( - State(_state): State>, -) -> Result, (StatusCode, Json)> { - Ok(Json(SuccessResponse { - success: false, - message: Some("File sync requires the General Bots desktop app. Install rclone and use the desktop app to sync files.".to_string()), - })) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn stop_sync( - State(_state): State>, -) -> Result, (StatusCode, Json)> { - Ok(Json(SuccessResponse { - success: false, - message: Some("File sync requires the General Bots desktop app".to_string()), - })) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn list_versions( - State(state): State>, - Query(params): Query, -) -> Result, (StatusCode, Json)> { - let bucket = params.bucket.unwrap_or_else(|| "default".to_string()); - let path = params.path; - - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 storage not configured" })), - ) - })?; - - let versions_result = s3_client - .list_object_versions() - .bucket(&bucket) - .prefix(&path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to list versions: {}", e) })), - ) - })?; - - let mut versions: Vec = Vec::new(); - - for version in versions_result.versions() { - if version.key().unwrap_or_default() == path { - versions.push(FileVersion { - version_id: version.version_id().unwrap_or("null").to_string(), - modified: version - .last_modified() - .map(|t| t.to_string()) - .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()), - size: version.size().unwrap_or(0), - is_latest: version.is_latest().unwrap_or(false), - etag: version.e_tag().map(|s| s.to_string()), - }); - } - } - - versions.sort_by(|a, b| b.modified.cmp(&a.modified)); - - Ok(Json(VersionsResponse { - path: path.clone(), - versions, - })) -} - -#[cfg(feature = "drive")] -#[cfg(feature = "drive")] -pub async fn restore_version( - State(state): State>, - Json(payload): Json, -) -> Result, (StatusCode, Json)> { - let bucket = payload.bucket.unwrap_or_else(|| "default".to_string()); - let path = payload.path; - let version_id = payload.version_id; - - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 storage not configured" })), - ) - })?; - - let copy_source = format!("{}/{}?versionId={}", bucket, path, version_id); - - let copy_result = s3_client - .copy_object() - .bucket(&bucket) - .key(&path) - .copy_source(©_source) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to restore version: {}", e) })), - ) - })?; - - let new_version_id = copy_result.version_id().map(|s| s.to_string()); - - Ok(Json(RestoreResponse { - success: true, - message: format!("Successfully restored {} to version {}", path, version_id), - restored_version: version_id, - new_version_id, - })) -} - -#[cfg(feature = "drive")] -pub async fn read_file(State(state): State>, Json(_req): Json) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - // Default implementation reading from S3 - let result = s3_client - .get_object() - .bucket(&_req.bucket) - .key(&_req.path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to read object: {}", e) })), - ) - })?; - - let bytes = result.body.collect().await.map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to read body: {}", e) })), - ) - })?.into_bytes(); - - let content = String::from_utf8(bytes.to_vec()).unwrap_or_default(); - - Ok(Json(ReadResponse { content })) -} - -#[cfg(feature = "drive")] -pub async fn write_file(State(state): State>, Json(req): Json) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - s3_client - .put_object() - .bucket(&req.bucket) - .key(&req.path) - .body(req.content.into_bytes().into()) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to write object: {}", e) })), - ) - })?; - - Ok(Json(SuccessResponse { success: true, message: Some("File written successfully".to_string()) })) -} - -#[cfg(feature = "drive")] -pub async fn delete_file(State(state): State>, Json(req): Json) -> Result, (StatusCode, Json)> { - let s3_client = state.drive.as_ref().ok_or_else(|| { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(serde_json::json!({ "error": "S3 service not available" })), - ) - })?; - - s3_client - .delete_object() - .bucket(&req.bucket) - .key(&req.path) - .send() - .await - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": format!("Failed to delete object: {}", e) })), - ) - })?; - - Ok(Json(SuccessResponse { success: true, message: Some("File deleted successfully".to_string()) })) -} - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashMap; - use std::path::PathBuf; - - // Test structures for MinIO/S3-like storage service tests from bottest/services/minio.rs - - #[derive(Debug, Clone)] - struct MinioTestConfig { - api_port: u16, - console_port: u16, - data_dir: PathBuf, - access_key: String, - secret_key: String, - } - - impl Default for MinioTestConfig { - fn default() -> Self { - Self { - api_port: 9100, - console_port: 9101, - data_dir: PathBuf::from("/tmp/test"), - access_key: "minioadmin".to_string(), - secret_key: "minioadmin".to_string(), - } - } - } - - impl MinioTestConfig { - fn endpoint(&self) -> String { - format!("http://127.0.0.1:{}", self.api_port) - } - - fn console_url(&self) -> String { - format!("http://127.0.0.1:{}", self.console_port) - } - - fn data_path(&self) -> &std::path::Path { - &self.data_dir - } - - fn credentials(&self) -> (String, String) { - (self.access_key.clone(), self.secret_key.clone()) - } - - fn s3_config(&self) -> HashMap { - let mut config = HashMap::new(); - config.insert("endpoint_url".to_string(), self.endpoint()); - config.insert("access_key_id".to_string(), self.access_key.clone()); - config.insert("secret_access_key".to_string(), self.secret_key.clone()); - config.insert("region".to_string(), "us-east-1".to_string()); - config.insert("force_path_style".to_string(), "true".to_string()); - config - } - } - - - - - - #[test] - fn test_create_folder_request() { - let request = CreateFolderRequest { - bucket: "test-bucket".to_string(), - path: "/documents".to_string(), - name: "new-folder".to_string(), - }; - - assert_eq!(request.name, "new-folder"); - assert_eq!(request.path, "/documents"); - } - - #[test] - fn test_copy_request() { - let request = CopyRequest { - source_bucket: "bucket-a".to_string(), - source_path: "file.txt".to_string(), - dest_bucket: "bucket-b".to_string(), - dest_path: "copied-file.txt".to_string(), - }; - - assert_eq!(request.source_bucket, "bucket-a"); - assert_eq!(request.dest_bucket, "bucket-b"); - } - - #[test] - fn test_move_request() { - let request = MoveRequest { - source_bucket: "bucket-a".to_string(), - source_path: "old-location/file.txt".to_string(), - dest_bucket: "bucket-a".to_string(), - dest_path: "new-location/file.txt".to_string(), - }; - - assert_eq!(request.source_path, "old-location/file.txt"); - assert_eq!(request.dest_path, "new-location/file.txt"); - } - - #[test] - fn test_search_query() { - let query = SearchQuery { - bucket: Some("test-bucket".to_string()), - query: "report".to_string(), - file_type: Some("pdf".to_string()), - }; - - assert_eq!(query.query, "report"); - assert_eq!(query.file_type, Some("pdf".to_string())); - } - - #[test] - fn test_share_request() { - let request = ShareRequest { - bucket: "test-bucket".to_string(), - path: "shared-folder".to_string(), - users: vec![ - "user1@example.com".to_string(), - "user2@example.com".to_string(), - ], - permissions: "read".to_string(), - }; - - assert_eq!(request.users.len(), 2); - assert_eq!(request.permissions, "read"); - } - - #[test] - fn test_success_response() { - let response = SuccessResponse { - success: true, - message: Some("Operation completed successfully".to_string()), - }; - - assert!(response.success); - assert!(response - .message - .as_ref() - .is_some_and(|m| m.contains("successfully"))); - } - - #[test] - fn test_quota_response() { - let response = QuotaResponse { - total_bytes: 1_073_741_824, // 1 GB - used_bytes: 536_870_912, // 512 MB - available_bytes: 536_870_912, // 512 MB - percentage_used: 50.0, - }; - - assert!((response.percentage_used - 50.0).abs() < f64::EPSILON); - assert_eq!( - response.total_bytes, - response.used_bytes + response.available_bytes - ); - } - - #[test] - fn test_share_response() { - let response = ShareResponse { - share_id: "share-12345".to_string(), - url: "https://example.com/share/share-12345".to_string(), - expires_at: Some("2024-12-31T23:59:59Z".to_string()), - }; - - assert!(response.url.contains("share-12345")); - assert!(response.expires_at.is_some()); - } - - #[test] - fn test_sync_status() { - let status = SyncStatus { - status: "syncing".to_string(), - last_sync: Some("2024-01-15T10:30:00Z".to_string()), - files_synced: 150, - bytes_synced: 52_428_800, - is_desktop: true, - message: Some("Syncing in progress...".to_string()), - }; - - assert_eq!(status.status, "syncing"); - assert_eq!(status.files_synced, 150); - assert!(status.is_desktop); - } - - #[test] - fn test_file_version() { - let version = FileVersion { - version_id: "v1234567890".to_string(), - modified: "2024-01-15T10:30:00Z".to_string(), - size: 2048, - is_latest: true, - etag: Some("abc123def456".to_string()), - }; - - assert!(version.is_latest); - assert_eq!(version.size, 2048); - } - - #[test] - fn test_versions_response() { - let versions = vec![ - FileVersion { - version_id: "v2".to_string(), - modified: "2024-01-15T12:00:00Z".to_string(), - size: 2048, - is_latest: true, - etag: Some("etag2".to_string()), - }, - FileVersion { - version_id: "v1".to_string(), - modified: "2024-01-15T10:00:00Z".to_string(), - size: 1024, - is_latest: false, - etag: Some("etag1".to_string()), - }, - ]; - - let response = VersionsResponse { - path: "documents/report.pdf".to_string(), - versions, - }; - - assert_eq!(response.versions.len(), 2); - assert!(response.versions[0].is_latest); - assert!(!response.versions[1].is_latest); - } - - #[test] - fn test_restore_request() { - let request = RestoreRequest { - bucket: Some("test-bucket".to_string()), - path: "documents/file.txt".to_string(), - version_id: "v1234567890".to_string(), - }; - - assert_eq!(request.version_id, "v1234567890"); - } - - #[test] - fn test_restore_response() { - let response = RestoreResponse { - success: true, - message: "File restored successfully".to_string(), - restored_version: "v1234567890".to_string(), - new_version_id: Some("v9876543210".to_string()), - }; - - assert!(response.success); - assert!(response.new_version_id.is_some()); - } - - #[test] - fn test_get_file_icon() { - assert_eq!(get_file_icon("document.pdf"), "file-text"); - assert_eq!(get_file_icon("image.png"), "image"); - assert_eq!(get_file_icon("image.jpg"), "image"); - assert_eq!(get_file_icon("video.mp4"), "video"); - assert_eq!(get_file_icon("music.mp3"), "music"); - assert_eq!(get_file_icon("archive.zip"), "archive"); - assert_eq!(get_file_icon("unknown.xyz"), "file"); - } - - #[test] - fn test_default_minio_credentials() { - let config = MinioTestConfig::default(); - assert_eq!(config.access_key, "minioadmin"); - assert_eq!(config.secret_key, "minioadmin"); - } - - #[test] - fn test_custom_port_configuration() { - let config = MinioTestConfig { - api_port: 19000, - console_port: 19001, - ..Default::default() - }; - - assert!(config.endpoint().contains("19000")); - assert!(config.console_url().contains("19001")); - } - - #[test] - fn test_download_request() { - let request = DownloadRequest { - bucket: "my-bucket".to_string(), - path: "downloads/file.zip".to_string(), - }; - - assert_eq!(request.bucket, "my-bucket"); - assert!(request.path.to_lowercase().ends_with(".zip")); - } -} +// Re-exports +pub use drive_files::DriveFileRepository; diff --git a/src/main_module/bootstrap.rs b/src/main_module/bootstrap.rs index 70d12f0f..4dcef1f9 100644 --- a/src/main_module/bootstrap.rs +++ b/src/main_module/bootstrap.rs @@ -891,12 +891,13 @@ pub async fn start_background_services( trace!("ensure_llama_servers_running completed"); } - // Start DriveMonitor for S3/MinIO file watching + // Start DriveMonitor for S3/MinIO file watching and syncing #[cfg(feature = "drive")] start_drive_monitors(app_state.clone(), _pool).await; - // Start LocalFileMonitor to compile .bas files from /opt/gbo/data to /opt/gbo/work - start_local_file_monitor(app_state.clone()).await; + // Start DriveCompiler to compile .bas files from drive_files table + #[cfg(feature = "drive")] + start_drive_compiler(app_state.clone()).await; // start_config_watcher(app_state.clone()).await; } @@ -1129,15 +1130,16 @@ fn create_bot_from_drive( } -// LocalFileMonitor compiles .bas files from /opt/gbo/data to /opt/gbo/work -async fn start_local_file_monitor(app_state: Arc) { -use crate::drive::local_file_monitor::LocalFileMonitor; +// DriveCompiler compiles .bas files based on drive_files table changes +#[cfg(feature = "drive")] +async fn start_drive_compiler(app_state: Arc) { +use crate::drive::drive_compiler::DriveCompiler; -let monitor = LocalFileMonitor::new(app_state.clone()); -if let Err(e) = monitor.start_monitoring().await { -error!("Failed to start LocalFileMonitor: {}", e); +let compiler = DriveCompiler::new(app_state.clone()); +if let Err(e) = compiler.start_compiling().await { +error!("Failed to start DriveCompiler: {}", e); } else { -trace!("LocalFileMonitor started - monitoring /opt/gbo/data for bot changes"); +trace!("DriveCompiler started - compiling .bas files from drive_files"); } } //