fix: DriveMonitor loop performance and WebSocket blocking
Some checks failed
BotServer CI/CD / build (push) Has been cancelled
Some checks failed
BotServer CI/CD / build (push) Has been cancelled
- Remove excessive trace/debug logging in hot loops - Fix broadcast_theme_change lock contention by cloning channels before iterating - Increase default sleep interval from 10s to 30s - Remove [MODULE] prefixes from log messages - Fix PDF re-download bug by using only last_modified (not ETag) for change detection - Re-enable DriveMonitor in bootstrap (was disabled for testing)
This commit is contained in:
parent
a884c650a3
commit
f04745ae1c
3 changed files with 289 additions and 344 deletions
|
|
@ -123,50 +123,43 @@ impl DriveMonitor {
|
||||||
.join("file_states.json")
|
.join("file_states.json")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load file states from disk to avoid reprocessing unchanged files
|
/// Load file states from disk to avoid reprocessing unchanged files
|
||||||
async fn load_file_states(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
async fn load_file_states(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let path = self.file_state_path();
|
let path = self.file_state_path();
|
||||||
debug!("[DRIVE_MONITOR] Loading file states from {} for bot {}", path.display(), self.bot_id);
|
if path.exists() {
|
||||||
if path.exists() {
|
match tokio_fs::read_to_string(&path).await {
|
||||||
match tokio_fs::read_to_string(&path).await {
|
Ok(content) => {
|
||||||
Ok(content) => {
|
match serde_json::from_str::<HashMap<String, FileState>>(&content) {
|
||||||
match serde_json::from_str::<HashMap<String, FileState>>(&content) {
|
Ok(states) => {
|
||||||
Ok(states) => {
|
let mut file_states = self.file_states.write().await;
|
||||||
let mut file_states = self.file_states.write().await;
|
let count = states.len();
|
||||||
let count = states.len();
|
*file_states = states;
|
||||||
*file_states = states;
|
info!(
|
||||||
info!(
|
"Loaded {} file states from disk for bot {}",
|
||||||
"[DRIVE_MONITOR] Loaded {} file states from disk for bot {}",
|
count,
|
||||||
count,
|
|
||||||
self.bot_id
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
"[DRIVE_MONITOR] Failed to parse file states from {}: {}. Starting with empty state.",
|
|
||||||
path.display(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
"[DRIVE_MONITOR] Failed to read file states from {}: {}. Starting with empty state.",
|
|
||||||
path.display(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
debug!(
|
|
||||||
"[DRIVE_MONITOR] No existing file states found at {} for bot {}. Starting fresh.",
|
|
||||||
path.display(),
|
|
||||||
self.bot_id
|
self.bot_id
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"Failed to parse file states from {}: {}. Starting with empty state.",
|
||||||
|
path.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"Failed to read file states from {}: {}. Starting with empty state.",
|
||||||
|
path.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Static helper to save file states (used by background tasks)
|
/// Static helper to save file states (used by background tasks)
|
||||||
async fn save_file_states_static(
|
async fn save_file_states_static(
|
||||||
|
|
@ -181,7 +174,7 @@ impl DriveMonitor {
|
||||||
if let Some(parent) = path.parent() {
|
if let Some(parent) = path.parent() {
|
||||||
if let Err(e) = tokio_fs::create_dir_all(parent).await {
|
if let Err(e) = tokio_fs::create_dir_all(parent).await {
|
||||||
warn!(
|
warn!(
|
||||||
"[DRIVE_MONITOR] Failed to create directory for file states: {} - {}",
|
"Failed to create directory for file states: {} - {}",
|
||||||
parent.display(),
|
parent.display(),
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
|
|
@ -193,13 +186,13 @@ impl DriveMonitor {
|
||||||
Ok(content) => {
|
Ok(content) => {
|
||||||
if let Err(e) = tokio_fs::write(&path, content).await {
|
if let Err(e) = tokio_fs::write(&path, content).await {
|
||||||
warn!(
|
warn!(
|
||||||
"[DRIVE_MONITOR] Failed to save file states to {}: {}",
|
"Failed to save file states to {}: {}",
|
||||||
path.display(),
|
path.display(),
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
"[DRIVE_MONITOR] Saved {} file states to disk for bucket {}",
|
"Saved {} file states to disk for bucket {}",
|
||||||
states.len(),
|
states.len(),
|
||||||
bucket_name
|
bucket_name
|
||||||
);
|
);
|
||||||
|
|
@ -207,7 +200,7 @@ impl DriveMonitor {
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
"[DRIVE_MONITOR] Failed to serialize file states: {}",
|
"Failed to serialize file states: {}",
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -228,11 +221,11 @@ impl DriveMonitor {
|
||||||
{
|
{
|
||||||
Ok(Ok(_)) => true,
|
Ok(Ok(_)) => true,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!("[DRIVE_MONITOR] Health check failed: {}", e);
|
debug!("Health check failed: {}", e);
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
debug!("[DRIVE_MONITOR] Health check timed out");
|
debug!("Health check timed out");
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -276,16 +269,14 @@ impl DriveMonitor {
|
||||||
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
|
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
|
||||||
is_processing: Arc<AtomicBool>,
|
is_processing: Arc<AtomicBool>,
|
||||||
) {
|
) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
trace!("[KB_PROCESSOR] Starting for bot {} (bucket: {})", bot_name, bot_id);
|
// Keep running as long as the DriveMonitor is active
|
||||||
|
while is_processing.load(std::sync::atomic::Ordering::SeqCst) {
|
||||||
// Keep running as long as the DriveMonitor is active
|
// Get one pending KB folder from the queue
|
||||||
while is_processing.load(std::sync::atomic::Ordering::SeqCst) {
|
let kb_key = {
|
||||||
// Get one pending KB folder from the queue
|
let pending = pending_kb_index.write().await;
|
||||||
let kb_key = {
|
pending.iter().next().cloned()
|
||||||
let pending = pending_kb_index.write().await;
|
};
|
||||||
pending.iter().next().cloned()
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(kb_key) = kb_key else {
|
let Some(kb_key) = kb_key else {
|
||||||
// Nothing pending, wait and retry
|
// Nothing pending, wait and retry
|
||||||
|
|
@ -321,7 +312,7 @@ impl DriveMonitor {
|
||||||
indexing.insert(kb_key.clone());
|
indexing.insert(kb_key.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("[KB_PROCESSOR] Indexing KB: {} for bot: {}", kb_key, bot_name);
|
trace!("Indexing KB: {} for bot: {}", kb_key, bot_name);
|
||||||
|
|
||||||
// Perform the actual KB indexing
|
// Perform the actual KB indexing
|
||||||
let result = tokio::time::timeout(
|
let result = tokio::time::timeout(
|
||||||
|
|
@ -343,7 +334,7 @@ impl DriveMonitor {
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(_)) => {
|
Ok(Ok(_)) => {
|
||||||
info!("[KB_PROCESSOR] Successfully indexed KB: {}", kb_key);
|
info!("Successfully indexed KB: {}", kb_key);
|
||||||
{
|
{
|
||||||
let mut indexed = kb_indexed_folders.write().await;
|
let mut indexed = kb_indexed_folders.write().await;
|
||||||
indexed.insert(kb_key.clone());
|
indexed.insert(kb_key.clone());
|
||||||
|
|
@ -358,7 +349,7 @@ match result {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
warn!("[KB_PROCESSOR] Failed to index KB {}: {}", kb_key, e);
|
warn!("Failed to index KB {}: {}", kb_key, e);
|
||||||
// Update fail count
|
// Update fail count
|
||||||
let mut states = file_states.write().await;
|
let mut states = file_states.write().await;
|
||||||
for (path, state) in states.iter_mut() {
|
for (path, state) in states.iter_mut() {
|
||||||
|
|
@ -369,7 +360,7 @@ match result {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("[KB_PROCESSOR] KB indexing timed out after 120s for {}", kb_key);
|
error!("KB indexing timed out after 120s for {}", kb_key);
|
||||||
let mut states = file_states.write().await;
|
let mut states = file_states.write().await;
|
||||||
for (path, state) in states.iter_mut() {
|
for (path, state) in states.iter_mut() {
|
||||||
if path.contains(&format!("{}/", kb_folder_name)) {
|
if path.contains(&format!("{}/", kb_folder_name)) {
|
||||||
|
|
@ -381,7 +372,7 @@ match result {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("[KB_PROCESSOR] Stopping for bot {}", bot_name);
|
trace!("Stopping for bot {}", bot_name);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -391,14 +382,12 @@ match result {
|
||||||
// KB indexing not available in this build
|
// KB indexing not available in this build
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn start_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
trace!("start_monitoring ENTER");
|
let start_mem = MemoryStats::current();
|
||||||
let start_mem = MemoryStats::current();
|
trace!(
|
||||||
trace!(
|
"Starting DriveMonitor for bot {}",
|
||||||
"[DRIVE_MONITOR] Starting DriveMonitor for bot {}, RSS={}",
|
self.bot_id
|
||||||
self.bot_id,
|
);
|
||||||
MemoryStats::format_bytes(start_mem.rss_bytes)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check if already processing to prevent duplicate monitoring
|
// Check if already processing to prevent duplicate monitoring
|
||||||
if self.is_processing.load(std::sync::atomic::Ordering::Acquire) {
|
if self.is_processing.load(std::sync::atomic::Ordering::Acquire) {
|
||||||
|
|
@ -409,14 +398,14 @@ match result {
|
||||||
// Load file states from disk to avoid reprocessing unchanged files
|
// Load file states from disk to avoid reprocessing unchanged files
|
||||||
if let Err(e) = self.load_file_states().await {
|
if let Err(e) = self.load_file_states().await {
|
||||||
warn!(
|
warn!(
|
||||||
"[DRIVE_MONITOR] Failed to load file states for bot {}: {}",
|
"Failed to load file states for bot {}: {}",
|
||||||
self.bot_id, e
|
self.bot_id, e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.check_drive_health().await {
|
if !self.check_drive_health().await {
|
||||||
warn!(
|
warn!(
|
||||||
"[DRIVE_MONITOR] S3/MinIO not available for bucket {}, will retry with backoff",
|
"S3/MinIO not available for bucket {}, will retry with backoff",
|
||||||
self.bucket_name
|
self.bucket_name
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -448,7 +437,7 @@ match result {
|
||||||
|
|
||||||
let after_initial = MemoryStats::current();
|
let after_initial = MemoryStats::current();
|
||||||
trace!(
|
trace!(
|
||||||
"[DRIVE_MONITOR] After initial check, RSS={} (delta={})",
|
"After initial check, RSS={} (delta={})",
|
||||||
MemoryStats::format_bytes(after_initial.rss_bytes),
|
MemoryStats::format_bytes(after_initial.rss_bytes),
|
||||||
MemoryStats::format_bytes(after_initial.rss_bytes.saturating_sub(start_mem.rss_bytes))
|
MemoryStats::format_bytes(after_initial.rss_bytes.saturating_sub(start_mem.rss_bytes))
|
||||||
);
|
);
|
||||||
|
|
@ -457,49 +446,42 @@ match result {
|
||||||
self.is_processing.store(true, std::sync::atomic::Ordering::SeqCst);
|
self.is_processing.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||||
trace!("Forced is_processing to true for periodic monitoring");
|
trace!("Forced is_processing to true for periodic monitoring");
|
||||||
|
|
||||||
let self_clone = self.clone(); // Don't wrap in Arc::new - that creates a copy
|
let self_clone = self.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut consecutive_processing_failures = 0;
|
let mut consecutive_processing_failures = 0;
|
||||||
trace!("Starting periodic monitoring loop for bot {}", self_clone.bot_id);
|
|
||||||
|
|
||||||
let is_processing_state = self_clone.is_processing.load(std::sync::atomic::Ordering::SeqCst);
|
while self_clone
|
||||||
trace!("is_processing state at loop start: {} for bot {}", is_processing_state, self_clone.bot_id);
|
.is_processing
|
||||||
|
.load(std::sync::atomic::Ordering::SeqCst)
|
||||||
while self_clone
|
{
|
||||||
.is_processing
|
|
||||||
.load(std::sync::atomic::Ordering::SeqCst)
|
|
||||||
{
|
|
||||||
debug!("[DRIVE_MONITOR] Inside monitoring loop for bot {}", self_clone.bot_id);
|
|
||||||
debug!("[DRIVE_MONITOR] Periodic check starting for bot {}", self_clone.bot_id);
|
|
||||||
|
|
||||||
// Smart sleep based on fail_count - prevent excessive retries
|
// Smart sleep based on fail_count - prevent excessive retries
|
||||||
{
|
{
|
||||||
let states = self_clone.file_states.read().await;
|
let states = self_clone.file_states.read().await;
|
||||||
let max_fail_count = states.values()
|
let max_fail_count = states.values()
|
||||||
.map(|s| s.fail_count)
|
.map(|s| s.fail_count)
|
||||||
.max()
|
.max()
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
let base_sleep = if max_fail_count >= 3 {
|
|
||||||
3600 // 1 hour for fail_count >= 3
|
|
||||||
} else if max_fail_count >= 2 {
|
|
||||||
900 // 15 min for fail_count >= 2
|
|
||||||
} else if max_fail_count >= 1 {
|
|
||||||
300 // 5 min for fail_count >= 1
|
|
||||||
} else {
|
|
||||||
10 // 10 sec default
|
|
||||||
};
|
|
||||||
|
|
||||||
if base_sleep > 10 {
|
|
||||||
debug!("[DRIVE_MONITOR] Sleep {}s based on fail_count={}", base_sleep, max_fail_count);
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(base_sleep)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("[DRIVE_MONITOR] Checking drive health for bot {}", self_clone.bot_id);
|
let base_sleep = if max_fail_count >= 3 {
|
||||||
// Skip drive health check - just proceed with monitoring
|
3600
|
||||||
// if !self_clone.check_drive_health().await {
|
} else if max_fail_count >= 2 {
|
||||||
|
900
|
||||||
|
} else if max_fail_count >= 1 {
|
||||||
|
300
|
||||||
|
} else {
|
||||||
|
30
|
||||||
|
};
|
||||||
|
|
||||||
|
if base_sleep > 10 {
|
||||||
|
debug!("Sleep {}s based on fail_count={}", base_sleep, max_fail_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(states);
|
||||||
|
tokio::time::sleep(Duration::from_secs(base_sleep)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip drive health check - just proceed with monitoring
|
||||||
if false {
|
if false {
|
||||||
let failures = self_clone
|
let failures = self_clone
|
||||||
.consecutive_failures
|
.consecutive_failures
|
||||||
|
|
@ -512,9 +494,8 @@ match result {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("[DRIVE_MONITOR] About to call check_for_changes for bot {}", self_clone.bot_id);
|
// Add timeout to prevent hanging
|
||||||
// Add timeout to prevent hanging
|
match tokio::time::timeout(Duration::from_secs(12), self_clone.check_for_changes()).await {
|
||||||
match tokio::time::timeout(Duration::from_secs(12), self_clone.check_for_changes()).await {
|
|
||||||
Ok(Ok(_)) => {
|
Ok(Ok(_)) => {
|
||||||
let prev_failures =
|
let prev_failures =
|
||||||
self_clone.consecutive_failures.swap(0, Ordering::Relaxed);
|
self_clone.consecutive_failures.swap(0, Ordering::Relaxed);
|
||||||
|
|
@ -559,18 +540,15 @@ match result {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn stop_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn stop_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
trace!("Stopping DriveMonitor for bot {}", self.bot_id);
|
self.is_processing
|
||||||
|
.store(false, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
self.is_processing
|
self.file_states.write().await.clear();
|
||||||
.store(false, std::sync::atomic::Ordering::SeqCst);
|
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
self.file_states.write().await.clear();
|
Ok(())
|
||||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
}
|
||||||
|
|
||||||
trace!("DriveMonitor stopped for bot {}", self.bot_id);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
|
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
trace!(
|
trace!(
|
||||||
|
|
@ -621,7 +599,7 @@ match result {
|
||||||
trace!("check_for_changes ENTER");
|
trace!("check_for_changes ENTER");
|
||||||
let start_mem = MemoryStats::current();
|
let start_mem = MemoryStats::current();
|
||||||
trace!(
|
trace!(
|
||||||
"[DRIVE_MONITOR] check_for_changes START, RSS={}",
|
"check_for_changes START, RSS={}",
|
||||||
MemoryStats::format_bytes(start_mem.rss_bytes)
|
MemoryStats::format_bytes(start_mem.rss_bytes)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -636,7 +614,7 @@ match result {
|
||||||
trace!("check_for_changes: check_gbdialog_changes done");
|
trace!("check_for_changes: check_gbdialog_changes done");
|
||||||
let after_dialog = MemoryStats::current();
|
let after_dialog = MemoryStats::current();
|
||||||
trace!(
|
trace!(
|
||||||
"[DRIVE_MONITOR] After gbdialog, RSS={} (delta={})",
|
"After gbdialog, RSS={} (delta={})",
|
||||||
MemoryStats::format_bytes(after_dialog.rss_bytes),
|
MemoryStats::format_bytes(after_dialog.rss_bytes),
|
||||||
MemoryStats::format_bytes(after_dialog.rss_bytes.saturating_sub(start_mem.rss_bytes))
|
MemoryStats::format_bytes(after_dialog.rss_bytes.saturating_sub(start_mem.rss_bytes))
|
||||||
);
|
);
|
||||||
|
|
@ -647,7 +625,7 @@ match result {
|
||||||
trace!("check_for_changes: check_gbot done");
|
trace!("check_for_changes: check_gbot done");
|
||||||
let after_gbot = MemoryStats::current();
|
let after_gbot = MemoryStats::current();
|
||||||
trace!(
|
trace!(
|
||||||
"[DRIVE_MONITOR] After gbot, RSS={} (delta={})",
|
"After gbot, RSS={} (delta={})",
|
||||||
MemoryStats::format_bytes(after_gbot.rss_bytes),
|
MemoryStats::format_bytes(after_gbot.rss_bytes),
|
||||||
MemoryStats::format_bytes(after_gbot.rss_bytes.saturating_sub(after_dialog.rss_bytes))
|
MemoryStats::format_bytes(after_gbot.rss_bytes.saturating_sub(after_dialog.rss_bytes))
|
||||||
);
|
);
|
||||||
|
|
@ -658,7 +636,7 @@ match result {
|
||||||
trace!("check_for_changes: check_gbkb_changes done");
|
trace!("check_for_changes: check_gbkb_changes done");
|
||||||
let after_gbkb = MemoryStats::current();
|
let after_gbkb = MemoryStats::current();
|
||||||
trace!(
|
trace!(
|
||||||
"[DRIVE_MONITOR] After gbkb, RSS={} (delta={})",
|
"After gbkb, RSS={} (delta={})",
|
||||||
MemoryStats::format_bytes(after_gbkb.rss_bytes),
|
MemoryStats::format_bytes(after_gbkb.rss_bytes),
|
||||||
MemoryStats::format_bytes(after_gbkb.rss_bytes.saturating_sub(after_gbot.rss_bytes))
|
MemoryStats::format_bytes(after_gbkb.rss_bytes.saturating_sub(after_gbot.rss_bytes))
|
||||||
);
|
);
|
||||||
|
|
@ -668,7 +646,7 @@ match result {
|
||||||
let total_delta = after_gbkb.rss_bytes.saturating_sub(start_mem.rss_bytes);
|
let total_delta = after_gbkb.rss_bytes.saturating_sub(start_mem.rss_bytes);
|
||||||
if total_delta > 50 * 1024 * 1024 {
|
if total_delta > 50 * 1024 * 1024 {
|
||||||
warn!(
|
warn!(
|
||||||
"[DRIVE_MONITOR] check_for_changes grew by {} - potential leak!",
|
"check_for_changes grew by {} - potential leak!",
|
||||||
MemoryStats::format_bytes(total_delta)
|
MemoryStats::format_bytes(total_delta)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -1173,62 +1151,65 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
|
||||||
trace!("check_gbot EXIT");
|
trace!("check_gbot EXIT");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn broadcast_theme_change(
|
async fn broadcast_theme_change(
|
||||||
&self,
|
&self,
|
||||||
csv_content: &str,
|
csv_content: &str,
|
||||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let mut theme_data = serde_json::json!({
|
let mut theme_data = serde_json::json!({
|
||||||
"event": "change_theme",
|
"event": "change_theme",
|
||||||
"data": {}
|
"data": {}
|
||||||
});
|
});
|
||||||
for line in csv_content.lines() {
|
for line in csv_content.lines() {
|
||||||
let parts: Vec<&str> = line.split(',').collect();
|
let parts: Vec<&str> = line.split(',').collect();
|
||||||
if parts.len() >= 2 {
|
if parts.len() >= 2 {
|
||||||
let key = parts[0].trim();
|
let key = parts[0].trim();
|
||||||
let value = parts[1].trim();
|
let value = parts[1].trim();
|
||||||
match key {
|
match key {
|
||||||
"theme-color1" => {
|
"theme-color1" => {
|
||||||
theme_data["data"]["color1"] = serde_json::Value::String(value.to_string());
|
theme_data["data"]["color1"] = serde_json::Value::String(value.to_string());
|
||||||
}
|
}
|
||||||
"theme-color2" => {
|
"theme-color2" => {
|
||||||
theme_data["data"]["color2"] = serde_json::Value::String(value.to_string());
|
theme_data["data"]["color2"] = serde_json::Value::String(value.to_string());
|
||||||
}
|
}
|
||||||
"theme-logo" => {
|
"theme-logo" => {
|
||||||
theme_data["data"]["logo_url"] =
|
theme_data["data"]["logo_url"] =
|
||||||
serde_json::Value::String(value.to_string());
|
serde_json::Value::String(value.to_string());
|
||||||
}
|
}
|
||||||
"theme-title" => {
|
"theme-title" => {
|
||||||
theme_data["data"]["title"] = serde_json::Value::String(value.to_string());
|
theme_data["data"]["title"] = serde_json::Value::String(value.to_string());
|
||||||
}
|
}
|
||||||
"theme-logo-text" => {
|
"theme-logo-text" => {
|
||||||
theme_data["data"]["logo_text"] =
|
theme_data["data"]["logo_text"] =
|
||||||
serde_json::Value::String(value.to_string());
|
serde_json::Value::String(value.to_string());
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let response_channels = self.state.response_channels.lock().await;
|
}
|
||||||
for (session_id, tx) in response_channels.iter() {
|
|
||||||
let theme_response = crate::core::shared::models::BotResponse {
|
|
||||||
bot_id: self.bot_id.to_string(),
|
|
||||||
user_id: "system".to_string(),
|
|
||||||
session_id: session_id.clone(),
|
|
||||||
channel: "web".to_string(),
|
|
||||||
content: serde_json::to_string(&theme_data)?,
|
|
||||||
message_type: MessageType::BOT_RESPONSE,
|
|
||||||
stream_token: None,
|
|
||||||
is_complete: true,
|
|
||||||
suggestions: Vec::new(),
|
|
||||||
context_name: None,
|
|
||||||
context_length: 0,
|
|
||||||
context_max_length: 0,
|
|
||||||
};
|
|
||||||
let _ = tx.try_send(theme_response);
|
|
||||||
}
|
|
||||||
drop(response_channels);
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
// Clone channels to avoid holding lock while sending
|
||||||
|
let channels: Vec<_> = {
|
||||||
|
let response_channels = self.state.response_channels.lock().await;
|
||||||
|
response_channels.iter().map(|(id, tx)| (id.clone(), tx.clone())).collect()
|
||||||
|
};
|
||||||
|
for (session_id, tx) in channels {
|
||||||
|
let theme_response = crate::core::shared::models::BotResponse {
|
||||||
|
bot_id: self.bot_id.to_string(),
|
||||||
|
user_id: "system".to_string(),
|
||||||
|
session_id,
|
||||||
|
channel: "web".to_string(),
|
||||||
|
content: serde_json::to_string(&theme_data)?,
|
||||||
|
message_type: MessageType::BOT_RESPONSE,
|
||||||
|
stream_token: None,
|
||||||
|
is_complete: true,
|
||||||
|
suggestions: Vec::new(),
|
||||||
|
context_name: None,
|
||||||
|
context_length: 0,
|
||||||
|
context_max_length: 0,
|
||||||
|
};
|
||||||
|
let _ = tx.try_send(theme_response);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
async fn compile_tool(
|
async fn compile_tool(
|
||||||
&self,
|
&self,
|
||||||
client: &Client,
|
client: &Client,
|
||||||
|
|
@ -1479,51 +1460,48 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
|
||||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
trace!("[GBKB] Scan already in progress for bot {}, skipping", self.bot_id);
|
trace!("Scan already in progress for bot {}, skipping", self.bot_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("[GBKB] check_gbkb_changes ENTER for bot {} (prefix: {})", self.bot_id, self.bucket_name);
|
let bot_name = self
|
||||||
let bot_name = self
|
.bucket_name
|
||||||
.bucket_name
|
.strip_suffix(".gbai")
|
||||||
.strip_suffix(".gbai")
|
.unwrap_or(&self.bucket_name);
|
||||||
.unwrap_or(&self.bucket_name);
|
|
||||||
|
|
||||||
let gbkb_prefix = format!("{}.gbkb/", bot_name);
|
let gbkb_prefix = format!("{}.gbkb/", bot_name);
|
||||||
debug!("[GBKB] Listing objects with prefix: {}", gbkb_prefix);
|
let mut current_files = HashMap::new();
|
||||||
let mut current_files = HashMap::new();
|
let mut continuation_token = None;
|
||||||
let mut continuation_token = None;
|
|
||||||
|
|
||||||
let mut files_processed = 0;
|
let mut files_processed = 0;
|
||||||
let mut files_to_process = Vec::new();
|
let mut files_to_process = Vec::new();
|
||||||
let mut pdf_files_found = 0;
|
let mut pdf_files_found = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let list_objects = match tokio::time::timeout(
|
let list_objects = match tokio::time::timeout(
|
||||||
Duration::from_secs(30),
|
Duration::from_secs(30),
|
||||||
client
|
client
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(self.bucket_name.to_lowercase())
|
.bucket(self.bucket_name.to_lowercase())
|
||||||
.prefix(&gbkb_prefix)
|
.prefix(&gbkb_prefix)
|
||||||
.set_continuation_token(continuation_token)
|
.set_continuation_token(continuation_token)
|
||||||
.send(),
|
.send(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(list)) => list,
|
Ok(Ok(list)) => list,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!("[GBKB] Error listing objects: {}", e);
|
debug!("Error listing objects: {}", e);
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
log::error!(
|
log::error!(
|
||||||
"Timeout listing .gbkb objects in bucket {}",
|
"Timeout listing .gbkb objects in bucket {}",
|
||||||
self.bucket_name
|
self.bucket_name
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
debug!("[GBKB] Listed {} objects in this page", list_objects.contents.as_ref().map(|c| c.len()).unwrap_or(0));
|
|
||||||
|
|
||||||
for obj in list_objects.contents.unwrap_or_default() {
|
for obj in list_objects.contents.unwrap_or_default() {
|
||||||
let path = obj.key().unwrap_or_default().to_string();
|
let path = obj.key().unwrap_or_default().to_string();
|
||||||
|
|
@ -1554,7 +1532,7 @@ let file_state = FileState {
|
||||||
continuation_token = list_objects.next_continuation_token;
|
continuation_token = list_objects.next_continuation_token;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("[GBKB] Found {} files total, acquiring file_states lock...", current_files.len());
|
|
||||||
|
|
||||||
// Check if ALL KBs for this bot are already indexed in Qdrant
|
// Check if ALL KBs for this bot are already indexed in Qdrant
|
||||||
// If so, only scan for NEW files - skip re-indexing existing ones
|
// If so, only scan for NEW files - skip re-indexing existing ones
|
||||||
|
|
@ -1579,8 +1557,7 @@ let file_state = FileState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut file_states = self.file_states.write().await;
|
let mut file_states = self.file_states.write().await;
|
||||||
debug!("[GBKB] file_states lock acquired, processing {} files (all_indexed={}, file_states_count={})", current_files.len(), all_indexed, file_states.len());
|
|
||||||
|
|
||||||
// Build set of already-indexed KB folder names for quick lookup
|
// Build set of already-indexed KB folder names for quick lookup
|
||||||
let indexed_kb_names: HashSet<String> = {
|
let indexed_kb_names: HashSet<String> = {
|
||||||
|
|
@ -1591,118 +1568,88 @@ let file_state = FileState {
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
for (path, current_state) in current_files.iter() {
|
for (path, current_state) in current_files.iter() {
|
||||||
let is_new = !file_states.contains_key(path);
|
let is_new = !file_states.contains_key(path);
|
||||||
debug!("[GBKB] DEBUG: path={} in_file_states={}", path, !is_new);
|
|
||||||
|
|
||||||
// Skip files from already-indexed KB folders that are not new
|
// Skip files from already-indexed KB folders that are not new
|
||||||
// This prevents re-download loop when file_states fails to load
|
// This prevents re-download loop when file_states fails to load
|
||||||
let kb_name_from_path = path.split('/').nth(1).map(|s| s.to_string());
|
let kb_name_from_path = path.split('/').nth(1).map(|s| s.to_string());
|
||||||
if all_indexed && !is_new {
|
if all_indexed && !is_new {
|
||||||
trace!("[GBKB] Skipping already indexed file: {}", path);
|
trace!("Skipping already indexed file: {}", path);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Extra safety: if file_states is empty but KB is indexed, skip non-new files
|
// Extra safety: if file_states is empty but KB is indexed, skip non-new files
|
||||||
if file_states.is_empty() && all_indexed {
|
if file_states.is_empty() && all_indexed {
|
||||||
if let Some(kb) = &kb_name_from_path {
|
if let Some(kb) = &kb_name_from_path {
|
||||||
if indexed_kb_names.contains(kb) {
|
if indexed_kb_names.contains(kb) {
|
||||||
trace!("[GBKB] Skipping file from indexed KB (empty file_states): {}", path);
|
trace!("Skipping file from indexed KB (empty file_states): {}", path);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use last_modified as primary change detector (more stable than ETag)
|
// Use only last_modified for change detection - more reliable than ETag
|
||||||
// ETags can change due to metadata updates even when content is identical
|
let is_modified = if let Some(prev) = file_states.get(path) {
|
||||||
let is_modified = if let Some(prev) = file_states.get(path) {
|
prev.last_modified != current_state.last_modified
|
||||||
// If last_modified matches, content hasn't changed regardless of ETag
|
} else {
|
||||||
if prev.last_modified == current_state.last_modified {
|
false
|
||||||
false
|
};
|
||||||
} else {
|
|
||||||
// Different timestamp - use ETag to confirm content actually changed
|
|
||||||
prev.etag != current_state.etag
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
|
|
||||||
if is_new || is_modified {
|
if is_new || is_modified {
|
||||||
debug!("[GBKB] New/modified file: {} (new={}, modified={})", path, is_new, is_modified);
|
#[cfg(any(feature = "research", feature = "llm"))]
|
||||||
|
{
|
||||||
|
// Only remove from indexed_folders if KB is actually being re-indexed
|
||||||
|
let path_parts: Vec<&str> = path.split('/').collect();
|
||||||
|
if path_parts.len() >= 2 {
|
||||||
|
let kb_name = path_parts[1];
|
||||||
|
let kb_key = format!("{}_{}", bot_name, kb_name);
|
||||||
|
|
||||||
#[cfg(any(feature = "research", feature = "llm"))]
|
// Check and remove in one atomic operation
|
||||||
{
|
let should_remove = {
|
||||||
// Only remove from indexed_folders if KB is actually being re-indexed
|
let indexed_folders = self.kb_indexed_folders.read().await;
|
||||||
// Don't remove if already indexed in Qdrant (skip unnecessary re-queueing)
|
indexed_folders.contains(&kb_key)
|
||||||
let path_parts: Vec<&str> = path.split('/').collect();
|
};
|
||||||
if path_parts.len() >= 2 {
|
|
||||||
let kb_name = path_parts[1];
|
|
||||||
let kb_key = format!("{}_{}", bot_name, kb_name);
|
|
||||||
|
|
||||||
let already_indexed = {
|
|
||||||
let indexed_folders = self.kb_indexed_folders.read().await;
|
|
||||||
indexed_folders.contains(&kb_key)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Only remove and re-queue if NOT already indexed
|
|
||||||
// This prevents infinite reindexing loops when files haven't really changed
|
|
||||||
if !already_indexed {
|
|
||||||
let mut indexed_folders = self.kb_indexed_folders.write().await;
|
|
||||||
if indexed_folders.remove(&kb_key) {
|
|
||||||
debug!("[GBKB] Removed {} from indexed set due to file change", kb_key);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
trace!("[GBKB] KB {} already indexed, skipping re-queue", kb_key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(prev_state) = file_states.get(path) {
|
|
||||||
if prev_state.fail_count >= MAX_FAIL_COUNT {
|
|
||||||
let elapsed = Utc::now()
|
|
||||||
.signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now()));
|
|
||||||
if elapsed.num_seconds() < RETRY_BACKOFF_SECS {
|
|
||||||
trace!(
|
|
||||||
"Skipping {} - fail_count={} (last failed {}s ago, max {}s backoff)",
|
|
||||||
path, prev_state.fail_count, elapsed.num_seconds(), RETRY_BACKOFF_SECS
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if path.to_lowercase().ends_with(".pdf") {
|
// Only remove if NOT already indexed
|
||||||
pdf_files_found += 1;
|
if !should_remove {
|
||||||
debug!("[GBKB] Detected PDF: {}", path);
|
let mut indexed_folders = self.kb_indexed_folders.write().await;
|
||||||
} else {
|
indexed_folders.remove(&kb_key);
|
||||||
trace!(
|
}
|
||||||
"Detected {} in .gbkb: {}",
|
}
|
||||||
if is_new { "new file" } else { "change" },
|
}
|
||||||
path
|
if let Some(prev_state) = file_states.get(path) {
|
||||||
);
|
if prev_state.fail_count >= MAX_FAIL_COUNT {
|
||||||
}
|
let elapsed = Utc::now()
|
||||||
|
.signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now()));
|
||||||
|
if elapsed.num_seconds() < RETRY_BACKOFF_SECS {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
debug!("[GBKB] Pushing to download queue: {}", path);
|
if path.to_lowercase().ends_with(".pdf") {
|
||||||
files_to_process.push(path.clone());
|
pdf_files_found += 1;
|
||||||
files_processed += 1;
|
}
|
||||||
debug!("[GBKB] Queue size: {}/10", files_to_process.len());
|
|
||||||
|
files_to_process.push(path.clone());
|
||||||
|
files_processed += 1;
|
||||||
|
|
||||||
// REMOVED: Skip downloads if LLM is actively streaming - was causing deadlocks
|
// REMOVED: Skip downloads if LLM is actively streaming - was causing deadlocks
|
||||||
// #[cfg(any(feature = "research", feature = "llm"))]
|
// #[cfg(any(feature = "research", feature = "llm"))]
|
||||||
// if is_llm_streaming() {
|
// if is_llm_streaming() {
|
||||||
// debug!("[GBKB] Skipping download - LLM is streaming, will retry later");
|
// debug!("Skipping download - LLM is streaming, will retry later");
|
||||||
// files_to_process.clear();
|
// files_to_process.clear();
|
||||||
// break;
|
// break;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if files_to_process.len() >= 10 {
|
if files_to_process.len() >= 10 {
|
||||||
debug!("[GBKB] Downloading batch of {} files", files_to_process.len());
|
for file_path in std::mem::take(&mut files_to_process) {
|
||||||
for file_path in std::mem::take(&mut files_to_process) {
|
if let Err(e) = self.download_gbkb_file(client, &file_path).await {
|
||||||
debug!("[GBKB] Downloading: {}", file_path);
|
log::error!("Failed to download .gbkb file {}: {}", file_path, e);
|
||||||
if let Err(e) = self.download_gbkb_file(client, &file_path).await {
|
}
|
||||||
log::error!("Failed to download .gbkb file {}: {}", file_path, e);
|
}
|
||||||
}
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue KB folder for indexing - only if not already indexed and no files changed
|
// Queue KB folder for indexing - only if not already indexed and no files changed
|
||||||
let path_parts: Vec<&str> = path.split('/').collect();
|
let path_parts: Vec<&str> = path.split('/').collect();
|
||||||
|
|
@ -1722,14 +1669,10 @@ let file_state = FileState {
|
||||||
indexed_folders.contains(&kb_key)
|
indexed_folders.contains(&kb_key)
|
||||||
};
|
};
|
||||||
|
|
||||||
if !already_indexed {
|
if !already_indexed {
|
||||||
let mut pending = self.pending_kb_index.write().await;
|
let mut pending = self.pending_kb_index.write().await;
|
||||||
if pending.insert(kb_key.clone()) {
|
pending.insert(kb_key.clone());
|
||||||
debug!("[GBKB] Queued KB {} for indexing (non-blocking)", kb_key);
|
}
|
||||||
}
|
|
||||||
} else {
|
|
||||||
trace!("[GBKB] KB {} already indexed, skipping", kb_key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1744,16 +1687,12 @@ let file_state = FileState {
|
||||||
|
|
||||||
// Download remaining files (less than 10)
|
// Download remaining files (less than 10)
|
||||||
if !files_to_process.is_empty() {
|
if !files_to_process.is_empty() {
|
||||||
debug!("[GBKB] Downloading final batch of {} files", files_to_process.len());
|
for file_path in std::mem::take(&mut files_to_process) {
|
||||||
for file_path in std::mem::take(&mut files_to_process) {
|
if let Err(e) = self.download_gbkb_file(client, &file_path).await {
|
||||||
debug!("[GBKB] Downloading: {}", file_path);
|
log::error!("Failed to download .gbkb file {}: {}", file_path, e);
|
||||||
if let Err(e) = self.download_gbkb_file(client, &file_path).await {
|
}
|
||||||
log::error!("Failed to download .gbkb file {}: {}", file_path, e);
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("[GBKB] Processed {} files, {} PDFs found", files_processed, pdf_files_found);
|
|
||||||
|
|
||||||
let paths_to_remove: Vec<String> = file_states
|
let paths_to_remove: Vec<String> = file_states
|
||||||
.keys()
|
.keys()
|
||||||
|
|
@ -1848,11 +1787,9 @@ let file_state = FileState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("[GBKB] check_gbkb_changes EXIT for bot {}", self.bot_id);
|
self.scanning.store(false, Ordering::Release);
|
||||||
trace!("check_gbkb_changes EXIT");
|
Ok(())
|
||||||
self.scanning.store(false, Ordering::Release);
|
}
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn download_gbkb_file(
|
async fn download_gbkb_file(
|
||||||
&self,
|
&self,
|
||||||
|
|
|
||||||
|
|
@ -227,8 +227,15 @@ impl OpenAIClient {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
for (role, content) in history {
|
for (role, content) in history {
|
||||||
|
// Filter out internal roles not valid for OpenAI API
|
||||||
|
let api_role = match role.as_str() {
|
||||||
|
"user" | "assistant" | "system" | "developer" | "tool" => role.as_str(),
|
||||||
|
// Convert internal roles to valid API roles
|
||||||
|
"episodic" | "compact" => "system",
|
||||||
|
_ => "system",
|
||||||
|
};
|
||||||
messages.push(serde_json::json!({
|
messages.push(serde_json::json!({
|
||||||
"role": role,
|
"role": api_role,
|
||||||
"content": Self::sanitize_utf8(content)
|
"content": Self::sanitize_utf8(content)
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -845,8 +845,8 @@ fn init_llm_provider(
|
||||||
|
|
||||||
/// Start background services and monitors
|
/// Start background services and monitors
|
||||||
pub async fn start_background_services(
|
pub async fn start_background_services(
|
||||||
app_state: Arc<AppState>,
|
app_state: Arc<AppState>,
|
||||||
pool: &crate::core::shared::utils::DbPool,
|
_pool: &crate::core::shared::utils::DbPool,
|
||||||
) {
|
) {
|
||||||
use crate::core::shared::memory_monitor::{log_process_memory, start_memory_monitor};
|
use crate::core::shared::memory_monitor::{log_process_memory, start_memory_monitor};
|
||||||
|
|
||||||
|
|
@ -890,8 +890,9 @@ pub async fn start_background_services(
|
||||||
trace!("ensure_llama_servers_running completed");
|
trace!("ensure_llama_servers_running completed");
|
||||||
}
|
}
|
||||||
|
|
||||||
// DISABLED: DriveMonitor for testing WebSocket/LLM response in isolation
|
// Start DriveMonitor for S3/MinIO file watching
|
||||||
// start_drive_monitors(app_state.clone(), pool).await;
|
#[cfg(feature = "drive")]
|
||||||
|
start_drive_monitors(app_state.clone(), _pool).await;
|
||||||
|
|
||||||
// Local file monitoring removed - Drive (MinIO) is the only source now
|
// Local file monitoring removed - Drive (MinIO) is the only source now
|
||||||
// #[cfg(feature = "local-files")]
|
// #[cfg(feature = "local-files")]
|
||||||
|
|
@ -901,17 +902,17 @@ pub async fn start_background_services(
|
||||||
// start_config_watcher(app_state.clone()).await;
|
// start_config_watcher(app_state.clone()).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "drive")]
|
#[cfg(feature = "drive")]
|
||||||
async fn start_drive_monitors(
|
async fn start_drive_monitors(
|
||||||
app_state: Arc<AppState>,
|
app_state: Arc<AppState>,
|
||||||
pool: &crate::core::shared::utils::DbPool,
|
_pool: &crate::core::shared::utils::DbPool,
|
||||||
) {
|
) {
|
||||||
use crate::core::shared::memory_monitor::register_thread;
|
use crate::core::shared::memory_monitor::register_thread;
|
||||||
use crate::core::shared::models::schema::bots;
|
use crate::core::shared::models::schema::bots;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
||||||
let drive_monitor_state = app_state.clone();
|
let drive_monitor_state = app_state.clone();
|
||||||
let pool_clone = pool.clone();
|
let pool_clone = _pool.clone();
|
||||||
let state_for_scan = app_state.clone();
|
let state_for_scan = app_state.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue