refactor: Split drive_monitor into modules for better organization
Some checks failed
BotServer CI/CD / build (push) Failing after 2m5s

- Extract KbProcessor for knowledge base processing
- Extract Monitor for file monitoring logic
- Extract Types for shared types and structs
- Extract Utils for helper functions
- Improves code organization and maintainability
- Reduces mod.rs complexity
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-18 13:08:39 -03:00
parent 2dc5cf0761
commit 97a4583d81
5 changed files with 256 additions and 1671 deletions

View file

@ -0,0 +1,116 @@
#[cfg(any(feature = "research", feature = "llm"))]
use crate::core::kb::KnowledgeBaseManager;
#[cfg(any(feature = "research", feature = "llm"))]
use log::{error, info, trace, warn};
#[cfg(any(feature = "research", feature = "llm"))]
use std::collections::HashSet;
#[cfg(any(feature = "research", feature = "llm"))]
use std::path::PathBuf;
#[cfg(any(feature = "research", feature = "llm"))]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(any(feature = "research", feature = "llm"))]
use std::sync::Arc;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::sync::RwLock as TokioRwLock;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::time::Duration;
#[cfg(any(feature = "research", feature = "llm"))]
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))]
pub fn start_kb_processor(
kb_manager: Arc<KnowledgeBaseManager>,
bot_id: uuid::Uuid,
bot_name: String,
work_root: PathBuf,
pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
file_repo: Arc<DriveFileRepository>,
is_processing: Arc<AtomicBool>,
) {
tokio::spawn(async move {
while is_processing.load(Ordering::SeqCst) {
let kb_key = {
let pending = pending_kb_index.write().await;
pending.iter().next().cloned()
};
let Some(kb_key) = kb_key else {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
};
let parts: Vec<&str> = kb_key.splitn(2, '_').collect();
if parts.len() < 2 {
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
continue;
}
let kb_folder_name = parts[1];
let kb_folder_path =
work_root.join(&bot_name).join(format!("{}.gbkb/", bot_name)).join(kb_folder_name);
{
let indexing = files_being_indexed.read().await;
if indexing.contains(&kb_key) {
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
continue;
}
}
{
let mut indexing = files_being_indexed.write().await;
indexing.insert(kb_key.clone());
}
trace!("Indexing KB: {} for bot: {}", kb_key, bot_name);
let result =
tokio::time::timeout(Duration::from_secs(120), kb_manager.handle_gbkb_change(bot_id, &bot_name, kb_folder_path.as_path()))
.await;
{
let mut indexing = files_being_indexed.write().await;
indexing.remove(&kb_key);
}
{
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
}
match result {
Ok(Ok(_)) => {
info!("Successfully indexed KB: {}", kb_key);
{
let mut indexed = kb_indexed_folders.write().await;
indexed.insert(kb_key.clone());
}
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_indexed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files indexed for {}: {}", kb_key, e);
}
}
Ok(Err(e)) => {
warn!("Failed to index KB {}: {}", kb_key, e);
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files failed for {}: {}", kb_key, e);
}
}
Err(_) => {
error!("KB indexing timed out after 120s for {}", kb_key);
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files failed for {}: {}", kb_key, e);
}
}
}
}
trace!("Stopping for bot {}", bot_name);
});
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,15 @@
use std::sync::atomic::Ordering;
use std::time::Duration;
use super::types::DriveMonitor;
impl DriveMonitor {
pub fn calculate_backoff(&self) -> Duration {
let failures = self.consecutive_failures.load(Ordering::Relaxed);
if failures == 0 {
return Duration::from_secs(30);
}
let backoff_secs = 30u64 * (1u64 << failures.min(4));
Duration::from_secs(backoff_secs.min(300))
}
}

View file

@ -0,0 +1,108 @@
#[cfg(any(feature = "research", feature = "llm"))]
use crate::core::kb::KnowledgeBaseManager;
use crate::core::shared::state::AppState;
#[cfg(not(any(feature = "research", feature = "llm")))]
use std::collections::HashMap;
#[cfg(any(feature = "research", feature = "llm"))]
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::sync::RwLock as TokioRwLock;
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))]
static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
#[cfg(any(feature = "research", feature = "llm"))]
pub fn set_llm_streaming(streaming: bool) {
LLM_STREAMING.store(streaming, Ordering::SeqCst);
}
#[cfg(any(feature = "research", feature = "llm"))]
pub fn is_llm_streaming() -> bool {
LLM_STREAMING.load(Ordering::SeqCst)
}
const MAX_BACKOFF_SECS: u64 = 300;
const INITIAL_BACKOFF_SECS: u64 = 30;
const RETRY_BACKOFF_SECS: i64 = 3600;
const MAX_FAIL_COUNT: i32 = 3;
pub fn normalize_etag(etag: &str) -> String {
etag.trim_matches('"').to_string()
}
#[derive(Debug, Clone)]
pub struct DriveMonitor {
pub state: Arc<AppState>,
pub bucket_name: String,
pub bot_id: uuid::Uuid,
#[cfg(any(feature = "research", feature = "llm"))]
pub kb_manager: Arc<KnowledgeBaseManager>,
pub work_root: PathBuf,
pub is_processing: Arc<AtomicBool>,
pub scanning: Arc<AtomicBool>,
pub consecutive_failures: Arc<AtomicU32>,
#[cfg(any(feature = "research", feature = "llm"))]
pub files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(any(feature = "research", feature = "llm"))]
pub pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(any(feature = "research", feature = "llm"))]
pub kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(not(any(feature = "research", feature = "llm")))]
pub _pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
pub file_repo: Arc<DriveFileRepository>,
#[allow(dead_code)]
pub pending_changes: Arc<TokioRwLock<Vec<String>>>,
#[allow(dead_code)]
pub last_etag_snapshot: Arc<TokioRwLock<std::collections::HashMap<String, String>>>,
}
impl DriveMonitor {
fn normalize_config_value(value: &str) -> String {
let trimmed = value.trim();
if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") {
String::new()
} else {
trimmed.to_string()
}
}
pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Self {
let work_root = PathBuf::from(crate::core::shared::utils::get_work_path());
#[cfg(any(feature = "research", feature = "llm"))]
let kb_manager = Arc::new(KnowledgeBaseManager::with_bot_config(
work_root.clone(),
state.conn.clone(),
bot_id,
));
let file_repo = Arc::new(DriveFileRepository::new(state.conn.clone()));
Self {
state,
bucket_name,
bot_id,
#[cfg(any(feature = "research", feature = "llm"))]
kb_manager,
work_root,
is_processing: Arc::new(AtomicBool::new(false)),
scanning: Arc::new(AtomicBool::new(false)),
consecutive_failures: Arc::new(AtomicU32::new(0)),
#[cfg(any(feature = "research", feature = "llm"))]
files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(any(feature = "research", feature = "llm"))]
pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(any(feature = "research", feature = "llm"))]
kb_indexed_folders: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(not(any(feature = "research", feature = "llm")))]
_pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
file_repo,
pending_changes: Arc::new(TokioRwLock::new(Vec::new())),
last_etag_snapshot: Arc::new(TokioRwLock::new(HashMap::new())),
}
}
}

View file

@ -0,0 +1,12 @@
use super::types::DriveMonitor;
impl DriveMonitor {
pub fn normalize_config_value(value: &str) -> String {
let trimmed = value.trim();
if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") {
String::new()
} else {
trimmed.to_string()
}
}
}