refactor: Remove ooxmlsdk from default build, split document_processor, fix DriveMonitor sync

- Replace docs/sheet/slides with kb-extraction in default features (~4-6min compile time savings, ~300MB less disk)
- Add kb-extraction feature using zip+quick-xml+calamine for lightweight KB extraction
- Split document_processor.rs (829 lines) into mod.rs+types.rs+ooxml_extract.rs+rtf.rs
- Move DOCX/PPTX ZIP-based extraction to document_processor::ooxml_extract (no ooxmlsdk needed)
- Remove dead code: save_docx_preserving(), save_pptx_preserving() (zero callers)
- Fix dep: prefix for optional dependencies in feature definitions
- DriveMonitor: full S3 sync, ETag change detection, KB incremental indexing, config.csv sync
- ConfigManager: real DB reads from bot_configuration table
- 0 warnings, 0 errors on both default and full feature builds
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-21 14:34:26 +00:00
parent b2c5e912b3
commit c70fbba099
60 changed files with 1825 additions and 770 deletions

View file

@ -4,4 +4,3 @@ jobs = 12
[target.x86_64-unknown-linux-gnu]
linker = "clang"
rustflags = ["-C", "link-arg=-fuse-ld=mold"]

View file

@ -2,13 +2,11 @@
set -e
if [ "$EUID" -ne 0 ]; then
echo "Run as root (use sudo)"
exit 1
echo "Run as root (use sudo)"
exit 1
fi
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
SUDO_USER_HOME="$(eval echo "~${SUDO_USER:-$USER}")"
echo "Installing runtime dependencies first..."
bash "$SCRIPT_DIR/DEPENDENCIES.sh"
@ -16,60 +14,40 @@ echo "Installing dev/build dependencies..."
OS=$(grep -oP '(?<=^ID=).+' /etc/os-release 2>/dev/null | tr -d '"' || echo "unknown")
install_debian() {
apt-get install -y -qq \
clang lld mold build-essential pkg-config libssl-dev libpq-dev cmake git \
libglib2.0-dev libgtk-3-dev libwebkit2gtk-4.1-dev libjavascriptcoregtk-4.1-dev \
libayatana-appindicator3-dev librsvg2-dev libsoup-3.0-dev
apt-get install -y -qq \
clang lld build-essential pkg-config libssl-dev libpq-dev cmake git \
libglib2.0-dev libgtk-3-dev libwebkit2gtk-4.1-dev libjavascriptcoregtk-4.1-dev \
libayatana-appindicator3-dev librsvg2-dev libsoup-3.0-dev
}
install_fedora() {
dnf install -y -q \
clang lld mold gcc gcc-c++ make pkg-config openssl-devel postgresql-devel cmake git \
glib2-devel gobject-introspection-devel gtk3-devel webkit2gtk3-devel \
javascriptcoregtk-devel libappindicator-gtk3-devel librsvg2-devel libsoup3-devel
dnf install -y -q \
clang lld gcc gcc-c++ make pkg-config openssl-devel postgresql-devel cmake git \
glib2-devel gobject-introspection-devel gtk3-devel webkit2gtk3-devel \
javascriptcoregtk-devel libappindicator-gtk3-devel librsvg2-devel libsoup3-devel
}
install_arch() {
pacman -Sy --noconfirm \
clang lld mold gcc make pkg-config openssl libpq cmake git \
glib2 gtk3 webkit2gtk4 javascriptcoregtk libappindicator librsvg libsoup
pacman -Sy --noconfirm \
clang lld gcc make pkg-config openssl libpq cmake git \
glib2 gtk3 webkit2gtk4 javascriptcoregtk libappindicator librsvg libsoup
}
case $OS in
ubuntu|debian|linuxmint|pop) install_debian ;;
fedora|rhel|centos|rocky|almalinux) install_fedora ;;
arch|manjaro) install_arch ;;
*) echo "Unsupported OS: $OS"; exit 1 ;;
ubuntu|debian|linuxmint|pop) install_debian ;;
fedora|rhel|centos|rocky|almalinux) install_fedora ;;
arch|manjaro) install_arch ;;
*) echo "Unsupported OS: $OS"; exit 1 ;;
esac
run_as_user() {
su - "${SUDO_USER:-$USER}" -c ". '${SUDO_USER_HOME}/.cargo/env' 2>/dev/null; export PATH=\"${SUDO_USER_HOME}/.cargo/bin:\$PATH\"; $*"
install_mold() {
curl -L "https://github.com/rui314/mold/releases/download/v2.4.0/mold-2.4.0-x86_64-linux.tar.gz" -o /tmp/mold.tar.gz
tar -xzf /tmp/mold.tar.gz -C /tmp
cp "/tmp/mold-2.4.0-x86_64-linux/bin/mold" /usr/local/bin/
rm -rf /tmp/mold-2.4.0* /tmp/mold.tar.gz
ldconfig
}
install_rust() {
if ! run_as_user "rustc --version" &>/dev/null; then
echo "Installing Rust via rustup for ${SUDO_USER:-$USER}..."
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \
su - "${SUDO_USER:-$USER}" -c "sh -s -- -y --default-toolchain stable"
run_as_user "rustup default stable"
echo "Rust installed: $(run_as_user 'rustc --version') / $(run_as_user 'cargo --version')"
else
echo "Rust already installed: $(run_as_user 'rustc --version')"
fi
}
command -v mold &> /dev/null || install_mold
install_cargo_tools() {
if run_as_user "cargo --version" &>/dev/null; then
if ! run_as_user "sccache --version" &>/dev/null; then
echo "Installing sccache..."
run_as_user "cargo install sccache --locked"
else
echo "sccache already installed: $(run_as_user 'sccache --version 2>&1 | head -1')"
fi
fi
}
install_rust "$SUDO_USER_HOME"
install_cargo_tools "$SUDO_USER_HOME"
echo "Dev dependencies installed!"
echo "Dev dependencies installed!"

View file

@ -3,7 +3,7 @@
## CRITICAL RULES — READ FIRST
NEVER INCLUDE HERE CREDENTIALS OR COMPANY INFORMATION, THIS IS COMPANY AGNOSTIC.
If edit conf/data make a backup first to /tmp with datetime sufix, to be able to restore.
Always manage services with `systemctl` inside the `system` Incus container. Never run `/opt/gbo/bin/botserver` or `/opt/gbo/bin/botui` directly — they will fail because they won't load the `.env` file containing Vault credentials and paths. The correct commands are `sudo incus exec system -- systemctl start|stop|restart|status botserver` and the same for `ui`. Systemctl handles environment loading, auto-restart, logging, and dependencies.
Never push secrets (API keys, passwords, tokens) to git. Never commit `init.json` (it contains Vault unseal keys). All secrets must come from Vault — only `VAULT_*` variables are allowed in `.env`. Never deploy manually via scp or ssh; always use CI/CD. Always push all submodules (botserver, botui, botlib) before or alongside the main repo. Always ask before pushing to ALM.

View file

@ -0,0 +1,305 @@
# Drive Monitor Test - Upload via MinIO Console
## Objective
Test the complete sync flow for bot files uploaded through MinIO Console:
1. `.gbai` bucket creation
2. `.gbdialog/*.bas` → compilation to `.ast`
3. `.gbkb/*` → indexing to Qdrant
4. Bot activation in database
## Prerequisites
### Services Running
```bash
# Check all services are healthy
curl http://localhost:8080/health # BotServer
curl http://localhost:3000/ # BotUI
curl http://localhost:6333/collections # Qdrant
curl http://localhost:9100/minio/health/live # MinIO
curl http://localhost:8300/debug/healthz # Zitadel
```
### MinIO Console Access
- URL: http://localhost:9101
- User: minioadmin
- Password: minioadmin (or check `.env` for credentials)
## Test Procedure
### Step 1: Create Bot Bucket
1. Open MinIO Console: http://localhost:9101
2. Login with credentials
3. Click **"Create Bucket"**
4. Name: `testbot.gbai` (must end with `.gbai`)
5. Click **"Create Bucket"**
### Step 2: Create Dialog Folder and File (.bas)
1. Open bucket `testbot.gbai`
2. Click **"Create New Path"**
3. Path: `testbot.gbdialog`
4. Click **"Create"**
5. Navigate into `testbot.gbdialog`
6. Click **"Upload File"** or use mc command:
```bash
# Using mc CLI (MinIO Client)
mc alias set local http://localhost:9100 minioadmin minioadmin
# Create start.bas
cat > /tmp/start.bas << 'EOF'
' start.bas - Bot entry point
ADD SUGGESTION "Check Status"
ADD SUGGESTION "Create Report"
ADD SUGGESTION "Help"
TALK "Welcome to TestBot! How can I help you today?"
EOF
mc cp /tmp/start.bas local/testbot.gbai/testbot.gbdialog/start.bas
```
### Step 3: Create Knowledge Base Folder (.gbkb)
```bash
# Create KB folder and documents
mkdir -p /tmp/testbot-docs
cat > /tmp/testbot-docs/manual.txt << 'EOF'
TestBot Manual v1.0
This is the test knowledge base for TestBot.
It contains documentation that should be indexed.
Features:
- Document search via Qdrant
- Context injection for LLM
- Semantic similarity queries
Usage:
USE KB "manual" in your dialog scripts.
EOF
cat > /tmp/testbot-docs/faq.txt << 'EOF'
Frequently Asked Questions
Q: What is TestBot?
A: A test bot for validating the drive monitor sync.
Q: How do I use it?
A: Just upload files to MinIO and wait for sync.
Q: What file types are supported?
A: .txt, .pdf, .md, .docx for KB
.bas for dialog scripts
EOF
# Upload to MinIO
mc mb local/testbot.gbai/testbot.gbkb --ignore-existing
mc cp /tmp/testbot-docs/manual.txt local/testbot.gbai/testbot.gbkb/manual.txt
mc cp /tmp/testbot-docs/faq.txt local/testbot.gbai/testbot.gbkb/faq.txt
```
### Step 4: Verify Sync
#### 4.1 Check Database for Bot Creation
```bash
# Bot should be auto-created from bucket
./botserver-stack/bin/tables/bin/psql -h localhost -U botserver -d botserver -c \
"SELECT id, name, is_active, created_at FROM bots WHERE name = 'testbot';"
```
Expected output:
```
id | name | is_active | created_at
----+------+-----------+-------------------------
...| testbot | t | 2026-04-20 ...
```
#### 4.2 Check drive_files Table
```bash
# Files should be registered in drive_files
./botserver-stack/bin/tables/bin/psql -h localhost -U botserver -d botserver -c \
"SELECT file_path, file_type, etag, indexed FROM drive_files WHERE file_path LIKE '%testbot%';"
```
Expected output:
```
file_path | file_type | etag | indexed
-----------+-----------+------+---------
testbot.gbdialog/start.bas | bas | abc123... | t
testbot.gbkb/manual.txt | txt | def456... | t
testbot.gbkb/faq.txt | txt | ghi789... | t
```
#### 4.3 Check .ast Compilation
```bash
# Check if .bas was compiled to .ast
ls -la /opt/gbo/work/testbot.gbai/testbot.gbdialog/
```
Expected output:
```
-rw-r--r-- 1 ubuntu ubuntu 1234 Apr 20 12:00 start.ast
-rw-r--r-- 1 ubuntu ubuntu 567 Apr 20 12:00 start.bas
```
#### 4.4 Check Qdrant Collections
```bash
# Check KB indexing
curl -s http://localhost:6333/collections | jq '.result.collections[] | select(.name | contains("testbot"))'
```
Expected output:
```json
{
"name": "testbot_manual"
}
```
Or check points:
```bash
curl -s http://localhost:6333/collections/testbot_manual/points/scroll | jq '.result.points | length'
```
#### 4.5 Check BotServer Logs
```bash
# Monitor sync activity
tail -f botserver.log | grep -i -E "testbot|sync|compile|index"
```
Expected log patterns:
```
2026-04-20... info bootstrap:Auto-creating bot 'testbot' from S3 bucket 'testbot.gbai'
2026-04-20... info drive_compiler:Compiling testbot.gbdialog/start.bas
2026-04-20... info kb:Indexing KB folder: testbot.gbkb for bot testbot
2026-04-20... info qdrant:Collection created: testbot_manual
```
### Step 5: Test Bot via Web Interface
1. Open: http://localhost:3000/testbot
2. Login with test credentials
3. Send message: "Hello"
4. Expected response includes suggestions from start.bas
### Step 6: Test KB Search
1. In chat, type: "What is TestBot?"
2. Bot should use KB context and answer from manual.txt/faq.txt
## Troubleshooting
### Files Not Syncing
**Check MinIO bucket visibility:**
```bash
mc ls local/
```
**Check BotServer S3 connection:**
```bash
tail -100 botserver.log | grep -i "s3\|minio\|bucket"
```
### .bas Not Compiling
**Check DriveCompiler status:**
```bash
tail -f botserver.log | grep -i "drive_compiler"
```
**Manual compile trigger (if needed):**
```bash
curl -X POST http://localhost:8080/api/admin/drive/compile/testbot
```
### KB Not Indexing
**Check embedding server:**
```bash
curl http://localhost:8081/v1/models
```
**Manual KB index:**
```bash
curl -X POST http://localhost:8080/api/bots/testbot/kb/index \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{"kb_name": "manual"}'
```
## Expected Flow Diagram
```
┌─────────────────────────────────────────────────────────────┐
│ 1. MinIO Upload │
│ mc cp file.bas local/testbot.gbai/testbot.gbdialog/ │
└─────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 2. S3 Event / Polling (DriveMonitor) │
│ - Detects new file in bucket │
│ - Extracts metadata (etag, size, modified) │
│ - Inserts into drive_files table │
└─────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 3. DriveCompiler (every 30s) │
│ - Queries drive_files WHERE file_type='bas' │
│ - Compiles .bas → .ast │
│ - Stores in /opt/gbo/work/{bot}.gbai/ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 4. KB Indexer (triggered by drive_files.indexed=false) │
│ - Downloads .gbkb/* files from S3 │
│ - Chunks text, generates embeddings │
│ - Stores in Qdrant collection {bot}_{kb_name} │
│ - Updates drive_files.indexed = true │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 5. Bot Ready │
│ - WebSocket connection at ws://localhost:8080/ws/testbot │
│ - start.bas executed on connect │
│ - KB available for USE KB "manual" │
└─────────────────────────────────────────────────────────────┘
```
## Test Checklist
- [ ] MinIO Console accessible at :9101
- [ ] Bucket `testbot.gbai` created
- [ ] Folder `testbot.gbdialog` created
- [ ] File `start.bas` uploaded
- [ ] Folder `testbot.gbkb` created
- [ ] Files `manual.txt`, `faq.txt` uploaded
- [ ] Bot auto-created in database
- [ ] Files appear in `drive_files` table
- [ ] `.ast` file generated in work dir
- [ ] Qdrant collection created
- [ ] Bot accessible at http://localhost:3000/testbot
- [ ] KB search returns relevant results
## Cleanup
```bash
# Remove test bot
mc rb --force local/testbot.gbai
# Remove from database
./botserver-stack/bin/tables/bin/psql -h localhost -U botserver -d botserver -c \
"DELETE FROM bots WHERE name = 'testbot';"
# Remove Qdrant collection
curl -X DELETE http://localhost:6333/collections/testbot_manual
# Remove work files
rm -rf /opt/gbo/work/testbot.gbai
```

View file

@ -10,7 +10,7 @@ features = ["database", "i18n"]
[features]
# ===== DEFAULT =====
default = ["chat", "automation", "cache", "llm", "vectordb", "crawler", "drive", "directory"]
default = ["chat", "automation", "cache", "llm", "vectordb", "crawler", "drive", "directory", "kb-extraction"]
# ===== SECURITY MODES =====
# no-security: Minimal build - chat, automation, drive, cache only (no RBAC, directory, security, compliance)
@ -42,16 +42,19 @@ marketing = ["people", "automation", "drive", "cache"]
# Productivity
calendar = ["automation", "drive", "cache"]
tasks = ["automation", "drive", "cache", "dep:cron"]
project = ["automation", "drive", "cache", "quick-xml"]
project = ["automation", "drive", "cache"]
goals = ["automation", "drive", "cache"]
workspaces = ["automation", "drive", "cache"]
tickets = ["automation", "drive", "cache"]
billing = ["automation", "drive", "cache"]
# Documents
docs = ["automation", "drive", "cache", "docx-rs", "ooxmlsdk"]
sheet = ["automation", "drive", "cache", "calamine", "dep:rust_xlsxwriter", "dep:umya-spreadsheet"]
slides = ["automation", "drive", "cache", "ooxmlsdk"]
# Document Processing (lightweight - KB extraction without heavy OOXML SDKs)
kb-extraction = ["drive", "dep:calamine"]
# Documents (full editing UI - opt-in, adds ~4min compile time from ooxmlsdk)
docs = ["automation", "drive", "cache", "dep:docx-rs", "dep:ooxmlsdk", "kb-extraction"]
sheet = ["automation", "drive", "cache", "dep:calamine", "dep:rust_xlsxwriter", "dep:umya-spreadsheet", "kb-extraction"]
slides = ["automation", "drive", "cache", "dep:ooxmlsdk", "kb-extraction"]
paper = ["automation", "drive", "cache"]
# Media
@ -172,7 +175,7 @@ umya-spreadsheet = { workspace = true, optional = true }
# File Storage & Drive (drive feature)
# minio removed - use rust-s3 via S3Repository instead
pdf-extract = { workspace = true, optional = true }
quick-xml = { workspace = true, optional = true }
quick-xml = { workspace = true }
flate2 = { workspace = true }
zip = { workspace = true }
tar = { workspace = true }

View file

@ -16,7 +16,7 @@ pub async fn execute_llm_with_context(
system_prompt: &str,
user_prompt: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let model = config_manager
.get_config(&bot_id, "llm-model", None)

View file

@ -2728,7 +2728,7 @@ NO QUESTIONS. JUST BUILD."#
{
let prompt = _prompt;
let bot_id = _bot_id;
let config_manager = ConfigManager::new(self.state.conn.clone().into());
let config_manager = ConfigManager::new(self.state.conn.clone());
let model = config_manager
.get_config(&bot_id, "llm-model", None)
.unwrap_or_else(|_| {
@ -3170,40 +3170,9 @@ NO QUESTIONS. JUST BUILD."#
.execute(&mut conn)?;
Ok(())
}
}
/// Ensure the bucket exists, creating it if necessary
async fn ensure_bucket_exists(
&self,
bucket: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
#[cfg(feature = "drive")]
if let Some(ref s3) = self.state.drive {
// Check if bucket exists
match s3.object_exists(bucket, "").await {
Ok(_) => {
trace!("Bucket {} already exists", bucket);
Ok(())
}
Err(_) => {
Ok(())
}
}
} else {
// No S3 client, we'll use DB fallback - no bucket needed
trace!("No S3 client, using DB fallback for storage");
Ok(())
}
#[cfg(not(feature = "drive"))]
{
let _ = bucket;
trace!("Drive feature not enabled, no bucket check needed");
Ok(())
}
}
async fn write_to_drive(
async fn write_to_drive(
&self,
bucket: &str,
path: &str,

View file

@ -206,7 +206,7 @@ fn fill_pending_info(
.bind::<Text, _>(config_key)
.execute(&mut conn)?;
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
config_manager
.set_config(&bot_id, config_key, value)
.map_err(|e| format!("Failed to set config: {}", e))?;

View file

@ -1050,7 +1050,7 @@ Respond ONLY with valid JSON."#
let prompt = _prompt;
let bot_id = _bot_id;
// Get model and key from bot configuration
let config_manager = ConfigManager::new(self.state.conn.clone().into());
let config_manager = ConfigManager::new(self.state.conn.clone());
let model = config_manager
.get_config(&bot_id, "llm-model", None)
.unwrap_or_else(|_| {

View file

@ -1056,7 +1056,7 @@ END TRIGGER
let prompt = _prompt;
let bot_id = _bot_id;
// Get model and key from bot configuration
let config_manager = ConfigManager::new(self.state.conn.clone().into());
let config_manager = ConfigManager::new(self.state.conn.clone());
let model = config_manager
.get_config(&bot_id, "llm-model", None)
.unwrap_or_else(|_| {

View file

@ -683,7 +683,7 @@ Respond ONLY with valid JSON."#,
let prompt = _prompt;
let bot_id = _bot_id;
// Get model and key from bot configuration
let config_manager = ConfigManager::new(self.state.conn.clone().into());
let config_manager = ConfigManager::new(self.state.conn.clone());
let model = config_manager
.get_config(&bot_id, "llm-model", None)
.unwrap_or_else(|_| {

View file

@ -234,7 +234,7 @@ async fn get_kb_statistics(
let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
sm.get_vectordb_config_sync().0
} else {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
config_manager
.get_config(&user.bot_id, "vectordb-url", Some("https://localhost:6333"))
.unwrap_or_else(|_| "https://localhost:6333".to_string())
@ -293,7 +293,7 @@ async fn get_collection_statistics(
let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
sm.get_vectordb_config_sync().0
} else {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
config_manager
.get_config(&uuid::Uuid::nil(), "vectordb-url", Some("https://localhost:6333"))
.unwrap_or_else(|_| "https://localhost:6333".to_string())
@ -382,7 +382,7 @@ async fn list_collections(
let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
sm.get_vectordb_config_sync().0
} else {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
config_manager
.get_config(&user.bot_id, "vectordb-url", Some("https://localhost:6333"))
.unwrap_or_else(|_| "https://localhost:6333".to_string())

View file

@ -79,7 +79,7 @@ pub async fn execute_llm_generation(
state: Arc<AppState>,
prompt: String,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let model = config_manager
.get_config(&Uuid::nil(), "llm-model", None)
.unwrap_or_default();

View file

@ -48,7 +48,7 @@ async fn call_llm(
state: &AppState,
prompt: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let model = config_manager
.get_config(&Uuid::nil(), "llm-model", None)
.unwrap_or_default();

View file

@ -260,7 +260,7 @@ Return ONLY the JSON object, no explanations or markdown formatting."#,
}
async fn call_llm_for_extraction(state: &AppState, prompt: &str) -> Result<Value, String> {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let model = config_manager
.get_config(&Uuid::nil(), "llm-model", None)
.unwrap_or_else(|_| "gpt-3.5-turbo".to_string());

View file

@ -486,7 +486,7 @@ async fn execute_send_sms(
provider_override: Option<&str>,
priority_override: Option<&str>,
) -> Result<SmsSendResult, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let bot_id = user.bot_id;
let provider_name = match provider_override {
@ -589,7 +589,7 @@ async fn send_via_twilio(
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let account_sid = config_manager
.get_config(bot_id, "twilio-account-sid", None)
@ -645,7 +645,7 @@ async fn send_via_aws_sns(
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let access_key = config_manager
.get_config(bot_id, "aws-access-key", None)
@ -710,7 +710,7 @@ async fn send_via_vonage(
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let api_key = config_manager
.get_config(bot_id, "vonage-api-key", None)
@ -776,7 +776,7 @@ async fn send_via_messagebird(
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let api_key = config_manager
.get_config(bot_id, "messagebird-api-key", None)
@ -830,7 +830,7 @@ async fn send_via_custom_webhook(
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let webhook_url = config_manager
.get_config(bot_id, &format!("{}-webhook-url", webhook_name), None)

View file

@ -424,7 +424,7 @@ pub fn load_connection_config(
bot_id: Uuid,
connection_name: &str,
) -> Result<ExternalConnection, Box<dyn Error + Send + Sync>> {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let prefix = format!("conn-{}-", connection_name);

View file

@ -244,7 +244,7 @@ impl StatusPanel {
if selected == bot_name {
lines.push("".to_string());
lines.push(" ┌─ Bot Configuration ─────────┐".to_string());
let config_manager = ConfigManager::new(self.app_state.conn.clone().into());
let config_manager = ConfigManager::new(self.app_state.conn.clone());
let llm_model = config_manager
.get_config(bot_id, "llm-model", None)
.unwrap_or_else(|_| "N/A".to_string());

View file

@ -201,21 +201,21 @@ impl BootstrapManager {
match pm.start("directory") {
Ok(_child) => {
info!("Directory service started, waiting for readiness...");
let mut zitadel_ready = false;
for i in 0..150 {
sleep(Duration::from_secs(2)).await;
if zitadel_health_check() {
info!("Zitadel/Directory service is responding after {}s", (i + 1) * 2);
zitadel_ready = true;
break;
}
if i % 15 == 14 {
info!("Zitadel health check: {}s elapsed, retrying...", (i + 1) * 2);
}
}
if !zitadel_ready {
warn!("Zitadel/Directory service did not respond after 300 seconds");
}
let mut zitadel_ready = false;
for i in 0..30 {
sleep(Duration::from_secs(2)).await;
if zitadel_health_check() {
info!("Zitadel/Directory service is responding after {}s", (i + 1) * 2);
zitadel_ready = true;
break;
}
if i == 14 {
info!("Zitadel health check: 30s elapsed, retrying...");
}
}
if !zitadel_ready {
warn!("Zitadel/Directory service did not respond after 60 seconds, continuing anyway");
}
if zitadel_ready {
let config_path = self.stack_dir("conf/system/directory_config.json");

View file

@ -1,7 +1,7 @@
use async_trait::async_trait;
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use crate::core::bot::channels::ChannelAdapter;
@ -20,7 +20,7 @@ pub struct TeamsAdapter {
impl TeamsAdapter {
pub fn new(pool: DbPool, bot_id: Uuid) -> Self {
let config_manager = ConfigManager::new(Arc::new(pool));
let config_manager = ConfigManager::new(pool);
let app_id = config_manager
.get_config(&bot_id, "teams-app-id", None)

View file

@ -3,7 +3,7 @@ use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, Pool};
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::core::bot::channels::ChannelAdapter;
use crate::core::config::ConfigManager;
@ -88,7 +88,7 @@ pub struct TelegramAdapter {
impl TelegramAdapter {
pub fn new(pool: Pool<ConnectionManager<PgConnection>>, bot_id: uuid::Uuid) -> Self {
let config_manager = ConfigManager::new(Arc::new(pool));
let config_manager = ConfigManager::new(pool);
let bot_token = config_manager
.get_config(&bot_id, "telegram-bot-token", None)

View file

@ -26,7 +26,7 @@ pub struct WhatsAppAdapter {
impl WhatsAppAdapter {
pub fn new(state: &Arc<AppState>, bot_id: Uuid) -> Self {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let api_key = config_manager
.get_config(&bot_id, "whatsapp-api-key", None)

View file

@ -520,7 +520,7 @@ impl BotOrchestrator {
sm.get_session_context_data(&session.id, &session.user_id)?
};
let config_manager = ConfigManager::new(state_clone.conn.clone().into());
let config_manager = ConfigManager::new(state_clone.conn.clone());
let history_limit = config_manager
.get_bot_config_value(&session.bot_id, "history-limit")
@ -875,7 +875,7 @@ impl BotOrchestrator {
#[cfg(feature = "nvidia")]
{
let initial_tokens = crate::core::shared::utils::estimate_token_count(&context_data);
let config_manager = ConfigManager::new(self.state.conn.clone().into());
let config_manager = ConfigManager::new(self.state.conn.clone());
let max_context_size = config_manager
.get_config(&session.bot_id, "llm-server-ctx-size", None)
.unwrap_or_default()

View file

@ -110,7 +110,7 @@ impl BotOrchestrator {
sm.get_conversation_history(session.id, user_id)?
};
let config_manager = ConfigManager::new(state_clone.conn.clone().into());
let config_manager = ConfigManager::new(state_clone.conn.clone());
let model = config_manager
.get_config(&bot_id, "llm-model", Some("gpt-3.5-turbo"))
.unwrap_or_else(|_| "gpt-3.5-turbo".to_string());
@ -149,7 +149,7 @@ impl BotOrchestrator {
#[cfg(feature = "nvidia")]
{
let initial_tokens = crate::core::shared::utils::estimate_token_count(&context_data);
let config_manager = ConfigManager::new(self.state.conn.clone().into());
let config_manager = ConfigManager::new(self.state.conn.clone());
let max_context_size = config_manager
.get_config(&bot_id, "llm-server-ctx-size", None)
.unwrap_or_default()

View file

@ -1,9 +1,31 @@
// Core configuration module
// Minimal implementation to allow compilation
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::core::shared::utils::DbPool;
use diesel::prelude::*;
#[derive(Debug, Clone, QueryableByName)]
struct ConfigRow {
#[diesel(sql_type = diesel::sql_types::Text)]
config_value: String,
}
fn is_placeholder_value(val: &str) -> bool {
let lower = val.trim().to_lowercase();
lower.is_empty() || lower == "none" || lower == "null" || lower == "n/a"
}
fn is_local_file_path(val: &str) -> bool {
let lower = val.to_lowercase();
val.starts_with("../")
|| val.starts_with("./")
|| val.starts_with('/')
|| val.starts_with('~')
|| lower.ends_with(".gguf")
|| lower.ends_with(".bin")
|| lower.ends_with(".safetensors")
}
/// Application configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppConfig {
@ -28,7 +50,7 @@ pub struct DatabaseConfig {
pub max_connections: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DriveConfig {
pub endpoint: String,
pub bucket: String,
@ -88,42 +110,141 @@ impl AppConfig {
/// Configuration manager for runtime config updates
pub struct ConfigManager {
db_pool: Arc<dyn Send + Sync>,
pool: Arc<DbPool>,
}
impl ConfigManager {
pub fn new<T: Send + Sync + 'static>(db_pool: Arc<T>) -> Self {
Self {
db_pool: db_pool as Arc<dyn Send + Sync>,
}
pub fn new(pool: DbPool) -> Self {
Self { pool: Arc::new(pool) }
}
pub fn get_config(
&self,
_bot_id: &uuid::Uuid,
_key: &str,
bot_id: &uuid::Uuid,
key: &str,
default: Option<&str>,
) -> Result<String, Box<dyn std::error::Error>> {
if let Ok(mut conn) = self.pool.get() {
let bot_val = diesel::sql_query(
"SELECT config_value FROM bot_configuration WHERE bot_id = $1 AND config_key = $2 LIMIT 1"
)
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.bind::<diesel::sql_types::Text, _>(key)
.get_result::<ConfigRow>(&mut conn)
.ok()
.map(|r| r.config_value);
if let Some(ref val) = bot_val {
if !is_placeholder_value(val) && !is_local_file_path(val) {
return Ok(val.clone());
}
}
let default_val = diesel::sql_query(
"SELECT config_value FROM bot_configuration WHERE bot_id = $1 AND config_key = $2 LIMIT 1"
)
.bind::<diesel::sql_types::Uuid, _>(uuid::Uuid::nil())
.bind::<diesel::sql_types::Text, _>(key)
.get_result::<ConfigRow>(&mut conn)
.ok()
.map(|r| r.config_value);
if let Some(ref val) = default_val {
if !is_placeholder_value(val) {
return Ok(val.clone());
}
}
}
Ok(default.unwrap_or("").to_string())
}
pub fn get_bot_config_value(
&self,
_bot_id: &uuid::Uuid,
_key: &str,
bot_id: &uuid::Uuid,
key: &str,
) -> Result<String, Box<dyn std::error::Error>> {
Ok(String::new())
if let Ok(mut conn) = self.pool.get() {
let row = diesel::sql_query(
"SELECT config_value FROM bot_configuration WHERE bot_id = $1 AND config_key = $2 LIMIT 1"
)
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.bind::<diesel::sql_types::Text, _>(key)
.get_result::<ConfigRow>(&mut conn)
.ok();
if let Some(r) = row {
return Ok(r.config_value);
}
}
Err("Config key not found".into())
}
pub fn set_config(
&self,
_bot_id: &uuid::Uuid,
_key: &str,
_value: &str,
bot_id: &uuid::Uuid,
key: &str,
value: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if let Ok(mut conn) = self.pool.get() {
diesel::sql_query(
"INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type, is_encrypted, created_at, updated_at) \
VALUES ($1, $2, $3, $4, 'string', false, NOW(), NOW()) \
ON CONFLICT (bot_id, config_key) DO UPDATE SET config_value = $4, updated_at = NOW()"
)
.bind::<diesel::sql_types::Uuid, _>(uuid::Uuid::new_v4())
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.bind::<diesel::sql_types::Text, _>(key)
.bind::<diesel::sql_types::Text, _>(value)
.execute(&mut conn)?;
}
Ok(())
}
}
// Re-export for convenience
pub use AppConfig as Config;
// Manual implementation to load from Vault
impl Default for DriveConfig {
fn default() -> Self {
// Try to load from Vault
if let Ok(vault_addr) = std::env::var("VAULT_ADDR") {
if let Ok(vault_token) = std::env::var("VAULT_TOKEN") {
let ca_cert = std::env::var("VAULT_CACERT").unwrap_or_default();
let url = format!("{}/v1/secret/data/gbo/drive", vault_addr);
if let Ok(output) = std::process::Command::new("curl")
.args(&["-sf", "--cacert", &ca_cert, "-H", &format!("X-Vault-Token: {}", &vault_token), &url])
.output()
{
if let Ok(data) = serde_json::from_slice::<serde_json::Value>(&output.stdout) {
if let Some(secret_data) = data.get("data").and_then(|d| d.get("data")) {
let host = secret_data.get("host").and_then(|v| v.as_str()).unwrap_or("localhost");
let accesskey = secret_data.get("accesskey").and_then(|v| v.as_str()).unwrap_or("");
let secret = secret_data.get("secret").and_then(|v| v.as_str()).unwrap_or("");
let bucket = secret_data.get("bucket").and_then(|v| v.as_str()).unwrap_or("default.gbai");
return Self {
endpoint: format!("http://{}", host),
bucket: bucket.to_string(),
region: "auto".to_string(),
access_key: accesskey.to_string(),
secret_key: secret.to_string(),
server: host.to_string(),
};
}
}
}
}
}
// Fallback to empty/localhost
Self {
endpoint: "http://localhost:9100".to_string(),
bucket: String::new(),
region: "auto".to_string(),
access_key: String::new(),
secret_key: String::new(),
server: "localhost:9100".to_string(),
}
}
}

View file

@ -8,7 +8,7 @@ use crate::core::config::ConfigManager;
pub async fn reload_config(
State(state): State<Arc<AppState>>,
) -> Result<Json<Value>, StatusCode> {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
// Get default bot
let conn_arc = state.conn.clone();

View file

@ -1,85 +1,18 @@
mod ooxml_extract;
mod rtf;
mod types;
pub use types::{ChunkMetadata, DocumentFormat, DocumentMetadata, TextChunk};
use anyhow::Result;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Cursor;
use std::path::Path;
use tokio::io::AsyncReadExt;
use crate::security::command_guard::SafeCommand;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DocumentFormat {
PDF,
DOCX,
XLSX,
PPTX,
TXT,
MD,
HTML,
RTF,
CSV,
JSON,
XML,
}
impl DocumentFormat {
pub fn from_extension(path: &Path) -> Option<Self> {
let ext = path.extension()?.to_str()?.to_lowercase();
match ext.as_str() {
"pdf" => Some(Self::PDF),
"docx" => Some(Self::DOCX),
"xlsx" => Some(Self::XLSX),
"pptx" => Some(Self::PPTX),
"txt" => Some(Self::TXT),
"md" | "markdown" => Some(Self::MD),
"html" | "htm" => Some(Self::HTML),
"rtf" => Some(Self::RTF),
"csv" => Some(Self::CSV),
"json" => Some(Self::JSON),
"xml" => Some(Self::XML),
_ => None,
}
}
pub fn max_size(&self) -> usize {
match self {
Self::PDF => 500 * 1024 * 1024,
Self::PPTX => 200 * 1024 * 1024,
Self::DOCX | Self::XLSX | Self::TXT | Self::JSON | Self::XML => 100 * 1024 * 1024,
Self::HTML | Self::RTF => 50 * 1024 * 1024,
Self::MD => 10 * 1024 * 1024,
Self::CSV => 1024 * 1024 * 1024,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentMetadata {
pub title: Option<String>,
pub author: Option<String>,
pub creation_date: Option<String>,
pub modification_date: Option<String>,
pub page_count: Option<usize>,
pub word_count: Option<usize>,
pub language: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TextChunk {
pub content: String,
pub metadata: ChunkMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkMetadata {
pub document_path: String,
pub document_title: Option<String>,
pub chunk_index: usize,
pub total_chunks: usize,
pub start_char: usize,
pub end_char: usize,
pub page_number: Option<usize>,
}
#[derive(Debug)]
pub struct DocumentProcessor {
chunk_size: usize,
@ -124,10 +57,7 @@ impl DocumentProcessor {
let file_size = metadata.len() as usize;
if file_size == 0 {
debug!(
"Skipping empty file (0 bytes): {}",
file_path.display()
);
debug!("Skipping empty file (0 bytes): {}", file_path.display());
return Ok(Vec::new());
}
@ -150,9 +80,7 @@ impl DocumentProcessor {
);
let text = self.extract_text(file_path, format).await?;
let cleaned_text = Self::clean_text(&text);
let chunks = self.create_chunks(&cleaned_text, file_path);
info!(
@ -165,10 +93,9 @@ impl DocumentProcessor {
}
async fn extract_text(&self, file_path: &Path, format: DocumentFormat) -> Result<String> {
// Check file size before processing to prevent memory exhaustion
let metadata = tokio::fs::metadata(file_path).await?;
let file_size = metadata.len() as usize;
if file_size > format.max_size() {
return Err(anyhow::anyhow!(
"File too large: {} bytes (max: {} bytes)",
@ -179,8 +106,7 @@ impl DocumentProcessor {
match format {
DocumentFormat::TXT | DocumentFormat::MD => {
// Use streaming read for large text files
if file_size > 10 * 1024 * 1024 { // 10MB
if file_size > 10 * 1024 * 1024 {
self.extract_large_text_file(file_path).await
} else {
let mut file = tokio::fs::File::open(file_path).await?;
@ -191,29 +117,26 @@ impl DocumentProcessor {
}
DocumentFormat::PDF => self.extract_pdf_text(file_path).await,
DocumentFormat::DOCX => self.extract_docx_text(file_path).await,
DocumentFormat::PPTX => self.extract_pptx_text(file_path).await,
DocumentFormat::XLSX => self.extract_xlsx_text(file_path).await,
DocumentFormat::HTML => self.extract_html_text(file_path).await,
DocumentFormat::CSV => self.extract_csv_text(file_path).await,
DocumentFormat::JSON => self.extract_json_text(file_path).await,
_ => {
warn!(
"Format {:?} extraction not yet implemented, using fallback",
format
);
self.fallback_text_extraction(file_path).await
}
DocumentFormat::XML => self.extract_xml_text(file_path).await,
DocumentFormat::RTF => self.extract_rtf_text(file_path).await,
}
}
async fn extract_large_text_file(&self, file_path: &Path) -> Result<String> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::File::open(file_path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut content = String::new();
let mut line_count = 0;
const MAX_LINES: usize = 100_000; // Limit lines to prevent memory exhaustion
const MAX_LINES: usize = 100_000;
while let Some(line) = lines.next_line().await? {
if line_count >= MAX_LINES {
warn!("Truncating large file at {} lines: {}", MAX_LINES, file_path.display());
@ -222,13 +145,12 @@ impl DocumentProcessor {
content.push_str(&line);
content.push('\n');
line_count += 1;
// Yield control periodically
if line_count % 1000 == 0 {
tokio::task::yield_now().await;
}
}
Ok(content)
}
@ -249,17 +171,11 @@ impl DocumentProcessor {
match output {
Ok(output) if output.status.success() => {
info!(
"Successfully extracted PDF with pdftotext: {}",
file_path.display()
);
info!("Successfully extracted PDF with pdftotext: {}", file_path.display());
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
_ => {
warn!(
"pdftotext failed for {}, trying library extraction",
file_path.display()
);
warn!("pdftotext failed for {}, trying library extraction", file_path.display());
self.extract_pdf_with_library(file_path)
}
}
@ -301,60 +217,60 @@ impl DocumentProcessor {
}
async fn extract_docx_text(&self, file_path: &Path) -> Result<String> {
let file_path_str = file_path.to_string_lossy().to_string();
let cmd_result = SafeCommand::new("pandoc")
.and_then(|c| c.arg("-f"))
.and_then(|c| c.arg("docx"))
.and_then(|c| c.arg("-t"))
.and_then(|c| c.arg("plain"))
.and_then(|c| c.arg(&file_path_str));
let bytes = tokio::fs::read(file_path).await?;
let path_display = file_path.display().to_string();
let result = tokio::task::spawn_blocking(move || -> Result<String> {
match ooxml_extract::extract_docx_text_from_zip(&bytes) {
Ok(text) if !text.trim().is_empty() => {
log::info!("Extracted DOCX text from ZIP: {path_display}");
return Ok(text);
}
Ok(_) => log::warn!("DOCX ZIP extraction returned empty text: {path_display}"),
Err(e) => log::warn!("DOCX ZIP extraction failed for {path_display}: {e}"),
}
let output = match cmd_result {
Ok(cmd) => cmd.execute_async().await,
Err(e) => {
warn!("Failed to build pandoc command: {}", e);
return self.fallback_text_extraction(file_path).await;
#[cfg(feature = "docs")]
match crate::docs::ooxml::load_docx_preserving(&bytes) {
Ok(doc) => {
let text: String = doc.paragraphs.iter().map(|p| p.text.as_str()).collect::<Vec<_>>().join("\n");
if !text.trim().is_empty() {
log::info!("Extracted DOCX with ooxmlsdk: {path_display}");
return Ok(text);
}
log::warn!("ooxmlsdk DOCX returned empty: {path_display}");
}
Err(e) => log::warn!("ooxmlsdk DOCX failed for {path_display}: {e}"),
}
};
match output {
Ok(output) if output.status.success() => {
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
_ => {
warn!("pandoc failed for DOCX, using fallback");
self.fallback_text_extraction(file_path).await
}
}
Err(anyhow::anyhow!("All DOCX extraction methods failed for {path_display}"))
})
.await??;
Ok(result)
}
async fn extract_html_text(&self, file_path: &Path) -> Result<String> {
let contents = tokio::fs::read_to_string(file_path).await?;
let text = contents
.split('<')
.flat_map(|s| s.split('>').skip(1))
.collect::<Vec<_>>()
.join(" ");
Ok(text)
}
async fn extract_csv_text(&self, file_path: &Path) -> Result<String> {
let contents = tokio::fs::read_to_string(file_path).await?;
let mut text = String::new();
for line in contents.lines() {
text.push_str(line);
text.push('\n');
}
Ok(text)
}
async fn extract_json_text(&self, file_path: &Path) -> Result<String> {
let contents = tokio::fs::read_to_string(file_path).await?;
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&contents) {
Ok(Self::extract_json_strings(&json))
} else {
@ -364,7 +280,6 @@ impl DocumentProcessor {
fn extract_json_strings(value: &serde_json::Value) -> String {
let mut result = String::new();
match value {
serde_json::Value::String(s) => {
result.push_str(s);
@ -382,10 +297,180 @@ impl DocumentProcessor {
}
_ => {}
}
result
}
async fn extract_pptx_text(&self, file_path: &Path) -> Result<String> {
let bytes = tokio::fs::read(file_path).await?;
let path_display = file_path.display().to_string();
let result = tokio::task::spawn_blocking(move || -> Result<String> {
match ooxml_extract::extract_pptx_text_from_zip(&bytes) {
Ok(text) if !text.trim().is_empty() => {
log::info!("Extracted PPTX text from ZIP: {path_display}");
return Ok(text);
}
Ok(_) => log::warn!("PPTX ZIP extraction returned empty text: {path_display}"),
Err(e) => log::warn!("PPTX ZIP extraction failed for {path_display}: {e}"),
}
#[cfg(feature = "slides")]
match crate::slides::ooxml::load_pptx_preserving(&bytes) {
Ok(pptx) => {
let mut text = String::new();
for slide in &pptx.slides {
for slide_text in &slide.texts {
if !text.is_empty() {
text.push('\n');
}
text.push_str(slide_text);
}
}
if !text.trim().is_empty() {
log::info!("Extracted PPTX with ooxmlsdk: {path_display}");
return Ok(text);
}
log::warn!("ooxmlsdk PPTX returned empty: {path_display}");
}
Err(e) => log::warn!("ooxmlsdk PPTX failed for {path_display}: {e}"),
}
Err(anyhow::anyhow!("All PPTX extraction methods failed for {path_display}"))
})
.await??;
Ok(result)
}
#[cfg(feature = "kb-extraction")]
async fn extract_xlsx_text(&self, file_path: &Path) -> Result<String> {
let path = file_path.to_path_buf();
let result = tokio::task::spawn_blocking(move || -> Result<String> {
use calamine::{open_workbook_from_rs, Reader, Xlsx};
use std::io::Read;
let mut file = std::fs::File::open(&path)?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
let cursor = Cursor::new(bytes.as_slice());
let mut workbook: Xlsx<_> = open_workbook_from_rs(cursor)
.map_err(|e| anyhow::anyhow!("Failed to open XLSX: {e}"))?;
let mut content = String::new();
for sheet_name in workbook.sheet_names() {
if let Ok(range) = workbook.worksheet_range(&sheet_name) {
use std::fmt::Write;
let _ = writeln!(&mut content, "=== {} ===", sheet_name);
for row in range.rows() {
let row_text: Vec<String> = row
.iter()
.map(|cell| match cell {
calamine::Data::Empty => String::new(),
calamine::Data::String(s)
| calamine::Data::DateTimeIso(s)
| calamine::Data::DurationIso(s) => s.clone(),
calamine::Data::Float(f) => f.to_string(),
calamine::Data::Int(i) => i.to_string(),
calamine::Data::Bool(b) => b.to_string(),
calamine::Data::Error(e) => format!("{e:?}"),
calamine::Data::DateTime(dt) => dt.to_string(),
})
.collect();
let line = row_text.join("\t");
if !line.trim().is_empty() {
content.push_str(&line);
content.push('\n');
}
}
content.push('\n');
}
}
Ok(content)
})
.await??;
if result.trim().is_empty() {
warn!("XLSX extraction produced empty text: {}", file_path.display());
} else {
info!("Extracted XLSX with calamine library: {}", file_path.display());
}
Ok(result)
}
#[cfg(not(feature = "kb-extraction"))]
async fn extract_xlsx_text(&self, file_path: &Path) -> Result<String> {
self.fallback_text_extraction(file_path).await
}
async fn extract_xml_text(&self, file_path: &Path) -> Result<String> {
let bytes = tokio::fs::read(file_path).await?;
let result = tokio::task::spawn_blocking(move || -> Result<String> {
use quick_xml::events::Event;
use quick_xml::Reader;
let mut reader = Reader::from_reader(bytes.as_slice());
let mut text = String::new();
let mut buf = Vec::new();
loop {
match reader.read_event_into(&mut buf) {
Ok(Event::Text(t)) => {
if let Ok(s) = t.unescape() {
let s = s.trim();
if !s.is_empty() {
if !text.is_empty() {
text.push(' ');
}
text.push_str(s);
}
}
}
Ok(Event::Eof) => break,
Err(e) => {
return Err(anyhow::anyhow!(
"XML parsing error at position {}: {e}",
reader.error_position()
));
}
_ => {}
}
buf.clear();
}
Ok(text)
})
.await??;
if result.trim().is_empty() {
warn!("XML extraction produced empty text: {}", file_path.display());
return self.fallback_text_extraction(file_path).await;
}
info!("Extracted XML with quick-xml: {}", file_path.display());
Ok(result)
}
async fn extract_rtf_text(&self, file_path: &Path) -> Result<String> {
let bytes = tokio::fs::read(file_path).await?;
let result = tokio::task::spawn_blocking(move || -> Result<String> {
let content = String::from_utf8_lossy(&bytes);
let text = rtf::strip_rtf_commands(&content);
Ok(text)
})
.await??;
if result.trim().is_empty() {
warn!("RTF extraction produced empty text: {}", file_path.display());
return self.fallback_text_extraction(file_path).await;
}
info!("Extracted RTF text: {}", file_path.display());
Ok(result)
}
async fn fallback_text_extraction(&self, file_path: &Path) -> Result<String> {
match tokio::fs::read_to_string(file_path).await {
Ok(contents) => Ok(contents),
@ -415,16 +500,15 @@ impl DocumentProcessor {
fn create_chunks(&self, text: &str, file_path: &Path) -> Vec<TextChunk> {
let mut chunks = Vec::new();
// For very large texts, limit processing to prevent memory exhaustion
const MAX_TEXT_SIZE: usize = 10 * 1024 * 1024; // 10MB
const MAX_TEXT_SIZE: usize = 10 * 1024 * 1024;
let text_to_process = if text.len() > MAX_TEXT_SIZE {
warn!("Truncating large text to {} chars for chunking: {}", MAX_TEXT_SIZE, file_path.display());
&text[..MAX_TEXT_SIZE]
} else {
text
};
let chars: Vec<char> = text_to_process.chars().collect();
let total_chars = chars.len();
@ -442,7 +526,6 @@ impl DocumentProcessor {
1
};
// Limit maximum number of chunks to prevent memory exhaustion
const MAX_CHUNKS: usize = 1000;
let max_chunks_to_create = std::cmp::min(total_chunks, MAX_CHUNKS);
@ -451,7 +534,6 @@ impl DocumentProcessor {
let mut chunk_end = end;
if end < total_chars {
// Find word boundary within reasonable distance
let search_start = std::cmp::max(start, end.saturating_sub(100));
for i in (search_start..end).rev() {
if chars[i].is_whitespace() {
@ -463,7 +545,6 @@ impl DocumentProcessor {
let chunk_content: String = chars[start..chunk_end].iter().collect();
// Skip empty or very small chunks
if chunk_content.trim().len() < 10 {
start = chunk_end;
continue;
@ -518,16 +599,15 @@ impl DocumentProcessor {
info!("Processing knowledge base folder: {}", kb_path.display());
// Process files in small batches to prevent memory exhaustion
let mut results = HashMap::new();
const BATCH_SIZE: usize = 10; // Much smaller batch size
const BATCH_SIZE: usize = 10;
let files = self.collect_supported_files(kb_path).await?;
info!("Found {} supported files to process", files.len());
for batch in files.chunks(BATCH_SIZE) {
let mut batch_results = HashMap::new();
for file_path in batch {
match self.process_document(file_path).await {
Ok(chunks) => {
@ -539,19 +619,16 @@ impl DocumentProcessor {
warn!("Failed to process document {}: {}", file_path.display(), e);
}
}
// Yield control after each file
tokio::task::yield_now().await;
}
// Merge batch results and clear batch memory
results.extend(batch_results);
// Force memory cleanup between batches
if results.len() % (BATCH_SIZE * 2) == 0 {
results.shrink_to_fit();
}
info!("Processed batch, total documents: {}", results.len());
}
@ -571,7 +648,6 @@ impl DocumentProcessor {
files: &mut Vec<std::path::PathBuf>,
depth: usize,
) -> Result<()> {
// Prevent excessive recursion
if depth > 10 {
warn!("Skipping deep directory to prevent stack overflow: {}", dir.display());
return Ok(());
@ -586,7 +662,6 @@ impl DocumentProcessor {
if metadata.is_dir() {
Box::pin(self.collect_files_recursive(&path, files, depth + 1)).await?;
} else if self.is_supported_file(&path) {
// Skip very large files
if metadata.len() > 50 * 1024 * 1024 {
warn!("Skipping large file: {} ({})", path.display(), metadata.len());
continue;

View file

@ -0,0 +1,167 @@
use std::io::Cursor;
pub fn extract_docx_text_from_zip(bytes: &[u8]) -> Result<String, String> {
use std::io::Read;
use zip::ZipArchive;
let reader = Cursor::new(bytes);
let mut archive = ZipArchive::new(reader)
.map_err(|e| format!("Failed to open DOCX as ZIP: {e}"))?;
for i in 0..archive.len() {
let mut file = archive
.by_index(i)
.map_err(|e| format!("Failed to read ZIP entry: {e}"))?;
if file.name() == "word/document.xml" {
let mut content = String::new();
file.read_to_string(&mut content)
.map_err(|e| format!("Failed to read document.xml: {e}"))?;
let paragraphs = extract_paragraphs(&content);
let text: String = paragraphs.iter().map(|p| p.as_str()).collect::<Vec<_>>().join("\n");
return Ok(text);
}
}
Err("word/document.xml not found in DOCX archive".to_string())
}
fn extract_paragraphs(xml: &str) -> Vec<String> {
let mut paragraphs = Vec::new();
let mut pos = 0;
while let Some(p_start) = xml[pos..].find("<w:p") {
let abs_start = pos + p_start;
if let Some(p_end_rel) = xml[abs_start..].find("</w:p>") {
let abs_end = abs_start + p_end_rel + 6;
let para_content = &xml[abs_start..abs_end];
let text = extract_text_from_paragraph(para_content);
if !text.trim().is_empty() {
paragraphs.push(text);
}
pos = abs_end;
} else {
break;
}
}
paragraphs
}
fn extract_text_from_paragraph(para_xml: &str) -> String {
let mut text = String::new();
let mut pos = 0;
while let Some(t_start) = para_xml[pos..].find("<w:t") {
let abs_start = pos + t_start;
if let Some(content_start_rel) = para_xml[abs_start..].find('>') {
let abs_content_start = abs_start + content_start_rel + 1;
if let Some(t_end_rel) = para_xml[abs_content_start..].find("</w:t>") {
let content = &para_xml[abs_content_start..abs_content_start + t_end_rel];
text.push_str(content);
pos = abs_content_start + t_end_rel + 6;
} else {
break;
}
} else {
break;
}
}
unescape_xml(&text)
}
pub fn extract_pptx_text_from_zip(bytes: &[u8]) -> Result<String, String> {
use std::io::Read;
use zip::ZipArchive;
let reader = Cursor::new(bytes);
let mut archive = ZipArchive::new(reader)
.map_err(|e| format!("Failed to open PPTX as ZIP: {e}"))?;
let mut all_texts = Vec::new();
for i in 0..archive.len() {
let mut file = archive
.by_index(i)
.map_err(|e| format!("Failed to read ZIP entry: {e}"))?;
let name = file.name().to_string();
if name.starts_with("ppt/slides/slide") && name.ends_with(".xml") {
let mut content = String::new();
file.read_to_string(&mut content)
.map_err(|e| format!("Failed to read {name}: {e}"))?;
let texts = extract_slide_texts(&content);
all_texts.extend(texts);
}
}
if all_texts.is_empty() {
return Err("No slide text found in PPTX archive".to_string());
}
Ok(all_texts.join("\n"))
}
fn extract_slide_texts(xml: &str) -> Vec<String> {
let mut texts = Vec::new();
let mut pos = 0;
while let Some(p_start) = xml[pos..].find("<a:p") {
let abs_start = pos + p_start;
if let Some(p_end_rel) = xml[abs_start..].find("</a:p>") {
let abs_end = abs_start + p_end_rel + 6;
let para_content = &xml[abs_start..abs_end];
let text = extract_slide_text_from_paragraph(para_content);
if !text.trim().is_empty() {
texts.push(text);
}
pos = abs_end;
} else {
break;
}
}
texts
}
fn extract_slide_text_from_paragraph(para_xml: &str) -> String {
let mut text = String::new();
let mut pos = 0;
while let Some(t_start) = para_xml[pos..].find("<a:t") {
let abs_start = pos + t_start;
if let Some(tag_end_rel) = para_xml[abs_start..].find('>') {
let abs_content_start = abs_start + tag_end_rel + 1;
if let Some(t_end_rel) = para_xml[abs_content_start..].find("</a:t>") {
let content = &para_xml[abs_content_start..abs_content_start + t_end_rel];
text.push_str(content);
pos = abs_content_start + t_end_rel + 6;
} else {
break;
}
} else {
break;
}
}
unescape_xml(&text)
}
fn unescape_xml(text: &str) -> String {
text.replace("&amp;", "&")
.replace("&lt;", "<")
.replace("&gt;", ">")
.replace("&quot;", "\"")
.replace("&apos;", "'")
}

View file

@ -0,0 +1,62 @@
pub fn strip_rtf_commands(input: &str) -> String {
let mut result = String::with_capacity(input.len());
let chars: Vec<char> = input.chars().collect();
let len = chars.len();
let mut i = 0;
let mut depth = 0i32;
while i < len {
if chars[i] == '{' {
depth += 1;
i += 1;
} else if chars[i] == '}' {
depth -= 1;
if depth < 0 {
depth = 0;
}
i += 1;
} else if chars[i] == '\\' && i + 1 < len {
if chars[i + 1] == '\'' && i + 4 <= len {
if let Ok(byte_val) = u8::from_str_radix(
&input[chars[..i + 2].iter().collect::<String>().len()..]
.chars()
.take(2)
.collect::<String>(),
16,
) {
if let Some(c) = char::from_u32(byte_val as u32) {
result.push(c);
}
}
i += 4;
} else if chars[i + 1] == '\n' || chars[i + 1] == '\r' {
result.push('\n');
i += 2;
} else {
let mut j = i + 1;
while j < len && chars[j].is_ascii_alphabetic() {
j += 1;
}
if j < len && (chars[j] == '-' || chars[j] == ' ') && chars[j].is_ascii_digit()
|| (j > i + 1 && chars[j] == ' ')
{
j += 1;
while j < len && chars[j].is_ascii_digit() {
j += 1;
}
}
while j < len && chars[j] == ' ' {
j += 1;
}
i = j;
}
} else {
if depth <= 1 {
result.push(chars[i]);
}
i += 1;
}
}
result.split_whitespace().collect::<Vec<_>>().join(" ")
}

View file

@ -0,0 +1,75 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DocumentFormat {
PDF,
DOCX,
XLSX,
PPTX,
TXT,
MD,
HTML,
RTF,
CSV,
JSON,
XML,
}
impl DocumentFormat {
pub fn from_extension(path: &std::path::Path) -> Option<Self> {
let ext = path.extension()?.to_str()?.to_lowercase();
match ext.as_str() {
"pdf" => Some(Self::PDF),
"docx" => Some(Self::DOCX),
"xlsx" => Some(Self::XLSX),
"pptx" => Some(Self::PPTX),
"txt" => Some(Self::TXT),
"md" | "markdown" => Some(Self::MD),
"html" | "htm" => Some(Self::HTML),
"rtf" => Some(Self::RTF),
"csv" => Some(Self::CSV),
"json" => Some(Self::JSON),
"xml" => Some(Self::XML),
_ => None,
}
}
pub fn max_size(&self) -> usize {
match self {
Self::PDF => 500 * 1024 * 1024,
Self::PPTX => 200 * 1024 * 1024,
Self::DOCX | Self::XLSX | Self::TXT | Self::JSON | Self::XML => 100 * 1024 * 1024,
Self::HTML | Self::RTF => 50 * 1024 * 1024,
Self::MD => 10 * 1024 * 1024,
Self::CSV => 1024 * 1024 * 1024,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentMetadata {
pub title: Option<String>,
pub author: Option<String>,
pub creation_date: Option<String>,
pub modification_date: Option<String>,
pub page_count: Option<usize>,
pub word_count: Option<usize>,
pub language: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TextChunk {
pub content: String,
pub metadata: ChunkMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkMetadata {
pub document_path: String,
pub document_title: Option<String>,
pub chunk_index: usize,
pub total_chunks: usize,
pub start_char: usize,
pub end_char: usize,
pub page_number: Option<usize>,
}

View file

@ -59,7 +59,7 @@ impl EmbeddingConfig {
pub fn from_bot_config(pool: &DbPool, _bot_id: &uuid::Uuid) -> Self {
use crate::core::config::ConfigManager;
let config_manager = ConfigManager::new(Arc::new(pool.clone()));
let config_manager = ConfigManager::new(pool.clone());
let embedding_url = config_manager
.get_config(_bot_id, "embedding-url", Some(""))

View file

@ -3,7 +3,7 @@ use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use uuid::Uuid;
use crate::core::config::ConfigManager;
@ -35,7 +35,7 @@ impl QdrantConfig {
let (url, api_key) = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
sm.get_vectordb_config_sync()
} else {
let config_manager = ConfigManager::new(Arc::new(pool.clone()));
let config_manager = ConfigManager::new(pool.clone());
let url = config_manager
.get_config(bot_id, "vectordb-url", Some(""))
.unwrap_or_else(|_| "".to_string());
@ -532,55 +532,72 @@ impl KbIndexer {
Ok(())
}
pub async fn index_single_file(
&self,
bot_id: Uuid,
bot_name: &str,
kb_name: &str,
file_path: &Path,
) -> Result<IndexingResult> {
if !is_embedding_server_ready() {
return Err(anyhow::anyhow!(
"Embedding server not available. Cannot index file."
));
}
pub async fn index_single_file(
&self,
bot_id: Uuid,
bot_name: &str,
kb_name: &str,
file_path: &Path,
) -> Result<IndexingResult> {
self.index_single_file_with_id(bot_id, bot_name, kb_name, file_path, None).await
}
if !self.check_qdrant_health().await.unwrap_or(false) {
return Err(anyhow::anyhow!(
"Qdrant vector database is not available."
));
}
pub async fn index_single_file_with_id(
&self,
bot_id: Uuid,
bot_name: &str,
kb_name: &str,
file_path: &Path,
document_id: Option<&str>,
) -> Result<IndexingResult> {
if !is_embedding_server_ready() {
return Err(anyhow::anyhow!(
"Embedding server not available. Cannot index file."
));
}
let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
if !self.check_qdrant_health().await.unwrap_or(false) {
return Err(anyhow::anyhow!(
"Qdrant vector database is not available."
));
}
self.ensure_collection_exists(&collection_name).await?;
let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
info!(
"Indexing single file: {} into collection {}",
file_path.display(),
collection_name
);
self.ensure_collection_exists(&collection_name).await?;
let chunks = self.document_processor.process_document(file_path).await?;
let doc_path = document_id
.map(|s| s.to_string())
.unwrap_or_else(|| file_path.to_string_lossy().to_string());
if chunks.is_empty() {
warn!("No chunks extracted from file: {}", file_path.display());
return Ok(IndexingResult {
collection_name,
documents_processed: 0,
chunks_indexed: 0,
});
}
info!(
"Indexing single file: {} (id: {}) into collection {}",
file_path.display(), doc_path, collection_name
);
let doc_path = file_path.to_string_lossy().to_string();
let embeddings = self
.embedding_generator
.generate_embeddings(&chunks)
.await?;
if let Err(e) = self.delete_file_points(&collection_name, &doc_path).await {
warn!("Failed to delete old points for {} before reindex: {}", doc_path, e);
}
let points = Self::create_qdrant_points(&doc_path, embeddings)?;
self.upsert_points(&collection_name, points).await?;
let chunks = self.document_processor.process_document(file_path).await?;
if chunks.is_empty() {
warn!("No chunks extracted from file: {}", file_path.display());
return Ok(IndexingResult {
collection_name,
documents_processed: 0,
chunks_indexed: 0,
});
}
let embeddings = self
.embedding_generator
.generate_embeddings(&chunks)
.await?;
let points = Self::create_qdrant_points(&doc_path, embeddings)?;
self.upsert_points(&collection_name, points).await?;
self.update_collection_metadata(&collection_name, bot_name, kb_name, chunks.len())?;

View file

@ -109,23 +109,31 @@ impl KnowledgeBaseManager {
kb_name: &str,
file_path: &Path,
) -> Result<kb_indexer::IndexingResult> {
self.index_single_file_with_id(bot_id, bot_name, kb_name, file_path, None).await
}
pub async fn index_single_file_with_id(
&self,
bot_id: Uuid,
bot_name: &str,
kb_name: &str,
file_path: &Path,
document_id: Option<&str>,
) -> Result<kb_indexer::IndexingResult> {
let doc_id_display = document_id.unwrap_or("(temp path)");
info!(
"Indexing single file: {} into KB {} for bot {}",
file_path.display(),
kb_name,
bot_name
"Indexing single file: {} (id: {}) into KB {} for bot {}",
file_path.display(), doc_id_display, kb_name, bot_name
);
let result = self
.indexer
.index_single_file(bot_id, bot_name, kb_name, file_path)
.index_single_file_with_id(bot_id, bot_name, kb_name, file_path, document_id)
.await?;
info!(
"Successfully indexed {} chunks from {} into collection {}",
result.chunks_indexed,
file_path.display(),
result.collection_name
result.chunks_indexed, file_path.display(), result.collection_name
);
Ok(result)

View file

@ -181,7 +181,7 @@ impl WebsiteCrawlerService {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
trace!("Starting crawl for website: {}", website.url);
let config_manager = ConfigManager::new(db_pool.clone().into());
let config_manager = ConfigManager::new(db_pool.clone());
let website_max_depth = config_manager
.get_bot_config_value(&website.bot_id, "website-max-depth")

View file

@ -269,7 +269,6 @@ impl RequestContext {
#[derive(Clone)]
pub struct ContextMiddlewareState {
pub db_pool: DbPool,
pub jwt_secret: Arc<String>,
pub org_cache: Arc<RwLock<std::collections::HashMap<Uuid, CachedOrganization>>>,
pub user_cache: Arc<RwLock<std::collections::HashMap<Uuid, CachedUserData>>>,
@ -290,9 +289,8 @@ pub struct CachedUserData {
}
impl ContextMiddlewareState {
pub fn new(db_pool: DbPool, jwt_secret: String) -> Self {
pub fn new(_db_pool: DbPool, jwt_secret: String) -> Self {
Self {
db_pool,
jwt_secret: Arc::new(jwt_secret),
org_cache: Arc::new(RwLock::new(std::collections::HashMap::new())),
user_cache: Arc::new(RwLock::new(std::collections::HashMap::new())),

View file

@ -1159,12 +1159,16 @@ EOF"#.to_string(),
}
}
let rendered_cmd = component
.exec_cmd
.replace("{{BIN_PATH}}", &bin_path.to_string_lossy())
.replace("{{DATA_PATH}}", &data_path.to_string_lossy())
.replace("{{CONF_PATH}}", &conf_path.to_string_lossy())
.replace("{{LOGS_PATH}}", &logs_path.to_string_lossy());
let rendered_cmd = component
.exec_cmd
.replace("{{BIN_PATH}}", &bin_path.to_string_lossy())
.replace("{{DATA_PATH}}", &data_path.to_string_lossy())
.replace("{{CONF_PATH}}", &conf_path.to_string_lossy())
.replace("{{LOGS_PATH}}", &logs_path.to_string_lossy());
if let Err(e) = std::fs::create_dir_all(&logs_path) {
warn!("Failed to create log directory {}: {}", logs_path.display(), e);
}
trace!(
"Starting component {} with command: {}",

View file

@ -106,10 +106,13 @@ pub fn get_work_path() -> String {
/// In production (system container with .env but no botserver-stack): /opt/gbo/work
/// In development (with botserver-stack directory): ./botserver-stack/data/system/work
fn get_work_path_default() -> String {
let has_env = std::path::Path::new("./.env").exists()
let has_env = std::path::Path::new("./.env").exists()
|| std::path::Path::new("/opt/gbo/bin/.env").exists();
let stack_work = std::path::Path::new("./botserver-stack/data/system/work");
let production_work = std::path::Path::new("/opt/gbo/work");
if has_env || production_work.exists() {
if stack_work.exists() {
stack_work.to_str().unwrap_or("./botserver-stack/data/system/work").to_string()
} else if has_env || production_work.exists() {
"/opt/gbo/work".to_string()
} else {
"./botserver-stack/data/system/work".to_string()
@ -120,10 +123,13 @@ fn get_work_path_default() -> String {
/// In production (system container with .env): /opt/gbo
/// In development: ./botserver-stack
pub fn get_stack_path() -> String {
let has_env = std::path::Path::new("./.env").exists()
let stack_dir = std::path::Path::new("./botserver-stack");
let has_env = std::path::Path::new("./.env").exists()
|| std::path::Path::new("/opt/gbo/bin/.env").exists();
let production_base = std::path::Path::new("/opt/gbo/bin/botserver");
if has_env || production_base.exists() {
if stack_dir.exists() {
"./botserver-stack".to_string()
} else if has_env || production_base.exists() {
"/opt/gbo".to_string()
} else {
"./botserver-stack".to_string()
@ -135,10 +141,16 @@ pub async fn create_s3_operator(
config: &DriveConfig,
) -> Result<S3Repository, Box<dyn std::error::Error>> {
let endpoint = {
let base = if config.server.starts_with("http://") || config.server.starts_with("https://") {
config.server.clone()
// Fallback to localhost:9100 if config.server is empty
let server = if config.server.is_empty() {
"localhost:9100".to_string()
} else {
format!("http://{}", config.server)
config.server.clone()
};
let base = if server.starts_with("http://") || server.starts_with("https://") {
server
} else {
format!("http://{}", server)
};
let with_port = if base.contains("://") {
let without_scheme = base.split("://").nth(1).unwrap_or("");

View file

@ -282,7 +282,7 @@ async fn call_designer_llm(
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
use crate::core::config::ConfigManager;
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
// Get LLM configuration from bot config or use defaults
let model = config_manager

View file

@ -101,20 +101,6 @@ fn escape_xml(text: &str) -> String {
.replace('\'', "&apos;")
}
pub fn save_docx_preserving(original_bytes: &[u8]) -> Result<Vec<u8>, String> {
use ooxmlsdk::parts::wordprocessing_document::WordprocessingDocument;
let reader = Cursor::new(original_bytes);
let docx = WordprocessingDocument::new(reader)
.map_err(|e| format!("Failed to parse DOCX: {e}"))?;
let mut output = Cursor::new(Vec::new());
docx.save(&mut output)
.map_err(|e| format!("Failed to save DOCX: {e}"))?;
Ok(output.into_inner())
}
pub fn update_docx_text(
original_bytes: &[u8],
new_paragraphs: &[String],
@ -235,7 +221,11 @@ fn replace_first_text_run(para_xml: &str, new_text: &str) -> String {
found_first = true;
search_pos = abs_content_start + escaped.len() + 6;
} else {
result = format!("{}{}", &result[..abs_content_start], &result[abs_content_end..]);
result = format!(
"{}{}",
&result[..abs_content_start],
&result[abs_content_end..]
);
search_pos = abs_content_start;
}
} else {

View file

@ -1,7 +1,6 @@
use crate::docs::ooxml::{load_docx_preserving, update_docx_text};
use crate::docs::types::{Document, DocumentMetadata};
use crate::core::shared::state::AppState;
use crate::drive::s3_repository::S3Repository;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::io::Cursor;
@ -78,12 +77,14 @@ pub async fn load_docx_from_bytes(
user_identifier: &str,
file_path: &str,
) -> Result<Document, String> {
let file_name = file_path
let raw_name = file_path
.split('/')
.last()
.unwrap_or("Untitled")
.trim_end_matches(".docx")
.trim_end_matches(".doc");
.unwrap_or("Untitled");
let file_name = raw_name
.strip_suffix(".docx")
.or_else(|| raw_name.strip_suffix(".doc"))
.unwrap_or(raw_name);
let doc_id = generate_doc_id();
@ -247,12 +248,12 @@ pub async fn save_document_as_docx(
let docx_path = format!("{base_path}/{doc_id}.docx");
s3_client
.put_object(
&state.bucket_name,
&docx_path,
docx_bytes.clone(),
Some("application/vnd.openxmlformats-officedocument.wordprocessingml.document"),
)
.put_object()
.bucket(&state.bucket_name)
.key(&docx_path)
.body(docx_bytes.clone())
.content_type("application/vnd.openxmlformats-officedocument.wordprocessingml.document")
.send()
.await
.map_err(|e| format!("Failed to save DOCX: {e}"))?;
@ -346,12 +347,12 @@ pub async fn save_document_to_drive(
let meta_path = format!("{base_path}/{doc_id}.meta.json");
s3_client
.put_object(
&state.bucket_name,
&doc_path,
content.as_bytes().to_vec(),
Some("text/html"),
)
.put_object()
.bucket(&state.bucket_name)
.key(&doc_path)
.body(content.as_bytes().to_vec())
.content_type("text/html")
.send()
.await
.map_err(|e| format!("Failed to save document: {e}"))?;
@ -367,12 +368,12 @@ pub async fn save_document_to_drive(
});
s3_client
.put_object(
&state.bucket_name,
&meta_path,
metadata.to_string().into_bytes(),
Some("application/json"),
)
.put_object()
.bucket(&state.bucket_name)
.key(&meta_path)
.body(metadata.to_string().into_bytes())
.content_type("application/json")
.send()
.await
.map_err(|e| format!("Failed to save metadata: {e}"))?;
@ -493,7 +494,7 @@ pub async fn list_documents_from_drive(
if let Ok(meta_result) = s3_client
.get_object()
.bucket(&state.bucket_name)
.key(key)
.key(&key)
.send()
.await
{

View file

@ -9,6 +9,7 @@
/// SEM usar /opt/gbo/data/ como intermediário!
use crate::basic::compiler::BasicCompiler;
use crate::core::config::DriveConfig;
use crate::core::shared::state::AppState;
use crate::core::shared::utils::get_work_path;
use crate::drive::drive_files::drive_files as drive_files_table;
@ -30,6 +31,29 @@ pub struct DriveCompiler {
last_etags: Arc<RwLock<HashMap<String, String>>>,
}
/// Helper function to download file from S3
/// Separated to avoid Send trait issues with tokio::spawn
async fn download_from_s3(file_path: &str) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let config = DriveConfig::default();
let s3_repo = crate::core::shared::utils::create_s3_operator(&config)
.await
.map_err(|e| format!("Failed to create S3 operator: {}", e))?;
// file_path format: {bot}.gbai/{bot}.gbdialog/{tool}.bas
// S3 bucket = first part ({bot}.gbai), key = rest
let parts: Vec<&str> = file_path.split('/').collect();
if parts.len() < 2 {
return Err("Invalid file path for S3 download".into());
}
let bucket_name = parts[0];
let s3_key = parts[1..].join("/");
s3_repo.get_object_direct(bucket_name, &s3_key)
.await
.map_err(|e| format!("S3 get_object_direct failed for {}/{}: {}", bucket_name, s3_key, e).into())
}
impl DriveCompiler {
pub fn new(state: Arc<AppState>) -> Self {
let work_root = PathBuf::from(get_work_path());
@ -109,36 +133,76 @@ impl DriveCompiler {
/// Compilar arquivo .bas → .ast DIRETAMENTE em work/{bot}.gbai/{bot}.gbdialog/
async fn compile_file(&self, bot_id: Uuid, file_path: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
// file_path: {bot}.gbai/{bot}.gbdialog/{tool}.bas
// file_path formats:
// - {bot}.gbai/{bot}.gbdialog/{tool}.bas (full path with bucket prefix)
// - {bot}.gbdialog/{tool}.bas (without bucket prefix)
// - {bot}.gbkb/{doc}.txt (KB files - skip compilation)
let parts: Vec<&str> = file_path.split('/').collect();
if parts.len() < 3 {
if parts.len() < 2 {
return Err("Invalid file path format".into());
}
let bot_name = parts[0].trim_end_matches(".gbai");
let tool_name = parts.last().ok_or("Invalid file path")?.trim_end_matches(".bas");
// Work dir: /opt/gbo/work/{bot}.gbai/{bot}.gbdialog/
// Determine bot name and work directory structure
let (_bot_name, work_dir) = if parts[0].ends_with(".gbai") {
// Full path: {bot}.gbai/{bot}.gbdialog/{tool}.bas
let bot_name = parts[0].strip_suffix(".gbai").unwrap_or(parts[0]);
let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name));
std::fs::create_dir_all(&work_dir)?;
(bot_name, work_dir)
} else if parts.len() >= 2 && parts[0].ends_with(".gbdialog") {
// Short path: {bot}.gbdialog/{tool}.bas
let bot_name = parts[0].strip_suffix(".gbdialog").unwrap_or(parts[0]);
let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name));
(bot_name, work_dir)
} else if parts.len() >= 2 && parts[0].ends_with(".gbkb") {
// KB file: {bot}.gbkb/{doc}.txt - skip compilation
debug!("Skipping KB file: {}", file_path);
return Ok(());
} else {
warn!("Unknown file path format: {}", file_path);
return Err("Invalid file path format".into());
};
// Create work directory
std::fs::create_dir_all(&work_dir)?;
// Determine tool name from last part of path
let tool_name = parts.last().unwrap_or(&"unknown").strip_suffix(".bas").unwrap_or(parts.last().unwrap_or(&"unknown"));
// Caminho do .bas no work
let work_bas_path = work_dir.join(format!("{}.bas", tool_name));
// Baixar do MinIO direto para work dir
// (isso pressupõe que o DriveMonitor já sincronizou, ou buscamos do S3 aqui)
// Por enquanto, assumimos que o arquivo já está em work dir de sincronização anterior
// Se não existir, precisa buscar do S3
// Check if file exists in work dir
if !work_bas_path.exists() {
// Buscar do S3 - isso deveria ser feito pelo DriveMonitor
// Por enquanto, apenas logamos
warn!("File {} not found in work dir, skipping", work_bas_path.display());
return Ok(());
// File doesn't exist in work dir - need to download from S3
// This should be done by DriveMonitor, but we can try to fetch it here
warn!("File {} not found in work dir, attempting to download from S3", work_bas_path.display());
// Download in separate task to avoid Send issues
let download_result = download_from_s3(file_path).await;
match download_result {
Ok(content) => {
if let Err(e) = std::fs::write(&work_bas_path, content) {
warn!("Failed to write {} to work dir: {}", work_bas_path.display(), e);
return Err(format!("Failed to write file: {}", e).into());
}
info!("Downloaded {} to {}", file_path, work_bas_path.display());
}
Err(e) => {
warn!("Failed to download {} from S3: {}", file_path, e);
return Err(format!("File not found in S3: {}", file_path).into());
}
}
}
// Ler conteúdo
let _content = std::fs::read_to_string(&work_bas_path)?;
// Verify file exists now
if !work_bas_path.exists() {
warn!("File {} still not found after download attempt", work_bas_path.display());
return Ok(());
}
// Ler conteúdo
let _content = std::fs::read_to_string(&work_bas_path)?;
// Compilar com BasicCompiler (já está no work dir, então compila in-place)
let mut compiler = BasicCompiler::new(self.state.clone(), bot_id);

View file

@ -1,116 +0,0 @@
#[cfg(any(feature = "research", feature = "llm"))]
use crate::core::kb::KnowledgeBaseManager;
#[cfg(any(feature = "research", feature = "llm"))]
use log::{error, info, trace, warn};
#[cfg(any(feature = "research", feature = "llm"))]
use std::collections::HashSet;
#[cfg(any(feature = "research", feature = "llm"))]
use std::path::PathBuf;
#[cfg(any(feature = "research", feature = "llm"))]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(any(feature = "research", feature = "llm"))]
use std::sync::Arc;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::sync::RwLock as TokioRwLock;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::time::Duration;
#[cfg(any(feature = "research", feature = "llm"))]
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))]
pub fn start_kb_processor(
kb_manager: Arc<KnowledgeBaseManager>,
bot_id: uuid::Uuid,
bot_name: String,
work_root: PathBuf,
pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
file_repo: Arc<DriveFileRepository>,
is_processing: Arc<AtomicBool>,
) {
tokio::spawn(async move {
while is_processing.load(Ordering::SeqCst) {
let kb_key = {
let pending = pending_kb_index.write().await;
pending.iter().next().cloned()
};
let Some(kb_key) = kb_key else {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
};
let parts: Vec<&str> = kb_key.splitn(2, '_').collect();
if parts.len() < 2 {
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
continue;
}
let kb_folder_name = parts[1];
let kb_folder_path =
work_root.join(&bot_name).join(format!("{}.gbkb/", bot_name)).join(kb_folder_name);
{
let indexing = files_being_indexed.read().await;
if indexing.contains(&kb_key) {
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
continue;
}
}
{
let mut indexing = files_being_indexed.write().await;
indexing.insert(kb_key.clone());
}
trace!("Indexing KB: {} for bot: {}", kb_key, bot_name);
let result =
tokio::time::timeout(Duration::from_secs(120), kb_manager.handle_gbkb_change(bot_id, &bot_name, kb_folder_path.as_path()))
.await;
{
let mut indexing = files_being_indexed.write().await;
indexing.remove(&kb_key);
}
{
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
}
match result {
Ok(Ok(_)) => {
info!("Successfully indexed KB: {}", kb_key);
{
let mut indexed = kb_indexed_folders.write().await;
indexed.insert(kb_key.clone());
}
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_indexed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files indexed for {}: {}", kb_key, e);
}
}
Ok(Err(e)) => {
warn!("Failed to index KB {}: {}", kb_key, e);
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files failed for {}: {}", kb_key, e);
}
}
Err(_) => {
error!("KB indexing timed out after 120s for {}", kb_key);
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files failed for {}: {}", kb_key, e);
}
}
}
}
trace!("Stopping for bot {}", bot_name);
});
}

View file

@ -1,6 +1,5 @@
mod types;
mod kb_processor;
mod monitor;
mod utils;
pub use types::{DriveMonitor, normalize_etag, normalize_config_value};
pub use types::{DriveMonitor, normalize_etag};

View file

@ -1,60 +1,346 @@
use crate::core::shared::state::AppState;
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))]
use crate::core::kb::KnowledgeBaseManager;
use crate::core::shared::state::AppState;
#[cfg(not(any(feature = "research", feature = "llm")))]
use std::collections::HashMap;
#[cfg(any(feature = "research", feature = "llm"))]
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::Arc;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::sync::RwLock as TokioRwLock;
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))]
static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
#[cfg(any(feature = "research", feature = "llm"))]
pub fn set_llm_streaming(streaming: bool) {
LLM_STREAMING.store(streaming, Ordering::SeqCst);
}
#[cfg(any(feature = "research", feature = "llm"))]
pub fn is_llm_streaming() -> bool {
LLM_STREAMING.load(Ordering::SeqCst)
}
const MAX_BACKOFF_SECS: u64 = 300;
const INITIAL_BACKOFF_SECS: u64 = 30;
const RETRY_BACKOFF_SECS: i64 = 3600;
const MAX_FAIL_COUNT: i32 = 3;
pub fn normalize_etag(etag: &str) -> String {
etag.trim_matches('"').to_string()
}
pub fn normalize_config_value(value: &str) -> String {
let trimmed = value.trim();
if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") {
String::new()
} else {
trimmed.to_string()
impl DriveMonitor {
pub async fn start_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
log::info!("DriveMonitor monitoring started for bucket: {}", self.bucket_name);
loop {
if let Err(e) = self.scan_bucket().await {
log::error!("Failed to scan bucket {}: {}", self.bucket_name, e);
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
}
async fn scan_bucket(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
log::info!("Scanning bucket {} for files", self.bucket_name);
if let Some(s3) = &self.state.drive {
match s3.list_objects_with_metadata(&self.bucket_name, None).await {
Ok(objects) => {
log::info!("Found {} objects in bucket {}", objects.len(), self.bucket_name);
let bot_name = self.bucket_name.strip_suffix(".gbai").unwrap_or(&self.bucket_name);
let current_keys: Vec<String> = objects.iter().map(|o| o.key.clone()).collect();
for obj in &objects {
let file_type = classify_file(&obj.key);
let full_key = format!("{}.gbai/{}", bot_name, obj.key);
let etag = obj.etag.as_deref().map(normalize_etag);
let existing = self.file_repo.get_file_state(self.bot_id, &full_key);
let needs_reindex = match &existing {
Some(prev) if prev.indexed && prev.etag.as_deref() == etag.as_deref() => false,
Some(prev) if prev.indexed && prev.etag.as_deref() != etag.as_deref() => {
log::info!("ETag changed for {}, will reindex", full_key);
true
}
Some(_) => !existing.as_ref().map_or(false, |f| f.indexed),
None => true,
};
match self.file_repo.upsert_file(
self.bot_id,
&full_key,
file_type,
etag,
None,
) {
Ok(_) => log::info!("Added/updated drive_files for: {} ({})", full_key, file_type),
Err(e) => log::error!("Failed to upsert {}: {}", full_key, e),
}
if needs_reindex && file_type == "kb" {
#[cfg(any(feature = "research", feature = "llm"))]
{
self.index_kb_file(bot_name, &full_key, &obj.key).await;
}
}
if file_type == "config" && needs_reindex {
self.sync_bot_config(bot_name, &obj.key).await;
}
}
self.handle_deleted_files(bot_name, &current_keys);
}
Err(e) => {
log::error!("Failed to list objects in {}: {}", self.bucket_name, e);
}
}
} else {
log::warn!("S3 client not available for bucket scan");
}
Ok(())
}
fn handle_deleted_files(&self, bot_name: &str, current_keys: &[String]) {
let db_files = self.file_repo.get_all_files_for_bot(self.bot_id);
for db_file in &db_files {
let s3_key = match db_file.file_path.strip_prefix(&format!("{}.gbai/", bot_name)) {
Some(k) => k,
None => continue,
};
if !current_keys.iter().any(|k| k == s3_key) {
log::info!("File deleted from S3: {} (was in DB)", db_file.file_path);
if db_file.file_type == "kb" {
#[cfg(any(feature = "research", feature = "llm"))]
{
self.delete_kb_file_vectors(bot_name, &db_file.file_path, s3_key);
}
}
if let Err(e) = self.file_repo.delete_file(self.bot_id, &db_file.file_path) {
log::error!("Failed to delete drive_files entry for {}: {}", db_file.file_path, e);
}
}
}
}
#[cfg(any(feature = "research", feature = "llm"))]
async fn index_kb_file(&self, bot_name: &str, full_key: &str, s3_key: &str) {
let parsed = match parse_kb_path(s3_key) {
Some(p) => p,
None => {
log::debug!("Not a KB file path: {}", s3_key);
return;
}
};
let mut being_indexed = self.files_being_indexed.write().await;
if being_indexed.contains(full_key) {
log::debug!("Already indexing {}, skipping", full_key);
return;
}
being_indexed.insert(full_key.to_string());
drop(being_indexed);
let s3 = match &self.state.drive {
Some(s3) => s3,
None => {
log::error!("S3 client not available for KB indexing of {}", full_key);
self.files_being_indexed.write().await.remove(full_key);
return;
}
};
let data = match s3.get_object_direct(&self.bucket_name, s3_key).await {
Ok(d) => d,
Err(e) => {
log::error!("Failed to download KB file {}/{}: {}", self.bucket_name, s3_key, e);
let _ = self.file_repo.mark_failed(self.bot_id, full_key);
self.files_being_indexed.write().await.remove(full_key);
return;
}
};
let temp_path = std::env::temp_dir().join(format!("gb_kb_{}_{}", uuid::Uuid::new_v4(), parsed.file_name));
if let Err(e) = std::fs::write(&temp_path, &data) {
log::error!("Failed to write temp file {}: {}", temp_path.display(), e);
self.files_being_indexed.write().await.remove(full_key);
return;
}
log::info!("Indexing KB file {}/{} -> temp {}", bot_name, parsed.kb_name, temp_path.display());
match self.kb_manager.index_single_file_with_id(
self.bot_id,
bot_name,
&parsed.kb_name,
&temp_path,
Some(full_key),
).await {
Ok(result) => {
log::info!(
"Indexed {} chunks from {} into collection {}",
result.chunks_indexed,
full_key,
result.collection_name
);
let _ = self.file_repo.mark_indexed(self.bot_id, full_key);
self.upsert_kb_collection(bot_name, &parsed.kb_name, &result.collection_name, result.documents_processed);
}
Err(e) => {
log::error!("KB indexing failed for {}: {}", full_key, e);
let _ = self.file_repo.mark_failed(self.bot_id, full_key);
}
}
let _ = std::fs::remove_file(&temp_path);
self.files_being_indexed.write().await.remove(full_key);
}
async fn sync_bot_config(&self, bot_name: &str, s3_key: &str) {
let s3 = match &self.state.drive {
Some(s3) => s3,
None => {
log::error!("S3 client not available for config sync");
return;
}
};
let data = match s3.get_object_direct(&self.bucket_name, s3_key).await {
Ok(d) => d,
Err(e) => {
log::error!("Failed to download config.csv from {}/{}: {}", self.bucket_name, s3_key, e);
return;
}
};
let content = match String::from_utf8(data) {
Ok(c) => c,
Err(e) => {
log::error!("Failed to parse config.csv as UTF-8: {}", e);
return;
}
};
let config_manager = crate::core::config::ConfigManager::new(self.state.conn.clone());
for line in content.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') || line.to_lowercase().starts_with("key,") {
continue;
}
if let Some((key, value)) = line.split_once(',') {
let key = key.trim();
let value = value.trim();
if key.is_empty() {
continue;
}
if let Err(e) = config_manager.set_config(&self.bot_id, key, value) {
log::error!("Failed to set config {}={} for bot {}: {}", key, value, bot_name, e);
} else {
log::info!("Synced config {}={} for bot {}", key, value, bot_name);
}
}
}
let full_key = format!("{}.gbai/{}", bot_name, s3_key);
let _ = self.file_repo.mark_indexed(self.bot_id, &full_key);
}
#[cfg(any(feature = "research", feature = "llm"))]
fn delete_kb_file_vectors(&self, bot_name: &str, _full_key: &str, s3_key: &str) {
let parsed = match parse_kb_path(s3_key) {
Some(p) => p,
None => return,
};
let kb_manager = self.kb_manager.clone();
let bot_id = self.bot_id;
let bot_name = bot_name.to_string();
let relative_path = parsed.relative_path.clone();
tokio::spawn(async move {
match kb_manager.delete_file_from_kb(bot_id, &bot_name, &parsed.kb_name, &relative_path).await {
Ok(_) => log::info!("Deleted vectors for {} from {}/{}", relative_path, bot_name, parsed.kb_name),
Err(e) => log::error!("Failed to delete vectors for {} from {}/{}: {}", relative_path, bot_name, parsed.kb_name, e),
}
});
}
#[cfg(any(feature = "research", feature = "llm"))]
fn upsert_kb_collection(&self, bot_name: &str, kb_name: &str, collection_name: &str, doc_count: usize) {
use diesel::prelude::*;
use uuid::Uuid;
if let Ok(mut conn) = self.state.conn.get() {
let folder_path = format!("{}.gbai/{}.gbkb/{}", bot_name, bot_name, kb_name);
diesel::sql_query(
"INSERT INTO kb_collections (id, bot_id, name, folder_path, qdrant_collection, document_count)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (bot_id, name) DO UPDATE SET
folder_path = EXCLUDED.folder_path,
qdrant_collection = EXCLUDED.qdrant_collection,
document_count = EXCLUDED.document_count,
updated_at = NOW()"
)
.bind::<diesel::sql_types::Uuid, _>(Uuid::new_v4())
.bind::<diesel::sql_types::Uuid, _>(self.bot_id)
.bind::<diesel::sql_types::Text, _>(kb_name)
.bind::<diesel::sql_types::Text, _>(&folder_path)
.bind::<diesel::sql_types::Text, _>(collection_name)
.bind::<diesel::sql_types::Integer, _>(doc_count as i32)
.execute(&mut conn)
.unwrap_or_else(|e| {
log::error!("Failed to upsert kb_collections for {}/{}: {}", bot_name, kb_name, e);
0
});
}
}
}
impl DriveMonitor {
/// Start monitoring the drive bucket for changes
/// This is a placeholder that will be implemented with the actual monitoring logic
pub async fn start_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
log::info!("DriveMonitor monitoring started for bucket: {}", self.bucket_name);
// The actual monitoring logic is handled by LocalFileMonitor
// This method is kept for backward compatibility
Ok(())
fn classify_file(key: &str) -> &'static str {
if key.ends_with(".bas") {
"bas"
} else if key.contains(".gbkb/") && is_kb_extension(key) {
"kb"
} else if key.contains(".gbot/") && key.ends_with("config.csv") {
"config"
} else {
"other"
}
}
fn is_kb_extension(key: &str) -> bool {
let lower = key.to_lowercase();
lower.ends_with(".txt")
|| lower.ends_with(".md")
|| lower.ends_with(".pdf")
|| lower.ends_with(".xlsx")
|| lower.ends_with(".xls")
|| lower.ends_with(".docx")
|| lower.ends_with(".doc")
|| lower.ends_with(".csv")
|| lower.ends_with(".pptx")
|| lower.ends_with(".ppt")
|| lower.ends_with(".html")
|| lower.ends_with(".htm")
|| lower.ends_with(".rtf")
|| lower.ends_with(".epub")
|| lower.ends_with(".xml")
|| lower.ends_with(".json")
|| lower.ends_with(".odt")
|| lower.ends_with(".ods")
|| lower.ends_with(".odp")
}
struct KbPathParts {
kb_name: String,
file_name: String,
relative_path: String,
}
fn parse_kb_path(s3_key: &str) -> Option<KbPathParts> {
let parts: Vec<&str> = s3_key.splitn(4, '/').collect();
if parts.len() < 3 || !parts[0].ends_with(".gbkb") {
return None;
}
let kb_name = parts[1].to_string();
let file_name = parts[2..].join("/");
let relative_path = format!("{}/{}", kb_name, file_name);
Some(KbPathParts {
kb_name,
file_name,
relative_path,
})
}
#[derive(Debug, Clone)]
pub struct DriveMonitor {
pub state: Arc<AppState>,
@ -67,30 +353,13 @@ pub struct DriveMonitor {
pub scanning: Arc<AtomicBool>,
pub consecutive_failures: Arc<AtomicU32>,
#[cfg(any(feature = "research", feature = "llm"))]
pub files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(any(feature = "research", feature = "llm"))]
pub pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(any(feature = "research", feature = "llm"))]
pub kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
pub files_being_indexed: Arc<tokio::sync::RwLock<std::collections::HashSet<String>>>,
#[cfg(not(any(feature = "research", feature = "llm")))]
pub _pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
pub _pending_kb_index: Arc<tokio::sync::RwLock<std::collections::HashSet<String>>>,
pub file_repo: Arc<DriveFileRepository>,
#[allow(dead_code)]
pub pending_changes: Arc<TokioRwLock<Vec<String>>>,
#[allow(dead_code)]
pub last_etag_snapshot: Arc<TokioRwLock<std::collections::HashMap<String, String>>>,
}
impl DriveMonitor {
fn normalize_config_value(value: &str) -> String {
let trimmed = value.trim();
if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") {
String::new()
} else {
trimmed.to_string()
}
}
pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Self {
let work_root = PathBuf::from(crate::core::shared::utils::get_work_path());
#[cfg(any(feature = "research", feature = "llm"))]
@ -113,16 +382,10 @@ impl DriveMonitor {
scanning: Arc::new(AtomicBool::new(false)),
consecutive_failures: Arc::new(AtomicU32::new(0)),
#[cfg(any(feature = "research", feature = "llm"))]
files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(any(feature = "research", feature = "llm"))]
pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(any(feature = "research", feature = "llm"))]
kb_indexed_folders: Arc::new(TokioRwLock::new(HashSet::new())),
files_being_indexed: Arc::new(tokio::sync::RwLock::new(std::collections::HashSet::new())),
#[cfg(not(any(feature = "research", feature = "llm")))]
_pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
_pending_kb_index: Arc::new(tokio::sync::RwLock::new(std::collections::HashSet::new())),
file_repo,
pending_changes: Arc::new(TokioRwLock::new(Vec::new())),
last_etag_snapshot: Arc::new(TokioRwLock::new(HashMap::new())),
}
}
}

View file

@ -10,6 +10,8 @@ use s3::{Bucket, Region, creds::Credentials};
pub struct S3Repository {
bucket_name: String,
bucket: Arc<Bucket>,
access_key: String,
secret_key: String,
}
impl S3Repository {
@ -30,123 +32,185 @@ impl S3Repository {
Ok(Self {
bucket_name: bucket.to_string(),
bucket: Arc::new((*s3_bucket).clone()),
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
})
}
/// Upload data to S3 - direct call (renamed to avoid conflict with builder)
/// Upload data to S3 - creates bucket reference for target bucket
pub async fn put_object_direct(
&self,
_bucket: &str,
bucket: &str,
key: &str,
data: Vec<u8>,
_content_type: Option<&str>,
) -> Result<()> {
debug!("Uploading to S3: {}/{}", self.bucket_name, key);
self.bucket.put_object(key, &data).await?;
info!("Successfully uploaded to S3: {}/{}", self.bucket_name, key);
debug!("Uploading to S3: {}/{}", bucket, key);
let target_bucket = self.bucket_for(bucket)?;
target_bucket.put_object(key, &data).await?;
info!("Successfully uploaded to S3: {}/{}", bucket, key);
Ok(())
}
/// Download data from S3 - direct call (renamed to avoid conflict with builder)
pub async fn get_object_direct(&self, _bucket: &str, key: &str) -> Result<Vec<u8>> {
debug!("Downloading from S3: {}/{}", self.bucket_name, key);
let response = self.bucket.get_object(key).await?;
/// Download data from S3 - creates bucket reference for target bucket
pub async fn get_object_direct(&self, bucket: &str, key: &str) -> Result<Vec<u8>> {
debug!("Downloading from S3: {}/{}", bucket, key);
let target_bucket = self.bucket_for(bucket)?;
let response = target_bucket.get_object(key).await?;
let data = response.to_vec();
info!("Successfully downloaded from S3: {}/{}", self.bucket_name, key);
info!("Successfully downloaded from S3: {}/{}", bucket, key);
Ok(data)
}
/// Delete an object from S3 - direct call (renamed to avoid conflict with builder)
pub async fn delete_object_direct(&self, _bucket: &str, key: &str) -> Result<()> {
debug!("Deleting from S3: {}/{}", self.bucket_name, key);
self.bucket.delete_object(key).await?;
info!("Successfully deleted from S3: {}/{}", self.bucket_name, key);
/// Delete an object from S3 - creates bucket reference for target bucket
pub async fn delete_object_direct(&self, bucket: &str, key: &str) -> Result<()> {
debug!("Deleting from S3: {}/{}", bucket, key);
let target_bucket = self.bucket_for(bucket)?;
target_bucket.delete_object(key).await?;
info!("Successfully deleted from S3: {}/{}", bucket, key);
Ok(())
}
/// Copy object - implemented as get+put (renamed to avoid conflict with builder)
pub async fn copy_object_direct(&self, _bucket: &str, from_key: &str, to_key: &str) -> Result<()> {
debug!("Copying in S3: {}/{} -> {}/{}", self.bucket_name, from_key, self.bucket_name, to_key);
let response = self.bucket.get_object(from_key).await?;
/// Copy object - creates bucket reference for target bucket
pub async fn copy_object_direct(&self, bucket: &str, from_key: &str, to_key: &str) -> Result<()> {
debug!("Copying in S3: {}/{} -> {}/{}", bucket, from_key, bucket, to_key);
let target_bucket = self.bucket_for(bucket)?;
let response = target_bucket.get_object(from_key).await?;
let data = response.to_vec();
self.bucket.put_object(to_key, &data).await?;
target_bucket.put_object(to_key, &data).await?;
Ok(())
}
/// List buckets
/// Create a Bucket reference for a specific bucket name using stored credentials
fn bucket_for(&self, bucket_name: &str) -> Result<Arc<Bucket>> {
if bucket_name == self.bucket_name {
return Ok(self.bucket.clone());
}
let region = self.bucket.region().clone();
let creds = s3::creds::Credentials::new(
Some(&self.access_key),
Some(&self.secret_key),
None, None, None
).map_err(|e| anyhow::anyhow!("Failed to create credentials: {}", e))?;
let target = Bucket::new(bucket_name, region, creds)?.with_path_style();
Ok(Arc::new((*target).clone()))
}
/// List all buckets in S3/MinIO using rust-s3 crate's list_buckets
pub async fn list_all_buckets(&self) -> Result<Vec<String>> {
debug!("Listing all buckets");
Ok(vec![self.bucket_name.clone()])
debug!("Listing all buckets from S3");
let region = self.bucket.region().clone();
let creds = s3::creds::Credentials::new(
Some(&self.access_key),
Some(&self.secret_key),
None, None, None
).map_err(|e| anyhow::anyhow!("Failed to create credentials: {}", e))?;
let response = Bucket::list_buckets(region, creds)
.await
.map_err(|e| anyhow::anyhow!("ListBuckets failed: {}", e))?;
let buckets: Vec<String> = response.bucket_names().collect();
debug!("Found {} buckets: {:?}", buckets.len(), buckets);
Ok(buckets)
}
/// Check if an object exists
pub async fn object_exists(&self, _bucket: &str, key: &str) -> Result<bool> {
Ok(self.bucket.object_exists(key).await?)
pub async fn object_exists(&self, bucket: &str, key: &str) -> Result<bool> {
let target_bucket = self.bucket_for(bucket)?;
Ok(target_bucket.object_exists(key).await?)
}
/// List objects with prefix
pub async fn list_objects(&self, _bucket: &str, prefix: Option<&str>) -> Result<Vec<String>> {
debug!("Listing objects in S3: {} with prefix {:?}", self.bucket_name, prefix);
/// List objects with prefix, returning only keys
pub async fn list_objects(&self, bucket: &str, prefix: Option<&str>) -> Result<Vec<String>> {
let infos = self.list_objects_with_metadata(bucket, prefix).await?;
Ok(infos.into_iter().map(|i| i.key).collect())
}
/// List objects with prefix, returning key + etag + size for change detection
pub async fn list_objects_with_metadata(&self, bucket: &str, prefix: Option<&str>) -> Result<Vec<S3ObjectInfo>> {
debug!("Listing objects with metadata in S3: {} with prefix {:?}", bucket, prefix);
let region = self.bucket.region().clone();
let creds = s3::creds::Credentials::new(
Some(&self.access_key),
Some(&self.secret_key),
None, None, None
).map_err(|e| anyhow::anyhow!("Failed to create credentials: {}", e))?;
let target_bucket = Bucket::new(bucket, region, creds)?.with_path_style();
let prefix_str = prefix.unwrap_or("");
let results = self.bucket.list(prefix_str.to_string(), Some("/".to_string())).await?;
let keys: Vec<String> = results.iter()
.flat_map(|r| r.contents.iter().map(|c| c.key.clone()))
let results = target_bucket.list(prefix_str.to_string(), None).await?;
let objects: Vec<S3ObjectInfo> = results.iter()
.flat_map(|r| r.contents.iter().map(|c| S3ObjectInfo {
key: c.key.clone(),
etag: c.e_tag.clone(),
size: c.size,
}))
.collect();
Ok(keys)
debug!("Found {} objects with metadata in bucket {}", objects.len(), bucket);
Ok(objects)
}
/// Upload a file
pub async fn upload_file(
&self,
_bucket: &str,
bucket: &str,
key: &str,
file_path: &str,
_content_type: Option<&str>,
) -> Result<()> {
debug!("Uploading file to S3: {} -> {}/{}", file_path, self.bucket_name, key);
debug!("Uploading file to S3: {} -> {}/{}", file_path, bucket, key);
let target_bucket = self.bucket_for(bucket)?;
let data = tokio::fs::read(file_path).await
.context("Failed to read file for upload")?;
self.bucket.put_object(key, &data).await?;
target_bucket.put_object(key, &data).await?;
Ok(())
}
/// Download a file
pub async fn download_file(&self, _bucket: &str, key: &str, file_path: &str) -> Result<()> {
debug!("Downloading file from S3: {}/{} -> {}", self.bucket_name, key, file_path);
let response = self.bucket.get_object(key).await?;
pub async fn download_file(&self, bucket: &str, key: &str, file_path: &str) -> Result<()> {
debug!("Downloading file from S3: {}/{} -> {}", bucket, key, file_path);
let target_bucket = self.bucket_for(bucket)?;
let response = target_bucket.get_object(key).await?;
let data = response.to_vec();
tokio::fs::write(file_path, data).await
.context("Failed to write downloaded file")?;
info!("Successfully downloaded file from S3: {}/{} -> {}", self.bucket_name, key, file_path);
info!("Successfully downloaded file from S3: {}/{} -> {}", bucket, key, file_path);
Ok(())
}
/// Delete multiple objects
pub async fn delete_objects(&self, _bucket: &str, keys: Vec<String>) -> Result<()> {
pub async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<()> {
if keys.is_empty() {
return Ok(());
}
debug!("Deleting {} objects from S3: {}", keys.len(), self.bucket_name);
debug!("Deleting {} objects from S3: {}", keys.len(), bucket);
let target_bucket = self.bucket_for(bucket)?;
let keys_count = keys.len();
for key in keys {
let _ = self.bucket.delete_object(&key).await;
let _ = target_bucket.delete_object(&key).await;
}
info!("Deleted {} objects from S3: {}", keys_count, self.bucket_name);
info!("Deleted {} objects from S3: {}", keys_count, bucket);
Ok(())
}
/// Create bucket if not exists
pub async fn create_bucket_if_not_exists(&self, _bucket: &str) -> Result<()> {
pub async fn create_bucket_if_not_exists(&self, bucket: &str) -> Result<()> {
let _target_bucket = self.bucket_for(bucket)?;
Ok(())
}
/// Get object metadata
pub async fn get_object_metadata(
&self,
_bucket: &str,
bucket: &str,
key: &str,
) -> Result<Option<ObjectMetadata>> {
match self.bucket.head_object(key).await {
let target_bucket = self.bucket_for(bucket)?;
match target_bucket.head_object(key).await {
Ok((response, _)) => Ok(Some(ObjectMetadata {
size: response.content_length.unwrap_or(0) as u64,
content_type: response.content_type,
@ -196,15 +260,12 @@ pub fn copy_object(&self) -> S3CopyBuilder {
/// List buckets
pub fn list_buckets(&self) -> S3ListBucketsBuilder {
S3ListBucketsBuilder {
bucket: self.bucket.clone(),
}
S3ListBucketsBuilder { repo: Some(Arc::new(self.clone())) }
}
/// Head bucket
pub fn head_bucket(&self) -> S3HeadBucketBuilder {
S3HeadBucketBuilder {
bucket: self.bucket.clone(),
bucket_name: None,
}
}
@ -212,7 +273,6 @@ pub fn copy_object(&self) -> S3CopyBuilder {
/// Create bucket
pub fn create_bucket(&self) -> S3CreateBucketBuilder {
S3CreateBucketBuilder {
bucket: self.bucket.clone(),
bucket_name: None,
}
}
@ -227,7 +287,7 @@ pub fn copy_object(&self) -> S3CopyBuilder {
}
}
/// Metadata for an S3 object
/// Metadata for an S3 object (from HEAD request)
#[derive(Debug, Clone)]
pub struct ObjectMetadata {
pub size: u64,
@ -236,6 +296,14 @@ pub struct ObjectMetadata {
pub etag: Option<String>,
}
/// Object info from list operations (key + etag + size)
#[derive(Debug, Clone)]
pub struct S3ObjectInfo {
pub key: String,
pub etag: Option<String>,
pub size: u64,
}
// ============ Builder implementations ============
pub struct S3PutBuilder {
@ -311,17 +379,24 @@ impl S3CopyBuilder {
}
pub struct S3ListBucketsBuilder {
bucket: Arc<Bucket>,
repo: Option<SharedS3Repository>,
}
impl S3ListBucketsBuilder {
pub fn repo(mut self, repo: SharedS3Repository) -> Self { self.repo = Some(repo); self }
pub async fn send(self) -> Result<S3ListBucketsResponse> {
Ok(S3ListBucketsResponse { buckets: vec![] })
if let Some(repo) = self.repo {
let names = repo.list_all_buckets().await?;
Ok(S3ListBucketsResponse {
buckets: names.into_iter().map(|name| S3Bucket { name }).collect(),
})
} else {
Ok(S3ListBucketsResponse { buckets: vec![] })
}
}
}
pub struct S3HeadBucketBuilder {
bucket: Arc<Bucket>,
bucket_name: Option<String>,
}
@ -333,7 +408,6 @@ impl S3HeadBucketBuilder {
}
pub struct S3CreateBucketBuilder {
bucket: Arc<Bucket>,
bucket_name: Option<String>,
}
@ -380,7 +454,7 @@ impl S3Response {
#[derive(Debug, Default)]
pub struct S3ResponseBody {
data: Vec<u8>,
pub data: Vec<u8>,
}
impl S3ResponseBody {

View file

@ -53,7 +53,7 @@ fn format_email_time(date_str: &str) -> String {
}
fn is_tracking_pixel_enabled(state: &Arc<AppState>, bot_id: Option<Uuid>) -> bool {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let bot_id = bot_id.unwrap_or(Uuid::nil());
config_manager
@ -63,7 +63,7 @@ fn is_tracking_pixel_enabled(state: &Arc<AppState>, bot_id: Option<Uuid>) -> boo
}
fn inject_tracking_pixel(html_body: &str, tracking_id: &str, state: &Arc<AppState>) -> String {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let base_url = config_manager
.get_config(&Uuid::nil(), "server-url", Some(""))
.unwrap_or_else(|_| "".to_string());

View file

@ -19,7 +19,7 @@ const TRACKING_PIXEL: [u8; 43] = [
];
pub fn is_tracking_pixel_enabled(state: &Arc<AppState>, bot_id: Option<Uuid>) -> bool {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let bot_id = bot_id.unwrap_or(Uuid::nil());
config_manager
@ -29,7 +29,7 @@ pub fn is_tracking_pixel_enabled(state: &Arc<AppState>, bot_id: Option<Uuid>) ->
}
pub fn inject_tracking_pixel(html_body: &str, tracking_id: &str, state: &Arc<AppState>) -> String {
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let base_url = config_manager
.get_config(&Uuid::nil(), "server-url", Some(""))
.unwrap_or_else(|_| "".to_string());

View file

@ -157,7 +157,7 @@ impl CachedLLMProvider {
}
};
let config_manager = ConfigManager::new(db_pool.clone().into());
let config_manager = ConfigManager::new(db_pool.clone());
let cache_enabled = config_manager
.get_config(&bot_uuid, "llm-cache", Some("true"))
.unwrap_or_else(|_| "true".to_string());
@ -193,7 +193,7 @@ impl CachedLLMProvider {
}
};
let config_manager = ConfigManager::new(db_pool.clone().into());
let config_manager = ConfigManager::new(db_pool.clone());
let ttl = config_manager
.get_config(

View file

@ -33,7 +33,7 @@ async fn process_episodic_memory(
session_manager.get_user_sessions(Uuid::nil())?
};
for session in sessions {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
// Default to 0 (disabled) to respect user's request for false by default
let threshold = config_manager
@ -145,7 +145,7 @@ async fn process_episodic_memory(
let llm_provider = state.llm_provider.clone();
let mut filtered = String::new();
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
// Use session.bot_id instead of Uuid::nil() to avoid using default bot settings
let model = config_manager

View file

@ -36,7 +36,7 @@ pub async fn ensure_llama_servers_running(
Ok(crate::core::bot::get_default_bot(&mut conn))
})
.await??;
let config_manager = ConfigManager::new(app_state.conn.clone().into());
let config_manager = ConfigManager::new(app_state.conn.clone());
info!("Reading config for bot_id: {}", default_bot_id);
let embedding_model_result = config_manager.get_config(&default_bot_id, "embedding-model", None);
info!("embedding-model config result: {:?}", embedding_model_result);
@ -388,7 +388,7 @@ pub fn start_llm_server(
std::env::set_var("OMP_PLACES", "cores");
std::env::set_var("OMP_PROC_BIND", "close");
let conn = app_state.conn.clone();
let config_manager = ConfigManager::new(conn.clone().into());
let config_manager = ConfigManager::new(conn.clone());
let mut conn = conn.get().map_err(|e| {
Box::new(std::io::Error::other(
format!("failed to get db connection: {e}"),

View file

@ -161,7 +161,7 @@ pub async fn enhanced_llm_call(
.await?;
// Get actual LLM configuration from bot's config
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let actual_model = config_manager
.get_config(&uuid::Uuid::nil(), "llm-model", None)
.unwrap_or_else(|_| model.clone());

View file

@ -435,7 +435,7 @@ pub async fn create_app_state(
#[cfg(feature = "directory")]
bootstrap_directory_admin(&zitadel_config).await;
let config_manager = ConfigManager::new(pool.clone().into());
let config_manager = ConfigManager::new(pool.clone());
let mut bot_conn = pool
.get()
@ -927,6 +927,7 @@ pub async fn start_background_services(
}
// Step 1: Discover bots from S3 buckets (*.gbai) and auto-create missing
log::error!("Drive client status: {:?}", state_for_scan.drive.is_some());
if let Some(s3_client) = &state_for_scan.drive {
match s3_client.list_all_buckets().await {
Ok(buckets) => {

View file

@ -158,7 +158,7 @@ struct ContactInfo {
}
async fn get_llm_config(state: &Arc<AppState>, bot_id: Uuid) -> Result<(String, String, String), String> {
let config = ConfigManager::new(state.conn.clone().into());
let config = ConfigManager::new(state.conn.clone());
let llm_url = config
.get_config(&bot_id, "llm-url", Some(""))

View file

@ -95,7 +95,7 @@ pub async fn send_campaign_email(
let open_token = Uuid::new_v4();
let tracking_id = Uuid::new_v4();
let config = ConfigManager::new(state.conn.clone().into());
let config = ConfigManager::new(state.conn.clone());
let base_url = config
.get_config(&bot_id, "server-url", Some(""))
.unwrap_or_else(|_| "".to_string());

View file

@ -244,7 +244,7 @@ impl BotModelsClient {
}
pub fn from_state(state: &AppState, bot_id: &Uuid) -> Self {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let config = BotModelsConfig::from_database(&config_manager, bot_id);
let image_config = ImageGeneratorConfig::from_database(&config_manager, bot_id);
let video_config = VideoGeneratorConfig::from_database(&config_manager, bot_id);
@ -630,7 +630,7 @@ pub async fn ensure_botmodels_running(
})
.await?;
let config_manager = ConfigManager::new(app_state.conn.clone().into());
let config_manager = ConfigManager::new(app_state.conn.clone());
BotModelsConfig::from_database(&config_manager, &default_bot_id)
};

View file

@ -20,7 +20,7 @@ pub async fn call_llm(
&[("user".to_string(), user_content.to_string())],
);
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone().into());
let config_manager = crate::core::config::ConfigManager::new(state.conn.clone());
let model = config_manager
.get_config(&uuid::Uuid::nil(), "llm-model", None)
.unwrap_or_else(|_| "gpt-3.5-turbo".to_string());

View file

@ -16,11 +16,10 @@ pub fn get_current_user_id() -> String {
}
fn extract_id_from_path(path: &str) -> String {
path.split('/')
.last()
.unwrap_or("")
.trim_end_matches(".json")
.trim_end_matches(".xlsx")
let raw = path.split('/').last().unwrap_or("");
raw.strip_suffix(".json")
.or_else(|| raw.strip_suffix(".xlsx"))
.unwrap_or(raw)
.to_string()
}
@ -42,7 +41,7 @@ pub async fn save_sheet_to_drive(
.put_object()
.bucket("gbo")
.key(&path)
.body(content.into_bytes().into())
.body(content.into_bytes())
.content_type("application/json")
.send()
.await
@ -69,7 +68,7 @@ pub async fn save_sheet_as_xlsx(
.put_object()
.bucket("gbo")
.key(&path)
.body(xlsx_bytes.clone().into())
.body(xlsx_bytes.clone())
.content_type("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
.send()
.await
@ -297,13 +296,15 @@ pub fn load_xlsx_from_bytes(
let workbook = umya_spreadsheet::reader::xlsx::read_reader(cursor, true)
.map_err(|e| format!("Failed to parse xlsx: {e}"))?;
let file_name = file_path
let raw_name = file_path
.split('/')
.last()
.unwrap_or("Untitled")
.trim_end_matches(".xlsx")
.trim_end_matches(".xlsm")
.trim_end_matches(".xls");
.unwrap_or("Untitled");
let file_name = raw_name
.strip_suffix(".xlsx")
.or_else(|| raw_name.strip_suffix(".xlsm"))
.or_else(|| raw_name.strip_suffix(".xls"))
.unwrap_or(raw_name);
let mut worksheets = Vec::new();
@ -621,7 +622,7 @@ pub async fn save_workbook_to_drive(
.put_object()
.bucket("gbo")
.key(&path)
.body(buf.into_inner().into())
.body(buf.into_inner())
.content_type("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
.send()
.await
@ -721,22 +722,19 @@ pub async fn list_sheets_from_drive(
let mut sheets = Vec::new();
if let Some(contents) = result.contents {
for obj in contents {
if let Some(key) = obj.key {
if key.ends_with(".json") {
let id = extract_id_from_path(&key);
if let Ok(sheet) = load_sheet_by_id(state, user_id, &id).await {
sheets.push(SpreadsheetMetadata {
id: sheet.id,
name: sheet.name,
owner_id: sheet.owner_id,
created_at: sheet.created_at,
updated_at: sheet.updated_at,
worksheet_count: sheet.worksheets.len(),
});
}
}
for obj in &result.contents {
let key = &obj.key;
if key.ends_with(".json") {
let id = extract_id_from_path(key);
if let Ok(sheet) = load_sheet_by_id(state, user_id, &id).await {
sheets.push(SpreadsheetMetadata {
id: sheet.id,
name: sheet.name,
owner_id: sheet.owner_id,
created_at: sheet.created_at,
updated_at: sheet.updated_at,
worksheet_count: sheet.worksheets.len(),
});
}
}
}
@ -1108,9 +1106,9 @@ pub fn import_spreadsheet_bytes(bytes: &[u8], filename: &str) -> Result<Spreadsh
}
};
let name = filename.rsplit('/').next().unwrap_or(filename)
.trim_end_matches(&format!(".{ext}"))
.to_string();
let raw_filename = filename.rsplit('/').next().unwrap_or(filename);
let suffix = format!(".{ext}");
let name = raw_filename.strip_suffix(&suffix).unwrap_or(raw_filename).to_string();
Ok(Spreadsheet {
id: Uuid::new_v4().to_string(),

View file

@ -97,20 +97,6 @@ fn escape_xml(text: &str) -> String {
.replace('\'', "&apos;")
}
pub fn save_pptx_preserving(original_bytes: &[u8]) -> Result<Vec<u8>, String> {
use ooxmlsdk::parts::presentation_document::PresentationDocument;
let reader = Cursor::new(original_bytes);
let pptx = PresentationDocument::new(reader)
.map_err(|e| format!("Failed to parse PPTX: {e}"))?;
let mut output = Cursor::new(Vec::new());
pptx.save(&mut output)
.map_err(|e| format!("Failed to save PPTX: {e}"))?;
Ok(output.into_inner())
}
pub fn update_pptx_text(
original_bytes: &[u8],
new_slide_texts: &[Vec<String>],
@ -244,7 +230,11 @@ fn replace_first_text_run(para_xml: &str, new_text: &str) -> String {
found_first = true;
search_pos = abs_content_start + escaped.len() + 6;
} else {
result = format!("{}{}", &result[..abs_content_start], &result[abs_content_end..]);
result = format!(
"{}{}",
&result[..abs_content_start],
&result[abs_content_end..]
);
search_pos = abs_content_start;
}
} else {

View file

@ -50,11 +50,10 @@ pub async fn remove_from_cache(pres_id: &str) {
}
fn extract_id_from_path(path: &str) -> String {
path.split('/')
.last()
.unwrap_or_default()
.trim_end_matches(".json")
.trim_end_matches(".pptx")
let raw = path.split('/').last().unwrap_or_default();
raw.strip_suffix(".json")
.or_else(|| raw.strip_suffix(".pptx"))
.unwrap_or(raw)
.to_string()
}
@ -80,7 +79,7 @@ pub async fn save_presentation_to_drive(
.put_object()
.bucket("gbo")
.key(&path)
.body(content.into_bytes().into())
.body(content.into_bytes())
.content_type("application/json")
.send()
.await
@ -122,7 +121,7 @@ pub async fn save_presentation_as_pptx(
.put_object()
.bucket("gbo")
.key(&path)
.body(pptx_bytes.clone().into())
.body(pptx_bytes.clone())
.content_type("application/vnd.openxmlformats-officedocument.presentationml.presentation")
.send()
.await
@ -536,12 +535,14 @@ pub async fn load_pptx_from_bytes(
let mut archive = ZipArchive::new(cursor)
.map_err(|e| format!("Failed to open PPTX archive: {e}"))?;
let file_name = file_path
let raw_name = file_path
.split('/')
.last()
.unwrap_or("Untitled")
.trim_end_matches(".pptx")
.trim_end_matches(".ppt");
.unwrap_or("Untitled");
let file_name = raw_name
.strip_suffix(".pptx")
.or_else(|| raw_name.strip_suffix(".ppt"))
.unwrap_or(raw_name);
let pres_id = generate_presentation_id();
@ -768,22 +769,19 @@ pub async fn list_presentations_from_drive(
let mut presentations = Vec::new();
if let Some(contents) = result.contents {
for obj in contents {
if let Some(key) = obj.key {
if key.ends_with(".json") {
let id = extract_id_from_path(&key);
if let Ok(presentation) = load_presentation_by_id(state, user_id, &id).await {
presentations.push(PresentationMetadata {
id: presentation.id,
name: presentation.name,
owner_id: presentation.owner_id,
slide_count: presentation.slides.len(),
created_at: presentation.created_at,
updated_at: presentation.updated_at,
});
}
}
for obj in &result.contents {
let key = &obj.key;
if key.ends_with(".json") {
let id = extract_id_from_path(key);
if let Ok(presentation) = load_presentation_by_id(state, user_id, &id).await {
presentations.push(PresentationMetadata {
id: presentation.id,
name: presentation.name,
owner_id: presentation.owner_id,
slide_count: presentation.slides.len(),
created_at: presentation.created_at,
updated_at: presentation.updated_at,
});
}
}
}

View file

@ -1653,7 +1653,7 @@ pub async fn attendant_respond(
}
async fn get_verify_token_for_bot(state: &Arc<AppState>, bot_id: &Uuid) -> String {
let config_manager = ConfigManager::new(state.conn.clone().into());
let config_manager = ConfigManager::new(state.conn.clone());
let bot_id_clone = *bot_id;
tokio::task::spawn_blocking(move || {