diff --git a/.cargo/config.toml b/.cargo/config.toml index 8d724206..df9d18e9 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -4,4 +4,3 @@ jobs = 12 [target.x86_64-unknown-linux-gnu] linker = "clang" -rustflags = ["-C", "link-arg=-fuse-ld=mold"] diff --git a/DEV-DEPENDENCIES.sh b/DEV-DEPENDENCIES.sh index 13299117..0c840240 100755 --- a/DEV-DEPENDENCIES.sh +++ b/DEV-DEPENDENCIES.sh @@ -2,13 +2,11 @@ set -e if [ "$EUID" -ne 0 ]; then -echo "Run as root (use sudo)" -exit 1 + echo "Run as root (use sudo)" + exit 1 fi SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" -SUDO_USER_HOME="$(eval echo "~${SUDO_USER:-$USER}")" - echo "Installing runtime dependencies first..." bash "$SCRIPT_DIR/DEPENDENCIES.sh" @@ -16,60 +14,40 @@ echo "Installing dev/build dependencies..." OS=$(grep -oP '(?<=^ID=).+' /etc/os-release 2>/dev/null | tr -d '"' || echo "unknown") install_debian() { -apt-get install -y -qq \ - clang lld mold build-essential pkg-config libssl-dev libpq-dev cmake git \ - libglib2.0-dev libgtk-3-dev libwebkit2gtk-4.1-dev libjavascriptcoregtk-4.1-dev \ - libayatana-appindicator3-dev librsvg2-dev libsoup-3.0-dev + apt-get install -y -qq \ + clang lld build-essential pkg-config libssl-dev libpq-dev cmake git \ + libglib2.0-dev libgtk-3-dev libwebkit2gtk-4.1-dev libjavascriptcoregtk-4.1-dev \ + libayatana-appindicator3-dev librsvg2-dev libsoup-3.0-dev } install_fedora() { -dnf install -y -q \ - clang lld mold gcc gcc-c++ make pkg-config openssl-devel postgresql-devel cmake git \ -glib2-devel gobject-introspection-devel gtk3-devel webkit2gtk3-devel \ -javascriptcoregtk-devel libappindicator-gtk3-devel librsvg2-devel libsoup3-devel + dnf install -y -q \ + clang lld gcc gcc-c++ make pkg-config openssl-devel postgresql-devel cmake git \ + glib2-devel gobject-introspection-devel gtk3-devel webkit2gtk3-devel \ + javascriptcoregtk-devel libappindicator-gtk3-devel librsvg2-devel libsoup3-devel } install_arch() { -pacman -Sy --noconfirm \ - clang lld mold gcc make pkg-config openssl libpq cmake git \ -glib2 gtk3 webkit2gtk4 javascriptcoregtk libappindicator librsvg libsoup + pacman -Sy --noconfirm \ + clang lld gcc make pkg-config openssl libpq cmake git \ + glib2 gtk3 webkit2gtk4 javascriptcoregtk libappindicator librsvg libsoup } case $OS in -ubuntu|debian|linuxmint|pop) install_debian ;; -fedora|rhel|centos|rocky|almalinux) install_fedora ;; -arch|manjaro) install_arch ;; -*) echo "Unsupported OS: $OS"; exit 1 ;; + ubuntu|debian|linuxmint|pop) install_debian ;; + fedora|rhel|centos|rocky|almalinux) install_fedora ;; + arch|manjaro) install_arch ;; + *) echo "Unsupported OS: $OS"; exit 1 ;; esac -run_as_user() { -su - "${SUDO_USER:-$USER}" -c ". '${SUDO_USER_HOME}/.cargo/env' 2>/dev/null; export PATH=\"${SUDO_USER_HOME}/.cargo/bin:\$PATH\"; $*" +install_mold() { + curl -L "https://github.com/rui314/mold/releases/download/v2.4.0/mold-2.4.0-x86_64-linux.tar.gz" -o /tmp/mold.tar.gz + tar -xzf /tmp/mold.tar.gz -C /tmp + cp "/tmp/mold-2.4.0-x86_64-linux/bin/mold" /usr/local/bin/ + rm -rf /tmp/mold-2.4.0* /tmp/mold.tar.gz + ldconfig } -install_rust() { -if ! run_as_user "rustc --version" &>/dev/null; then -echo "Installing Rust via rustup for ${SUDO_USER:-$USER}..." -curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \ -su - "${SUDO_USER:-$USER}" -c "sh -s -- -y --default-toolchain stable" -run_as_user "rustup default stable" -echo "Rust installed: $(run_as_user 'rustc --version') / $(run_as_user 'cargo --version')" -else -echo "Rust already installed: $(run_as_user 'rustc --version')" -fi -} +command -v mold &> /dev/null || install_mold -install_cargo_tools() { -if run_as_user "cargo --version" &>/dev/null; then -if ! run_as_user "sccache --version" &>/dev/null; then -echo "Installing sccache..." -run_as_user "cargo install sccache --locked" -else -echo "sccache already installed: $(run_as_user 'sccache --version 2>&1 | head -1')" -fi -fi -} - -install_rust "$SUDO_USER_HOME" -install_cargo_tools "$SUDO_USER_HOME" - -echo "Dev dependencies installed!" +echo "Dev dependencies installed!" \ No newline at end of file diff --git a/PROD.md b/PROD.md index da4cc6ec..2d5e9782 100644 --- a/PROD.md +++ b/PROD.md @@ -3,7 +3,7 @@ ## CRITICAL RULES — READ FIRST NEVER INCLUDE HERE CREDENTIALS OR COMPANY INFORMATION, THIS IS COMPANY AGNOSTIC. - +If edit conf/data make a backup first to /tmp with datetime sufix, to be able to restore. Always manage services with `systemctl` inside the `system` Incus container. Never run `/opt/gbo/bin/botserver` or `/opt/gbo/bin/botui` directly — they will fail because they won't load the `.env` file containing Vault credentials and paths. The correct commands are `sudo incus exec system -- systemctl start|stop|restart|status botserver` and the same for `ui`. Systemctl handles environment loading, auto-restart, logging, and dependencies. Never push secrets (API keys, passwords, tokens) to git. Never commit `init.json` (it contains Vault unseal keys). All secrets must come from Vault — only `VAULT_*` variables are allowed in `.env`. Never deploy manually via scp or ssh; always use CI/CD. Always push all submodules (botserver, botui, botlib) before or alongside the main repo. Always ask before pushing to ALM. diff --git a/botbook/src/drive-monitor-test.md b/botbook/src/drive-monitor-test.md new file mode 100644 index 00000000..969feff2 --- /dev/null +++ b/botbook/src/drive-monitor-test.md @@ -0,0 +1,305 @@ +# Drive Monitor Test - Upload via MinIO Console + +## Objective +Test the complete sync flow for bot files uploaded through MinIO Console: +1. `.gbai` bucket creation +2. `.gbdialog/*.bas` → compilation to `.ast` +3. `.gbkb/*` → indexing to Qdrant +4. Bot activation in database + +## Prerequisites + +### Services Running +```bash +# Check all services are healthy +curl http://localhost:8080/health # BotServer +curl http://localhost:3000/ # BotUI +curl http://localhost:6333/collections # Qdrant +curl http://localhost:9100/minio/health/live # MinIO +curl http://localhost:8300/debug/healthz # Zitadel +``` + +### MinIO Console Access +- URL: http://localhost:9101 +- User: minioadmin +- Password: minioadmin (or check `.env` for credentials) + +## Test Procedure + +### Step 1: Create Bot Bucket + +1. Open MinIO Console: http://localhost:9101 +2. Login with credentials +3. Click **"Create Bucket"** +4. Name: `testbot.gbai` (must end with `.gbai`) +5. Click **"Create Bucket"** + +### Step 2: Create Dialog Folder and File (.bas) + +1. Open bucket `testbot.gbai` +2. Click **"Create New Path"** +3. Path: `testbot.gbdialog` +4. Click **"Create"** +5. Navigate into `testbot.gbdialog` +6. Click **"Upload File"** or use mc command: + +```bash +# Using mc CLI (MinIO Client) +mc alias set local http://localhost:9100 minioadmin minioadmin + +# Create start.bas +cat > /tmp/start.bas << 'EOF' +' start.bas - Bot entry point +ADD SUGGESTION "Check Status" +ADD SUGGESTION "Create Report" +ADD SUGGESTION "Help" + +TALK "Welcome to TestBot! How can I help you today?" +EOF + +mc cp /tmp/start.bas local/testbot.gbai/testbot.gbdialog/start.bas +``` + +### Step 3: Create Knowledge Base Folder (.gbkb) + +```bash +# Create KB folder and documents +mkdir -p /tmp/testbot-docs + +cat > /tmp/testbot-docs/manual.txt << 'EOF' +TestBot Manual v1.0 + +This is the test knowledge base for TestBot. +It contains documentation that should be indexed. + +Features: +- Document search via Qdrant +- Context injection for LLM +- Semantic similarity queries + +Usage: +USE KB "manual" in your dialog scripts. +EOF + +cat > /tmp/testbot-docs/faq.txt << 'EOF' +Frequently Asked Questions + +Q: What is TestBot? +A: A test bot for validating the drive monitor sync. + +Q: How do I use it? +A: Just upload files to MinIO and wait for sync. + +Q: What file types are supported? +A: .txt, .pdf, .md, .docx for KB + .bas for dialog scripts +EOF + +# Upload to MinIO +mc mb local/testbot.gbai/testbot.gbkb --ignore-existing +mc cp /tmp/testbot-docs/manual.txt local/testbot.gbai/testbot.gbkb/manual.txt +mc cp /tmp/testbot-docs/faq.txt local/testbot.gbai/testbot.gbkb/faq.txt +``` + +### Step 4: Verify Sync + +#### 4.1 Check Database for Bot Creation +```bash +# Bot should be auto-created from bucket +./botserver-stack/bin/tables/bin/psql -h localhost -U botserver -d botserver -c \ + "SELECT id, name, is_active, created_at FROM bots WHERE name = 'testbot';" +``` + +Expected output: +``` + id | name | is_active | created_at +----+------+-----------+------------------------- + ...| testbot | t | 2026-04-20 ... +``` + +#### 4.2 Check drive_files Table +```bash +# Files should be registered in drive_files +./botserver-stack/bin/tables/bin/psql -h localhost -U botserver -d botserver -c \ + "SELECT file_path, file_type, etag, indexed FROM drive_files WHERE file_path LIKE '%testbot%';" +``` + +Expected output: +``` + file_path | file_type | etag | indexed +-----------+-----------+------+--------- + testbot.gbdialog/start.bas | bas | abc123... | t + testbot.gbkb/manual.txt | txt | def456... | t + testbot.gbkb/faq.txt | txt | ghi789... | t +``` + +#### 4.3 Check .ast Compilation +```bash +# Check if .bas was compiled to .ast +ls -la /opt/gbo/work/testbot.gbai/testbot.gbdialog/ +``` + +Expected output: +``` +-rw-r--r-- 1 ubuntu ubuntu 1234 Apr 20 12:00 start.ast +-rw-r--r-- 1 ubuntu ubuntu 567 Apr 20 12:00 start.bas +``` + +#### 4.4 Check Qdrant Collections +```bash +# Check KB indexing +curl -s http://localhost:6333/collections | jq '.result.collections[] | select(.name | contains("testbot"))' +``` + +Expected output: +```json +{ + "name": "testbot_manual" +} +``` + +Or check points: +```bash +curl -s http://localhost:6333/collections/testbot_manual/points/scroll | jq '.result.points | length' +``` + +#### 4.5 Check BotServer Logs +```bash +# Monitor sync activity +tail -f botserver.log | grep -i -E "testbot|sync|compile|index" +``` + +Expected log patterns: +``` +2026-04-20... info bootstrap:Auto-creating bot 'testbot' from S3 bucket 'testbot.gbai' +2026-04-20... info drive_compiler:Compiling testbot.gbdialog/start.bas +2026-04-20... info kb:Indexing KB folder: testbot.gbkb for bot testbot +2026-04-20... info qdrant:Collection created: testbot_manual +``` + +### Step 5: Test Bot via Web Interface + +1. Open: http://localhost:3000/testbot +2. Login with test credentials +3. Send message: "Hello" +4. Expected response includes suggestions from start.bas + +### Step 6: Test KB Search + +1. In chat, type: "What is TestBot?" +2. Bot should use KB context and answer from manual.txt/faq.txt + +## Troubleshooting + +### Files Not Syncing + +**Check MinIO bucket visibility:** +```bash +mc ls local/ +``` + +**Check BotServer S3 connection:** +```bash +tail -100 botserver.log | grep -i "s3\|minio\|bucket" +``` + +### .bas Not Compiling + +**Check DriveCompiler status:** +```bash +tail -f botserver.log | grep -i "drive_compiler" +``` + +**Manual compile trigger (if needed):** +```bash +curl -X POST http://localhost:8080/api/admin/drive/compile/testbot +``` + +### KB Not Indexing + +**Check embedding server:** +```bash +curl http://localhost:8081/v1/models +``` + +**Manual KB index:** +```bash +curl -X POST http://localhost:8080/api/bots/testbot/kb/index \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"kb_name": "manual"}' +``` + +## Expected Flow Diagram + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 1. MinIO Upload │ +│ mc cp file.bas local/testbot.gbai/testbot.gbdialog/ │ +└─────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 2. S3 Event / Polling (DriveMonitor) │ +│ - Detects new file in bucket │ +│ - Extracts metadata (etag, size, modified) │ +│ - Inserts into drive_files table │ +└─────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 3. DriveCompiler (every 30s) │ +│ - Queries drive_files WHERE file_type='bas' │ +│ - Compiles .bas → .ast │ +│ - Stores in /opt/gbo/work/{bot}.gbai/ │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 4. KB Indexer (triggered by drive_files.indexed=false) │ +│ - Downloads .gbkb/* files from S3 │ +│ - Chunks text, generates embeddings │ +│ - Stores in Qdrant collection {bot}_{kb_name} │ +│ - Updates drive_files.indexed = true │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 5. Bot Ready │ +│ - WebSocket connection at ws://localhost:8080/ws/testbot │ +│ - start.bas executed on connect │ +│ - KB available for USE KB "manual" │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Test Checklist + +- [ ] MinIO Console accessible at :9101 +- [ ] Bucket `testbot.gbai` created +- [ ] Folder `testbot.gbdialog` created +- [ ] File `start.bas` uploaded +- [ ] Folder `testbot.gbkb` created +- [ ] Files `manual.txt`, `faq.txt` uploaded +- [ ] Bot auto-created in database +- [ ] Files appear in `drive_files` table +- [ ] `.ast` file generated in work dir +- [ ] Qdrant collection created +- [ ] Bot accessible at http://localhost:3000/testbot +- [ ] KB search returns relevant results + +## Cleanup + +```bash +# Remove test bot +mc rb --force local/testbot.gbai + +# Remove from database +./botserver-stack/bin/tables/bin/psql -h localhost -U botserver -d botserver -c \ + "DELETE FROM bots WHERE name = 'testbot';" + +# Remove Qdrant collection +curl -X DELETE http://localhost:6333/collections/testbot_manual + +# Remove work files +rm -rf /opt/gbo/work/testbot.gbai +``` diff --git a/botserver/Cargo.toml b/botserver/Cargo.toml index aebe4d16..2a231845 100644 --- a/botserver/Cargo.toml +++ b/botserver/Cargo.toml @@ -10,7 +10,7 @@ features = ["database", "i18n"] [features] # ===== DEFAULT ===== -default = ["chat", "automation", "cache", "llm", "vectordb", "crawler", "drive", "directory"] +default = ["chat", "automation", "cache", "llm", "vectordb", "crawler", "drive", "directory", "kb-extraction"] # ===== SECURITY MODES ===== # no-security: Minimal build - chat, automation, drive, cache only (no RBAC, directory, security, compliance) @@ -42,16 +42,19 @@ marketing = ["people", "automation", "drive", "cache"] # Productivity calendar = ["automation", "drive", "cache"] tasks = ["automation", "drive", "cache", "dep:cron"] -project = ["automation", "drive", "cache", "quick-xml"] +project = ["automation", "drive", "cache"] goals = ["automation", "drive", "cache"] workspaces = ["automation", "drive", "cache"] tickets = ["automation", "drive", "cache"] billing = ["automation", "drive", "cache"] -# Documents -docs = ["automation", "drive", "cache", "docx-rs", "ooxmlsdk"] -sheet = ["automation", "drive", "cache", "calamine", "dep:rust_xlsxwriter", "dep:umya-spreadsheet"] -slides = ["automation", "drive", "cache", "ooxmlsdk"] +# Document Processing (lightweight - KB extraction without heavy OOXML SDKs) +kb-extraction = ["drive", "dep:calamine"] + +# Documents (full editing UI - opt-in, adds ~4min compile time from ooxmlsdk) +docs = ["automation", "drive", "cache", "dep:docx-rs", "dep:ooxmlsdk", "kb-extraction"] +sheet = ["automation", "drive", "cache", "dep:calamine", "dep:rust_xlsxwriter", "dep:umya-spreadsheet", "kb-extraction"] +slides = ["automation", "drive", "cache", "dep:ooxmlsdk", "kb-extraction"] paper = ["automation", "drive", "cache"] # Media @@ -172,7 +175,7 @@ umya-spreadsheet = { workspace = true, optional = true } # File Storage & Drive (drive feature) # minio removed - use rust-s3 via S3Repository instead pdf-extract = { workspace = true, optional = true } -quick-xml = { workspace = true, optional = true } +quick-xml = { workspace = true } flate2 = { workspace = true } zip = { workspace = true } tar = { workspace = true } diff --git a/botserver/src/attendance/llm_assist_helpers.rs b/botserver/src/attendance/llm_assist_helpers.rs index e6282300..b09495f8 100644 --- a/botserver/src/attendance/llm_assist_helpers.rs +++ b/botserver/src/attendance/llm_assist_helpers.rs @@ -16,7 +16,7 @@ pub async fn execute_llm_with_context( system_prompt: &str, user_prompt: &str, ) -> Result> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let model = config_manager .get_config(&bot_id, "llm-model", None) diff --git a/botserver/src/auto_task/app_generator.rs b/botserver/src/auto_task/app_generator.rs index 9cb7df4d..0b3fa3b3 100644 --- a/botserver/src/auto_task/app_generator.rs +++ b/botserver/src/auto_task/app_generator.rs @@ -2728,7 +2728,7 @@ NO QUESTIONS. JUST BUILD."# { let prompt = _prompt; let bot_id = _bot_id; - let config_manager = ConfigManager::new(self.state.conn.clone().into()); + let config_manager = ConfigManager::new(self.state.conn.clone()); let model = config_manager .get_config(&bot_id, "llm-model", None) .unwrap_or_else(|_| { @@ -3170,40 +3170,9 @@ NO QUESTIONS. JUST BUILD."# .execute(&mut conn)?; Ok(()) - } +} - /// Ensure the bucket exists, creating it if necessary - async fn ensure_bucket_exists( - &self, - bucket: &str, - ) -> Result<(), Box> { - #[cfg(feature = "drive")] - if let Some(ref s3) = self.state.drive { - // Check if bucket exists -match s3.object_exists(bucket, "").await { - Ok(_) => { - trace!("Bucket {} already exists", bucket); - Ok(()) - } - Err(_) => { - Ok(()) - } - } - } else { - // No S3 client, we'll use DB fallback - no bucket needed - trace!("No S3 client, using DB fallback for storage"); - Ok(()) - } - - #[cfg(not(feature = "drive"))] - { - let _ = bucket; - trace!("Drive feature not enabled, no bucket check needed"); - Ok(()) - } - } - - async fn write_to_drive( +async fn write_to_drive( &self, bucket: &str, path: &str, diff --git a/botserver/src/auto_task/ask_later.rs b/botserver/src/auto_task/ask_later.rs index 3e56ca91..6c470cc2 100644 --- a/botserver/src/auto_task/ask_later.rs +++ b/botserver/src/auto_task/ask_later.rs @@ -206,7 +206,7 @@ fn fill_pending_info( .bind::(config_key) .execute(&mut conn)?; - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); config_manager .set_config(&bot_id, config_key, value) .map_err(|e| format!("Failed to set config: {}", e))?; diff --git a/botserver/src/auto_task/designer_ai.rs b/botserver/src/auto_task/designer_ai.rs index 9b246361..65d61e10 100644 --- a/botserver/src/auto_task/designer_ai.rs +++ b/botserver/src/auto_task/designer_ai.rs @@ -1050,7 +1050,7 @@ Respond ONLY with valid JSON."# let prompt = _prompt; let bot_id = _bot_id; // Get model and key from bot configuration - let config_manager = ConfigManager::new(self.state.conn.clone().into()); + let config_manager = ConfigManager::new(self.state.conn.clone()); let model = config_manager .get_config(&bot_id, "llm-model", None) .unwrap_or_else(|_| { diff --git a/botserver/src/auto_task/intent_classifier.rs b/botserver/src/auto_task/intent_classifier.rs index af49663f..8ea7ab1a 100644 --- a/botserver/src/auto_task/intent_classifier.rs +++ b/botserver/src/auto_task/intent_classifier.rs @@ -1056,7 +1056,7 @@ END TRIGGER let prompt = _prompt; let bot_id = _bot_id; // Get model and key from bot configuration - let config_manager = ConfigManager::new(self.state.conn.clone().into()); + let config_manager = ConfigManager::new(self.state.conn.clone()); let model = config_manager .get_config(&bot_id, "llm-model", None) .unwrap_or_else(|_| { diff --git a/botserver/src/auto_task/intent_compiler.rs b/botserver/src/auto_task/intent_compiler.rs index 70e093e9..3df7fa75 100644 --- a/botserver/src/auto_task/intent_compiler.rs +++ b/botserver/src/auto_task/intent_compiler.rs @@ -683,7 +683,7 @@ Respond ONLY with valid JSON."#, let prompt = _prompt; let bot_id = _bot_id; // Get model and key from bot configuration - let config_manager = ConfigManager::new(self.state.conn.clone().into()); + let config_manager = ConfigManager::new(self.state.conn.clone()); let model = config_manager .get_config(&bot_id, "llm-model", None) .unwrap_or_else(|_| { diff --git a/botserver/src/basic/keywords/kb_statistics.rs b/botserver/src/basic/keywords/kb_statistics.rs index f8d722f0..0a6f77a8 100644 --- a/botserver/src/basic/keywords/kb_statistics.rs +++ b/botserver/src/basic/keywords/kb_statistics.rs @@ -234,7 +234,7 @@ async fn get_kb_statistics( let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() { sm.get_vectordb_config_sync().0 } else { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); config_manager .get_config(&user.bot_id, "vectordb-url", Some("https://localhost:6333")) .unwrap_or_else(|_| "https://localhost:6333".to_string()) @@ -293,7 +293,7 @@ async fn get_collection_statistics( let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() { sm.get_vectordb_config_sync().0 } else { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); config_manager .get_config(&uuid::Uuid::nil(), "vectordb-url", Some("https://localhost:6333")) .unwrap_or_else(|_| "https://localhost:6333".to_string()) @@ -382,7 +382,7 @@ async fn list_collections( let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() { sm.get_vectordb_config_sync().0 } else { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); config_manager .get_config(&user.bot_id, "vectordb-url", Some("https://localhost:6333")) .unwrap_or_else(|_| "https://localhost:6333".to_string()) diff --git a/botserver/src/basic/keywords/llm_keyword.rs b/botserver/src/basic/keywords/llm_keyword.rs index 94eb5451..8ba54356 100644 --- a/botserver/src/basic/keywords/llm_keyword.rs +++ b/botserver/src/basic/keywords/llm_keyword.rs @@ -79,7 +79,7 @@ pub async fn execute_llm_generation( state: Arc, prompt: String, ) -> Result> { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let model = config_manager .get_config(&Uuid::nil(), "llm-model", None) .unwrap_or_default(); diff --git a/botserver/src/basic/keywords/llm_macros.rs b/botserver/src/basic/keywords/llm_macros.rs index 4453f3ad..77c2118f 100644 --- a/botserver/src/basic/keywords/llm_macros.rs +++ b/botserver/src/basic/keywords/llm_macros.rs @@ -48,7 +48,7 @@ async fn call_llm( state: &AppState, prompt: &str, ) -> Result> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let model = config_manager .get_config(&Uuid::nil(), "llm-model", None) .unwrap_or_default(); diff --git a/botserver/src/basic/keywords/save_from_unstructured.rs b/botserver/src/basic/keywords/save_from_unstructured.rs index e1ab2fea..db77c662 100644 --- a/botserver/src/basic/keywords/save_from_unstructured.rs +++ b/botserver/src/basic/keywords/save_from_unstructured.rs @@ -260,7 +260,7 @@ Return ONLY the JSON object, no explanations or markdown formatting."#, } async fn call_llm_for_extraction(state: &AppState, prompt: &str) -> Result { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let model = config_manager .get_config(&Uuid::nil(), "llm-model", None) .unwrap_or_else(|_| "gpt-3.5-turbo".to_string()); diff --git a/botserver/src/basic/keywords/sms.rs b/botserver/src/basic/keywords/sms.rs index 06923015..97936218 100644 --- a/botserver/src/basic/keywords/sms.rs +++ b/botserver/src/basic/keywords/sms.rs @@ -486,7 +486,7 @@ async fn execute_send_sms( provider_override: Option<&str>, priority_override: Option<&str>, ) -> Result> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let bot_id = user.bot_id; let provider_name = match provider_override { @@ -589,7 +589,7 @@ async fn send_via_twilio( message: &str, priority: &SmsPriority, ) -> Result, Box> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let account_sid = config_manager .get_config(bot_id, "twilio-account-sid", None) @@ -645,7 +645,7 @@ async fn send_via_aws_sns( message: &str, priority: &SmsPriority, ) -> Result, Box> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let access_key = config_manager .get_config(bot_id, "aws-access-key", None) @@ -710,7 +710,7 @@ async fn send_via_vonage( message: &str, priority: &SmsPriority, ) -> Result, Box> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let api_key = config_manager .get_config(bot_id, "vonage-api-key", None) @@ -776,7 +776,7 @@ async fn send_via_messagebird( message: &str, priority: &SmsPriority, ) -> Result, Box> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let api_key = config_manager .get_config(bot_id, "messagebird-api-key", None) @@ -830,7 +830,7 @@ async fn send_via_custom_webhook( message: &str, priority: &SmsPriority, ) -> Result, Box> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let webhook_url = config_manager .get_config(bot_id, &format!("{}-webhook-url", webhook_name), None) diff --git a/botserver/src/basic/keywords/table_definition.rs b/botserver/src/basic/keywords/table_definition.rs index 94f180ea..4a361532 100644 --- a/botserver/src/basic/keywords/table_definition.rs +++ b/botserver/src/basic/keywords/table_definition.rs @@ -424,7 +424,7 @@ pub fn load_connection_config( bot_id: Uuid, connection_name: &str, ) -> Result> { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let prefix = format!("conn-{}-", connection_name); diff --git a/botserver/src/console/status_panel.rs b/botserver/src/console/status_panel.rs index abc84ead..50d79f05 100644 --- a/botserver/src/console/status_panel.rs +++ b/botserver/src/console/status_panel.rs @@ -244,7 +244,7 @@ impl StatusPanel { if selected == bot_name { lines.push("".to_string()); lines.push(" ┌─ Bot Configuration ─────────┐".to_string()); - let config_manager = ConfigManager::new(self.app_state.conn.clone().into()); + let config_manager = ConfigManager::new(self.app_state.conn.clone()); let llm_model = config_manager .get_config(bot_id, "llm-model", None) .unwrap_or_else(|_| "N/A".to_string()); diff --git a/botserver/src/core/bootstrap/bootstrap_manager.rs b/botserver/src/core/bootstrap/bootstrap_manager.rs index 239ad48c..91785aea 100644 --- a/botserver/src/core/bootstrap/bootstrap_manager.rs +++ b/botserver/src/core/bootstrap/bootstrap_manager.rs @@ -201,21 +201,21 @@ impl BootstrapManager { match pm.start("directory") { Ok(_child) => { info!("Directory service started, waiting for readiness..."); - let mut zitadel_ready = false; - for i in 0..150 { - sleep(Duration::from_secs(2)).await; - if zitadel_health_check() { - info!("Zitadel/Directory service is responding after {}s", (i + 1) * 2); - zitadel_ready = true; - break; - } - if i % 15 == 14 { - info!("Zitadel health check: {}s elapsed, retrying...", (i + 1) * 2); - } - } - if !zitadel_ready { - warn!("Zitadel/Directory service did not respond after 300 seconds"); - } + let mut zitadel_ready = false; + for i in 0..30 { + sleep(Duration::from_secs(2)).await; + if zitadel_health_check() { + info!("Zitadel/Directory service is responding after {}s", (i + 1) * 2); + zitadel_ready = true; + break; + } + if i == 14 { + info!("Zitadel health check: 30s elapsed, retrying..."); + } + } + if !zitadel_ready { + warn!("Zitadel/Directory service did not respond after 60 seconds, continuing anyway"); + } if zitadel_ready { let config_path = self.stack_dir("conf/system/directory_config.json"); diff --git a/botserver/src/core/bot/channels/teams.rs b/botserver/src/core/bot/channels/teams.rs index 8d051bca..eff1d9a0 100644 --- a/botserver/src/core/bot/channels/teams.rs +++ b/botserver/src/core/bot/channels/teams.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use log::{error, info}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; + use uuid::Uuid; use crate::core::bot::channels::ChannelAdapter; @@ -20,7 +20,7 @@ pub struct TeamsAdapter { impl TeamsAdapter { pub fn new(pool: DbPool, bot_id: Uuid) -> Self { - let config_manager = ConfigManager::new(Arc::new(pool)); + let config_manager = ConfigManager::new(pool); let app_id = config_manager .get_config(&bot_id, "teams-app-id", None) diff --git a/botserver/src/core/bot/channels/telegram.rs b/botserver/src/core/bot/channels/telegram.rs index 05f89ad7..f8169210 100644 --- a/botserver/src/core/bot/channels/telegram.rs +++ b/botserver/src/core/bot/channels/telegram.rs @@ -3,7 +3,7 @@ use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, Pool}; use log::{debug, error, info}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; + use crate::core::bot::channels::ChannelAdapter; use crate::core::config::ConfigManager; @@ -88,7 +88,7 @@ pub struct TelegramAdapter { impl TelegramAdapter { pub fn new(pool: Pool>, bot_id: uuid::Uuid) -> Self { - let config_manager = ConfigManager::new(Arc::new(pool)); + let config_manager = ConfigManager::new(pool); let bot_token = config_manager .get_config(&bot_id, "telegram-bot-token", None) diff --git a/botserver/src/core/bot/channels/whatsapp.rs b/botserver/src/core/bot/channels/whatsapp.rs index 42dd1d0c..458a38f3 100644 --- a/botserver/src/core/bot/channels/whatsapp.rs +++ b/botserver/src/core/bot/channels/whatsapp.rs @@ -26,7 +26,7 @@ pub struct WhatsAppAdapter { impl WhatsAppAdapter { pub fn new(state: &Arc, bot_id: Uuid) -> Self { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let api_key = config_manager .get_config(&bot_id, "whatsapp-api-key", None) diff --git a/botserver/src/core/bot/mod.rs b/botserver/src/core/bot/mod.rs index f54f8d87..f44e7e53 100644 --- a/botserver/src/core/bot/mod.rs +++ b/botserver/src/core/bot/mod.rs @@ -520,7 +520,7 @@ impl BotOrchestrator { sm.get_session_context_data(&session.id, &session.user_id)? }; - let config_manager = ConfigManager::new(state_clone.conn.clone().into()); + let config_manager = ConfigManager::new(state_clone.conn.clone()); let history_limit = config_manager .get_bot_config_value(&session.bot_id, "history-limit") @@ -875,7 +875,7 @@ impl BotOrchestrator { #[cfg(feature = "nvidia")] { let initial_tokens = crate::core::shared::utils::estimate_token_count(&context_data); - let config_manager = ConfigManager::new(self.state.conn.clone().into()); + let config_manager = ConfigManager::new(self.state.conn.clone()); let max_context_size = config_manager .get_config(&session.bot_id, "llm-server-ctx-size", None) .unwrap_or_default() diff --git a/botserver/src/core/bot/mod_backup.rs b/botserver/src/core/bot/mod_backup.rs index 32f10558..37c16e75 100644 --- a/botserver/src/core/bot/mod_backup.rs +++ b/botserver/src/core/bot/mod_backup.rs @@ -110,7 +110,7 @@ impl BotOrchestrator { sm.get_conversation_history(session.id, user_id)? }; - let config_manager = ConfigManager::new(state_clone.conn.clone().into()); + let config_manager = ConfigManager::new(state_clone.conn.clone()); let model = config_manager .get_config(&bot_id, "llm-model", Some("gpt-3.5-turbo")) .unwrap_or_else(|_| "gpt-3.5-turbo".to_string()); @@ -149,7 +149,7 @@ impl BotOrchestrator { #[cfg(feature = "nvidia")] { let initial_tokens = crate::core::shared::utils::estimate_token_count(&context_data); - let config_manager = ConfigManager::new(self.state.conn.clone().into()); + let config_manager = ConfigManager::new(self.state.conn.clone()); let max_context_size = config_manager .get_config(&bot_id, "llm-server-ctx-size", None) .unwrap_or_default() diff --git a/botserver/src/core/config.rs b/botserver/src/core/config.rs index e61c135f..b8eb314d 100644 --- a/botserver/src/core/config.rs +++ b/botserver/src/core/config.rs @@ -1,9 +1,31 @@ -// Core configuration module -// Minimal implementation to allow compilation - use serde::{Deserialize, Serialize}; use std::sync::Arc; +use crate::core::shared::utils::DbPool; +use diesel::prelude::*; + +#[derive(Debug, Clone, QueryableByName)] +struct ConfigRow { + #[diesel(sql_type = diesel::sql_types::Text)] + config_value: String, +} + +fn is_placeholder_value(val: &str) -> bool { + let lower = val.trim().to_lowercase(); + lower.is_empty() || lower == "none" || lower == "null" || lower == "n/a" +} + +fn is_local_file_path(val: &str) -> bool { + let lower = val.to_lowercase(); + val.starts_with("../") + || val.starts_with("./") + || val.starts_with('/') + || val.starts_with('~') + || lower.ends_with(".gguf") + || lower.ends_with(".bin") + || lower.ends_with(".safetensors") +} + /// Application configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppConfig { @@ -28,7 +50,7 @@ pub struct DatabaseConfig { pub max_connections: u32, } -#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DriveConfig { pub endpoint: String, pub bucket: String, @@ -88,42 +110,141 @@ impl AppConfig { /// Configuration manager for runtime config updates pub struct ConfigManager { - db_pool: Arc, + pool: Arc, } impl ConfigManager { - pub fn new(db_pool: Arc) -> Self { - Self { - db_pool: db_pool as Arc, - } + pub fn new(pool: DbPool) -> Self { + Self { pool: Arc::new(pool) } } pub fn get_config( &self, - _bot_id: &uuid::Uuid, - _key: &str, + bot_id: &uuid::Uuid, + key: &str, default: Option<&str>, ) -> Result> { + if let Ok(mut conn) = self.pool.get() { + let bot_val = diesel::sql_query( + "SELECT config_value FROM bot_configuration WHERE bot_id = $1 AND config_key = $2 LIMIT 1" + ) + .bind::(bot_id) + .bind::(key) + .get_result::(&mut conn) + .ok() + .map(|r| r.config_value); + + if let Some(ref val) = bot_val { + if !is_placeholder_value(val) && !is_local_file_path(val) { + return Ok(val.clone()); + } + } + + let default_val = diesel::sql_query( + "SELECT config_value FROM bot_configuration WHERE bot_id = $1 AND config_key = $2 LIMIT 1" + ) + .bind::(uuid::Uuid::nil()) + .bind::(key) + .get_result::(&mut conn) + .ok() + .map(|r| r.config_value); + + if let Some(ref val) = default_val { + if !is_placeholder_value(val) { + return Ok(val.clone()); + } + } + } Ok(default.unwrap_or("").to_string()) } pub fn get_bot_config_value( &self, - _bot_id: &uuid::Uuid, - _key: &str, + bot_id: &uuid::Uuid, + key: &str, ) -> Result> { - Ok(String::new()) + if let Ok(mut conn) = self.pool.get() { + let row = diesel::sql_query( + "SELECT config_value FROM bot_configuration WHERE bot_id = $1 AND config_key = $2 LIMIT 1" + ) + .bind::(bot_id) + .bind::(key) + .get_result::(&mut conn) + .ok(); + if let Some(r) = row { + return Ok(r.config_value); + } + } + Err("Config key not found".into()) } pub fn set_config( &self, - _bot_id: &uuid::Uuid, - _key: &str, - _value: &str, + bot_id: &uuid::Uuid, + key: &str, + value: &str, ) -> Result<(), Box> { + if let Ok(mut conn) = self.pool.get() { + diesel::sql_query( + "INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type, is_encrypted, created_at, updated_at) \ + VALUES ($1, $2, $3, $4, 'string', false, NOW(), NOW()) \ + ON CONFLICT (bot_id, config_key) DO UPDATE SET config_value = $4, updated_at = NOW()" + ) + .bind::(uuid::Uuid::new_v4()) + .bind::(bot_id) + .bind::(key) + .bind::(value) + .execute(&mut conn)?; + } Ok(()) } } // Re-export for convenience pub use AppConfig as Config; + +// Manual implementation to load from Vault +impl Default for DriveConfig { + fn default() -> Self { + // Try to load from Vault + if let Ok(vault_addr) = std::env::var("VAULT_ADDR") { + if let Ok(vault_token) = std::env::var("VAULT_TOKEN") { + let ca_cert = std::env::var("VAULT_CACERT").unwrap_or_default(); + let url = format!("{}/v1/secret/data/gbo/drive", vault_addr); + + if let Ok(output) = std::process::Command::new("curl") + .args(&["-sf", "--cacert", &ca_cert, "-H", &format!("X-Vault-Token: {}", &vault_token), &url]) + .output() + { + if let Ok(data) = serde_json::from_slice::(&output.stdout) { + if let Some(secret_data) = data.get("data").and_then(|d| d.get("data")) { + let host = secret_data.get("host").and_then(|v| v.as_str()).unwrap_or("localhost"); + let accesskey = secret_data.get("accesskey").and_then(|v| v.as_str()).unwrap_or(""); + let secret = secret_data.get("secret").and_then(|v| v.as_str()).unwrap_or(""); + let bucket = secret_data.get("bucket").and_then(|v| v.as_str()).unwrap_or("default.gbai"); + + return Self { + endpoint: format!("http://{}", host), + bucket: bucket.to_string(), + region: "auto".to_string(), + access_key: accesskey.to_string(), + secret_key: secret.to_string(), + server: host.to_string(), + }; + } + } + } + } + } + + // Fallback to empty/localhost + Self { + endpoint: "http://localhost:9100".to_string(), + bucket: String::new(), + region: "auto".to_string(), + access_key: String::new(), + secret_key: String::new(), + server: "localhost:9100".to_string(), + } + } +} diff --git a/botserver/src/core/config_reload.rs b/botserver/src/core/config_reload.rs index ba435bb1..b22ba9e7 100644 --- a/botserver/src/core/config_reload.rs +++ b/botserver/src/core/config_reload.rs @@ -8,7 +8,7 @@ use crate::core::config::ConfigManager; pub async fn reload_config( State(state): State>, ) -> Result, StatusCode> { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); // Get default bot let conn_arc = state.conn.clone(); diff --git a/botserver/src/core/kb/document_processor.rs b/botserver/src/core/kb/document_processor/mod.rs similarity index 61% rename from botserver/src/core/kb/document_processor.rs rename to botserver/src/core/kb/document_processor/mod.rs index 8b41cc25..7ed97c8d 100644 --- a/botserver/src/core/kb/document_processor.rs +++ b/botserver/src/core/kb/document_processor/mod.rs @@ -1,85 +1,18 @@ +mod ooxml_extract; +mod rtf; +mod types; + +pub use types::{ChunkMetadata, DocumentFormat, DocumentMetadata, TextChunk}; + use anyhow::Result; use log::{debug, info, warn}; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::io::Cursor; use std::path::Path; use tokio::io::AsyncReadExt; + use crate::security::command_guard::SafeCommand; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum DocumentFormat { - PDF, - DOCX, - XLSX, - PPTX, - TXT, - MD, - HTML, - RTF, - CSV, - JSON, - XML, -} - -impl DocumentFormat { - pub fn from_extension(path: &Path) -> Option { - let ext = path.extension()?.to_str()?.to_lowercase(); - match ext.as_str() { - "pdf" => Some(Self::PDF), - "docx" => Some(Self::DOCX), - "xlsx" => Some(Self::XLSX), - "pptx" => Some(Self::PPTX), - "txt" => Some(Self::TXT), - "md" | "markdown" => Some(Self::MD), - "html" | "htm" => Some(Self::HTML), - "rtf" => Some(Self::RTF), - "csv" => Some(Self::CSV), - "json" => Some(Self::JSON), - "xml" => Some(Self::XML), - _ => None, - } - } - - pub fn max_size(&self) -> usize { - match self { - Self::PDF => 500 * 1024 * 1024, - Self::PPTX => 200 * 1024 * 1024, - Self::DOCX | Self::XLSX | Self::TXT | Self::JSON | Self::XML => 100 * 1024 * 1024, - Self::HTML | Self::RTF => 50 * 1024 * 1024, - Self::MD => 10 * 1024 * 1024, - Self::CSV => 1024 * 1024 * 1024, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DocumentMetadata { - pub title: Option, - pub author: Option, - pub creation_date: Option, - pub modification_date: Option, - pub page_count: Option, - pub word_count: Option, - pub language: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TextChunk { - pub content: String, - pub metadata: ChunkMetadata, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ChunkMetadata { - pub document_path: String, - pub document_title: Option, - pub chunk_index: usize, - pub total_chunks: usize, - pub start_char: usize, - pub end_char: usize, - pub page_number: Option, -} - #[derive(Debug)] pub struct DocumentProcessor { chunk_size: usize, @@ -124,10 +57,7 @@ impl DocumentProcessor { let file_size = metadata.len() as usize; if file_size == 0 { - debug!( - "Skipping empty file (0 bytes): {}", - file_path.display() - ); + debug!("Skipping empty file (0 bytes): {}", file_path.display()); return Ok(Vec::new()); } @@ -150,9 +80,7 @@ impl DocumentProcessor { ); let text = self.extract_text(file_path, format).await?; - let cleaned_text = Self::clean_text(&text); - let chunks = self.create_chunks(&cleaned_text, file_path); info!( @@ -165,10 +93,9 @@ impl DocumentProcessor { } async fn extract_text(&self, file_path: &Path, format: DocumentFormat) -> Result { - // Check file size before processing to prevent memory exhaustion let metadata = tokio::fs::metadata(file_path).await?; let file_size = metadata.len() as usize; - + if file_size > format.max_size() { return Err(anyhow::anyhow!( "File too large: {} bytes (max: {} bytes)", @@ -179,8 +106,7 @@ impl DocumentProcessor { match format { DocumentFormat::TXT | DocumentFormat::MD => { - // Use streaming read for large text files - if file_size > 10 * 1024 * 1024 { // 10MB + if file_size > 10 * 1024 * 1024 { self.extract_large_text_file(file_path).await } else { let mut file = tokio::fs::File::open(file_path).await?; @@ -191,29 +117,26 @@ impl DocumentProcessor { } DocumentFormat::PDF => self.extract_pdf_text(file_path).await, DocumentFormat::DOCX => self.extract_docx_text(file_path).await, + DocumentFormat::PPTX => self.extract_pptx_text(file_path).await, + DocumentFormat::XLSX => self.extract_xlsx_text(file_path).await, DocumentFormat::HTML => self.extract_html_text(file_path).await, DocumentFormat::CSV => self.extract_csv_text(file_path).await, DocumentFormat::JSON => self.extract_json_text(file_path).await, - _ => { - warn!( - "Format {:?} extraction not yet implemented, using fallback", - format - ); - self.fallback_text_extraction(file_path).await - } + DocumentFormat::XML => self.extract_xml_text(file_path).await, + DocumentFormat::RTF => self.extract_rtf_text(file_path).await, } } async fn extract_large_text_file(&self, file_path: &Path) -> Result { use tokio::io::AsyncBufReadExt; - + let file = tokio::fs::File::open(file_path).await?; let reader = tokio::io::BufReader::new(file); let mut lines = reader.lines(); let mut content = String::new(); let mut line_count = 0; - const MAX_LINES: usize = 100_000; // Limit lines to prevent memory exhaustion - + const MAX_LINES: usize = 100_000; + while let Some(line) = lines.next_line().await? { if line_count >= MAX_LINES { warn!("Truncating large file at {} lines: {}", MAX_LINES, file_path.display()); @@ -222,13 +145,12 @@ impl DocumentProcessor { content.push_str(&line); content.push('\n'); line_count += 1; - - // Yield control periodically + if line_count % 1000 == 0 { tokio::task::yield_now().await; } } - + Ok(content) } @@ -249,17 +171,11 @@ impl DocumentProcessor { match output { Ok(output) if output.status.success() => { - info!( - "Successfully extracted PDF with pdftotext: {}", - file_path.display() - ); + info!("Successfully extracted PDF with pdftotext: {}", file_path.display()); Ok(String::from_utf8_lossy(&output.stdout).to_string()) } _ => { - warn!( - "pdftotext failed for {}, trying library extraction", - file_path.display() - ); + warn!("pdftotext failed for {}, trying library extraction", file_path.display()); self.extract_pdf_with_library(file_path) } } @@ -301,60 +217,60 @@ impl DocumentProcessor { } async fn extract_docx_text(&self, file_path: &Path) -> Result { - let file_path_str = file_path.to_string_lossy().to_string(); - let cmd_result = SafeCommand::new("pandoc") - .and_then(|c| c.arg("-f")) - .and_then(|c| c.arg("docx")) - .and_then(|c| c.arg("-t")) - .and_then(|c| c.arg("plain")) - .and_then(|c| c.arg(&file_path_str)); + let bytes = tokio::fs::read(file_path).await?; + let path_display = file_path.display().to_string(); + let result = tokio::task::spawn_blocking(move || -> Result { + match ooxml_extract::extract_docx_text_from_zip(&bytes) { + Ok(text) if !text.trim().is_empty() => { + log::info!("Extracted DOCX text from ZIP: {path_display}"); + return Ok(text); + } + Ok(_) => log::warn!("DOCX ZIP extraction returned empty text: {path_display}"), + Err(e) => log::warn!("DOCX ZIP extraction failed for {path_display}: {e}"), + } - let output = match cmd_result { - Ok(cmd) => cmd.execute_async().await, - Err(e) => { - warn!("Failed to build pandoc command: {}", e); - return self.fallback_text_extraction(file_path).await; + #[cfg(feature = "docs")] + match crate::docs::ooxml::load_docx_preserving(&bytes) { + Ok(doc) => { + let text: String = doc.paragraphs.iter().map(|p| p.text.as_str()).collect::>().join("\n"); + if !text.trim().is_empty() { + log::info!("Extracted DOCX with ooxmlsdk: {path_display}"); + return Ok(text); + } + log::warn!("ooxmlsdk DOCX returned empty: {path_display}"); + } + Err(e) => log::warn!("ooxmlsdk DOCX failed for {path_display}: {e}"), } - }; - match output { - Ok(output) if output.status.success() => { - Ok(String::from_utf8_lossy(&output.stdout).to_string()) - } - _ => { - warn!("pandoc failed for DOCX, using fallback"); - self.fallback_text_extraction(file_path).await - } - } + Err(anyhow::anyhow!("All DOCX extraction methods failed for {path_display}")) + }) + .await??; + + Ok(result) } async fn extract_html_text(&self, file_path: &Path) -> Result { let contents = tokio::fs::read_to_string(file_path).await?; - let text = contents .split('<') .flat_map(|s| s.split('>').skip(1)) .collect::>() .join(" "); - Ok(text) } async fn extract_csv_text(&self, file_path: &Path) -> Result { let contents = tokio::fs::read_to_string(file_path).await?; - let mut text = String::new(); for line in contents.lines() { text.push_str(line); text.push('\n'); } - Ok(text) } async fn extract_json_text(&self, file_path: &Path) -> Result { let contents = tokio::fs::read_to_string(file_path).await?; - if let Ok(json) = serde_json::from_str::(&contents) { Ok(Self::extract_json_strings(&json)) } else { @@ -364,7 +280,6 @@ impl DocumentProcessor { fn extract_json_strings(value: &serde_json::Value) -> String { let mut result = String::new(); - match value { serde_json::Value::String(s) => { result.push_str(s); @@ -382,10 +297,180 @@ impl DocumentProcessor { } _ => {} } - result } + async fn extract_pptx_text(&self, file_path: &Path) -> Result { + let bytes = tokio::fs::read(file_path).await?; + let path_display = file_path.display().to_string(); + let result = tokio::task::spawn_blocking(move || -> Result { + match ooxml_extract::extract_pptx_text_from_zip(&bytes) { + Ok(text) if !text.trim().is_empty() => { + log::info!("Extracted PPTX text from ZIP: {path_display}"); + return Ok(text); + } + Ok(_) => log::warn!("PPTX ZIP extraction returned empty text: {path_display}"), + Err(e) => log::warn!("PPTX ZIP extraction failed for {path_display}: {e}"), + } + + #[cfg(feature = "slides")] + match crate::slides::ooxml::load_pptx_preserving(&bytes) { + Ok(pptx) => { + let mut text = String::new(); + for slide in &pptx.slides { + for slide_text in &slide.texts { + if !text.is_empty() { + text.push('\n'); + } + text.push_str(slide_text); + } + } + if !text.trim().is_empty() { + log::info!("Extracted PPTX with ooxmlsdk: {path_display}"); + return Ok(text); + } + log::warn!("ooxmlsdk PPTX returned empty: {path_display}"); + } + Err(e) => log::warn!("ooxmlsdk PPTX failed for {path_display}: {e}"), + } + + Err(anyhow::anyhow!("All PPTX extraction methods failed for {path_display}")) + }) + .await??; + + Ok(result) + } + + #[cfg(feature = "kb-extraction")] + async fn extract_xlsx_text(&self, file_path: &Path) -> Result { + let path = file_path.to_path_buf(); + let result = tokio::task::spawn_blocking(move || -> Result { + use calamine::{open_workbook_from_rs, Reader, Xlsx}; + use std::io::Read; + + let mut file = std::fs::File::open(&path)?; + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes)?; + let cursor = Cursor::new(bytes.as_slice()); + let mut workbook: Xlsx<_> = open_workbook_from_rs(cursor) + .map_err(|e| anyhow::anyhow!("Failed to open XLSX: {e}"))?; + + let mut content = String::new(); + for sheet_name in workbook.sheet_names() { + if let Ok(range) = workbook.worksheet_range(&sheet_name) { + use std::fmt::Write; + let _ = writeln!(&mut content, "=== {} ===", sheet_name); + + for row in range.rows() { + let row_text: Vec = row + .iter() + .map(|cell| match cell { + calamine::Data::Empty => String::new(), + calamine::Data::String(s) + | calamine::Data::DateTimeIso(s) + | calamine::Data::DurationIso(s) => s.clone(), + calamine::Data::Float(f) => f.to_string(), + calamine::Data::Int(i) => i.to_string(), + calamine::Data::Bool(b) => b.to_string(), + calamine::Data::Error(e) => format!("{e:?}"), + calamine::Data::DateTime(dt) => dt.to_string(), + }) + .collect(); + + let line = row_text.join("\t"); + if !line.trim().is_empty() { + content.push_str(&line); + content.push('\n'); + } + } + content.push('\n'); + } + } + + Ok(content) + }) + .await??; + + if result.trim().is_empty() { + warn!("XLSX extraction produced empty text: {}", file_path.display()); + } else { + info!("Extracted XLSX with calamine library: {}", file_path.display()); + } + + Ok(result) + } + + #[cfg(not(feature = "kb-extraction"))] + async fn extract_xlsx_text(&self, file_path: &Path) -> Result { + self.fallback_text_extraction(file_path).await + } + + async fn extract_xml_text(&self, file_path: &Path) -> Result { + let bytes = tokio::fs::read(file_path).await?; + let result = tokio::task::spawn_blocking(move || -> Result { + use quick_xml::events::Event; + use quick_xml::Reader; + + let mut reader = Reader::from_reader(bytes.as_slice()); + let mut text = String::new(); + let mut buf = Vec::new(); + + loop { + match reader.read_event_into(&mut buf) { + Ok(Event::Text(t)) => { + if let Ok(s) = t.unescape() { + let s = s.trim(); + if !s.is_empty() { + if !text.is_empty() { + text.push(' '); + } + text.push_str(s); + } + } + } + Ok(Event::Eof) => break, + Err(e) => { + return Err(anyhow::anyhow!( + "XML parsing error at position {}: {e}", + reader.error_position() + )); + } + _ => {} + } + buf.clear(); + } + + Ok(text) + }) + .await??; + + if result.trim().is_empty() { + warn!("XML extraction produced empty text: {}", file_path.display()); + return self.fallback_text_extraction(file_path).await; + } + + info!("Extracted XML with quick-xml: {}", file_path.display()); + Ok(result) + } + + async fn extract_rtf_text(&self, file_path: &Path) -> Result { + let bytes = tokio::fs::read(file_path).await?; + let result = tokio::task::spawn_blocking(move || -> Result { + let content = String::from_utf8_lossy(&bytes); + let text = rtf::strip_rtf_commands(&content); + Ok(text) + }) + .await??; + + if result.trim().is_empty() { + warn!("RTF extraction produced empty text: {}", file_path.display()); + return self.fallback_text_extraction(file_path).await; + } + + info!("Extracted RTF text: {}", file_path.display()); + Ok(result) + } + async fn fallback_text_extraction(&self, file_path: &Path) -> Result { match tokio::fs::read_to_string(file_path).await { Ok(contents) => Ok(contents), @@ -415,16 +500,15 @@ impl DocumentProcessor { fn create_chunks(&self, text: &str, file_path: &Path) -> Vec { let mut chunks = Vec::new(); - - // For very large texts, limit processing to prevent memory exhaustion - const MAX_TEXT_SIZE: usize = 10 * 1024 * 1024; // 10MB + + const MAX_TEXT_SIZE: usize = 10 * 1024 * 1024; let text_to_process = if text.len() > MAX_TEXT_SIZE { warn!("Truncating large text to {} chars for chunking: {}", MAX_TEXT_SIZE, file_path.display()); &text[..MAX_TEXT_SIZE] } else { text }; - + let chars: Vec = text_to_process.chars().collect(); let total_chars = chars.len(); @@ -442,7 +526,6 @@ impl DocumentProcessor { 1 }; - // Limit maximum number of chunks to prevent memory exhaustion const MAX_CHUNKS: usize = 1000; let max_chunks_to_create = std::cmp::min(total_chunks, MAX_CHUNKS); @@ -451,7 +534,6 @@ impl DocumentProcessor { let mut chunk_end = end; if end < total_chars { - // Find word boundary within reasonable distance let search_start = std::cmp::max(start, end.saturating_sub(100)); for i in (search_start..end).rev() { if chars[i].is_whitespace() { @@ -463,7 +545,6 @@ impl DocumentProcessor { let chunk_content: String = chars[start..chunk_end].iter().collect(); - // Skip empty or very small chunks if chunk_content.trim().len() < 10 { start = chunk_end; continue; @@ -518,16 +599,15 @@ impl DocumentProcessor { info!("Processing knowledge base folder: {}", kb_path.display()); - // Process files in small batches to prevent memory exhaustion let mut results = HashMap::new(); - const BATCH_SIZE: usize = 10; // Much smaller batch size - + const BATCH_SIZE: usize = 10; + let files = self.collect_supported_files(kb_path).await?; info!("Found {} supported files to process", files.len()); - + for batch in files.chunks(BATCH_SIZE) { let mut batch_results = HashMap::new(); - + for file_path in batch { match self.process_document(file_path).await { Ok(chunks) => { @@ -539,19 +619,16 @@ impl DocumentProcessor { warn!("Failed to process document {}: {}", file_path.display(), e); } } - - // Yield control after each file + tokio::task::yield_now().await; } - - // Merge batch results and clear batch memory + results.extend(batch_results); - - // Force memory cleanup between batches + if results.len() % (BATCH_SIZE * 2) == 0 { results.shrink_to_fit(); } - + info!("Processed batch, total documents: {}", results.len()); } @@ -571,7 +648,6 @@ impl DocumentProcessor { files: &mut Vec, depth: usize, ) -> Result<()> { - // Prevent excessive recursion if depth > 10 { warn!("Skipping deep directory to prevent stack overflow: {}", dir.display()); return Ok(()); @@ -586,7 +662,6 @@ impl DocumentProcessor { if metadata.is_dir() { Box::pin(self.collect_files_recursive(&path, files, depth + 1)).await?; } else if self.is_supported_file(&path) { - // Skip very large files if metadata.len() > 50 * 1024 * 1024 { warn!("Skipping large file: {} ({})", path.display(), metadata.len()); continue; diff --git a/botserver/src/core/kb/document_processor/ooxml_extract.rs b/botserver/src/core/kb/document_processor/ooxml_extract.rs new file mode 100644 index 00000000..3601f0c8 --- /dev/null +++ b/botserver/src/core/kb/document_processor/ooxml_extract.rs @@ -0,0 +1,167 @@ +use std::io::Cursor; + +pub fn extract_docx_text_from_zip(bytes: &[u8]) -> Result { + use std::io::Read; + use zip::ZipArchive; + + let reader = Cursor::new(bytes); + let mut archive = ZipArchive::new(reader) + .map_err(|e| format!("Failed to open DOCX as ZIP: {e}"))?; + + for i in 0..archive.len() { + let mut file = archive + .by_index(i) + .map_err(|e| format!("Failed to read ZIP entry: {e}"))?; + + if file.name() == "word/document.xml" { + let mut content = String::new(); + file.read_to_string(&mut content) + .map_err(|e| format!("Failed to read document.xml: {e}"))?; + + let paragraphs = extract_paragraphs(&content); + let text: String = paragraphs.iter().map(|p| p.as_str()).collect::>().join("\n"); + return Ok(text); + } + } + + Err("word/document.xml not found in DOCX archive".to_string()) +} + +fn extract_paragraphs(xml: &str) -> Vec { + let mut paragraphs = Vec::new(); + let mut pos = 0; + + while let Some(p_start) = xml[pos..].find("") { + let abs_end = abs_start + p_end_rel + 6; + let para_content = &xml[abs_start..abs_end]; + + let text = extract_text_from_paragraph(para_content); + if !text.trim().is_empty() { + paragraphs.push(text); + } + pos = abs_end; + } else { + break; + } + } + + paragraphs +} + +fn extract_text_from_paragraph(para_xml: &str) -> String { + let mut text = String::new(); + let mut pos = 0; + + while let Some(t_start) = para_xml[pos..].find("') { + let abs_content_start = abs_start + content_start_rel + 1; + + if let Some(t_end_rel) = para_xml[abs_content_start..].find("") { + let content = ¶_xml[abs_content_start..abs_content_start + t_end_rel]; + text.push_str(content); + pos = abs_content_start + t_end_rel + 6; + } else { + break; + } + } else { + break; + } + } + + unescape_xml(&text) +} + +pub fn extract_pptx_text_from_zip(bytes: &[u8]) -> Result { + use std::io::Read; + use zip::ZipArchive; + + let reader = Cursor::new(bytes); + let mut archive = ZipArchive::new(reader) + .map_err(|e| format!("Failed to open PPTX as ZIP: {e}"))?; + + let mut all_texts = Vec::new(); + + for i in 0..archive.len() { + let mut file = archive + .by_index(i) + .map_err(|e| format!("Failed to read ZIP entry: {e}"))?; + + let name = file.name().to_string(); + if name.starts_with("ppt/slides/slide") && name.ends_with(".xml") { + let mut content = String::new(); + file.read_to_string(&mut content) + .map_err(|e| format!("Failed to read {name}: {e}"))?; + + let texts = extract_slide_texts(&content); + all_texts.extend(texts); + } + } + + if all_texts.is_empty() { + return Err("No slide text found in PPTX archive".to_string()); + } + + Ok(all_texts.join("\n")) +} + +fn extract_slide_texts(xml: &str) -> Vec { + let mut texts = Vec::new(); + let mut pos = 0; + + while let Some(p_start) = xml[pos..].find("") { + let abs_end = abs_start + p_end_rel + 6; + let para_content = &xml[abs_start..abs_end]; + + let text = extract_slide_text_from_paragraph(para_content); + if !text.trim().is_empty() { + texts.push(text); + } + pos = abs_end; + } else { + break; + } + } + + texts +} + +fn extract_slide_text_from_paragraph(para_xml: &str) -> String { + let mut text = String::new(); + let mut pos = 0; + + while let Some(t_start) = para_xml[pos..].find("') { + let abs_content_start = abs_start + tag_end_rel + 1; + + if let Some(t_end_rel) = para_xml[abs_content_start..].find("") { + let content = ¶_xml[abs_content_start..abs_content_start + t_end_rel]; + text.push_str(content); + pos = abs_content_start + t_end_rel + 6; + } else { + break; + } + } else { + break; + } + } + + unescape_xml(&text) +} + +fn unescape_xml(text: &str) -> String { + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace(""", "\"") + .replace("'", "'") +} diff --git a/botserver/src/core/kb/document_processor/rtf.rs b/botserver/src/core/kb/document_processor/rtf.rs new file mode 100644 index 00000000..0e35b0a9 --- /dev/null +++ b/botserver/src/core/kb/document_processor/rtf.rs @@ -0,0 +1,62 @@ +pub fn strip_rtf_commands(input: &str) -> String { + let mut result = String::with_capacity(input.len()); + let chars: Vec = input.chars().collect(); + let len = chars.len(); + let mut i = 0; + let mut depth = 0i32; + + while i < len { + if chars[i] == '{' { + depth += 1; + i += 1; + } else if chars[i] == '}' { + depth -= 1; + if depth < 0 { + depth = 0; + } + i += 1; + } else if chars[i] == '\\' && i + 1 < len { + if chars[i + 1] == '\'' && i + 4 <= len { + if let Ok(byte_val) = u8::from_str_radix( + &input[chars[..i + 2].iter().collect::().len()..] + .chars() + .take(2) + .collect::(), + 16, + ) { + if let Some(c) = char::from_u32(byte_val as u32) { + result.push(c); + } + } + i += 4; + } else if chars[i + 1] == '\n' || chars[i + 1] == '\r' { + result.push('\n'); + i += 2; + } else { + let mut j = i + 1; + while j < len && chars[j].is_ascii_alphabetic() { + j += 1; + } + if j < len && (chars[j] == '-' || chars[j] == ' ') && chars[j].is_ascii_digit() + || (j > i + 1 && chars[j] == ' ') + { + j += 1; + while j < len && chars[j].is_ascii_digit() { + j += 1; + } + } + while j < len && chars[j] == ' ' { + j += 1; + } + i = j; + } + } else { + if depth <= 1 { + result.push(chars[i]); + } + i += 1; + } + } + + result.split_whitespace().collect::>().join(" ") +} diff --git a/botserver/src/core/kb/document_processor/types.rs b/botserver/src/core/kb/document_processor/types.rs new file mode 100644 index 00000000..35dde261 --- /dev/null +++ b/botserver/src/core/kb/document_processor/types.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DocumentFormat { + PDF, + DOCX, + XLSX, + PPTX, + TXT, + MD, + HTML, + RTF, + CSV, + JSON, + XML, +} + +impl DocumentFormat { + pub fn from_extension(path: &std::path::Path) -> Option { + let ext = path.extension()?.to_str()?.to_lowercase(); + match ext.as_str() { + "pdf" => Some(Self::PDF), + "docx" => Some(Self::DOCX), + "xlsx" => Some(Self::XLSX), + "pptx" => Some(Self::PPTX), + "txt" => Some(Self::TXT), + "md" | "markdown" => Some(Self::MD), + "html" | "htm" => Some(Self::HTML), + "rtf" => Some(Self::RTF), + "csv" => Some(Self::CSV), + "json" => Some(Self::JSON), + "xml" => Some(Self::XML), + _ => None, + } + } + + pub fn max_size(&self) -> usize { + match self { + Self::PDF => 500 * 1024 * 1024, + Self::PPTX => 200 * 1024 * 1024, + Self::DOCX | Self::XLSX | Self::TXT | Self::JSON | Self::XML => 100 * 1024 * 1024, + Self::HTML | Self::RTF => 50 * 1024 * 1024, + Self::MD => 10 * 1024 * 1024, + Self::CSV => 1024 * 1024 * 1024, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DocumentMetadata { + pub title: Option, + pub author: Option, + pub creation_date: Option, + pub modification_date: Option, + pub page_count: Option, + pub word_count: Option, + pub language: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TextChunk { + pub content: String, + pub metadata: ChunkMetadata, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkMetadata { + pub document_path: String, + pub document_title: Option, + pub chunk_index: usize, + pub total_chunks: usize, + pub start_char: usize, + pub end_char: usize, + pub page_number: Option, +} diff --git a/botserver/src/core/kb/embedding_generator.rs b/botserver/src/core/kb/embedding_generator.rs index 809ce904..751a9550 100644 --- a/botserver/src/core/kb/embedding_generator.rs +++ b/botserver/src/core/kb/embedding_generator.rs @@ -59,7 +59,7 @@ impl EmbeddingConfig { pub fn from_bot_config(pool: &DbPool, _bot_id: &uuid::Uuid) -> Self { use crate::core::config::ConfigManager; - let config_manager = ConfigManager::new(Arc::new(pool.clone())); + let config_manager = ConfigManager::new(pool.clone()); let embedding_url = config_manager .get_config(_bot_id, "embedding-url", Some("")) diff --git a/botserver/src/core/kb/kb_indexer.rs b/botserver/src/core/kb/kb_indexer.rs index 300ba8c6..9b3b98c2 100644 --- a/botserver/src/core/kb/kb_indexer.rs +++ b/botserver/src/core/kb/kb_indexer.rs @@ -3,7 +3,7 @@ use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::Arc; + use uuid::Uuid; use crate::core::config::ConfigManager; @@ -35,7 +35,7 @@ impl QdrantConfig { let (url, api_key) = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() { sm.get_vectordb_config_sync() } else { - let config_manager = ConfigManager::new(Arc::new(pool.clone())); + let config_manager = ConfigManager::new(pool.clone()); let url = config_manager .get_config(bot_id, "vectordb-url", Some("")) .unwrap_or_else(|_| "".to_string()); @@ -532,55 +532,72 @@ impl KbIndexer { Ok(()) } - pub async fn index_single_file( - &self, - bot_id: Uuid, - bot_name: &str, - kb_name: &str, - file_path: &Path, - ) -> Result { - if !is_embedding_server_ready() { - return Err(anyhow::anyhow!( - "Embedding server not available. Cannot index file." - )); - } +pub async fn index_single_file( + &self, + bot_id: Uuid, + bot_name: &str, + kb_name: &str, + file_path: &Path, +) -> Result { + self.index_single_file_with_id(bot_id, bot_name, kb_name, file_path, None).await +} - if !self.check_qdrant_health().await.unwrap_or(false) { - return Err(anyhow::anyhow!( - "Qdrant vector database is not available." - )); - } +pub async fn index_single_file_with_id( + &self, + bot_id: Uuid, + bot_name: &str, + kb_name: &str, + file_path: &Path, + document_id: Option<&str>, +) -> Result { + if !is_embedding_server_ready() { + return Err(anyhow::anyhow!( + "Embedding server not available. Cannot index file." + )); + } - let bot_id_short = bot_id.to_string().chars().take(8).collect::(); - let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); + if !self.check_qdrant_health().await.unwrap_or(false) { + return Err(anyhow::anyhow!( + "Qdrant vector database is not available." + )); + } - self.ensure_collection_exists(&collection_name).await?; + let bot_id_short = bot_id.to_string().chars().take(8).collect::(); + let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); - info!( - "Indexing single file: {} into collection {}", - file_path.display(), - collection_name - ); + self.ensure_collection_exists(&collection_name).await?; - let chunks = self.document_processor.process_document(file_path).await?; + let doc_path = document_id + .map(|s| s.to_string()) + .unwrap_or_else(|| file_path.to_string_lossy().to_string()); - if chunks.is_empty() { - warn!("No chunks extracted from file: {}", file_path.display()); - return Ok(IndexingResult { - collection_name, - documents_processed: 0, - chunks_indexed: 0, - }); - } + info!( + "Indexing single file: {} (id: {}) into collection {}", + file_path.display(), doc_path, collection_name + ); - let doc_path = file_path.to_string_lossy().to_string(); - let embeddings = self - .embedding_generator - .generate_embeddings(&chunks) - .await?; + if let Err(e) = self.delete_file_points(&collection_name, &doc_path).await { + warn!("Failed to delete old points for {} before reindex: {}", doc_path, e); + } - let points = Self::create_qdrant_points(&doc_path, embeddings)?; - self.upsert_points(&collection_name, points).await?; + let chunks = self.document_processor.process_document(file_path).await?; + + if chunks.is_empty() { + warn!("No chunks extracted from file: {}", file_path.display()); + return Ok(IndexingResult { + collection_name, + documents_processed: 0, + chunks_indexed: 0, + }); + } + + let embeddings = self + .embedding_generator + .generate_embeddings(&chunks) + .await?; + + let points = Self::create_qdrant_points(&doc_path, embeddings)?; + self.upsert_points(&collection_name, points).await?; self.update_collection_metadata(&collection_name, bot_name, kb_name, chunks.len())?; diff --git a/botserver/src/core/kb/mod.rs b/botserver/src/core/kb/mod.rs index b97ec159..986dd4e1 100644 --- a/botserver/src/core/kb/mod.rs +++ b/botserver/src/core/kb/mod.rs @@ -109,23 +109,31 @@ impl KnowledgeBaseManager { kb_name: &str, file_path: &Path, ) -> Result { + self.index_single_file_with_id(bot_id, bot_name, kb_name, file_path, None).await + } + + pub async fn index_single_file_with_id( + &self, + bot_id: Uuid, + bot_name: &str, + kb_name: &str, + file_path: &Path, + document_id: Option<&str>, + ) -> Result { + let doc_id_display = document_id.unwrap_or("(temp path)"); info!( - "Indexing single file: {} into KB {} for bot {}", - file_path.display(), - kb_name, - bot_name + "Indexing single file: {} (id: {}) into KB {} for bot {}", + file_path.display(), doc_id_display, kb_name, bot_name ); let result = self .indexer - .index_single_file(bot_id, bot_name, kb_name, file_path) + .index_single_file_with_id(bot_id, bot_name, kb_name, file_path, document_id) .await?; info!( "Successfully indexed {} chunks from {} into collection {}", - result.chunks_indexed, - file_path.display(), - result.collection_name + result.chunks_indexed, file_path.display(), result.collection_name ); Ok(result) diff --git a/botserver/src/core/kb/website_crawler_service.rs b/botserver/src/core/kb/website_crawler_service.rs index a364ddaa..e7ebda0f 100644 --- a/botserver/src/core/kb/website_crawler_service.rs +++ b/botserver/src/core/kb/website_crawler_service.rs @@ -181,7 +181,7 @@ impl WebsiteCrawlerService { ) -> Result<(), Box> { trace!("Starting crawl for website: {}", website.url); - let config_manager = ConfigManager::new(db_pool.clone().into()); + let config_manager = ConfigManager::new(db_pool.clone()); let website_max_depth = config_manager .get_bot_config_value(&website.bot_id, "website-max-depth") diff --git a/botserver/src/core/middleware.rs b/botserver/src/core/middleware.rs index 46c0652e..60ba2bec 100644 --- a/botserver/src/core/middleware.rs +++ b/botserver/src/core/middleware.rs @@ -269,7 +269,6 @@ impl RequestContext { #[derive(Clone)] pub struct ContextMiddlewareState { - pub db_pool: DbPool, pub jwt_secret: Arc, pub org_cache: Arc>>, pub user_cache: Arc>>, @@ -290,9 +289,8 @@ pub struct CachedUserData { } impl ContextMiddlewareState { - pub fn new(db_pool: DbPool, jwt_secret: String) -> Self { + pub fn new(_db_pool: DbPool, jwt_secret: String) -> Self { Self { - db_pool, jwt_secret: Arc::new(jwt_secret), org_cache: Arc::new(RwLock::new(std::collections::HashMap::new())), user_cache: Arc::new(RwLock::new(std::collections::HashMap::new())), diff --git a/botserver/src/core/package_manager/installer.rs b/botserver/src/core/package_manager/installer.rs index 34643699..0a3ebb06 100644 --- a/botserver/src/core/package_manager/installer.rs +++ b/botserver/src/core/package_manager/installer.rs @@ -1159,12 +1159,16 @@ EOF"#.to_string(), } } - let rendered_cmd = component - .exec_cmd - .replace("{{BIN_PATH}}", &bin_path.to_string_lossy()) - .replace("{{DATA_PATH}}", &data_path.to_string_lossy()) - .replace("{{CONF_PATH}}", &conf_path.to_string_lossy()) - .replace("{{LOGS_PATH}}", &logs_path.to_string_lossy()); + let rendered_cmd = component + .exec_cmd + .replace("{{BIN_PATH}}", &bin_path.to_string_lossy()) + .replace("{{DATA_PATH}}", &data_path.to_string_lossy()) + .replace("{{CONF_PATH}}", &conf_path.to_string_lossy()) + .replace("{{LOGS_PATH}}", &logs_path.to_string_lossy()); + + if let Err(e) = std::fs::create_dir_all(&logs_path) { + warn!("Failed to create log directory {}: {}", logs_path.display(), e); + } trace!( "Starting component {} with command: {}", diff --git a/botserver/src/core/shared/utils.rs b/botserver/src/core/shared/utils.rs index c5bfdd0c..2eb446ef 100644 --- a/botserver/src/core/shared/utils.rs +++ b/botserver/src/core/shared/utils.rs @@ -106,10 +106,13 @@ pub fn get_work_path() -> String { /// In production (system container with .env but no botserver-stack): /opt/gbo/work /// In development (with botserver-stack directory): ./botserver-stack/data/system/work fn get_work_path_default() -> String { - let has_env = std::path::Path::new("./.env").exists() + let has_env = std::path::Path::new("./.env").exists() || std::path::Path::new("/opt/gbo/bin/.env").exists(); + let stack_work = std::path::Path::new("./botserver-stack/data/system/work"); let production_work = std::path::Path::new("/opt/gbo/work"); - if has_env || production_work.exists() { + if stack_work.exists() { + stack_work.to_str().unwrap_or("./botserver-stack/data/system/work").to_string() + } else if has_env || production_work.exists() { "/opt/gbo/work".to_string() } else { "./botserver-stack/data/system/work".to_string() @@ -120,10 +123,13 @@ fn get_work_path_default() -> String { /// In production (system container with .env): /opt/gbo /// In development: ./botserver-stack pub fn get_stack_path() -> String { - let has_env = std::path::Path::new("./.env").exists() + let stack_dir = std::path::Path::new("./botserver-stack"); + let has_env = std::path::Path::new("./.env").exists() || std::path::Path::new("/opt/gbo/bin/.env").exists(); let production_base = std::path::Path::new("/opt/gbo/bin/botserver"); - if has_env || production_base.exists() { + if stack_dir.exists() { + "./botserver-stack".to_string() + } else if has_env || production_base.exists() { "/opt/gbo".to_string() } else { "./botserver-stack".to_string() @@ -135,10 +141,16 @@ pub async fn create_s3_operator( config: &DriveConfig, ) -> Result> { let endpoint = { - let base = if config.server.starts_with("http://") || config.server.starts_with("https://") { - config.server.clone() + // Fallback to localhost:9100 if config.server is empty + let server = if config.server.is_empty() { + "localhost:9100".to_string() } else { - format!("http://{}", config.server) + config.server.clone() + }; + let base = if server.starts_with("http://") || server.starts_with("https://") { + server + } else { + format!("http://{}", server) }; let with_port = if base.contains("://") { let without_scheme = base.split("://").nth(1).unwrap_or(""); diff --git a/botserver/src/designer/designer_api/llm_integration.rs b/botserver/src/designer/designer_api/llm_integration.rs index 516bb65c..e65ab774 100644 --- a/botserver/src/designer/designer_api/llm_integration.rs +++ b/botserver/src/designer/designer_api/llm_integration.rs @@ -282,7 +282,7 @@ async fn call_designer_llm( ) -> Result> { use crate::core::config::ConfigManager; - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); // Get LLM configuration from bot config or use defaults let model = config_manager diff --git a/botserver/src/docs/ooxml.rs b/botserver/src/docs/ooxml.rs index 2af5b526..ff32ac8b 100644 --- a/botserver/src/docs/ooxml.rs +++ b/botserver/src/docs/ooxml.rs @@ -101,20 +101,6 @@ fn escape_xml(text: &str) -> String { .replace('\'', "'") } -pub fn save_docx_preserving(original_bytes: &[u8]) -> Result, String> { - use ooxmlsdk::parts::wordprocessing_document::WordprocessingDocument; - - let reader = Cursor::new(original_bytes); - let docx = WordprocessingDocument::new(reader) - .map_err(|e| format!("Failed to parse DOCX: {e}"))?; - - let mut output = Cursor::new(Vec::new()); - docx.save(&mut output) - .map_err(|e| format!("Failed to save DOCX: {e}"))?; - - Ok(output.into_inner()) -} - pub fn update_docx_text( original_bytes: &[u8], new_paragraphs: &[String], @@ -235,7 +221,11 @@ fn replace_first_text_run(para_xml: &str, new_text: &str) -> String { found_first = true; search_pos = abs_content_start + escaped.len() + 6; } else { - result = format!("{}{}", &result[..abs_content_start], &result[abs_content_end..]); + result = format!( + "{}{}", + &result[..abs_content_start], + &result[abs_content_end..] + ); search_pos = abs_content_start; } } else { diff --git a/botserver/src/docs/storage.rs b/botserver/src/docs/storage.rs index b143ae08..1fa00fd9 100644 --- a/botserver/src/docs/storage.rs +++ b/botserver/src/docs/storage.rs @@ -1,7 +1,6 @@ use crate::docs::ooxml::{load_docx_preserving, update_docx_text}; use crate::docs::types::{Document, DocumentMetadata}; use crate::core::shared::state::AppState; -use crate::drive::s3_repository::S3Repository; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::io::Cursor; @@ -78,12 +77,14 @@ pub async fn load_docx_from_bytes( user_identifier: &str, file_path: &str, ) -> Result { - let file_name = file_path + let raw_name = file_path .split('/') .last() - .unwrap_or("Untitled") - .trim_end_matches(".docx") - .trim_end_matches(".doc"); + .unwrap_or("Untitled"); + let file_name = raw_name + .strip_suffix(".docx") + .or_else(|| raw_name.strip_suffix(".doc")) + .unwrap_or(raw_name); let doc_id = generate_doc_id(); @@ -247,12 +248,12 @@ pub async fn save_document_as_docx( let docx_path = format!("{base_path}/{doc_id}.docx"); s3_client - .put_object( - &state.bucket_name, - &docx_path, - docx_bytes.clone(), - Some("application/vnd.openxmlformats-officedocument.wordprocessingml.document"), - ) + .put_object() + .bucket(&state.bucket_name) + .key(&docx_path) + .body(docx_bytes.clone()) + .content_type("application/vnd.openxmlformats-officedocument.wordprocessingml.document") + .send() .await .map_err(|e| format!("Failed to save DOCX: {e}"))?; @@ -346,12 +347,12 @@ pub async fn save_document_to_drive( let meta_path = format!("{base_path}/{doc_id}.meta.json"); s3_client - .put_object( - &state.bucket_name, - &doc_path, - content.as_bytes().to_vec(), - Some("text/html"), - ) + .put_object() + .bucket(&state.bucket_name) + .key(&doc_path) + .body(content.as_bytes().to_vec()) + .content_type("text/html") + .send() .await .map_err(|e| format!("Failed to save document: {e}"))?; @@ -367,12 +368,12 @@ pub async fn save_document_to_drive( }); s3_client - .put_object( - &state.bucket_name, - &meta_path, - metadata.to_string().into_bytes(), - Some("application/json"), - ) + .put_object() + .bucket(&state.bucket_name) + .key(&meta_path) + .body(metadata.to_string().into_bytes()) + .content_type("application/json") + .send() .await .map_err(|e| format!("Failed to save metadata: {e}"))?; @@ -493,7 +494,7 @@ pub async fn list_documents_from_drive( if let Ok(meta_result) = s3_client .get_object() .bucket(&state.bucket_name) - .key(key) + .key(&key) .send() .await { diff --git a/botserver/src/drive/drive_compiler.rs b/botserver/src/drive/drive_compiler.rs index 9c81efd5..9b1fa9cf 100644 --- a/botserver/src/drive/drive_compiler.rs +++ b/botserver/src/drive/drive_compiler.rs @@ -9,6 +9,7 @@ /// SEM usar /opt/gbo/data/ como intermediário! use crate::basic::compiler::BasicCompiler; +use crate::core::config::DriveConfig; use crate::core::shared::state::AppState; use crate::core::shared::utils::get_work_path; use crate::drive::drive_files::drive_files as drive_files_table; @@ -30,6 +31,29 @@ pub struct DriveCompiler { last_etags: Arc>>, } +/// Helper function to download file from S3 +/// Separated to avoid Send trait issues with tokio::spawn +async fn download_from_s3(file_path: &str) -> Result, Box> { + let config = DriveConfig::default(); + let s3_repo = crate::core::shared::utils::create_s3_operator(&config) + .await + .map_err(|e| format!("Failed to create S3 operator: {}", e))?; + + // file_path format: {bot}.gbai/{bot}.gbdialog/{tool}.bas + // S3 bucket = first part ({bot}.gbai), key = rest + let parts: Vec<&str> = file_path.split('/').collect(); + if parts.len() < 2 { + return Err("Invalid file path for S3 download".into()); + } + + let bucket_name = parts[0]; + let s3_key = parts[1..].join("/"); + + s3_repo.get_object_direct(bucket_name, &s3_key) + .await + .map_err(|e| format!("S3 get_object_direct failed for {}/{}: {}", bucket_name, s3_key, e).into()) +} + impl DriveCompiler { pub fn new(state: Arc) -> Self { let work_root = PathBuf::from(get_work_path()); @@ -109,36 +133,76 @@ impl DriveCompiler { /// Compilar arquivo .bas → .ast DIRETAMENTE em work/{bot}.gbai/{bot}.gbdialog/ async fn compile_file(&self, bot_id: Uuid, file_path: &str) -> Result<(), Box> { - // file_path: {bot}.gbai/{bot}.gbdialog/{tool}.bas + // file_path formats: + // - {bot}.gbai/{bot}.gbdialog/{tool}.bas (full path with bucket prefix) + // - {bot}.gbdialog/{tool}.bas (without bucket prefix) + // - {bot}.gbkb/{doc}.txt (KB files - skip compilation) let parts: Vec<&str> = file_path.split('/').collect(); - if parts.len() < 3 { + if parts.len() < 2 { return Err("Invalid file path format".into()); } - let bot_name = parts[0].trim_end_matches(".gbai"); - let tool_name = parts.last().ok_or("Invalid file path")?.trim_end_matches(".bas"); - - // Work dir: /opt/gbo/work/{bot}.gbai/{bot}.gbdialog/ + // Determine bot name and work directory structure + let (_bot_name, work_dir) = if parts[0].ends_with(".gbai") { + // Full path: {bot}.gbai/{bot}.gbdialog/{tool}.bas + let bot_name = parts[0].strip_suffix(".gbai").unwrap_or(parts[0]); let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)); - std::fs::create_dir_all(&work_dir)?; + (bot_name, work_dir) + } else if parts.len() >= 2 && parts[0].ends_with(".gbdialog") { + // Short path: {bot}.gbdialog/{tool}.bas + let bot_name = parts[0].strip_suffix(".gbdialog").unwrap_or(parts[0]); + let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)); + (bot_name, work_dir) + } else if parts.len() >= 2 && parts[0].ends_with(".gbkb") { + // KB file: {bot}.gbkb/{doc}.txt - skip compilation + debug!("Skipping KB file: {}", file_path); + return Ok(()); + } else { + warn!("Unknown file path format: {}", file_path); + return Err("Invalid file path format".into()); + }; + + // Create work directory + std::fs::create_dir_all(&work_dir)?; + + // Determine tool name from last part of path + let tool_name = parts.last().unwrap_or(&"unknown").strip_suffix(".bas").unwrap_or(parts.last().unwrap_or(&"unknown")); // Caminho do .bas no work let work_bas_path = work_dir.join(format!("{}.bas", tool_name)); - // Baixar do MinIO direto para work dir - // (isso pressupõe que o DriveMonitor já sincronizou, ou buscamos do S3 aqui) - // Por enquanto, assumimos que o arquivo já está em work dir de sincronização anterior - // Se não existir, precisa buscar do S3 - + // Check if file exists in work dir if !work_bas_path.exists() { - // Buscar do S3 - isso deveria ser feito pelo DriveMonitor - // Por enquanto, apenas logamos - warn!("File {} not found in work dir, skipping", work_bas_path.display()); - return Ok(()); + // File doesn't exist in work dir - need to download from S3 + // This should be done by DriveMonitor, but we can try to fetch it here + warn!("File {} not found in work dir, attempting to download from S3", work_bas_path.display()); + + // Download in separate task to avoid Send issues + let download_result = download_from_s3(file_path).await; + + match download_result { + Ok(content) => { + if let Err(e) = std::fs::write(&work_bas_path, content) { + warn!("Failed to write {} to work dir: {}", work_bas_path.display(), e); + return Err(format!("Failed to write file: {}", e).into()); + } + info!("Downloaded {} to {}", file_path, work_bas_path.display()); + } + Err(e) => { + warn!("Failed to download {} from S3: {}", file_path, e); + return Err(format!("File not found in S3: {}", file_path).into()); + } } + } - // Ler conteúdo - let _content = std::fs::read_to_string(&work_bas_path)?; + // Verify file exists now + if !work_bas_path.exists() { + warn!("File {} still not found after download attempt", work_bas_path.display()); + return Ok(()); + } + + // Ler conteúdo + let _content = std::fs::read_to_string(&work_bas_path)?; // Compilar com BasicCompiler (já está no work dir, então compila in-place) let mut compiler = BasicCompiler::new(self.state.clone(), bot_id); diff --git a/botserver/src/drive/drive_monitor/kb_processor.rs b/botserver/src/drive/drive_monitor/kb_processor.rs deleted file mode 100644 index 07bb4e24..00000000 --- a/botserver/src/drive/drive_monitor/kb_processor.rs +++ /dev/null @@ -1,116 +0,0 @@ -#[cfg(any(feature = "research", feature = "llm"))] -use crate::core::kb::KnowledgeBaseManager; -#[cfg(any(feature = "research", feature = "llm"))] -use log::{error, info, trace, warn}; -#[cfg(any(feature = "research", feature = "llm"))] -use std::collections::HashSet; -#[cfg(any(feature = "research", feature = "llm"))] -use std::path::PathBuf; -#[cfg(any(feature = "research", feature = "llm"))] -use std::sync::atomic::{AtomicBool, Ordering}; -#[cfg(any(feature = "research", feature = "llm"))] -use std::sync::Arc; -#[cfg(any(feature = "research", feature = "llm"))] -use tokio::sync::RwLock as TokioRwLock; -#[cfg(any(feature = "research", feature = "llm"))] -use tokio::time::Duration; -#[cfg(any(feature = "research", feature = "llm"))] -use crate::drive::drive_files::DriveFileRepository; - -#[cfg(any(feature = "research", feature = "llm"))] -pub fn start_kb_processor( - kb_manager: Arc, - bot_id: uuid::Uuid, - bot_name: String, - work_root: PathBuf, - pending_kb_index: Arc>>, - files_being_indexed: Arc>>, - kb_indexed_folders: Arc>>, - file_repo: Arc, - is_processing: Arc, -) { - tokio::spawn(async move { - while is_processing.load(Ordering::SeqCst) { - let kb_key = { - let pending = pending_kb_index.write().await; - pending.iter().next().cloned() - }; - - let Some(kb_key) = kb_key else { - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - }; - - let parts: Vec<&str> = kb_key.splitn(2, '_').collect(); - if parts.len() < 2 { - let mut pending = pending_kb_index.write().await; - pending.remove(&kb_key); - continue; - } - - let kb_folder_name = parts[1]; - let kb_folder_path = - work_root.join(&bot_name).join(format!("{}.gbkb/", bot_name)).join(kb_folder_name); - - { - let indexing = files_being_indexed.read().await; - if indexing.contains(&kb_key) { - let mut pending = pending_kb_index.write().await; - pending.remove(&kb_key); - continue; - } - } - - { - let mut indexing = files_being_indexed.write().await; - indexing.insert(kb_key.clone()); - } - - trace!("Indexing KB: {} for bot: {}", kb_key, bot_name); - - let result = - tokio::time::timeout(Duration::from_secs(120), kb_manager.handle_gbkb_change(bot_id, &bot_name, kb_folder_path.as_path())) - .await; - - { - let mut indexing = files_being_indexed.write().await; - indexing.remove(&kb_key); - } - - { - let mut pending = pending_kb_index.write().await; - pending.remove(&kb_key); - } - - match result { - Ok(Ok(_)) => { - info!("Successfully indexed KB: {}", kb_key); - { - let mut indexed = kb_indexed_folders.write().await; - indexed.insert(kb_key.clone()); - } - let pattern = format!("{}/", kb_folder_name); - if let Err(e) = file_repo.mark_indexed_by_pattern(bot_id, &pattern) { - warn!("Failed to mark files indexed for {}: {}", kb_key, e); - } - } - Ok(Err(e)) => { - warn!("Failed to index KB {}: {}", kb_key, e); - let pattern = format!("{}/", kb_folder_name); - if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) { - warn!("Failed to mark files failed for {}: {}", kb_key, e); - } - } - Err(_) => { - error!("KB indexing timed out after 120s for {}", kb_key); - let pattern = format!("{}/", kb_folder_name); - if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) { - warn!("Failed to mark files failed for {}: {}", kb_key, e); - } - } - } - } - - trace!("Stopping for bot {}", bot_name); - }); -} diff --git a/botserver/src/drive/drive_monitor/mod.rs b/botserver/src/drive/drive_monitor/mod.rs index 1cc202ad..ed6a0c8b 100644 --- a/botserver/src/drive/drive_monitor/mod.rs +++ b/botserver/src/drive/drive_monitor/mod.rs @@ -1,6 +1,5 @@ mod types; -mod kb_processor; mod monitor; mod utils; -pub use types::{DriveMonitor, normalize_etag, normalize_config_value}; +pub use types::{DriveMonitor, normalize_etag}; diff --git a/botserver/src/drive/drive_monitor/types.rs b/botserver/src/drive/drive_monitor/types.rs index 8f9f5ab0..3731db4a 100644 --- a/botserver/src/drive/drive_monitor/types.rs +++ b/botserver/src/drive/drive_monitor/types.rs @@ -1,60 +1,346 @@ +use crate::core::shared::state::AppState; +use crate::drive::drive_files::DriveFileRepository; #[cfg(any(feature = "research", feature = "llm"))] use crate::core::kb::KnowledgeBaseManager; -use crate::core::shared::state::AppState; -#[cfg(not(any(feature = "research", feature = "llm")))] -use std::collections::HashMap; -#[cfg(any(feature = "research", feature = "llm"))] -use std::collections::{HashMap, HashSet}; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32}; use std::sync::Arc; -#[cfg(any(feature = "research", feature = "llm"))] -use tokio::sync::RwLock as TokioRwLock; - -use crate::drive::drive_files::DriveFileRepository; - -#[cfg(any(feature = "research", feature = "llm"))] -static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); - -#[cfg(any(feature = "research", feature = "llm"))] -pub fn set_llm_streaming(streaming: bool) { - LLM_STREAMING.store(streaming, Ordering::SeqCst); -} - -#[cfg(any(feature = "research", feature = "llm"))] -pub fn is_llm_streaming() -> bool { - LLM_STREAMING.load(Ordering::SeqCst) -} - -const MAX_BACKOFF_SECS: u64 = 300; -const INITIAL_BACKOFF_SECS: u64 = 30; -const RETRY_BACKOFF_SECS: i64 = 3600; -const MAX_FAIL_COUNT: i32 = 3; pub fn normalize_etag(etag: &str) -> String { etag.trim_matches('"').to_string() } -pub fn normalize_config_value(value: &str) -> String { - let trimmed = value.trim(); - if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") { - String::new() - } else { - trimmed.to_string() +impl DriveMonitor { + pub async fn start_monitoring(&self) -> Result<(), Box> { + log::info!("DriveMonitor monitoring started for bucket: {}", self.bucket_name); + + loop { + if let Err(e) = self.scan_bucket().await { + log::error!("Failed to scan bucket {}: {}", self.bucket_name, e); + } + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + } + + async fn scan_bucket(&self) -> Result<(), Box> { + log::info!("Scanning bucket {} for files", self.bucket_name); + + if let Some(s3) = &self.state.drive { + match s3.list_objects_with_metadata(&self.bucket_name, None).await { + Ok(objects) => { + log::info!("Found {} objects in bucket {}", objects.len(), self.bucket_name); + + let bot_name = self.bucket_name.strip_suffix(".gbai").unwrap_or(&self.bucket_name); + + let current_keys: Vec = objects.iter().map(|o| o.key.clone()).collect(); + + for obj in &objects { + let file_type = classify_file(&obj.key); + let full_key = format!("{}.gbai/{}", bot_name, obj.key); + let etag = obj.etag.as_deref().map(normalize_etag); + + let existing = self.file_repo.get_file_state(self.bot_id, &full_key); + let needs_reindex = match &existing { + Some(prev) if prev.indexed && prev.etag.as_deref() == etag.as_deref() => false, + Some(prev) if prev.indexed && prev.etag.as_deref() != etag.as_deref() => { + log::info!("ETag changed for {}, will reindex", full_key); + true + } + Some(_) => !existing.as_ref().map_or(false, |f| f.indexed), + None => true, + }; + + match self.file_repo.upsert_file( + self.bot_id, + &full_key, + file_type, + etag, + None, + ) { + Ok(_) => log::info!("Added/updated drive_files for: {} ({})", full_key, file_type), + Err(e) => log::error!("Failed to upsert {}: {}", full_key, e), + } + + if needs_reindex && file_type == "kb" { + #[cfg(any(feature = "research", feature = "llm"))] + { + self.index_kb_file(bot_name, &full_key, &obj.key).await; + } + } + + if file_type == "config" && needs_reindex { + self.sync_bot_config(bot_name, &obj.key).await; + } + } + + self.handle_deleted_files(bot_name, ¤t_keys); + } + Err(e) => { + log::error!("Failed to list objects in {}: {}", self.bucket_name, e); + } + } + } else { + log::warn!("S3 client not available for bucket scan"); + } + + Ok(()) + } + + fn handle_deleted_files(&self, bot_name: &str, current_keys: &[String]) { + let db_files = self.file_repo.get_all_files_for_bot(self.bot_id); + for db_file in &db_files { + let s3_key = match db_file.file_path.strip_prefix(&format!("{}.gbai/", bot_name)) { + Some(k) => k, + None => continue, + }; + if !current_keys.iter().any(|k| k == s3_key) { + log::info!("File deleted from S3: {} (was in DB)", db_file.file_path); + + if db_file.file_type == "kb" { + #[cfg(any(feature = "research", feature = "llm"))] + { + self.delete_kb_file_vectors(bot_name, &db_file.file_path, s3_key); + } + } + + if let Err(e) = self.file_repo.delete_file(self.bot_id, &db_file.file_path) { + log::error!("Failed to delete drive_files entry for {}: {}", db_file.file_path, e); + } + } + } + } + + #[cfg(any(feature = "research", feature = "llm"))] + async fn index_kb_file(&self, bot_name: &str, full_key: &str, s3_key: &str) { + let parsed = match parse_kb_path(s3_key) { + Some(p) => p, + None => { + log::debug!("Not a KB file path: {}", s3_key); + return; + } + }; + + let mut being_indexed = self.files_being_indexed.write().await; + if being_indexed.contains(full_key) { + log::debug!("Already indexing {}, skipping", full_key); + return; + } + being_indexed.insert(full_key.to_string()); + drop(being_indexed); + + let s3 = match &self.state.drive { + Some(s3) => s3, + None => { + log::error!("S3 client not available for KB indexing of {}", full_key); + self.files_being_indexed.write().await.remove(full_key); + return; + } + }; + + let data = match s3.get_object_direct(&self.bucket_name, s3_key).await { + Ok(d) => d, + Err(e) => { + log::error!("Failed to download KB file {}/{}: {}", self.bucket_name, s3_key, e); + let _ = self.file_repo.mark_failed(self.bot_id, full_key); + self.files_being_indexed.write().await.remove(full_key); + return; + } + }; + + let temp_path = std::env::temp_dir().join(format!("gb_kb_{}_{}", uuid::Uuid::new_v4(), parsed.file_name)); + + if let Err(e) = std::fs::write(&temp_path, &data) { + log::error!("Failed to write temp file {}: {}", temp_path.display(), e); + self.files_being_indexed.write().await.remove(full_key); + return; + } + + log::info!("Indexing KB file {}/{} -> temp {}", bot_name, parsed.kb_name, temp_path.display()); + + match self.kb_manager.index_single_file_with_id( + self.bot_id, + bot_name, + &parsed.kb_name, + &temp_path, + Some(full_key), + ).await { + Ok(result) => { + log::info!( + "Indexed {} chunks from {} into collection {}", + result.chunks_indexed, + full_key, + result.collection_name + ); + let _ = self.file_repo.mark_indexed(self.bot_id, full_key); + self.upsert_kb_collection(bot_name, &parsed.kb_name, &result.collection_name, result.documents_processed); + } + Err(e) => { + log::error!("KB indexing failed for {}: {}", full_key, e); + let _ = self.file_repo.mark_failed(self.bot_id, full_key); + } + } + + let _ = std::fs::remove_file(&temp_path); + self.files_being_indexed.write().await.remove(full_key); + } + + async fn sync_bot_config(&self, bot_name: &str, s3_key: &str) { + let s3 = match &self.state.drive { + Some(s3) => s3, + None => { + log::error!("S3 client not available for config sync"); + return; + } + }; + + let data = match s3.get_object_direct(&self.bucket_name, s3_key).await { + Ok(d) => d, + Err(e) => { + log::error!("Failed to download config.csv from {}/{}: {}", self.bucket_name, s3_key, e); + return; + } + }; + + let content = match String::from_utf8(data) { + Ok(c) => c, + Err(e) => { + log::error!("Failed to parse config.csv as UTF-8: {}", e); + return; + } + }; + + let config_manager = crate::core::config::ConfigManager::new(self.state.conn.clone()); + + for line in content.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') || line.to_lowercase().starts_with("key,") { + continue; + } + if let Some((key, value)) = line.split_once(',') { + let key = key.trim(); + let value = value.trim(); + if key.is_empty() { + continue; + } + if let Err(e) = config_manager.set_config(&self.bot_id, key, value) { + log::error!("Failed to set config {}={} for bot {}: {}", key, value, bot_name, e); + } else { + log::info!("Synced config {}={} for bot {}", key, value, bot_name); + } + } + } + + let full_key = format!("{}.gbai/{}", bot_name, s3_key); + let _ = self.file_repo.mark_indexed(self.bot_id, &full_key); + } + + #[cfg(any(feature = "research", feature = "llm"))] + fn delete_kb_file_vectors(&self, bot_name: &str, _full_key: &str, s3_key: &str) { + let parsed = match parse_kb_path(s3_key) { + Some(p) => p, + None => return, + }; + + let kb_manager = self.kb_manager.clone(); + let bot_id = self.bot_id; + let bot_name = bot_name.to_string(); + let relative_path = parsed.relative_path.clone(); + + tokio::spawn(async move { + match kb_manager.delete_file_from_kb(bot_id, &bot_name, &parsed.kb_name, &relative_path).await { + Ok(_) => log::info!("Deleted vectors for {} from {}/{}", relative_path, bot_name, parsed.kb_name), + Err(e) => log::error!("Failed to delete vectors for {} from {}/{}: {}", relative_path, bot_name, parsed.kb_name, e), + } + }); + } + + #[cfg(any(feature = "research", feature = "llm"))] + fn upsert_kb_collection(&self, bot_name: &str, kb_name: &str, collection_name: &str, doc_count: usize) { + use diesel::prelude::*; + use uuid::Uuid; + + if let Ok(mut conn) = self.state.conn.get() { + let folder_path = format!("{}.gbai/{}.gbkb/{}", bot_name, bot_name, kb_name); + diesel::sql_query( + "INSERT INTO kb_collections (id, bot_id, name, folder_path, qdrant_collection, document_count) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (bot_id, name) DO UPDATE SET + folder_path = EXCLUDED.folder_path, + qdrant_collection = EXCLUDED.qdrant_collection, + document_count = EXCLUDED.document_count, + updated_at = NOW()" + ) + .bind::(Uuid::new_v4()) + .bind::(self.bot_id) + .bind::(kb_name) + .bind::(&folder_path) + .bind::(collection_name) + .bind::(doc_count as i32) + .execute(&mut conn) + .unwrap_or_else(|e| { + log::error!("Failed to upsert kb_collections for {}/{}: {}", bot_name, kb_name, e); + 0 + }); + } } } -impl DriveMonitor { - /// Start monitoring the drive bucket for changes - /// This is a placeholder that will be implemented with the actual monitoring logic - pub async fn start_monitoring(&self) -> Result<(), Box> { - log::info!("DriveMonitor monitoring started for bucket: {}", self.bucket_name); - // The actual monitoring logic is handled by LocalFileMonitor - // This method is kept for backward compatibility - Ok(()) +fn classify_file(key: &str) -> &'static str { + if key.ends_with(".bas") { + "bas" + } else if key.contains(".gbkb/") && is_kb_extension(key) { + "kb" + } else if key.contains(".gbot/") && key.ends_with("config.csv") { + "config" + } else { + "other" } } +fn is_kb_extension(key: &str) -> bool { + let lower = key.to_lowercase(); + lower.ends_with(".txt") + || lower.ends_with(".md") + || lower.ends_with(".pdf") + || lower.ends_with(".xlsx") + || lower.ends_with(".xls") + || lower.ends_with(".docx") + || lower.ends_with(".doc") + || lower.ends_with(".csv") + || lower.ends_with(".pptx") + || lower.ends_with(".ppt") + || lower.ends_with(".html") + || lower.ends_with(".htm") + || lower.ends_with(".rtf") + || lower.ends_with(".epub") + || lower.ends_with(".xml") + || lower.ends_with(".json") + || lower.ends_with(".odt") + || lower.ends_with(".ods") + || lower.ends_with(".odp") +} + +struct KbPathParts { + kb_name: String, + file_name: String, + relative_path: String, +} + +fn parse_kb_path(s3_key: &str) -> Option { + let parts: Vec<&str> = s3_key.splitn(4, '/').collect(); + if parts.len() < 3 || !parts[0].ends_with(".gbkb") { + return None; + } + let kb_name = parts[1].to_string(); + let file_name = parts[2..].join("/"); + let relative_path = format!("{}/{}", kb_name, file_name); + Some(KbPathParts { + kb_name, + file_name, + relative_path, + }) +} + #[derive(Debug, Clone)] pub struct DriveMonitor { pub state: Arc, @@ -67,30 +353,13 @@ pub struct DriveMonitor { pub scanning: Arc, pub consecutive_failures: Arc, #[cfg(any(feature = "research", feature = "llm"))] - pub files_being_indexed: Arc>>, - #[cfg(any(feature = "research", feature = "llm"))] - pub pending_kb_index: Arc>>, - #[cfg(any(feature = "research", feature = "llm"))] - pub kb_indexed_folders: Arc>>, + pub files_being_indexed: Arc>>, #[cfg(not(any(feature = "research", feature = "llm")))] - pub _pending_kb_index: Arc>>, + pub _pending_kb_index: Arc>>, pub file_repo: Arc, - #[allow(dead_code)] - pub pending_changes: Arc>>, - #[allow(dead_code)] - pub last_etag_snapshot: Arc>>, } impl DriveMonitor { - fn normalize_config_value(value: &str) -> String { - let trimmed = value.trim(); - if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") { - String::new() - } else { - trimmed.to_string() - } - } - pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Self { let work_root = PathBuf::from(crate::core::shared::utils::get_work_path()); #[cfg(any(feature = "research", feature = "llm"))] @@ -113,16 +382,10 @@ impl DriveMonitor { scanning: Arc::new(AtomicBool::new(false)), consecutive_failures: Arc::new(AtomicU32::new(0)), #[cfg(any(feature = "research", feature = "llm"))] - files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())), - #[cfg(any(feature = "research", feature = "llm"))] - pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())), - #[cfg(any(feature = "research", feature = "llm"))] - kb_indexed_folders: Arc::new(TokioRwLock::new(HashSet::new())), + files_being_indexed: Arc::new(tokio::sync::RwLock::new(std::collections::HashSet::new())), #[cfg(not(any(feature = "research", feature = "llm")))] - _pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())), + _pending_kb_index: Arc::new(tokio::sync::RwLock::new(std::collections::HashSet::new())), file_repo, - pending_changes: Arc::new(TokioRwLock::new(Vec::new())), - last_etag_snapshot: Arc::new(TokioRwLock::new(HashMap::new())), } } } diff --git a/botserver/src/drive/s3_repository.rs b/botserver/src/drive/s3_repository.rs index 2ea757ff..b712643c 100644 --- a/botserver/src/drive/s3_repository.rs +++ b/botserver/src/drive/s3_repository.rs @@ -10,6 +10,8 @@ use s3::{Bucket, Region, creds::Credentials}; pub struct S3Repository { bucket_name: String, bucket: Arc, + access_key: String, + secret_key: String, } impl S3Repository { @@ -30,123 +32,185 @@ impl S3Repository { Ok(Self { bucket_name: bucket.to_string(), bucket: Arc::new((*s3_bucket).clone()), + access_key: access_key.to_string(), + secret_key: secret_key.to_string(), }) } - /// Upload data to S3 - direct call (renamed to avoid conflict with builder) + /// Upload data to S3 - creates bucket reference for target bucket pub async fn put_object_direct( &self, - _bucket: &str, + bucket: &str, key: &str, data: Vec, _content_type: Option<&str>, ) -> Result<()> { - debug!("Uploading to S3: {}/{}", self.bucket_name, key); - self.bucket.put_object(key, &data).await?; - info!("Successfully uploaded to S3: {}/{}", self.bucket_name, key); + debug!("Uploading to S3: {}/{}", bucket, key); + let target_bucket = self.bucket_for(bucket)?; + target_bucket.put_object(key, &data).await?; + info!("Successfully uploaded to S3: {}/{}", bucket, key); Ok(()) } - /// Download data from S3 - direct call (renamed to avoid conflict with builder) - pub async fn get_object_direct(&self, _bucket: &str, key: &str) -> Result> { - debug!("Downloading from S3: {}/{}", self.bucket_name, key); - let response = self.bucket.get_object(key).await?; + /// Download data from S3 - creates bucket reference for target bucket + pub async fn get_object_direct(&self, bucket: &str, key: &str) -> Result> { + debug!("Downloading from S3: {}/{}", bucket, key); + let target_bucket = self.bucket_for(bucket)?; + let response = target_bucket.get_object(key).await?; let data = response.to_vec(); - info!("Successfully downloaded from S3: {}/{}", self.bucket_name, key); + info!("Successfully downloaded from S3: {}/{}", bucket, key); Ok(data) } - /// Delete an object from S3 - direct call (renamed to avoid conflict with builder) - pub async fn delete_object_direct(&self, _bucket: &str, key: &str) -> Result<()> { - debug!("Deleting from S3: {}/{}", self.bucket_name, key); - self.bucket.delete_object(key).await?; - info!("Successfully deleted from S3: {}/{}", self.bucket_name, key); + /// Delete an object from S3 - creates bucket reference for target bucket + pub async fn delete_object_direct(&self, bucket: &str, key: &str) -> Result<()> { + debug!("Deleting from S3: {}/{}", bucket, key); + let target_bucket = self.bucket_for(bucket)?; + target_bucket.delete_object(key).await?; + info!("Successfully deleted from S3: {}/{}", bucket, key); Ok(()) } - /// Copy object - implemented as get+put (renamed to avoid conflict with builder) - pub async fn copy_object_direct(&self, _bucket: &str, from_key: &str, to_key: &str) -> Result<()> { - debug!("Copying in S3: {}/{} -> {}/{}", self.bucket_name, from_key, self.bucket_name, to_key); - let response = self.bucket.get_object(from_key).await?; + /// Copy object - creates bucket reference for target bucket + pub async fn copy_object_direct(&self, bucket: &str, from_key: &str, to_key: &str) -> Result<()> { + debug!("Copying in S3: {}/{} -> {}/{}", bucket, from_key, bucket, to_key); + let target_bucket = self.bucket_for(bucket)?; + let response = target_bucket.get_object(from_key).await?; let data = response.to_vec(); - self.bucket.put_object(to_key, &data).await?; + target_bucket.put_object(to_key, &data).await?; Ok(()) } - /// List buckets + /// Create a Bucket reference for a specific bucket name using stored credentials + fn bucket_for(&self, bucket_name: &str) -> Result> { + if bucket_name == self.bucket_name { + return Ok(self.bucket.clone()); + } + let region = self.bucket.region().clone(); + let creds = s3::creds::Credentials::new( + Some(&self.access_key), + Some(&self.secret_key), + None, None, None + ).map_err(|e| anyhow::anyhow!("Failed to create credentials: {}", e))?; + let target = Bucket::new(bucket_name, region, creds)?.with_path_style(); + Ok(Arc::new((*target).clone())) + } + + /// List all buckets in S3/MinIO using rust-s3 crate's list_buckets pub async fn list_all_buckets(&self) -> Result> { - debug!("Listing all buckets"); - Ok(vec![self.bucket_name.clone()]) + debug!("Listing all buckets from S3"); + + let region = self.bucket.region().clone(); + let creds = s3::creds::Credentials::new( + Some(&self.access_key), + Some(&self.secret_key), + None, None, None + ).map_err(|e| anyhow::anyhow!("Failed to create credentials: {}", e))?; + + let response = Bucket::list_buckets(region, creds) + .await + .map_err(|e| anyhow::anyhow!("ListBuckets failed: {}", e))?; + + let buckets: Vec = response.bucket_names().collect(); + debug!("Found {} buckets: {:?}", buckets.len(), buckets); + Ok(buckets) } /// Check if an object exists - pub async fn object_exists(&self, _bucket: &str, key: &str) -> Result { - Ok(self.bucket.object_exists(key).await?) + pub async fn object_exists(&self, bucket: &str, key: &str) -> Result { + let target_bucket = self.bucket_for(bucket)?; + Ok(target_bucket.object_exists(key).await?) } - /// List objects with prefix - pub async fn list_objects(&self, _bucket: &str, prefix: Option<&str>) -> Result> { - debug!("Listing objects in S3: {} with prefix {:?}", self.bucket_name, prefix); + /// List objects with prefix, returning only keys + pub async fn list_objects(&self, bucket: &str, prefix: Option<&str>) -> Result> { + let infos = self.list_objects_with_metadata(bucket, prefix).await?; + Ok(infos.into_iter().map(|i| i.key).collect()) + } + + /// List objects with prefix, returning key + etag + size for change detection + pub async fn list_objects_with_metadata(&self, bucket: &str, prefix: Option<&str>) -> Result> { + debug!("Listing objects with metadata in S3: {} with prefix {:?}", bucket, prefix); + + let region = self.bucket.region().clone(); + let creds = s3::creds::Credentials::new( + Some(&self.access_key), + Some(&self.secret_key), + None, None, None + ).map_err(|e| anyhow::anyhow!("Failed to create credentials: {}", e))?; + + let target_bucket = Bucket::new(bucket, region, creds)?.with_path_style(); + let prefix_str = prefix.unwrap_or(""); - let results = self.bucket.list(prefix_str.to_string(), Some("/".to_string())).await?; - let keys: Vec = results.iter() - .flat_map(|r| r.contents.iter().map(|c| c.key.clone())) + let results = target_bucket.list(prefix_str.to_string(), None).await?; + let objects: Vec = results.iter() + .flat_map(|r| r.contents.iter().map(|c| S3ObjectInfo { + key: c.key.clone(), + etag: c.e_tag.clone(), + size: c.size, + })) .collect(); - Ok(keys) + debug!("Found {} objects with metadata in bucket {}", objects.len(), bucket); + Ok(objects) } /// Upload a file pub async fn upload_file( &self, - _bucket: &str, + bucket: &str, key: &str, file_path: &str, _content_type: Option<&str>, ) -> Result<()> { - debug!("Uploading file to S3: {} -> {}/{}", file_path, self.bucket_name, key); + debug!("Uploading file to S3: {} -> {}/{}", file_path, bucket, key); + let target_bucket = self.bucket_for(bucket)?; let data = tokio::fs::read(file_path).await .context("Failed to read file for upload")?; - self.bucket.put_object(key, &data).await?; + target_bucket.put_object(key, &data).await?; Ok(()) } /// Download a file - pub async fn download_file(&self, _bucket: &str, key: &str, file_path: &str) -> Result<()> { - debug!("Downloading file from S3: {}/{} -> {}", self.bucket_name, key, file_path); - let response = self.bucket.get_object(key).await?; + pub async fn download_file(&self, bucket: &str, key: &str, file_path: &str) -> Result<()> { + debug!("Downloading file from S3: {}/{} -> {}", bucket, key, file_path); + let target_bucket = self.bucket_for(bucket)?; + let response = target_bucket.get_object(key).await?; let data = response.to_vec(); tokio::fs::write(file_path, data).await .context("Failed to write downloaded file")?; - info!("Successfully downloaded file from S3: {}/{} -> {}", self.bucket_name, key, file_path); + info!("Successfully downloaded file from S3: {}/{} -> {}", bucket, key, file_path); Ok(()) } /// Delete multiple objects - pub async fn delete_objects(&self, _bucket: &str, keys: Vec) -> Result<()> { + pub async fn delete_objects(&self, bucket: &str, keys: Vec) -> Result<()> { if keys.is_empty() { return Ok(()); } - debug!("Deleting {} objects from S3: {}", keys.len(), self.bucket_name); + debug!("Deleting {} objects from S3: {}", keys.len(), bucket); + let target_bucket = self.bucket_for(bucket)?; let keys_count = keys.len(); for key in keys { - let _ = self.bucket.delete_object(&key).await; + let _ = target_bucket.delete_object(&key).await; } - info!("Deleted {} objects from S3: {}", keys_count, self.bucket_name); + info!("Deleted {} objects from S3: {}", keys_count, bucket); Ok(()) } /// Create bucket if not exists - pub async fn create_bucket_if_not_exists(&self, _bucket: &str) -> Result<()> { + pub async fn create_bucket_if_not_exists(&self, bucket: &str) -> Result<()> { + let _target_bucket = self.bucket_for(bucket)?; Ok(()) } /// Get object metadata pub async fn get_object_metadata( &self, - _bucket: &str, + bucket: &str, key: &str, ) -> Result> { - match self.bucket.head_object(key).await { + let target_bucket = self.bucket_for(bucket)?; + match target_bucket.head_object(key).await { Ok((response, _)) => Ok(Some(ObjectMetadata { size: response.content_length.unwrap_or(0) as u64, content_type: response.content_type, @@ -196,15 +260,12 @@ pub fn copy_object(&self) -> S3CopyBuilder { /// List buckets pub fn list_buckets(&self) -> S3ListBucketsBuilder { - S3ListBucketsBuilder { - bucket: self.bucket.clone(), - } + S3ListBucketsBuilder { repo: Some(Arc::new(self.clone())) } } /// Head bucket pub fn head_bucket(&self) -> S3HeadBucketBuilder { S3HeadBucketBuilder { - bucket: self.bucket.clone(), bucket_name: None, } } @@ -212,7 +273,6 @@ pub fn copy_object(&self) -> S3CopyBuilder { /// Create bucket pub fn create_bucket(&self) -> S3CreateBucketBuilder { S3CreateBucketBuilder { - bucket: self.bucket.clone(), bucket_name: None, } } @@ -227,7 +287,7 @@ pub fn copy_object(&self) -> S3CopyBuilder { } } -/// Metadata for an S3 object +/// Metadata for an S3 object (from HEAD request) #[derive(Debug, Clone)] pub struct ObjectMetadata { pub size: u64, @@ -236,6 +296,14 @@ pub struct ObjectMetadata { pub etag: Option, } +/// Object info from list operations (key + etag + size) +#[derive(Debug, Clone)] +pub struct S3ObjectInfo { + pub key: String, + pub etag: Option, + pub size: u64, +} + // ============ Builder implementations ============ pub struct S3PutBuilder { @@ -311,17 +379,24 @@ impl S3CopyBuilder { } pub struct S3ListBucketsBuilder { - bucket: Arc, + repo: Option, } impl S3ListBucketsBuilder { + pub fn repo(mut self, repo: SharedS3Repository) -> Self { self.repo = Some(repo); self } pub async fn send(self) -> Result { - Ok(S3ListBucketsResponse { buckets: vec![] }) + if let Some(repo) = self.repo { + let names = repo.list_all_buckets().await?; + Ok(S3ListBucketsResponse { + buckets: names.into_iter().map(|name| S3Bucket { name }).collect(), + }) + } else { + Ok(S3ListBucketsResponse { buckets: vec![] }) + } } } pub struct S3HeadBucketBuilder { - bucket: Arc, bucket_name: Option, } @@ -333,7 +408,6 @@ impl S3HeadBucketBuilder { } pub struct S3CreateBucketBuilder { - bucket: Arc, bucket_name: Option, } @@ -380,7 +454,7 @@ impl S3Response { #[derive(Debug, Default)] pub struct S3ResponseBody { - data: Vec, + pub data: Vec, } impl S3ResponseBody { diff --git a/botserver/src/email/messages.rs b/botserver/src/email/messages.rs index 99f6d212..ff94004e 100644 --- a/botserver/src/email/messages.rs +++ b/botserver/src/email/messages.rs @@ -53,7 +53,7 @@ fn format_email_time(date_str: &str) -> String { } fn is_tracking_pixel_enabled(state: &Arc, bot_id: Option) -> bool { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let bot_id = bot_id.unwrap_or(Uuid::nil()); config_manager @@ -63,7 +63,7 @@ fn is_tracking_pixel_enabled(state: &Arc, bot_id: Option) -> boo } fn inject_tracking_pixel(html_body: &str, tracking_id: &str, state: &Arc) -> String { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let base_url = config_manager .get_config(&Uuid::nil(), "server-url", Some("")) .unwrap_or_else(|_| "".to_string()); diff --git a/botserver/src/email/tracking.rs b/botserver/src/email/tracking.rs index 8e24de20..22cd2be8 100644 --- a/botserver/src/email/tracking.rs +++ b/botserver/src/email/tracking.rs @@ -19,7 +19,7 @@ const TRACKING_PIXEL: [u8; 43] = [ ]; pub fn is_tracking_pixel_enabled(state: &Arc, bot_id: Option) -> bool { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let bot_id = bot_id.unwrap_or(Uuid::nil()); config_manager @@ -29,7 +29,7 @@ pub fn is_tracking_pixel_enabled(state: &Arc, bot_id: Option) -> } pub fn inject_tracking_pixel(html_body: &str, tracking_id: &str, state: &Arc) -> String { - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let base_url = config_manager .get_config(&Uuid::nil(), "server-url", Some("")) .unwrap_or_else(|_| "".to_string()); diff --git a/botserver/src/llm/cache.rs b/botserver/src/llm/cache.rs index fbc8d009..14403f17 100644 --- a/botserver/src/llm/cache.rs +++ b/botserver/src/llm/cache.rs @@ -157,7 +157,7 @@ impl CachedLLMProvider { } }; - let config_manager = ConfigManager::new(db_pool.clone().into()); + let config_manager = ConfigManager::new(db_pool.clone()); let cache_enabled = config_manager .get_config(&bot_uuid, "llm-cache", Some("true")) .unwrap_or_else(|_| "true".to_string()); @@ -193,7 +193,7 @@ impl CachedLLMProvider { } }; - let config_manager = ConfigManager::new(db_pool.clone().into()); + let config_manager = ConfigManager::new(db_pool.clone()); let ttl = config_manager .get_config( diff --git a/botserver/src/llm/episodic_memory.rs b/botserver/src/llm/episodic_memory.rs index 12be0a09..94b08e5f 100644 --- a/botserver/src/llm/episodic_memory.rs +++ b/botserver/src/llm/episodic_memory.rs @@ -33,7 +33,7 @@ async fn process_episodic_memory( session_manager.get_user_sessions(Uuid::nil())? }; for session in sessions { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); // Default to 0 (disabled) to respect user's request for false by default let threshold = config_manager @@ -145,7 +145,7 @@ async fn process_episodic_memory( let llm_provider = state.llm_provider.clone(); let mut filtered = String::new(); - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); // Use session.bot_id instead of Uuid::nil() to avoid using default bot settings let model = config_manager diff --git a/botserver/src/llm/local.rs b/botserver/src/llm/local.rs index af6bea76..4546130b 100644 --- a/botserver/src/llm/local.rs +++ b/botserver/src/llm/local.rs @@ -36,7 +36,7 @@ pub async fn ensure_llama_servers_running( Ok(crate::core::bot::get_default_bot(&mut conn)) }) .await??; - let config_manager = ConfigManager::new(app_state.conn.clone().into()); + let config_manager = ConfigManager::new(app_state.conn.clone()); info!("Reading config for bot_id: {}", default_bot_id); let embedding_model_result = config_manager.get_config(&default_bot_id, "embedding-model", None); info!("embedding-model config result: {:?}", embedding_model_result); @@ -388,7 +388,7 @@ pub fn start_llm_server( std::env::set_var("OMP_PLACES", "cores"); std::env::set_var("OMP_PROC_BIND", "close"); let conn = app_state.conn.clone(); - let config_manager = ConfigManager::new(conn.clone().into()); + let config_manager = ConfigManager::new(conn.clone()); let mut conn = conn.get().map_err(|e| { Box::new(std::io::Error::other( format!("failed to get db connection: {e}"), diff --git a/botserver/src/llm/smart_router.rs b/botserver/src/llm/smart_router.rs index e00f409a..969a2e79 100644 --- a/botserver/src/llm/smart_router.rs +++ b/botserver/src/llm/smart_router.rs @@ -161,7 +161,7 @@ pub async fn enhanced_llm_call( .await?; // Get actual LLM configuration from bot's config - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let actual_model = config_manager .get_config(&uuid::Uuid::nil(), "llm-model", None) .unwrap_or_else(|_| model.clone()); diff --git a/botserver/src/main_module/bootstrap.rs b/botserver/src/main_module/bootstrap.rs index 2ebed225..029712c1 100644 --- a/botserver/src/main_module/bootstrap.rs +++ b/botserver/src/main_module/bootstrap.rs @@ -435,7 +435,7 @@ pub async fn create_app_state( #[cfg(feature = "directory")] bootstrap_directory_admin(&zitadel_config).await; - let config_manager = ConfigManager::new(pool.clone().into()); + let config_manager = ConfigManager::new(pool.clone()); let mut bot_conn = pool .get() @@ -927,6 +927,7 @@ pub async fn start_background_services( } // Step 1: Discover bots from S3 buckets (*.gbai) and auto-create missing + log::error!("Drive client status: {:?}", state_for_scan.drive.is_some()); if let Some(s3_client) = &state_for_scan.drive { match s3_client.list_all_buckets().await { Ok(buckets) => { diff --git a/botserver/src/marketing/ai.rs b/botserver/src/marketing/ai.rs index 4bc52f83..20c5a1f7 100644 --- a/botserver/src/marketing/ai.rs +++ b/botserver/src/marketing/ai.rs @@ -158,7 +158,7 @@ struct ContactInfo { } async fn get_llm_config(state: &Arc, bot_id: Uuid) -> Result<(String, String, String), String> { - let config = ConfigManager::new(state.conn.clone().into()); + let config = ConfigManager::new(state.conn.clone()); let llm_url = config .get_config(&bot_id, "llm-url", Some("")) diff --git a/botserver/src/marketing/email.rs b/botserver/src/marketing/email.rs index 2f21b9f1..c68670d7 100644 --- a/botserver/src/marketing/email.rs +++ b/botserver/src/marketing/email.rs @@ -95,7 +95,7 @@ pub async fn send_campaign_email( let open_token = Uuid::new_v4(); let tracking_id = Uuid::new_v4(); - let config = ConfigManager::new(state.conn.clone().into()); + let config = ConfigManager::new(state.conn.clone()); let base_url = config .get_config(&bot_id, "server-url", Some("")) .unwrap_or_else(|_| "".to_string()); diff --git a/botserver/src/multimodal/mod.rs b/botserver/src/multimodal/mod.rs index 998fdbc3..5a371f3f 100644 --- a/botserver/src/multimodal/mod.rs +++ b/botserver/src/multimodal/mod.rs @@ -244,7 +244,7 @@ impl BotModelsClient { } pub fn from_state(state: &AppState, bot_id: &Uuid) -> Self { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let config = BotModelsConfig::from_database(&config_manager, bot_id); let image_config = ImageGeneratorConfig::from_database(&config_manager, bot_id); let video_config = VideoGeneratorConfig::from_database(&config_manager, bot_id); @@ -630,7 +630,7 @@ pub async fn ensure_botmodels_running( }) .await?; - let config_manager = ConfigManager::new(app_state.conn.clone().into()); + let config_manager = ConfigManager::new(app_state.conn.clone()); BotModelsConfig::from_database(&config_manager, &default_bot_id) }; diff --git a/botserver/src/paper/llm.rs b/botserver/src/paper/llm.rs index fcebb6a3..f1179fe2 100644 --- a/botserver/src/paper/llm.rs +++ b/botserver/src/paper/llm.rs @@ -20,7 +20,7 @@ pub async fn call_llm( &[("user".to_string(), user_content.to_string())], ); - let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into()); + let config_manager = crate::core::config::ConfigManager::new(state.conn.clone()); let model = config_manager .get_config(&uuid::Uuid::nil(), "llm-model", None) .unwrap_or_else(|_| "gpt-3.5-turbo".to_string()); diff --git a/botserver/src/sheet/storage.rs b/botserver/src/sheet/storage.rs index d65351be..e3c768fd 100644 --- a/botserver/src/sheet/storage.rs +++ b/botserver/src/sheet/storage.rs @@ -16,11 +16,10 @@ pub fn get_current_user_id() -> String { } fn extract_id_from_path(path: &str) -> String { - path.split('/') - .last() - .unwrap_or("") - .trim_end_matches(".json") - .trim_end_matches(".xlsx") + let raw = path.split('/').last().unwrap_or(""); + raw.strip_suffix(".json") + .or_else(|| raw.strip_suffix(".xlsx")) + .unwrap_or(raw) .to_string() } @@ -42,7 +41,7 @@ pub async fn save_sheet_to_drive( .put_object() .bucket("gbo") .key(&path) - .body(content.into_bytes().into()) + .body(content.into_bytes()) .content_type("application/json") .send() .await @@ -69,7 +68,7 @@ pub async fn save_sheet_as_xlsx( .put_object() .bucket("gbo") .key(&path) - .body(xlsx_bytes.clone().into()) + .body(xlsx_bytes.clone()) .content_type("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") .send() .await @@ -297,13 +296,15 @@ pub fn load_xlsx_from_bytes( let workbook = umya_spreadsheet::reader::xlsx::read_reader(cursor, true) .map_err(|e| format!("Failed to parse xlsx: {e}"))?; - let file_name = file_path + let raw_name = file_path .split('/') .last() - .unwrap_or("Untitled") - .trim_end_matches(".xlsx") - .trim_end_matches(".xlsm") - .trim_end_matches(".xls"); + .unwrap_or("Untitled"); + let file_name = raw_name + .strip_suffix(".xlsx") + .or_else(|| raw_name.strip_suffix(".xlsm")) + .or_else(|| raw_name.strip_suffix(".xls")) + .unwrap_or(raw_name); let mut worksheets = Vec::new(); @@ -621,7 +622,7 @@ pub async fn save_workbook_to_drive( .put_object() .bucket("gbo") .key(&path) - .body(buf.into_inner().into()) + .body(buf.into_inner()) .content_type("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") .send() .await @@ -721,22 +722,19 @@ pub async fn list_sheets_from_drive( let mut sheets = Vec::new(); - if let Some(contents) = result.contents { - for obj in contents { - if let Some(key) = obj.key { - if key.ends_with(".json") { - let id = extract_id_from_path(&key); - if let Ok(sheet) = load_sheet_by_id(state, user_id, &id).await { - sheets.push(SpreadsheetMetadata { - id: sheet.id, - name: sheet.name, - owner_id: sheet.owner_id, - created_at: sheet.created_at, - updated_at: sheet.updated_at, - worksheet_count: sheet.worksheets.len(), - }); - } - } + for obj in &result.contents { + let key = &obj.key; + if key.ends_with(".json") { + let id = extract_id_from_path(key); + if let Ok(sheet) = load_sheet_by_id(state, user_id, &id).await { + sheets.push(SpreadsheetMetadata { + id: sheet.id, + name: sheet.name, + owner_id: sheet.owner_id, + created_at: sheet.created_at, + updated_at: sheet.updated_at, + worksheet_count: sheet.worksheets.len(), + }); } } } @@ -1108,9 +1106,9 @@ pub fn import_spreadsheet_bytes(bytes: &[u8], filename: &str) -> Result String { .replace('\'', "'") } -pub fn save_pptx_preserving(original_bytes: &[u8]) -> Result, String> { - use ooxmlsdk::parts::presentation_document::PresentationDocument; - - let reader = Cursor::new(original_bytes); - let pptx = PresentationDocument::new(reader) - .map_err(|e| format!("Failed to parse PPTX: {e}"))?; - - let mut output = Cursor::new(Vec::new()); - pptx.save(&mut output) - .map_err(|e| format!("Failed to save PPTX: {e}"))?; - - Ok(output.into_inner()) -} - pub fn update_pptx_text( original_bytes: &[u8], new_slide_texts: &[Vec], @@ -244,7 +230,11 @@ fn replace_first_text_run(para_xml: &str, new_text: &str) -> String { found_first = true; search_pos = abs_content_start + escaped.len() + 6; } else { - result = format!("{}{}", &result[..abs_content_start], &result[abs_content_end..]); + result = format!( + "{}{}", + &result[..abs_content_start], + &result[abs_content_end..] + ); search_pos = abs_content_start; } } else { diff --git a/botserver/src/slides/storage.rs b/botserver/src/slides/storage.rs index be827221..5eeb45f5 100644 --- a/botserver/src/slides/storage.rs +++ b/botserver/src/slides/storage.rs @@ -50,11 +50,10 @@ pub async fn remove_from_cache(pres_id: &str) { } fn extract_id_from_path(path: &str) -> String { - path.split('/') - .last() - .unwrap_or_default() - .trim_end_matches(".json") - .trim_end_matches(".pptx") + let raw = path.split('/').last().unwrap_or_default(); + raw.strip_suffix(".json") + .or_else(|| raw.strip_suffix(".pptx")) + .unwrap_or(raw) .to_string() } @@ -80,7 +79,7 @@ pub async fn save_presentation_to_drive( .put_object() .bucket("gbo") .key(&path) - .body(content.into_bytes().into()) + .body(content.into_bytes()) .content_type("application/json") .send() .await @@ -122,7 +121,7 @@ pub async fn save_presentation_as_pptx( .put_object() .bucket("gbo") .key(&path) - .body(pptx_bytes.clone().into()) + .body(pptx_bytes.clone()) .content_type("application/vnd.openxmlformats-officedocument.presentationml.presentation") .send() .await @@ -536,12 +535,14 @@ pub async fn load_pptx_from_bytes( let mut archive = ZipArchive::new(cursor) .map_err(|e| format!("Failed to open PPTX archive: {e}"))?; - let file_name = file_path + let raw_name = file_path .split('/') .last() - .unwrap_or("Untitled") - .trim_end_matches(".pptx") - .trim_end_matches(".ppt"); + .unwrap_or("Untitled"); + let file_name = raw_name + .strip_suffix(".pptx") + .or_else(|| raw_name.strip_suffix(".ppt")) + .unwrap_or(raw_name); let pres_id = generate_presentation_id(); @@ -768,22 +769,19 @@ pub async fn list_presentations_from_drive( let mut presentations = Vec::new(); - if let Some(contents) = result.contents { - for obj in contents { - if let Some(key) = obj.key { - if key.ends_with(".json") { - let id = extract_id_from_path(&key); - if let Ok(presentation) = load_presentation_by_id(state, user_id, &id).await { - presentations.push(PresentationMetadata { - id: presentation.id, - name: presentation.name, - owner_id: presentation.owner_id, - slide_count: presentation.slides.len(), - created_at: presentation.created_at, - updated_at: presentation.updated_at, - }); - } - } + for obj in &result.contents { + let key = &obj.key; + if key.ends_with(".json") { + let id = extract_id_from_path(key); + if let Ok(presentation) = load_presentation_by_id(state, user_id, &id).await { + presentations.push(PresentationMetadata { + id: presentation.id, + name: presentation.name, + owner_id: presentation.owner_id, + slide_count: presentation.slides.len(), + created_at: presentation.created_at, + updated_at: presentation.updated_at, + }); } } } diff --git a/botserver/src/whatsapp/mod.rs b/botserver/src/whatsapp/mod.rs index 064edf51..cb862f37 100644 --- a/botserver/src/whatsapp/mod.rs +++ b/botserver/src/whatsapp/mod.rs @@ -1653,7 +1653,7 @@ pub async fn attendant_respond( } async fn get_verify_token_for_bot(state: &Arc, bot_id: &Uuid) -> String { - let config_manager = ConfigManager::new(state.conn.clone().into()); + let config_manager = ConfigManager::new(state.conn.clone()); let bot_id_clone = *bot_id; tokio::task::spawn_blocking(move || {