Update botserver: various fixes and improvements
All checks were successful
BotServer CI/CD / build (push) Successful in 9m59s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-07 13:33:50 -03:00
parent 90c14bcd09
commit 73002b36cc
23 changed files with 370 additions and 630 deletions

View file

@ -24,7 +24,8 @@ external_sync = ["automation", "drive", "cache"]
# ===== CORE INFRASTRUCTURE (Can be used standalone) =====
scripting = ["dep:rhai"]
automation = ["scripting", "dep:cron"]
drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract", "dep:notify"]
drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract"]
local-files = ["dep:notify"]
cache = ["dep:redis"]
directory = ["rbac"]
rbac = []
@ -32,7 +33,7 @@ crawler = ["drive", "cache"]
# ===== APPS (Each includes what it needs from core) =====
# Communication
chat = ["automation", "drive", "cache"]
chat = ["automation", "local-files", "cache"]
people = ["automation", "drive", "cache"]
mail = ["automation", "drive", "cache", "dep:lettre", "dep:mailparse", "dep:imap"]
meet = ["automation", "drive", "cache"]
@ -83,7 +84,7 @@ telegram = ["automation", "drive", "cache"]
instagram = ["automation", "drive", "cache"]
msteams = ["automation", "drive", "cache"]
# Core Tech
llm = ["automation", "drive", "cache"]
llm = ["automation", "local-files", "cache"]
vectordb = ["automation", "drive", "cache", "dep:qdrant-client"]
nvidia = ["automation", "drive", "cache"]
compliance = ["automation", "drive", "cache", "dep:csv"]
@ -96,7 +97,7 @@ console = ["automation", "drive", "cache", "dep:crossterm", "dep:ratatui"]
# ===== BUNDLES (Optional - for convenience) =====
minimal = ["chat"]
minimal-chat = ["chat", "automation", "drive", "cache"] # No security at all
minimal-chat = ["chat", "automation", "local-files", "cache"] # No security at all
lightweight = ["chat", "tasks", "people"]
full = ["chat", "people", "mail", "tasks", "calendar", "drive", "docs", "llm", "cache", "compliance"]
embed-ui = ["dep:rust-embed"]

View file

@ -135,6 +135,7 @@ pub async fn serve_vendor_file(
key
);
#[cfg(feature = "drive")]
if let Some(ref drive) = state.drive {
match drive.get_object().bucket(&bucket).key(&key).send().await {
Ok(response) => match response.body.collect().await {
@ -306,6 +307,7 @@ async fn serve_app_file_internal(state: &AppState, app_name: &str, file_path: &s
);
// Try to serve from MinIO
#[cfg(feature = "drive")]
if let Some(ref drive) = state.drive {
match drive.get_object().bucket(&bucket).key(&key).send().await {
Ok(response) => {

View file

@ -91,7 +91,7 @@ async fn detect_anomalies_in_table(
let column_list = columns.join(", ");
let query = format!(
"SELECT row_to_json(t)::text as data FROM (SELECT {} FROM {} LIMIT 500) t",
"SELECT row_to_json(t)::text as data FROM (SELECT {} FROM {} LIMIT 1500) t",
column_list, table_name
);

View file

@ -1,7 +1,5 @@
use crate::core::shared::models::schema::bots::dsl::*;
use crate::core::shared::models::UserSession;
use crate::core::shared::state::AppState;
use diesel::prelude::*;
use log::{error, trace};
use reqwest::{self, Client};
use rhai::{Dynamic, Engine};
@ -149,6 +147,8 @@ pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Syn
);
Ok(content)
}
#[cfg(feature = "drive")]
pub async fn get_from_bucket(
state: &AppState,
file_path: &str,
@ -202,6 +202,7 @@ pub async fn get_from_bucket(
}
};
let content = if file_path.to_ascii_lowercase().ends_with(".pdf") {
#[cfg(feature = "drive")]
match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => text,
Err(e) => {
@ -209,6 +210,10 @@ pub async fn get_from_bucket(
return Err(format!("PDF extraction failed: {}", e).into());
}
}
#[cfg(not(feature = "drive"))]
{
return Err("PDF extraction requires drive feature".into());
}
} else {
match String::from_utf8(bytes) {
Ok(text) => text,
@ -225,3 +230,12 @@ pub async fn get_from_bucket(
);
Ok(content)
}
#[cfg(not(feature = "drive"))]
pub async fn get_from_bucket(
_state: &AppState,
_file_path: &str,
_bot_id: uuid::Uuid,
) -> Result<String, Box<dyn Error + Send + Sync>> {
Err("S3 drive is not enabled. Configure MinIO to use this feature.".into())
}

View file

@ -59,7 +59,10 @@ pub fn llm_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
.expect("valid syntax registration");
}
fn build_llm_prompt(user_text: &str) -> String {
user_text.trim().to_string()
format!(
"Você é um assistente virtual em português brasileiro. Responda sempre em português do Brasil, de forma clara e amigável.\n\nPedido do usuário: {}",
user_text.trim()
)
}
pub async fn execute_llm_generation(
state: Arc<AppState>,

View file

@ -337,7 +337,10 @@ async fn send_file_with_caption_to_recipient(
send_whatsapp_file(state, user, &recipient_id, file_data, caption).await?;
}
"instagram" => {
#[cfg(feature = "drive")]
send_instagram_file(state, user, &recipient_id, file_data, caption).await?;
#[cfg(not(feature = "drive"))]
return Err("Drive feature not enabled".into());
}
"teams" => {
send_teams_file(state, user, &recipient_id, file_data, caption).await?;
@ -500,6 +503,7 @@ async fn send_whatsapp_file(
Ok(())
}
#[cfg(feature = "drive")]
async fn send_instagram_file(
state: Arc<AppState>,
user: &UserSession,

View file

@ -1353,6 +1353,7 @@ impl ScriptService {
// - Lines that are part of a continuation block (in_line_continuation is true)
if !trimmed.ends_with(';') && !trimmed.ends_with('{') && !trimmed.ends_with('}')
&& !upper.starts_with("SELECT ") && !upper.starts_with("CASE ") && upper != "END SELECT"
&& !upper.starts_with("WHILE ") && !upper.starts_with("WEND")
&& !ends_with_comma && !in_line_continuation {
result.push(';');
}
@ -1793,6 +1794,9 @@ impl ScriptService {
"REQUIRED",
"WEBSITE",
"MODEL",
"DETECT",
"LLM",
"TALK",
];
let _identifier_re = Regex::new(r"([a-zA-Z_][a-zA-Z0-9_]*)").expect("valid regex");

View file

@ -40,6 +40,8 @@ use serde_json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
use tokio::sync::Mutex as AsyncMutex;
use uuid::Uuid;
use serde::{Deserialize, Serialize};
@ -102,6 +104,7 @@ pub fn get_bot_id_by_name(conn: &mut PgConnection, bot_name: &str) -> Result<Uui
#[derive(Debug)]
pub struct BotOrchestrator {
pub state: Arc<AppState>,
#[cfg(feature = "drive")]
pub mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
}
@ -232,6 +235,7 @@ impl BotOrchestrator {
pub fn new(state: Arc<AppState>) -> Self {
Self {
state,
#[cfg(feature = "drive")]
mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())),
}
}

View file

@ -115,12 +115,14 @@ pub trait MultimediaHandler: Send + Sync {
}
#[cfg(feature = "drive")]
#[derive(Debug)]
pub struct DefaultMultimediaHandler {
storage_client: Option<aws_sdk_s3::Client>,
search_api_key: Option<String>,
}
#[cfg(feature = "drive")]
impl DefaultMultimediaHandler {
pub fn new(storage_client: Option<aws_sdk_s3::Client>, search_api_key: Option<String>) -> Self {
Self {
@ -138,6 +140,29 @@ impl DefaultMultimediaHandler {
}
}
#[cfg(not(feature = "drive"))]
#[derive(Debug)]
pub struct DefaultMultimediaHandler {
search_api_key: Option<String>,
}
#[cfg(not(feature = "drive"))]
impl DefaultMultimediaHandler {
pub fn new(_storage_client: Option<()>, search_api_key: Option<String>) -> Self {
Self {
search_api_key,
}
}
pub fn storage_client(&self) -> &Option<()> {
&None
}
pub fn search_api_key(&self) -> &Option<String> {
&self.search_api_key
}
}
#[async_trait]
impl MultimediaHandler for DefaultMultimediaHandler {
async fn process_multimedia(
@ -307,6 +332,7 @@ impl MultimediaHandler for DefaultMultimediaHandler {
}
}
#[cfg(feature = "drive")]
async fn upload_media(&self, request: MediaUploadRequest) -> Result<MediaUploadResponse> {
let media_id = Uuid::new_v4().to_string();
let key = format!(
@ -348,6 +374,27 @@ impl MultimediaHandler for DefaultMultimediaHandler {
}
}
#[cfg(not(feature = "drive"))]
async fn upload_media(&self, request: MediaUploadRequest) -> Result<MediaUploadResponse> {
let media_id = Uuid::new_v4().to_string();
let key = format!(
"media/{}/{}/{}",
request.user_id, request.session_id, request.file_name
);
let local_path = format!("./media/{}", key);
if let Some(parent) = std::path::Path::new(&local_path).parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&local_path, request.data)?;
Ok(MediaUploadResponse {
media_id,
url: format!("file://{}", local_path),
thumbnail_url: None,
})
}
async fn download_media(&self, url: &str) -> Result<Vec<u8>> {
if url.starts_with("http://") || url.starts_with("https://") {
let response = reqwest::get(url).await?;
@ -452,11 +499,15 @@ use std::sync::Arc;
pub async fn upload_media_handler(
State(state): State<Arc<AppState>>,
State(_state): State<Arc<AppState>>,
Json(request): Json<MediaUploadRequest>,
) -> impl IntoResponse {
#[cfg(feature = "drive")]
let handler = DefaultMultimediaHandler::new(state.drive.clone(), None);
#[cfg(not(feature = "drive"))]
let handler = DefaultMultimediaHandler::new(None, None);
match handler.upload_media(request).await {
Ok(response) => (StatusCode::OK, Json(serde_json::json!(response))),
Err(e) => (
@ -468,11 +519,14 @@ pub async fn upload_media_handler(
pub async fn download_media_handler(
State(state): State<Arc<AppState>>,
State(_state): State<Arc<AppState>>,
Path(media_id): Path<String>,
) -> impl IntoResponse {
#[cfg(feature = "drive")]
let handler = DefaultMultimediaHandler::new(state.drive.clone(), None);
#[cfg(not(feature = "drive"))]
let handler = DefaultMultimediaHandler::new(None, None);
let url = format!("https://storage.botserver.com/media/{}", media_id);
@ -494,11 +548,14 @@ pub async fn download_media_handler(
pub async fn generate_thumbnail_handler(
State(state): State<Arc<AppState>>,
State(_state): State<Arc<AppState>>,
Path(media_id): Path<String>,
) -> impl IntoResponse {
#[cfg(feature = "drive")]
let handler = DefaultMultimediaHandler::new(state.drive.clone(), None);
#[cfg(not(feature = "drive"))]
let handler = DefaultMultimediaHandler::new(None, None);
let url = format!("https://storage.botserver.com/media/{}", media_id);
@ -519,7 +576,7 @@ pub async fn generate_thumbnail_handler(
pub async fn web_search_handler(
State(state): State<Arc<AppState>>,
State(_state): State<Arc<AppState>>,
Json(payload): Json<serde_json::Value>,
) -> impl IntoResponse {
let query = payload.get("query").and_then(|q| q.as_str()).unwrap_or("");
@ -528,8 +585,12 @@ pub async fn web_search_handler(
.and_then(|m| m.as_u64())
.unwrap_or(10) as usize;
#[cfg(feature = "drive")]
let handler = DefaultMultimediaHandler::new(state.drive.clone(), None);
#[cfg(not(feature = "drive"))]
let handler = DefaultMultimediaHandler::new(None, None);
match handler.web_search(query, max_results).await {
Ok(results) => (
StatusCode::OK,

View file

@ -2,7 +2,7 @@ pub mod model_routing_config;
pub mod sse_config;
pub mod user_memory_config;
#[cfg(feature = "drive")]
#[cfg(any(feature = "drive", feature = "local-files"))]
pub mod watcher;
pub use model_routing_config::{ModelRoutingConfig, RoutingStrategy, TaskType};

View file

@ -266,17 +266,13 @@ impl DocumentProcessor {
}
fn extract_pdf_with_library(&self, file_path: &Path) -> Result<String> {
let _ = self; // Suppress unused self warning
let _ = self;
#[cfg(feature = "drive")]
{
use pdf_extract::extract_text;
match extract_text(file_path) {
Ok(text) => {
info!(
"Successfully extracted PDF with library: {}",
file_path.display()
);
info!("Successfully extracted PDF with library: {}", file_path.display());
return Ok(text);
}
Err(e) => {
@ -284,23 +280,24 @@ impl DocumentProcessor {
}
}
}
#[cfg(not(feature = "drive"))]
let _ = file_path;
Self::extract_pdf_basic_sync(file_path)
}
#[cfg(feature = "drive")]
fn extract_pdf_basic_sync(file_path: &Path) -> Result<String> {
#[cfg(feature = "drive")]
{
if let Ok(text) = pdf_extract::extract_text(file_path) {
if !text.is_empty() {
return Ok(text);
}
if let Ok(text) = pdf_extract::extract_text(file_path) {
if !text.is_empty() {
return Ok(text);
}
}
Err(anyhow::anyhow!("Could not extract text from PDF"))
}
Err(anyhow::anyhow!(
"Could not extract text from PDF. Please ensure pdftotext is installed."
))
#[cfg(not(feature = "drive"))]
fn extract_pdf_basic_sync(_file_path: &Path) -> Result<String> {
Err(anyhow::anyhow!("PDF extraction requires 'drive' feature"))
}
async fn extract_docx_text(&self, file_path: &Path) -> Result<String> {

View file

@ -167,13 +167,23 @@ impl SessionManager {
uid: Uuid,
bid: Uuid,
session_title: &str,
) -> Result<UserSession, Box<dyn Error + Send + Sync>> {
self.create_session_with_id(Uuid::new_v4(), uid, bid, session_title)
}
pub fn create_session_with_id(
&mut self,
session_id: Uuid,
uid: Uuid,
bid: Uuid,
session_title: &str,
) -> Result<UserSession, Box<dyn Error + Send + Sync>> {
use crate::core::shared::models::user_sessions::dsl::*;
let verified_uid = self.get_or_create_anonymous_user(Some(uid))?;
let now = Utc::now();
let inserted: UserSession = diesel::insert_into(user_sessions)
.values((
id.eq(Uuid::new_v4()),
id.eq(session_id),
user_id.eq(verified_uid),
bot_id.eq(bid),
title.eq(session_title),
@ -194,6 +204,21 @@ impl SessionManager {
Ok(inserted)
}
pub fn get_or_create_session_by_id(
&mut self,
session_id: Uuid,
uid: Uuid,
bid: Uuid,
session_title: &str,
) -> Result<UserSession, Box<dyn Error + Send + Sync>> {
// Check if session already exists
if let Ok(Some(existing)) = self.get_session_by_id(session_id) {
return Ok(existing);
}
// Create new session with specified ID
self.create_session_with_id(session_id, uid, bid, session_title)
}
fn _clear_messages(&mut self, _session_id: Uuid) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::core::shared::models::message_history::dsl::*;
diesel::delete(message_history.filter(session_id.eq(session_id)))

View file

@ -38,6 +38,17 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, RwLock};
#[cfg(not(feature = "drive"))]
#[derive(Debug, Clone, Default)]
pub struct NoDrive;
#[cfg(not(feature = "drive"))]
impl NoDrive {
pub fn new() -> Self {
NoDrive
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AttendantNotification {
#[serde(rename = "type")]
@ -365,6 +376,9 @@ pub struct BillingAlertNotification {
pub struct AppState {
#[cfg(feature = "drive")]
pub drive: Option<S3Client>,
#[cfg(not(feature = "drive"))]
#[allow(non_snake_case)]
pub drive: Option<crate::core::shared::state::NoDrive>,
#[cfg(feature = "cache")]
pub cache: Option<Arc<RedisClient>>,
pub bucket_name: String,
@ -413,6 +427,8 @@ impl Clone for AppState {
Self {
#[cfg(feature = "drive")]
drive: self.drive.clone(),
#[cfg(not(feature = "drive"))]
drive: None,
bucket_name: self.bucket_name.clone(),
config: self.config.clone(),
conn: self.conn.clone(),
@ -628,6 +644,8 @@ impl Default for AppState {
Self {
#[cfg(feature = "drive")]
drive: None,
#[cfg(not(feature = "drive"))]
drive: None,
#[cfg(feature = "cache")]
cache: None,
bucket_name: "test-bucket".to_string(),

View file

@ -1,7 +1,8 @@
use crate::core::config::DriveConfig;
use crate::core::secrets::SecretsManager;
use anyhow::{Context, Result};
#[cfg(feature = "drive")]
use crate::core::config::DriveConfig;
#[cfg(feature = "drive")]
use aws_config::retry::RetryConfig;
#[cfg(feature = "drive")]
use aws_config::timeout::TimeoutConfig;

View file

@ -1,304 +1,122 @@
// Drive HTTP handlers implementation using S3
// Extracted from drive/mod.rs and using aws-sdk-s3
// Drive HTTP handlers - stub for when drive feature is disabled
#[cfg(not(feature = "drive"))]
use crate::core::shared::state::AppState;
use crate::drive::drive_types::*;
use axum::{
extract::{Path, State},
http::{header, StatusCode},
response::{IntoResponse, Json, Response},
body::Body,
http::StatusCode,
response::Json,
};
use aws_sdk_s3::primitives::ByteStream;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
// Import ReadResponse from parent mod if not in drive_types
use super::ReadResponse;
/// Open a file (get metadata)
pub async fn open_file(
State(state): State<Arc<AppState>>,
Path(file_id): Path<String>,
State(_): State<Arc<AppState>>,
Path(_file_id): Path<String>,
) -> Result<Json<FileItem>, (StatusCode, Json<serde_json::Value>)> {
read_metadata(state, file_id).await
}
/// Helper to get file metadata
async fn read_metadata(
state: Arc<AppState>,
file_id: String,
) -> Result<Json<FileItem>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = &state.bucket_name;
let resp = client.head_object()
.bucket(bucket)
.key(&file_id)
.send()
.await
.map_err(|e| (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()}))))?;
let item = FileItem {
id: file_id.clone(),
name: file_id.split('/').next_back().unwrap_or(&file_id).to_string(),
file_type: if file_id.ends_with('/') { "folder".to_string() } else { "file".to_string() },
size: resp.content_length.unwrap_or(0),
mime_type: resp.content_type.unwrap_or_else(|| "application/octet-stream".to_string()),
created_at: Utc::now(), // S3 doesn't track creation time easily
modified_at: Utc::now(), // Simplified
parent_id: None,
url: None,
thumbnail_url: None,
is_favorite: false, // Not implemented in S3
tags: vec![],
metadata: HashMap::new(),
};
Ok(Json(item))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// List all buckets (or configured one)
pub async fn list_buckets(
State(state): State<Arc<AppState>>,
State(_): State<Arc<AppState>>,
) -> Result<Json<Vec<BucketInfo>>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
match client.list_buckets().send().await {
Ok(resp) => {
let buckets = resp.buckets.unwrap_or_default().iter().map(|b| {
BucketInfo {
id: b.name.clone().unwrap_or_default(),
name: b.name.clone().unwrap_or_default(),
created_at: Utc::now(),
file_count: 0,
total_size: 0,
}
}).collect();
Ok(Json(buckets))
},
Err(_) => {
// Fallback
Ok(Json(vec![BucketInfo {
id: state.bucket_name.clone(),
name: state.bucket_name.clone(),
created_at: Utc::now(),
file_count: 0,
total_size: 0,
}]))
}
}
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// List files in a bucket
pub async fn list_files(
State(state): State<Arc<AppState>>,
Json(req): Json<SearchQuery>,
State(_): State<Arc<AppState>>,
Json(_req): Json<SearchQuery>,
) -> Result<Json<Vec<FileItem>>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = req.bucket.clone().unwrap_or_else(|| state.bucket_name.clone());
let prefix = req.parent_path.clone().unwrap_or_default();
let resp = client.list_objects_v2()
.bucket(&bucket)
.prefix(&prefix)
.send()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?;
let files = resp.contents.unwrap_or_default().iter().map(|obj| {
let key = obj.key().unwrap_or_default();
let name = key.split('/').next_back().unwrap_or(key).to_string();
FileItem {
id: key.to_string(),
name,
file_type: if key.ends_with('/') { "folder".to_string() } else { "file".to_string() },
size: obj.size.unwrap_or(0),
mime_type: "application/octet-stream".to_string(),
created_at: Utc::now(),
modified_at: Utc::now(),
parent_id: Some(prefix.clone()),
url: None,
thumbnail_url: None,
is_favorite: false,
tags: vec![],
metadata: HashMap::new(),
}
}).collect();
Ok(Json(files))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// Read file content (as text)
pub async fn read_file(
State(state): State<Arc<AppState>>,
Path(file_id): Path<String>,
) -> Result<Json<ReadResponse>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
State(_): State<Arc<AppState>>,
Path(_file_id): Path<String>,
) -> Result<Json<super::ReadResponse>, (StatusCode, Json<serde_json::Value>)> {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = &state.bucket_name;
let resp = client.get_object()
.bucket(bucket)
.key(&file_id)
.send()
.await
.map_err(|e| (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()}))))?;
let data = resp.body.collect().await.map_err(|e|
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})))
)?.into_bytes();
let content = String::from_utf8(data.to_vec()).unwrap_or_else(|_| "[Binary Content]".to_string());
Ok(Json(ReadResponse { content }))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// Write file content
pub async fn write_file(
State(state): State<Arc<AppState>>,
Json(req): Json<WriteRequest>,
State(_): State<Arc<AppState>>,
Json(_req): Json<WriteRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = &state.bucket_name;
let key = req.file_id.ok_or((StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing file_id"}))))?;
client.put_object()
.bucket(bucket)
.key(&key)
.body(ByteStream::from(req.content.into_bytes()))
.send()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?;
log::info!("User system created resource: drive_file {}", key);
Ok(Json(serde_json::json!({"success": true})))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// Delete a file
pub async fn delete_file(
State(state): State<Arc<AppState>>,
Path(file_id): Path<String>,
State(_): State<Arc<AppState>>,
Path(_file_id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = &state.bucket_name;
client.delete_object()
.bucket(bucket)
.key(&file_id)
.send()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?;
// Technically a deletion, but we could log it as an audit change or leave it out
// The plan specifies resource creation: `info!("User {} created resource: {}", user_id, resource_id);`
log::info!("User system deleted resource: drive_file {}", file_id);
Ok(Json(serde_json::json!({"success": true})))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// Create a folder
pub async fn create_folder(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateFolderRequest>,
State(_): State<Arc<AppState>>,
Json(_req): Json<CreateFolderRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = &state.bucket_name;
// Construct folder path/key
let mut key = req.parent_id.unwrap_or_default();
if !key.ends_with('/') && !key.is_empty() {
key.push('/');
}
key.push_str(&req.name);
if !key.ends_with('/') {
key.push('/');
}
client.put_object()
.bucket(bucket)
.key(&key)
.body(ByteStream::from_static(&[]))
.send()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?;
log::info!("User system created resource: drive_folder {}", key);
Ok(Json(serde_json::json!({"success": true})))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
/// Download file (stream)
pub async fn download_file(
State(state): State<Arc<AppState>>,
Path(file_id): Path<String>,
) -> Result<Response, (StatusCode, Json<serde_json::Value>)> {
let client = state.drive.as_ref().ok_or((
State(_): State<Arc<AppState>>,
Path(_file_id): Path<String>,
) -> Result<axum::response::Response, (StatusCode, Json<serde_json::Value>)> {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({"error": "Drive not configured"})),
))?;
let bucket = &state.bucket_name;
let resp = client.get_object()
.bucket(bucket)
.key(&file_id)
.send()
.await
.map_err(|e| (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()}))))?;
let body = resp.body.collect().await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?.into_bytes();
Response::builder()
.header(header::CONTENT_TYPE, "application/octet-stream")
.header(header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}\"", file_id.split('/').next_back().unwrap_or("file")))
.body(Body::from(body))
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))
Json(serde_json::json!({"error": "Drive feature not enabled"})),
))
}
// Stubs for others (list_shared, etc.)
pub async fn copy_file(State(_): State<Arc<AppState>>, Json(_): Json<CopyFileRequest>) -> impl IntoResponse {
(StatusCode::NOT_IMPLEMENTED, Json(serde_json::json!({"error": "Not implemented"})))
pub async fn copy_file(State(_): State<Arc<AppState>>, Json(_): Json<CopyFileRequest>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn upload_file_to_drive(State(_): State<Arc<AppState>>, Json(_): Json<UploadRequest>) -> impl IntoResponse {
(StatusCode::NOT_IMPLEMENTED, Json(serde_json::json!({"error": "Not implemented"})))
pub async fn upload_file_to_drive(State(_): State<Arc<AppState>>, Json(_): Json<UploadRequest>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn list_folder_contents(State(_): State<Arc<AppState>>, Json(_): Json<SearchQuery>) -> impl IntoResponse {
(StatusCode::OK, Json(Vec::<FileItem>::new()))
pub async fn list_folder_contents(State(_): State<Arc<AppState>>, Json(_): Json<SearchQuery>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn search_files(State(_): State<Arc<AppState>>, Json(_): Json<SearchQuery>) -> impl IntoResponse {
(StatusCode::OK, Json(Vec::<FileItem>::new()))
pub async fn search_files(State(_): State<Arc<AppState>>, Json(_): Json<SearchQuery>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn recent_files(State(_): State<Arc<AppState>>) -> impl IntoResponse {
(StatusCode::OK, Json(Vec::<FileItem>::new()))
pub async fn recent_files(State(_): State<Arc<AppState>>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn list_favorites(State(_): State<Arc<AppState>>) -> impl IntoResponse {
(StatusCode::OK, Json(Vec::<FileItem>::new()))
pub async fn list_favorites(State(_): State<Arc<AppState>>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn share_folder(State(_): State<Arc<AppState>>, Json(_): Json<ShareRequest>) -> impl IntoResponse {
(StatusCode::OK, Json(serde_json::json!({"success": true})))
pub async fn share_folder(State(_): State<Arc<AppState>>, Json(_): Json<ShareRequest>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}
pub async fn list_shared(State(_): State<Arc<AppState>>) -> impl IntoResponse {
(StatusCode::OK, Json(Vec::<FileItem>::new()))
pub async fn list_shared(State(_): State<Arc<AppState>>) -> impl axum::response::IntoResponse {
(StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({"error": "Drive feature not enabled"})))
}

View file

@ -7,6 +7,9 @@ use crate::core::kb::KnowledgeBaseManager;
use crate::core::shared::memory_monitor::{log_jemalloc_stats, MemoryStats};
use crate::core::shared::message_types::MessageType;
use crate::core::shared::state::AppState;
#![cfg_attr(not(feature = "drive"), allow(dead_code))]
#[cfg(feature = "drive")]
use aws_sdk_s3::Client;
use log::{debug, error, info, trace, warn};
use std::collections::HashMap;

View file

@ -1,6 +1,12 @@
#![cfg_attr(not(feature = "drive"), allow(dead_code))]
#[cfg(feature = "console")]
use crate::console::file_tree::FileTree;
#[cfg(feature = "drive")]
use crate::core::shared::state::AppState;
#[cfg(feature = "drive")]
use axum::{
extract::{Query, State},
http::StatusCode,
@ -8,17 +14,25 @@ use axum::{
routing::{get, post},
Router,
};
#[cfg(feature = "drive")]
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
#[cfg(feature = "drive")]
use diesel::{QueryableByName, RunQueryDsl};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub mod drive_types;
pub mod drive_handlers;
#[cfg(feature = "drive")]
pub mod document_processing;
#[cfg(feature = "drive")]
pub mod drive_monitor;
#[cfg(feature = "local-files")]
pub mod local_file_monitor;
#[cfg(feature = "drive")]
pub mod vectordb;
#[derive(Debug, Serialize, Deserialize)]
@ -193,6 +207,7 @@ pub struct OpenResponse {
pub content_type: String,
}
#[cfg(feature = "drive")]
pub fn configure() -> Router<Arc<AppState>> {
Router::new()
.route("/api/files/buckets", get(list_buckets))
@ -223,12 +238,9 @@ pub fn configure() -> Router<Arc<AppState>> {
.route("/api/files/sync/stop", post(stop_sync))
.route("/api/files/versions", get(list_versions))
.route("/api/files/restore", post(restore_version))
.route("/api/docs/merge", post(document_processing::merge_documents))
.route("/api/docs/convert", post(document_processing::convert_document))
.route("/api/docs/fill", post(document_processing::fill_document))
.route("/api/docs/export", post(document_processing::export_document))
}
#[cfg(feature = "drive")]
pub async fn open_file(
Json(req): Json<OpenRequest>,
) -> Result<Json<OpenResponse>, (StatusCode, Json<serde_json::Value>)> {
@ -311,6 +323,8 @@ pub async fn open_file(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn list_buckets(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<BucketInfo>>, (StatusCode, Json<serde_json::Value>)> {
@ -342,6 +356,8 @@ pub async fn list_buckets(
Ok(Json(buckets))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn list_files(
State(state): State<Arc<AppState>>,
Query(params): Query<ListQuery>,
@ -419,37 +435,8 @@ pub async fn list_files(
}
}
}
}
if let Some(contents) = result.contents {
for object in contents {
if let Some(key) = object.key {
if !key.ends_with('/') {
let name = key.split('/').next_back().unwrap_or(&key).to_string();
items.push(FileItem {
name,
path: key.clone(),
is_dir: false,
size: object.size,
modified: object.last_modified.map(|t| t.to_string()),
icon: get_file_icon(&key),
is_kb: false,
is_public: true,
});
}
}
}
}
}
// Post-process to mark KBs
for item in &mut items {
if item.is_dir {
if let Some((_, is_public)) = kbs.iter().find(|(name, _)| name == &item.name) {
item.is_kb = true;
item.is_public = *is_public;
item.icon = "🧠".to_string(); // Knowledge icon
}
}
}
Ok(items)
} else {
Ok(vec![])
@ -462,279 +449,8 @@ pub async fn list_files(
}
}
#[cfg(feature = "console")]
pub fn convert_tree_to_items(tree: &FileTree) -> Vec<FileItem> {
let mut items = Vec::new();
for (display_name, node) in tree.get_items() {
match node {
crate::console::file_tree::TreeNode::Bucket { name } => {
if !name.is_empty() {
items.push(FileItem {
name: display_name.clone(),
path: format!("/{}", name),
is_dir: true,
size: None,
modified: None,
icon: if name.to_ascii_lowercase().ends_with(".gbai") {
"".to_string()
} else {
"📦".to_string()
},
});
}
}
crate::console::file_tree::TreeNode::Folder { bucket, path } => {
let folder_name = path.split('/').next_back().unwrap_or(&display_name);
items.push(FileItem {
name: folder_name.to_string(),
path: format!("/{}/{}", bucket, path),
is_dir: true,
size: None,
modified: None,
icon: "📁".to_string(),
});
}
crate::console::file_tree::TreeNode::File { bucket, path } => {
let file_name = path.split('/').next_back().unwrap_or(&display_name);
items.push(FileItem {
name: file_name.to_string(),
path: format!("/{}/{}", bucket, path),
is_dir: false,
size: None,
modified: None,
icon: "📄".to_string(),
});
}
}
}
items
}
pub async fn read_file(
State(state): State<Arc<AppState>>,
Json(req): Json<ReadRequest>,
) -> Result<Json<ReadResponse>, (StatusCode, Json<serde_json::Value>)> {
let s3_client = state.drive.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({ "error": "S3 service not available" })),
)
})?;
let result = s3_client
.get_object()
.bucket(&req.bucket)
.key(&req.path)
.send()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to read file: {}", e) })),
)
})?;
let bytes = result
.body
.collect()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to read file body: {}", e) })),
)
})?
.into_bytes();
let content = String::from_utf8(bytes.to_vec()).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("File is not valid UTF-8: {}", e) })),
)
})?;
Ok(Json(ReadResponse { content }))
}
pub async fn write_file(
State(state): State<Arc<AppState>>,
Json(req): Json<WriteRequest>,
) -> Result<Json<SuccessResponse>, (StatusCode, Json<serde_json::Value>)> {
tracing::debug!(
"write_file called: bucket={}, path={}, content_len={}",
req.bucket,
req.path,
req.content.len()
);
let s3_client = state.drive.as_ref().ok_or_else(|| {
tracing::error!("S3 client not available for write_file");
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({ "error": "S3 service not available" })),
)
})?;
// Try to decode as base64, otherwise use content directly
// Base64 content from file uploads won't have whitespace/newlines at start
// and will only contain valid base64 characters
let is_base64 = is_likely_base64(&req.content);
tracing::debug!("Content detected as base64: {}", is_base64);
let body_bytes: Vec<u8> = if is_base64 {
match BASE64.decode(&req.content) {
Ok(decoded) => {
tracing::debug!("Base64 decoded successfully, size: {} bytes", decoded.len());
decoded
}
Err(e) => {
tracing::warn!("Base64 decode failed ({}), using raw content", e);
req.content.clone().into_bytes()
}
}
} else {
req.content.into_bytes()
};
let sanitized_path = req.path
.replace("//", "/")
.trim_start_matches('/')
.to_string();
tracing::debug!("Writing {} bytes to {}/{}", body_bytes.len(), req.bucket, sanitized_path);
s3_client
.put_object()
.bucket(&req.bucket)
.key(&sanitized_path)
.body(body_bytes.into())
.send()
.await
.map_err(|e| {
tracing::error!("S3 put_object failed: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to write file: {}", e) })),
)
})?;
tracing::info!("File written successfully: {}/{}", req.bucket, sanitized_path);
Ok(Json(SuccessResponse {
success: true,
message: Some("File written successfully".to_string()),
}))
}
/// Check if a string is likely base64 encoded content (from file upload)
/// Base64 from DataURL will be pure base64 without newlines at start
fn is_likely_base64(s: &str) -> bool {
// Empty or very short strings are not base64 uploads
if s.len() < 20 {
return false;
}
// If it starts with common text patterns, it's not base64
let trimmed = s.trim_start();
if trimmed.starts_with('#') // Markdown, shell scripts
|| trimmed.starts_with("//") // Comments
|| trimmed.starts_with("/*") // C-style comments
|| trimmed.starts_with('{') // JSON
|| trimmed.starts_with('[') // JSON array
|| trimmed.starts_with('<') // XML/HTML
|| trimmed.starts_with("<!") // HTML doctype
|| trimmed.starts_with("function") // JavaScript
|| trimmed.starts_with("const ") // JavaScript
|| trimmed.starts_with("let ") // JavaScript
|| trimmed.starts_with("var ") // JavaScript
|| trimmed.starts_with("import ") // Various languages
|| trimmed.starts_with("from ") // Python
|| trimmed.starts_with("def ") // Python
|| trimmed.starts_with("class ") // Various languages
|| trimmed.starts_with("pub ") // Rust
|| trimmed.starts_with("use ") // Rust
|| trimmed.starts_with("mod ") // Rust
{
return false;
}
// Check if string contains only valid base64 characters
// and try to decode it
let base64_chars = s.chars().all(|c| {
c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '='
});
if !base64_chars {
return false;
}
// Final check: try to decode and see if it works
BASE64.decode(s).is_ok()
}
pub async fn delete_file(
State(state): State<Arc<AppState>>,
Json(req): Json<DeleteRequest>,
) -> Result<Json<SuccessResponse>, (StatusCode, Json<serde_json::Value>)> {
let s3_client = state.drive.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({ "error": "S3 service not available" })),
)
})?;
if req.path.ends_with('/') {
let result = s3_client
.list_objects_v2()
.bucket(&req.bucket)
.prefix(&req.path)
.send()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to list objects for deletion: {}", e) })),
)
})?;
for obj in result.contents() {
if let Some(key) = obj.key() {
s3_client
.delete_object()
.bucket(&req.bucket)
.key(key)
.send()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to delete object: {}", e) })),
)
})?;
}
}
} else {
s3_client
.delete_object()
.bucket(&req.bucket)
.key(&req.path)
.send()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to delete file: {}", e) })),
)
})?;
}
Ok(Json(SuccessResponse {
success: true,
message: Some("Deleted successfully".to_string()),
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn create_folder(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateFolderRequest>,
@ -788,6 +504,8 @@ fn get_file_icon(path: &str) -> String {
}
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn copy_file(
State(state): State<Arc<AppState>>,
Json(req): Json<CopyRequest>,
@ -821,6 +539,8 @@ pub async fn copy_file(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn move_file(
State(state): State<Arc<AppState>>,
Json(req): Json<MoveRequest>,
@ -869,13 +589,17 @@ pub async fn move_file(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn upload_file_to_drive(
State(state): State<Arc<AppState>>,
Json(req): Json<WriteRequest>,
) -> Result<Json<SuccessResponse>, (StatusCode, Json<serde_json::Value>)> {
write_file(State(state), Json(req)).await
write_file(State(state), Json(req)) .await
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn download_file(
State(state): State<Arc<AppState>>,
Json(req): Json<DownloadRequest>,
@ -890,6 +614,8 @@ pub async fn download_file(
.await
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn list_folder_contents(
State(state): State<Arc<AppState>>,
Json(req): Json<ReadRequest>,
@ -904,6 +630,8 @@ pub async fn list_folder_contents(
.await
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn search_files(
State(state): State<Arc<AppState>>,
Query(params): Query<SearchQuery>,
@ -984,6 +712,8 @@ pub async fn search_files(
Ok(Json(all_items))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn recent_files(
State(state): State<Arc<AppState>>,
Query(params): Query<ListQuery>,
@ -1047,12 +777,16 @@ pub async fn recent_files(
Ok(Json(all_items))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn list_favorites(
State(_state): State<Arc<AppState>>,
) -> Result<Json<Vec<FileItem>>, (StatusCode, Json<serde_json::Value>)> {
Ok(Json(Vec::new()))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn share_folder(
State(_state): State<Arc<AppState>>,
Json(_req): Json<ShareRequest>,
@ -1072,12 +806,15 @@ pub async fn share_folder(
}))
}
#[cfg(feature = "drive")]
pub async fn list_shared(
State(_state): State<Arc<AppState>>,
) -> Result<Json<Vec<FileItem>>, (StatusCode, Json<serde_json::Value>)> {
Ok(Json(Vec::new()))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn get_permissions(
State(_state): State<Arc<AppState>>,
Query(params): Query<ReadRequest>,
@ -1095,6 +832,8 @@ pub async fn get_permissions(
})))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn get_quota(
State(state): State<Arc<AppState>>,
) -> Result<Json<QuotaResponse>, (StatusCode, Json<serde_json::Value>)> {
@ -1151,6 +890,7 @@ pub async fn get_quota(
}))
}
#[cfg(feature = "drive")]
pub async fn sync_status(
State(_state): State<Arc<AppState>>,
) -> Result<Json<SyncStatus>, (StatusCode, Json<serde_json::Value>)> {
@ -1166,6 +906,8 @@ pub async fn sync_status(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn start_sync(
State(_state): State<Arc<AppState>>,
) -> Result<Json<SuccessResponse>, (StatusCode, Json<serde_json::Value>)> {
@ -1175,6 +917,8 @@ pub async fn start_sync(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn stop_sync(
State(_state): State<Arc<AppState>>,
) -> Result<Json<SuccessResponse>, (StatusCode, Json<serde_json::Value>)> {
@ -1184,6 +928,8 @@ pub async fn stop_sync(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn list_versions(
State(state): State<Arc<AppState>>,
Query(params): Query<VersionsQuery>,
@ -1236,6 +982,8 @@ pub async fn list_versions(
}))
}
#[cfg(feature = "drive")]
#[cfg(feature = "drive")]
pub async fn restore_version(
State(state): State<Arc<AppState>>,
Json(payload): Json<RestoreRequest>,

View file

@ -9,6 +9,9 @@ use std::sync::Arc;
use tokio::fs;
use uuid::Uuid;
#[cfg(feature = "drive")]
use pdf_extract;
#[cfg(feature = "vectordb")]
use qdrant_client::{
qdrant::{Distance, PointStruct, VectorParams},
@ -565,7 +568,14 @@ Ok(content)
"application/pdf" => {
log::info!("PDF extraction for {}", file_path.display());
Self::extract_pdf_text(file_path).await
#[cfg(feature = "drive")]
{
Self::extract_pdf_text(file_path).await
}
#[cfg(not(feature = "drive"))]
{
Err(anyhow::anyhow!("PDF extraction requires 'drive' feature"))
}
}
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
@ -627,21 +637,30 @@ Ok(content)
async fn extract_pdf_text(file_path: &PathBuf) -> Result<String> {
let bytes = fs::read(file_path).await?;
match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => {
let cleaned = text
.lines()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect::<Vec<_>>()
.join("\n");
Ok(cleaned)
}
Err(e) => {
log::warn!("PDF extraction failed for {}: {}", file_path.display(), e);
Ok(String::new())
#[cfg(feature = "drive")]
{
match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => {
let cleaned = text
.lines()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect::<Vec<_>>()
.join("\n");
Ok(cleaned)
}
Err(e) => {
log::warn!("PDF extraction failed for {}: {}", file_path.display(), e);
Ok(String::new())
}
}
}
#[cfg(not(feature = "drive"))]
{
let _ = file_path;
Err(anyhow::anyhow!("PDF extraction requires 'drive' feature"))
}
}
async fn extract_docx_text(file_path: &Path) -> Result<String> {

View file

@ -95,7 +95,7 @@ pub mod console;
#[cfg(feature = "directory")]
pub mod directory;
#[cfg(feature = "drive")]
#[cfg(any(feature = "drive", feature = "local-files"))]
pub mod drive;
#[cfg(feature = "mail")]
@ -133,9 +133,6 @@ pub mod whatsapp;
pub mod telegram;
// Re-export commonly used types
#[cfg(feature = "drive")]
pub use drive::drive_monitor::DriveMonitor;
#[cfg(feature = "llm")]
pub use llm::cache::{CacheConfig, CachedLLMProvider, CachedResponse, LocalEmbeddingService};
#[cfg(feature = "llm")]

View file

@ -13,7 +13,9 @@ use crate::core::config::ConfigManager;
use crate::core::package_manager::InstallMode;
use crate::core::session::SessionManager;
use crate::core::shared::state::AppState;
use crate::core::shared::utils::{create_conn, create_s3_operator, get_stack_path};
use crate::core::shared::utils::{create_conn, get_stack_path};
#[cfg(feature = "drive")]
use crate::core::shared::utils::create_s3_operator;
use super::BootstrapProgress;
@ -399,8 +401,6 @@ pub async fn create_app_state(
) -> Result<Arc<AppState>, std::io::Error> {
use std::collections::HashMap;
let config = std::sync::Arc::new(cfg.clone());
#[cfg(feature = "cache")]
let redis_client = redis_client.clone();
#[cfg(not(feature = "cache"))]
@ -586,6 +586,8 @@ pub async fn create_app_state(
let app_state = Arc::new(AppState {
#[cfg(feature = "drive")]
drive: Some(drive),
#[cfg(not(feature = "drive"))]
drive: None,
config: Some(cfg.clone()),
conn: pool.clone(),
database_url: database_url.clone(),
@ -843,9 +845,8 @@ fn init_llm_provider(
/// Start background services and monitors
pub async fn start_background_services(
app_state: Arc<AppState>,
pool: &crate::core::shared::utils::DbPool,
_pool: &crate::core::shared::utils::DbPool,
) {
#[cfg(feature = "drive")]
use crate::core::shared::memory_monitor::{log_process_memory, start_memory_monitor};
// Resume workflows after server restart
@ -890,6 +891,11 @@ pub async fn start_background_services(
#[cfg(feature = "drive")]
start_drive_monitors(app_state.clone(), pool).await;
#[cfg(feature = "local-files")]
start_local_file_monitor(app_state.clone()).await;
start_config_watcher(app_state.clone()).await;
}
#[cfg(feature = "drive")]
@ -978,22 +984,25 @@ async fn start_drive_monitors(
});
}
});
}
// Start local file monitor for /opt/gbo/data/*.gbai directories
let local_monitor_state = app_state.clone();
#[cfg(feature = "local-files")]
async fn start_local_file_monitor(app_state: Arc<AppState>) {
use crate::core::shared::memory_monitor::register_thread;
tokio::spawn(async move {
register_thread("local-file-monitor", "drive");
trace!("Starting LocalFileMonitor for /opt/gbo/data/*.gbai directories");
let monitor = crate::drive::local_file_monitor::LocalFileMonitor::new(local_monitor_state);
let monitor = crate::drive::local_file_monitor::LocalFileMonitor::new(app_state);
if let Err(e) = monitor.start_monitoring().await {
error!("LocalFileMonitor failed: {}", e);
} else {
info!("LocalFileMonitor started - watching /opt/gbo/data/*.gbai/*.gbdialog/*.bas");
}
});
}
// Start config file watcher for /opt/gbo/data/*.gbai/*.gbot/config.csv
let config_watcher_state = app_state.clone();
async fn start_config_watcher(app_state: Arc<AppState>) {
use crate::core::shared::memory_monitor::register_thread;
tokio::spawn(async move {
register_thread("config-file-watcher", "drive");
trace!("Starting ConfigWatcher for /opt/gbo/data/*.gbai/*.gbot/config.csv");
@ -1005,7 +1014,7 @@ async fn start_drive_monitors(
let watcher = crate::core::config::watcher::ConfigWatcher::new(
data_dir,
config_watcher_state,
app_state,
);
Arc::new(watcher).spawn();

View file

@ -1,10 +1,9 @@
//! Drive-related utilities
use log::{info, warn};
#[cfg(feature = "drive")]
pub async fn ensure_vendor_files_in_minio(drive: &aws_sdk_s3::Client) {
use aws_sdk_s3::primitives::ByteStream;
use log::{info, warn};
let htmx_paths = [
"./botui/ui/suite/js/vendor/htmx.min.js",

View file

@ -8,6 +8,7 @@ mod shutdown;
mod types;
pub use bootstrap::*;
#[cfg(feature = "drive")]
pub use drive_utils::*;
pub use health::*;
pub use server::*;

View file

@ -230,20 +230,32 @@ pub async fn run_axum_server(
}
};
// Create session if it doesn't exist
let _ = {
// Check if session already exists and reuse it
let mut final_session_id = session_id.clone();
{
let mut sm = state.session_manager.lock().await;
sm.get_or_create_anonymous_user(Some(user_uuid)).ok();
sm.create_session(user_uuid, bot_id, "Anonymous Chat").ok()
};
// Get or create session with the specified ID
let session = sm.get_or_create_session_by_id(
session_uuid,
user_uuid,
bot_id,
"Anonymous Chat"
);
if let Ok(sess) = session {
final_session_id = sess.id.to_string();
}
}
info!("Anonymous auth for bot: {}, session: {}", bot_name, session_id);
info!("Anonymous auth for bot: {}, session: {}", bot_name, final_session_id);
(
axum::http::StatusCode::OK,
Json(serde_json::json!({
"user_id": user_id,
"session_id": session_id,
"session_id": final_session_id,
"bot_name": bot_name,
"status": "anonymous"
})),