refactor: Unify GBDialog compilation to use drive_files table
Some checks failed
BotServer CI/CD / build (push) Failing after 3m48s
Some checks failed
BotServer CI/CD / build (push) Failing after 3m48s
- Renamed LocalFileMonitor to DriveCompiler - DriveCompiler reads drive_files table to detect changes - Compiles .bas → .ast when etag changes (like GBKB/GBOT) - Unified flow: MinIO → drive_files → DriveCompiler → .ast
This commit is contained in:
parent
bc37ffc6a5
commit
2bc44413c5
4 changed files with 187 additions and 1961 deletions
172
src/drive/drive_compiler.rs
Normal file
172
src/drive/drive_compiler.rs
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
/// DriveCompiler - Unificado para compilar arquivos .bas do Drive (MinIO)
|
||||
///
|
||||
/// Fluxo:
|
||||
/// 1. DriveMonitor (S3) baixa .bas do MinIO para /opt/gbo/data/{bot}.gbai/{bot}.gbdialog/
|
||||
/// 2. DriveMonitor atualiza tabela drive_files com etag, last_modified
|
||||
/// 3. DriveCompiler lê drive_files, detecta mudanças, compila para /opt/gbo/work/
|
||||
/// 4. Compilados: .bas → .ast (Rhai)
|
||||
|
||||
use crate::basic::compiler::BasicCompiler;
|
||||
use crate::core::shared::state::AppState;
|
||||
use crate::core::shared::utils::get_work_path;
|
||||
use crate::drive::drive_files::{drive_files as drive_files_table, DriveFileRepository};
|
||||
use diesel::prelude::*;
|
||||
use log::{debug, error, info, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Estado de compilação de um arquivo
|
||||
#[derive(Debug, Clone)]
|
||||
struct CompileState {
|
||||
etag: String,
|
||||
compiled: bool,
|
||||
}
|
||||
|
||||
pub struct DriveCompiler {
|
||||
state: Arc<AppState>,
|
||||
work_root: PathBuf,
|
||||
is_processing: Arc<AtomicBool>,
|
||||
/// Últimos etags conhecidos: file_path -> etag
|
||||
last_etags: Arc<RwLock<HashMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl DriveCompiler {
|
||||
pub fn new(state: Arc<AppState>) -> Self {
|
||||
let work_root = PathBuf::from(get_work_path());
|
||||
|
||||
Self {
|
||||
state,
|
||||
work_root,
|
||||
is_processing: Arc::new(AtomicBool::new(false)),
|
||||
last_etags: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Iniciar loop de compilação baseado em drive_files
|
||||
pub async fn start_compiling(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
info!("DriveCompiler started - monitoring drive_files table for changes");
|
||||
|
||||
self.is_processing.store(true, Ordering::SeqCst);
|
||||
|
||||
let compiler = self.clone();
|
||||
|
||||
// Spawn loop que verifica drive_files a cada 30s
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||
|
||||
while compiler.is_processing.load(Ordering::SeqCst) {
|
||||
interval.tick().await;
|
||||
|
||||
if let Err(e) = compiler.check_and_compile().await {
|
||||
error!("DriveCompiler error: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verifica drive_files e compila .bas files mudaram
|
||||
async fn check_and_compile(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
use drive_files_table::dsl::*;
|
||||
use diesel::dsl::eq;
|
||||
|
||||
let mut conn = self.state.conn.get()?;
|
||||
|
||||
// Selecionar todos os arquivos .gbdialog/*.bas não compilados ou com etag diferente
|
||||
let files: Vec<(Uuid, String, String, Option<String>)> = drive_files_table
|
||||
.filter(file_type.eq("bas"))
|
||||
.filter(file_path.like("%.gbdialog/%"))
|
||||
.select((bot_id, file_path, file_type, etag.clone()))
|
||||
.load(&mut conn)?;
|
||||
|
||||
for (bot_id, file_path, _file_type, current_etag_opt) in files {
|
||||
let current_etag = current_etag_opt.unwrap_or_default();
|
||||
|
||||
// Verificar se precisa compilar
|
||||
let should_compile = {
|
||||
let etags = self.last_etags.read().await;
|
||||
etags.get(&file_path).map(|e| e != ¤t_etag).unwrap_or(true)
|
||||
};
|
||||
|
||||
if should_compile {
|
||||
debug!("DriveCompiler: {} changed, compiling...", file_path);
|
||||
|
||||
// Compilar
|
||||
if let Err(e) = self.compile_file(bot_id, &file_path).await {
|
||||
error!("Failed to compile {}: {}", file_path, e);
|
||||
} else {
|
||||
// Atualizar estado
|
||||
let mut etags = self.last_etags.write().await;
|
||||
etags.insert(file_path.clone(), current_etag.clone());
|
||||
|
||||
// Marcar como compilado na DB
|
||||
diesel::update(drive_files_table
|
||||
.filter(bot_id.eq(bot_id))
|
||||
.filter(file_path.eq(&file_path)))
|
||||
.set(indexed.eq(true))
|
||||
.execute(&mut conn)?;
|
||||
|
||||
info!("DriveCompiler: {} compiled successfully", file_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compilar um arquivo .bas → .ast
|
||||
async fn compile_file(&self, bot_id: Uuid, file_path: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
// Extrair nome do bot e tool
|
||||
// file_path: salesianos.gbai/salesianos.gbdialog/tool.bas
|
||||
let parts: Vec<&str> = file_path.split('/').collect();
|
||||
if parts.len() < 3 {
|
||||
return Err("Invalid file path format".into());
|
||||
}
|
||||
|
||||
let bot_name = parts[0].trim_end_matches(".gbai");
|
||||
let tool_name = parts.last().unwrap().trim_end_matches(".bas");
|
||||
|
||||
// Caminho do arquivo .bas em /opt/gbo/data/
|
||||
let bas_path = format!("/opt/gbo/data/{}.gbai/{}.gbdialog/{}.bas",
|
||||
bot_name, bot_name, tool_name);
|
||||
|
||||
// Ler conteúdo
|
||||
let content = std::fs::read_to_string(&bas_path)
|
||||
.map_err(|e| format!("Failed to read {}: {}", bas_path, e))?;
|
||||
|
||||
// Criar work dir
|
||||
let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name));
|
||||
std::fs::create_dir_all(&work_dir)?;
|
||||
|
||||
// Escrever .bas em work
|
||||
let work_bas_path = work_dir.join(format!("{}.bas", tool_name));
|
||||
std::fs::write(&work_bas_path, &content)?;
|
||||
|
||||
// Compilar com BasicCompiler
|
||||
let mut compiler = BasicCompiler::new(self.state.clone(), bot_id);
|
||||
compiler.compile_file(
|
||||
work_bas_path.to_str().ok_or("Invalid path")?,
|
||||
work_dir.to_str().ok_or("Invalid path")?
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for DriveCompiler {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
state: Arc::clone(&self.state),
|
||||
work_root: self.work_root.clone(),
|
||||
is_processing: Arc::clone(&self.is_processing),
|
||||
last_etags: Arc::clone(&self.last_etags),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,565 +0,0 @@
|
|||
use crate::basic::compiler::BasicCompiler;
|
||||
use crate::core::kb::{EmbeddingConfig, KnowledgeBaseManager};
|
||||
use crate::core::shared::state::AppState;
|
||||
use crate::core::shared::utils::get_work_path;
|
||||
use diesel::prelude::*;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::Duration;
|
||||
use notify::{RecursiveMode, EventKind, RecommendedWatcher, Watcher};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct LocalFileState {
|
||||
modified: SystemTime,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
/// Tracks state of a KB folder for change detection
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct KbFolderState {
|
||||
/// Combined hash of all file mtimes and sizes in the folder tree
|
||||
content_hash: u64,
|
||||
/// Number of files indexed last time
|
||||
file_count: usize,
|
||||
}
|
||||
|
||||
pub struct LocalFileMonitor {
|
||||
state: Arc<AppState>,
|
||||
data_dir: PathBuf,
|
||||
work_root: PathBuf,
|
||||
file_states: Arc<RwLock<HashMap<String, LocalFileState>>>,
|
||||
kb_states: Arc<RwLock<HashMap<String, KbFolderState>>>,
|
||||
is_processing: Arc<AtomicBool>,
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
kb_manager: Option<Arc<KnowledgeBaseManager>>,
|
||||
}
|
||||
|
||||
impl LocalFileMonitor {
|
||||
pub fn new(state: Arc<AppState>) -> Self {
|
||||
let work_root = PathBuf::from(get_work_path());
|
||||
let data_dir = PathBuf::from("/opt/gbo/data");
|
||||
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
let kb_manager = match &state.kb_manager {
|
||||
Some(km) => Some(Arc::clone(km)),
|
||||
None => {
|
||||
debug!("KB manager not available in LocalFileMonitor");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Initializing with data_dir: {:?}, work_root: {:?}", data_dir, work_root);
|
||||
|
||||
Self {
|
||||
state,
|
||||
data_dir,
|
||||
work_root,
|
||||
file_states: Arc::new(RwLock::new(HashMap::new())),
|
||||
kb_states: Arc::new(RwLock::new(HashMap::new())),
|
||||
is_processing: Arc::new(AtomicBool::new(false)),
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
kb_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_monitoring(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
info!("Local file monitor started - watching /opt/gbo/data/*.gbai directories");
|
||||
|
||||
// Create data directory if it doesn't exist
|
||||
if let Err(e) = tokio::fs::create_dir_all(&self.data_dir).await {
|
||||
warn!("Failed to create data directory: {}", e);
|
||||
}
|
||||
|
||||
// Load persisted file states from disk
|
||||
self.load_states().await;
|
||||
|
||||
// Initial scan of all .gbai directories
|
||||
self.scan_and_compile_all().await?;
|
||||
|
||||
// Persist states back to disk
|
||||
self.save_states().await;
|
||||
|
||||
self.is_processing.store(true, Ordering::SeqCst);
|
||||
|
||||
// Spawn the monitoring loop
|
||||
let monitor = self.clone();
|
||||
tokio::spawn(async move {
|
||||
monitor.monitoring_loop().await;
|
||||
});
|
||||
|
||||
debug!("Local file monitor successfully initialized");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn monitoring_loop(&self) {
|
||||
trace!("Starting monitoring loop");
|
||||
|
||||
// Try to create a file system watcher
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
// Use notify crate for file system watching
|
||||
let tx_clone = tx.clone();
|
||||
let mut watcher: RecommendedWatcher = match RecommendedWatcher::new(
|
||||
move |res| {
|
||||
if let Ok(event) = res {
|
||||
let _ = tx_clone.try_send(event);
|
||||
}
|
||||
},
|
||||
notify::Config::default(),
|
||||
) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
error!("Failed to create watcher: {}. Falling back to polling.", e);
|
||||
// Fall back to polling if watcher creation fails
|
||||
self.polling_loop().await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Watch the data directory
|
||||
if let Err(e) = watcher.watch(&self.data_dir, RecursiveMode::Recursive) {
|
||||
warn!("Failed to watch directory {:?}: {}. Using polling fallback.", self.data_dir, e);
|
||||
drop(watcher);
|
||||
self.polling_loop().await;
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("Watching directory: {:?}", self.data_dir);
|
||||
|
||||
while self.is_processing.load(Ordering::SeqCst) {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
|
||||
// Process events from the watcher
|
||||
while let Ok(event) = rx.try_recv() {
|
||||
match event.kind {
|
||||
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Any => {
|
||||
for path in &event.paths {
|
||||
if self.is_gbdialog_file(path) {
|
||||
debug!("Detected change in: {:?}", path);
|
||||
if let Err(e) = self.compile_local_file(path).await {
|
||||
error!("Failed to compile {:?}: {}", path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EventKind::Remove(_) => {
|
||||
for path in &event.paths {
|
||||
if self.is_gbdialog_file(path) {
|
||||
debug!("File removed: {:?}", path);
|
||||
self.remove_file_state(path).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Monitoring loop ended");
|
||||
}
|
||||
|
||||
async fn polling_loop(&self) {
|
||||
trace!("Using polling fallback (checking every 60s)");
|
||||
|
||||
while self.is_processing.load(Ordering::SeqCst) {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
|
||||
if let Err(e) = self.scan_and_compile_all().await {
|
||||
error!("Scan failed: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_gbdialog_file(&self, path: &Path) -> bool {
|
||||
// Check if path is something like /opt/gbo/data/*.gbai/*.gbdialog/*.bas
|
||||
path.extension()
|
||||
.and_then(|e| e.to_str())
|
||||
.map(|e| e.eq_ignore_ascii_case("bas"))
|
||||
.unwrap_or(false)
|
||||
&& path.ancestors()
|
||||
.any(|p| {
|
||||
p.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.map(|n| n.ends_with(".gbdialog"))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
|
||||
async fn scan_and_compile_all(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
trace!("Scanning directory: {:?}", self.data_dir);
|
||||
|
||||
let entries = match tokio::fs::read_dir(&self.data_dir).await {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
debug!("[LOCAL_MONITOR] Cannot read data directory: {}", e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let mut entries = entries;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
|
||||
// Check if this is a .gbai directory
|
||||
if path.extension()
|
||||
.and_then(|e| e.to_str())
|
||||
.map(|e| e.eq_ignore_ascii_case("gbai"))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let bot_name = path.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("unknown");
|
||||
|
||||
// Look for <botname>.gbdialog folder inside (e.g., cristo.gbai/cristo.gbdialog)
|
||||
let gbdialog_path = path.join(format!("{}.gbdialog", bot_name));
|
||||
if gbdialog_path.exists() {
|
||||
self.compile_gbdialog(bot_name, &gbdialog_path).await?;
|
||||
}
|
||||
|
||||
// Index .gbkb folders
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
{
|
||||
if let Some(ref kb_manager) = self.kb_manager {
|
||||
let gbkb_path = path.join(format!("{}.gbkb", bot_name));
|
||||
if gbkb_path.exists() {
|
||||
if let Err(e) = self.index_gbkb_folder(bot_name, &gbkb_path, kb_manager).await {
|
||||
error!("Failed to index .gbkb folder {:?}: {}", gbkb_path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
async fn index_gbkb_folder(
|
||||
&self,
|
||||
bot_name: &str,
|
||||
gbkb_path: &Path,
|
||||
_kb_manager: &Arc<KnowledgeBaseManager>,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
// Get bot_id from database
|
||||
let bot_id = {
|
||||
use crate::core::shared::models::schema::bots::dsl::*;
|
||||
let mut conn = self.state.conn.get()
|
||||
.map_err(|e| format!("Failed to get DB connection: {}", e))?;
|
||||
|
||||
bots.filter(name.eq(bot_name))
|
||||
.select(id)
|
||||
.first::<Uuid>(&mut *conn)
|
||||
.map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name, e))?
|
||||
};
|
||||
|
||||
// Load bot-specific embedding config from database
|
||||
let embedding_config = EmbeddingConfig::from_bot_config(&self.state.conn, &bot_id);
|
||||
|
||||
// Compute content hash of the entire .gbkb tree
|
||||
let (content_hash, file_count) = self.compute_gbkb_hash(gbkb_path).await?;
|
||||
|
||||
// Index each KB folder inside .gbkb (e.g., carta, proc)
|
||||
let entries = tokio::fs::read_dir(gbkb_path).await?;
|
||||
let mut entries = entries;
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let kb_folder_path = entry.path();
|
||||
|
||||
if kb_folder_path.is_dir() {
|
||||
if let Some(kb_name) = kb_folder_path.file_name().and_then(|n| n.to_str()) {
|
||||
let kb_key = format!("{}:{}", bot_name, kb_name);
|
||||
|
||||
// Check if KB content changed since last index
|
||||
let should_index = {
|
||||
let states = self.kb_states.read().await;
|
||||
states.get(&kb_key)
|
||||
.map(|state| state.content_hash != content_hash || state.file_count != file_count)
|
||||
.unwrap_or(true)
|
||||
};
|
||||
|
||||
if !should_index {
|
||||
debug!("KB '{}' for bot '{}' unchanged, skipping re-index", kb_name, bot_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("Indexing KB '{}' for bot '{}'", kb_name, bot_name);
|
||||
|
||||
// Create a temporary KbIndexer with the bot-specific config
|
||||
let qdrant_config = crate::core::kb::QdrantConfig::from_config(self.state.conn.clone(), &bot_id);
|
||||
let indexer = crate::core::kb::KbIndexer::new(embedding_config.clone(), qdrant_config);
|
||||
|
||||
if let Err(e) = indexer.index_kb_folder(
|
||||
bot_id,
|
||||
bot_name,
|
||||
kb_name,
|
||||
&kb_folder_path,
|
||||
).await {
|
||||
error!("Failed to index KB '{}' for bot '{}': {}", kb_name, bot_name, e);
|
||||
}
|
||||
|
||||
// Update state to mark as indexed
|
||||
let mut states = self.kb_states.write().await;
|
||||
states.insert(kb_key, KbFolderState { content_hash, file_count });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compute a simple hash over all file metadata in a folder tree
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
async fn compute_gbkb_hash(&self, root: &Path) -> Result<(u64, usize), Box<dyn Error + Send + Sync>> {
|
||||
let mut hash: u64 = 0;
|
||||
let mut file_count: usize = 0;
|
||||
|
||||
let mut stack = vec![root.to_path_buf()];
|
||||
while let Some(dir) = stack.pop() {
|
||||
let mut entries = tokio::fs::read_dir(&dir).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
stack.push(path);
|
||||
} else if let Ok(meta) = tokio::fs::metadata(&path).await {
|
||||
let mtime = meta.modified()
|
||||
.map(|t| t.duration_since(SystemTime::UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0))
|
||||
.unwrap_or(0);
|
||||
let size = meta.len();
|
||||
// Simple combinatorial hash
|
||||
hash = hash.wrapping_mul(31).wrapping_add(mtime.wrapping_mul(37).wrapping_add(size));
|
||||
file_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((hash, file_count))
|
||||
}
|
||||
|
||||
async fn compile_gbdialog(&self, bot_name: &str, gbdialog_path: &Path) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let entries = tokio::fs::read_dir(gbdialog_path).await?;
|
||||
let mut entries = entries;
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
|
||||
if path.extension()
|
||||
.and_then(|e| e.to_str())
|
||||
.map(|e| e.eq_ignore_ascii_case("bas"))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let metadata = tokio::fs::metadata(&path).await?;
|
||||
let modified = metadata.modified()?;
|
||||
let size = metadata.len();
|
||||
|
||||
let file_key = path.to_string_lossy().to_string();
|
||||
|
||||
// Check if file changed
|
||||
let should_compile = {
|
||||
let states = self.file_states.read().await;
|
||||
states.get(&file_key)
|
||||
.map(|state| state.modified != modified || state.size != size)
|
||||
.unwrap_or(true)
|
||||
};
|
||||
|
||||
if should_compile {
|
||||
info!("Compiling bot: {}", bot_name);
|
||||
debug!("Recompiling {:?} - modification detected", path);
|
||||
if let Err(e) = self.compile_local_file(&path).await {
|
||||
error!("Failed to compile {:?}: {}", path, e);
|
||||
}
|
||||
|
||||
// Update state
|
||||
let mut states = self.file_states.write().await;
|
||||
states.insert(file_key, LocalFileState { modified, size });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn compile_local_file(&self, file_path: &Path) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let tool_name = file_path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("unknown");
|
||||
|
||||
// Extract bot name from path like /opt/gbo/data/cristo.gbai/.gbdialog/file.bas
|
||||
let bot_name = file_path
|
||||
.ancestors()
|
||||
.find(|p| p.extension().and_then(|e| e.to_str()).map(|e| e.eq_ignore_ascii_case("gbai")).unwrap_or(false))
|
||||
.and_then(|p| p.file_stem())
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("unknown");
|
||||
|
||||
// Create work directory structure in botserver/work (not in data/)
|
||||
let work_dir = self.work_root.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name));
|
||||
|
||||
// Read the file content
|
||||
let source_content = tokio::fs::read_to_string(file_path).await?;
|
||||
|
||||
// Compile the file
|
||||
let state_clone = Arc::clone(&self.state);
|
||||
let work_dir_clone = work_dir.clone();
|
||||
let tool_name_clone = tool_name.to_string();
|
||||
let source_content_clone = source_content.clone();
|
||||
let bot_name_clone = bot_name.to_string();
|
||||
|
||||
// Get the actual bot_id from the database for this bot_name
|
||||
let bot_id = {
|
||||
use crate::core::shared::models::schema::bots::dsl::*;
|
||||
let mut conn = state_clone.conn.get()
|
||||
.map_err(|e| format!("Failed to get DB connection: {}", e))?;
|
||||
|
||||
bots.filter(name.eq(&bot_name_clone))
|
||||
.select(id)
|
||||
.first::<Uuid>(&mut *conn)
|
||||
.map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name_clone, e))?
|
||||
};
|
||||
|
||||
let elapsed_ms = tokio::task::spawn_blocking(move || {
|
||||
std::fs::create_dir_all(&work_dir_clone)?;
|
||||
let local_source_path = work_dir_clone.join(format!("{}.bas", tool_name_clone));
|
||||
std::fs::write(&local_source_path, &source_content_clone)?;
|
||||
let mut compiler = BasicCompiler::new(state_clone, bot_id);
|
||||
let local_source_str = local_source_path.to_str()
|
||||
.ok_or_else(|| "Invalid UTF-8 in local source path".to_string())?;
|
||||
let work_dir_str = work_dir_clone.to_str()
|
||||
.ok_or_else(|| "Invalid UTF-8 in work directory path".to_string())?;
|
||||
let start_time = std::time::Instant::now();
|
||||
let result = compiler.compile_file(local_source_str, work_dir_str)?;
|
||||
let elapsed_ms = start_time.elapsed().as_millis();
|
||||
if let Some(mcp_tool) = result.mcp_tool {
|
||||
trace!(
|
||||
"[LOCAL_MONITOR] MCP tool generated with {} parameters for bot {}",
|
||||
mcp_tool.input_schema.properties.len(),
|
||||
bot_name_clone
|
||||
);
|
||||
}
|
||||
Ok::<u128, Box<dyn Error + Send + Sync>>(elapsed_ms)
|
||||
})
|
||||
.await??;
|
||||
|
||||
info!("Successfully compiled: {:?} in {} ms", file_path, elapsed_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_file_state(&self, path: &Path) {
|
||||
let file_key = path.to_string_lossy().to_string();
|
||||
let mut states = self.file_states.write().await;
|
||||
states.remove(&file_key);
|
||||
}
|
||||
|
||||
/// Persist file states and KB states to disk for survival across restarts
|
||||
async fn save_states(&self) {
|
||||
if let Err(e) = tokio::fs::create_dir_all(&self.work_root).await {
|
||||
warn!("Failed to create work directory: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Persist file states
|
||||
let file_states_file = self.work_root.join("local_file_states.json");
|
||||
{
|
||||
let states = self.file_states.read().await;
|
||||
match serde_json::to_string_pretty(&*states) {
|
||||
Ok(json) => {
|
||||
if let Err(e) = tokio::fs::write(&file_states_file, json).await {
|
||||
warn!("Failed to persist file states: {}", e);
|
||||
} else {
|
||||
debug!("Persisted {} file states to disk", states.len());
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Failed to serialize file states: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// Persist KB states
|
||||
let kb_states_file = self.work_root.join("local_kb_states.json");
|
||||
{
|
||||
let states = self.kb_states.read().await;
|
||||
match serde_json::to_string_pretty(&*states) {
|
||||
Ok(json) => {
|
||||
if let Err(e) = tokio::fs::write(&kb_states_file, json).await {
|
||||
warn!("Failed to persist KB states: {}", e);
|
||||
} else {
|
||||
debug!("Persisted {} KB states to disk", states.len());
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Failed to serialize KB states: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Load file states and KB states from disk
|
||||
async fn load_states(&self) {
|
||||
if let Err(e) = tokio::fs::create_dir_all(&self.work_root).await {
|
||||
warn!("Failed to create work directory: {}", e);
|
||||
}
|
||||
|
||||
// Load file states
|
||||
let file_states_file = self.work_root.join("local_file_states.json");
|
||||
match tokio::fs::read_to_string(&file_states_file).await {
|
||||
Ok(json) => {
|
||||
match serde_json::from_str::<HashMap<String, LocalFileState>>(&json) {
|
||||
Ok(states) => {
|
||||
let count = states.len();
|
||||
*self.file_states.write().await = states;
|
||||
info!("Loaded {} persisted file states from disk", count);
|
||||
}
|
||||
Err(e) => warn!("Failed to parse persisted file states: {}", e),
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("No persisted file states found, starting fresh");
|
||||
}
|
||||
}
|
||||
|
||||
// Load KB states
|
||||
let kb_states_file = self.work_root.join("local_kb_states.json");
|
||||
match tokio::fs::read_to_string(&kb_states_file).await {
|
||||
Ok(json) => {
|
||||
match serde_json::from_str::<HashMap<String, KbFolderState>>(&json) {
|
||||
Ok(states) => {
|
||||
let count = states.len();
|
||||
*self.kb_states.write().await = states;
|
||||
info!("Loaded {} persisted KB states from disk", count);
|
||||
}
|
||||
Err(e) => warn!("Failed to parse persisted KB states: {}", e),
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("No persisted KB states found, starting fresh");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop_monitoring(&self) {
|
||||
trace!("Stopping local file monitor");
|
||||
self.is_processing.store(false, Ordering::SeqCst);
|
||||
self.save_states().await;
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for LocalFileMonitor {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
state: Arc::clone(&self.state),
|
||||
data_dir: self.data_dir.clone(),
|
||||
work_root: self.work_root.clone(),
|
||||
file_states: Arc::clone(&self.file_states),
|
||||
kb_states: Arc::clone(&self.kb_states),
|
||||
is_processing: Arc::clone(&self.is_processing),
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
kb_manager: self.kb_manager.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
1389
src/drive/mod.rs
1389
src/drive/mod.rs
File diff suppressed because it is too large
Load diff
|
|
@ -891,12 +891,13 @@ pub async fn start_background_services(
|
|||
trace!("ensure_llama_servers_running completed");
|
||||
}
|
||||
|
||||
// Start DriveMonitor for S3/MinIO file watching
|
||||
// Start DriveMonitor for S3/MinIO file watching and syncing
|
||||
#[cfg(feature = "drive")]
|
||||
start_drive_monitors(app_state.clone(), _pool).await;
|
||||
|
||||
// Start LocalFileMonitor to compile .bas files from /opt/gbo/data to /opt/gbo/work
|
||||
start_local_file_monitor(app_state.clone()).await;
|
||||
// Start DriveCompiler to compile .bas files from drive_files table
|
||||
#[cfg(feature = "drive")]
|
||||
start_drive_compiler(app_state.clone()).await;
|
||||
// start_config_watcher(app_state.clone()).await;
|
||||
}
|
||||
|
||||
|
|
@ -1129,15 +1130,16 @@ fn create_bot_from_drive(
|
|||
}
|
||||
|
||||
|
||||
// LocalFileMonitor compiles .bas files from /opt/gbo/data to /opt/gbo/work
|
||||
async fn start_local_file_monitor(app_state: Arc<AppState>) {
|
||||
use crate::drive::local_file_monitor::LocalFileMonitor;
|
||||
// DriveCompiler compiles .bas files based on drive_files table changes
|
||||
#[cfg(feature = "drive")]
|
||||
async fn start_drive_compiler(app_state: Arc<AppState>) {
|
||||
use crate::drive::drive_compiler::DriveCompiler;
|
||||
|
||||
let monitor = LocalFileMonitor::new(app_state.clone());
|
||||
if let Err(e) = monitor.start_monitoring().await {
|
||||
error!("Failed to start LocalFileMonitor: {}", e);
|
||||
let compiler = DriveCompiler::new(app_state.clone());
|
||||
if let Err(e) = compiler.start_compiling().await {
|
||||
error!("Failed to start DriveCompiler: {}", e);
|
||||
} else {
|
||||
trace!("LocalFileMonitor started - monitoring /opt/gbo/data for bot changes");
|
||||
trace!("DriveCompiler started - compiling .bas files from drive_files");
|
||||
}
|
||||
}
|
||||
//
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue