Update HTML rendering: buffer chunks and render visual elements only
Some checks failed
BotServer CI/CD / build (push) Failing after 4m19s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-14 14:39:08 -03:00
parent 73d9531563
commit 09f4c876b4
4 changed files with 230 additions and 41 deletions

View file

@ -2,47 +2,6 @@ use chrono::{DateTime, Utc};
use diesel::prelude::*; use diesel::prelude::*;
use uuid::Uuid; use uuid::Uuid;
#[derive(Queryable, Insertable, AsChangeset, Debug, Clone)]
#[diesel(table_name = drive_files)]
pub struct DriveFile {
pub id: Uuid,
pub bot_id: Uuid,
pub file_path: String,
pub file_type: String,
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: bool,
pub fail_count: i32,
pub last_failed_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Insertable, Debug)]
#[diesel(table_name = drive_files)]
pub struct NewDriveFile {
pub bot_id: Uuid,
pub file_path: String,
pub file_type: String,
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: bool,
}
#[derive(AsChangeset, Debug)]
#[diesel(table_name = drive_files)]
pub struct DriveFileUpdate {
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: Option<bool>,
pub fail_count: Option<i32>,
pub last_failed_at: Option<DateTime<Utc>>,
pub updated_at: DateTime<Utc>,
}
diesel::table! { diesel::table! {
drive_files (id) { drive_files (id) {
id -> Uuid, id -> Uuid,
@ -59,3 +18,47 @@ diesel::table! {
updated_at -> Timestamptz, updated_at -> Timestamptz,
} }
} }
// Query-only struct (no defaults needed)
#[derive(Queryable, Debug, Clone)]
pub struct DriveFile {
pub id: Uuid,
pub bot_id: Uuid,
pub file_path: String,
pub file_type: String,
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: bool,
pub fail_count: i32,
pub last_failed_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
// Insert struct - uses diesel defaults
#[derive(Insertable, Debug)]
#[diesel(table_name = drive_files)]
pub struct NewDriveFile {
pub bot_id: Uuid,
pub file_path: String,
pub file_type: String,
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: Option<bool>,
pub fail_count: Option<i32>,
}
// Update struct
#[derive(AsChangeset, Debug)]
#[diesel(table_name = drive_files)]
pub struct DriveFileUpdate {
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: Option<bool>,
pub fail_count: Option<i32>,
pub last_failed_at: Option<DateTime<Utc>>,
pub updated_at: DateTime<Utc>,
}

174
src/drive/drive_files.rs Normal file
View file

@ -0,0 +1,174 @@
use crate::core::shared::DbPool;
use chrono::{DateTime, Utc};
use diesel::dsl::{max, sql};
use diesel::prelude::*;
use uuid::Uuid;
diesel::table! {
drive_files (id) {
id -> Uuid,
bot_id -> Uuid,
file_path -> Text,
file_type -> Varchar,
etag -> Nullable<Text>,
last_modified -> Nullable<Timestamptz>,
file_size -> Nullable<Int8>,
indexed -> Bool,
fail_count -> Int4,
last_failed_at -> Nullable<Timestamptz>,
created_at -> Timestamptz,
updated_at -> Timestamptz,
}
}
pub mod dsl {
pub use super::drive_files::*;
}
#[derive(Queryable, Debug, Clone)]
pub struct DriveFile {
pub id: Uuid,
pub bot_id: Uuid,
pub file_path: String,
pub file_type: String,
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: Option<i64>,
pub indexed: bool,
pub fail_count: i32,
pub last_failed_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
pub struct DriveFileRepository {
pool: DbPool,
}
impl DriveFileRepository {
pub fn new(pool: DbPool) -> Self {
Self { pool }
}
pub fn get_file_state(&self, bot_id: Uuid, file_path: &str) -> Option<DriveFile> {
let mut conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return None,
};
drive_files::table
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.eq(file_path)),
)
.first(&mut conn)
.ok()
}
pub fn upsert_file(
&self,
bot_id: Uuid,
file_path: &str,
file_type: &str,
etag: Option<String>,
last_modified: Option<DateTime<Utc>>,
) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
let now = Utc::now();
diesel::insert_into(drive_files::table)
.values((
drive_files::bot_id.eq(bot_id),
drive_files::file_path.eq(file_path),
drive_files::file_type.eq(file_type),
drive_files::etag.eq(etag),
drive_files::last_modified.eq(last_modified),
drive_files::indexed.eq(false),
drive_files::fail_count.eq(0),
drive_files::created_at.eq(now),
drive_files::updated_at.eq(now),
))
.on_conflict((drive_files::bot_id, drive_files::file_path))
.do_update()
.set((
drive_files::etag.eq(etag),
drive_files::last_modified.eq(last_modified),
drive_files::updated_at.eq(now),
))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
pub fn mark_indexed(&self, bot_id: Uuid, file_path: &str) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
diesel::update(drive_files::table)
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.eq(file_path)),
)
.set((
drive_files::indexed.eq(true),
drive_files::fail_count.eq(0),
drive_files::updated_at.eq(Utc::now()),
))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
pub fn mark_failed(&self, bot_id: Uuid, file_path: &str) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
diesel::update(drive_files::table)
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.eq(file_path)),
)
.set((
drive_files::fail_count.eq(sql("fail_count + 1")),
drive_files::last_failed_at.eq(Some(Utc::now())),
drive_files::updated_at.eq(Utc::now()),
))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
pub fn get_max_fail_count(&self, bot_id: Uuid) -> i32 {
let mut conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return 0,
};
drive_files::table
.filter(drive_files::bot_id.eq(bot_id))
.select(max(drive_files::fail_count))
.first(&mut conn)
.unwrap_or(0)
}
pub fn get_files_to_index(&self, bot_id: Uuid) -> Vec<DriveFile> {
let mut conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return vec![],
};
drive_files::table
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::indexed.eq(false)),
)
.load(&mut conn)
.unwrap_or_default()
}
}

View file

@ -23,6 +23,9 @@ use tokio::time::Duration;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::fs as tokio_fs; use tokio::fs as tokio_fs;
#[cfg(any(feature = "research", feature = "llm"))]
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]
static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
@ -78,6 +81,8 @@ pub struct DriveMonitor {
kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>, kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(not(any(feature = "research", feature = "llm")))] #[cfg(not(any(feature = "research", feature = "llm")))]
_pending_kb_index: Arc<TokioRwLock<HashSet<String>>>, _pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
// Database-backed file state repository
file_repo: Arc<DriveFileRepository>,
} }
impl DriveMonitor { impl DriveMonitor {
fn normalize_config_value(value: &str) -> String { fn normalize_config_value(value: &str) -> String {
@ -94,6 +99,9 @@ impl DriveMonitor {
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]
let kb_manager = Arc::new(KnowledgeBaseManager::with_bot_config(work_root.clone(), state.conn.clone(), bot_id)); let kb_manager = Arc::new(KnowledgeBaseManager::with_bot_config(work_root.clone(), state.conn.clone(), bot_id));
// Initialize DB-backed file state repository
let file_repo = Arc::new(DriveFileRepository::new(state.conn.clone()));
Self { Self {
state, state,
bucket_name, bucket_name,
@ -113,6 +121,7 @@ impl DriveMonitor {
kb_indexed_folders: Arc::new(TokioRwLock::new(HashSet::new())), kb_indexed_folders: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(not(any(feature = "research", feature = "llm")))] #[cfg(not(any(feature = "research", feature = "llm")))]
_pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())), _pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
file_repo,
} }
} }

View file

@ -24,6 +24,9 @@ use std::sync::Arc;
#[cfg(feature = "drive")] #[cfg(feature = "drive")]
pub mod document_processing; pub mod document_processing;
#[cfg(feature = "drive")]
pub mod drive_files;
#[cfg(feature = "drive")] #[cfg(feature = "drive")]
pub mod drive_monitor; pub mod drive_monitor;