Add fail_count/last_failed_at to FileState for indexing retries
All checks were successful
BotServer CI/CD / build (push) Successful in 3m21s
All checks were successful
BotServer CI/CD / build (push) Successful in 3m21s
- Skip re-indexing files that failed 3+ times within 1 hour - Update file_states on indexing success (indexed=true, fail_count=0) - Update file_states on indexing failure (fail_count++, last_failed_at=now) - Don't skip KB indexing when embedding server not marked ready yet - Embedding server health will be detected via wait_for_server() in kb_indexer - Remove drive_monitor bypass of embedding check - let kb_indexer handle it
This commit is contained in:
parent
cdab04e999
commit
f48fa6d5f0
1 changed files with 73 additions and 5 deletions
|
|
@ -10,6 +10,7 @@ use crate::core::shared::state::AppState;
|
||||||
|
|
||||||
#[cfg(feature = "drive")]
|
#[cfg(feature = "drive")]
|
||||||
use aws_sdk_s3::Client;
|
use aws_sdk_s3::Client;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
#[cfg(any(feature = "research", feature = "llm"))]
|
#[cfg(any(feature = "research", feature = "llm"))]
|
||||||
|
|
@ -26,12 +27,18 @@ use tokio::fs as tokio_fs;
|
||||||
|
|
||||||
const MAX_BACKOFF_SECS: u64 = 300;
|
const MAX_BACKOFF_SECS: u64 = 300;
|
||||||
const INITIAL_BACKOFF_SECS: u64 = 30;
|
const INITIAL_BACKOFF_SECS: u64 = 30;
|
||||||
|
const RETRY_BACKOFF_SECS: i64 = 3600;
|
||||||
|
const MAX_FAIL_COUNT: u32 = 3;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct FileState {
|
pub struct FileState {
|
||||||
pub etag: String,
|
pub etag: String,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub indexed: bool,
|
pub indexed: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub last_failed_at: Option<DateTime<Utc>>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub fail_count: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
|
@ -504,6 +511,8 @@ impl DriveMonitor {
|
||||||
let file_state = FileState {
|
let file_state = FileState {
|
||||||
etag: obj.e_tag().unwrap_or_default().to_string(),
|
etag: obj.e_tag().unwrap_or_default().to_string(),
|
||||||
indexed: false,
|
indexed: false,
|
||||||
|
last_failed_at: None,
|
||||||
|
fail_count: 0,
|
||||||
};
|
};
|
||||||
current_files.insert(path, file_state);
|
current_files.insert(path, file_state);
|
||||||
}
|
}
|
||||||
|
|
@ -644,7 +653,7 @@ impl DriveMonitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut states = self.file_states.write().await;
|
let mut states = self.file_states.write().await;
|
||||||
states.insert(prompt_state_key, FileState { etag, indexed: false });
|
states.insert(prompt_state_key, FileState { etag, indexed: false, last_failed_at: None, fail_count: 0 });
|
||||||
drop(states);
|
drop(states);
|
||||||
let file_states_clone = Arc::clone(&self.file_states);
|
let file_states_clone = Arc::clone(&self.file_states);
|
||||||
let work_root_clone = self.work_root.clone();
|
let work_root_clone = self.work_root.clone();
|
||||||
|
|
@ -1200,6 +1209,8 @@ impl DriveMonitor {
|
||||||
let file_state = FileState {
|
let file_state = FileState {
|
||||||
etag: obj.e_tag().unwrap_or_default().to_string(),
|
etag: obj.e_tag().unwrap_or_default().to_string(),
|
||||||
indexed: false,
|
indexed: false,
|
||||||
|
last_failed_at: None,
|
||||||
|
fail_count: 0,
|
||||||
};
|
};
|
||||||
current_files.insert(path.clone(), file_state);
|
current_files.insert(path.clone(), file_state);
|
||||||
}
|
}
|
||||||
|
|
@ -1220,6 +1231,20 @@ impl DriveMonitor {
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
if is_new || is_modified {
|
if is_new || is_modified {
|
||||||
|
if let Some(prev_state) = file_states.get(path) {
|
||||||
|
if prev_state.fail_count >= MAX_FAIL_COUNT {
|
||||||
|
let elapsed = Utc::now()
|
||||||
|
.signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now()));
|
||||||
|
if elapsed.num_seconds() < RETRY_BACKOFF_SECS {
|
||||||
|
trace!(
|
||||||
|
"Skipping {} - fail_count={} (last failed {}s ago, max {}s backoff)",
|
||||||
|
path, prev_state.fail_count, elapsed.num_seconds(), RETRY_BACKOFF_SECS
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if path.to_lowercase().ends_with(".pdf") {
|
if path.to_lowercase().ends_with(".pdf") {
|
||||||
pdf_files_found += 1;
|
pdf_files_found += 1;
|
||||||
trace!(
|
trace!(
|
||||||
|
|
@ -1270,8 +1295,7 @@ impl DriveMonitor {
|
||||||
#[cfg(any(feature = "research", feature = "llm"))]
|
#[cfg(any(feature = "research", feature = "llm"))]
|
||||||
{
|
{
|
||||||
if !is_embedding_server_ready() {
|
if !is_embedding_server_ready() {
|
||||||
trace!("Embedding server not ready, deferring KB indexing for {}", kb_folder_path.display());
|
info!("Embedding server not yet marked ready, KB indexing will wait for it");
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a unique key for this KB folder to track indexing state
|
// Create a unique key for this KB folder to track indexing state
|
||||||
|
|
@ -1300,6 +1324,11 @@ impl DriveMonitor {
|
||||||
let _files_being_indexed = Arc::clone(&self.files_being_indexed);
|
let _files_being_indexed = Arc::clone(&self.files_being_indexed);
|
||||||
let file_key = Arc::clone(&self.files_being_indexed);
|
let file_key = Arc::clone(&self.files_being_indexed);
|
||||||
let kb_key_owned = kb_key.clone();
|
let kb_key_owned = kb_key.clone();
|
||||||
|
let file_states = Arc::clone(&self.file_states);
|
||||||
|
let work_root = self.work_root.clone();
|
||||||
|
let bucket_name = self.bucket_name.clone();
|
||||||
|
let gbkb_prefix_owned = gbkb_prefix.clone();
|
||||||
|
let bot_name_for_spawn = bot_name.to_string();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
trace!(
|
trace!(
|
||||||
|
|
@ -1319,6 +1348,43 @@ impl DriveMonitor {
|
||||||
indexing_set.remove(&kb_key_owned);
|
indexing_set.remove(&kb_key_owned);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let kb_prefix = format!("{}/", gbkb_prefix_owned);
|
||||||
|
let kb_folder_name = kb_folder_owned
|
||||||
|
.strip_prefix(&work_root)
|
||||||
|
.ok()
|
||||||
|
.and_then(|p| p.to_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.trim_start_matches(&format!("{}/", bot_name_for_spawn))
|
||||||
|
.trim_start_matches(&kb_prefix)
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut states = file_states.write().await;
|
||||||
|
for (path, state) in states.iter_mut() {
|
||||||
|
if path.starts_with(&format!("{}{}/", kb_prefix, kb_folder_name)) {
|
||||||
|
match &result {
|
||||||
|
Ok(Ok(_)) => {
|
||||||
|
state.indexed = true;
|
||||||
|
state.fail_count = 0;
|
||||||
|
state.last_failed_at = None;
|
||||||
|
}
|
||||||
|
Ok(Err(_)) | Err(_) => {
|
||||||
|
state.fail_count = state.fail_count.saturating_add(1);
|
||||||
|
state.last_failed_at = Some(chrono::Utc::now());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let states_clone = Arc::clone(&file_states);
|
||||||
|
let work_root_clone = work_root.clone();
|
||||||
|
let bucket_name_clone = bucket_name.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::save_file_states_static(&states_clone, &work_root_clone, &bucket_name_clone).await {
|
||||||
|
warn!("Failed to save file states after indexing update: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(_)) => {
|
Ok(Ok(_)) => {
|
||||||
debug!(
|
debug!(
|
||||||
|
|
@ -1375,8 +1441,10 @@ impl DriveMonitor {
|
||||||
}
|
}
|
||||||
for (path, mut state) in current_files {
|
for (path, mut state) in current_files {
|
||||||
if let Some(previous) = file_states.get(&path) {
|
if let Some(previous) = file_states.get(&path) {
|
||||||
if previous.indexed && state.etag == previous.etag {
|
if state.etag == previous.etag {
|
||||||
state.indexed = true;
|
state.indexed = previous.indexed;
|
||||||
|
state.fail_count = previous.fail_count;
|
||||||
|
state.last_failed_at = previous.last_failed_at;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
file_states.insert(path, state);
|
file_states.insert(path, state);
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue