feat: dual-mode service configs - Vault first, fallback to DB/localhost
Some checks failed
BotServer CI/CD / build (push) Has been cancelled
Some checks failed
BotServer CI/CD / build (push) Has been cancelled
All services now try Vault first (remote/distributed mode), then fall back to database config, then localhost defaults (local/dev mode). Services fixed: - Qdrant/VectorDB: kb_indexer.rs, kb_statistics.rs, bootstrap_utils.rs, kb_context.rs - LLM/Embedding: email/vectordb.rs (was hardcoded localhost:8082) - All services: security/integration.rs (postgres, cache, drive, directory, qdrant, llm) Pattern: SecretsManager::get_X_config_sync() → DB config → localhost default
This commit is contained in:
parent
edff5de662
commit
6f183c63d2
8 changed files with 217 additions and 86 deletions
|
|
@ -227,10 +227,14 @@ async fn get_kb_statistics(
|
|||
state: &AppState,
|
||||
user: &UserSession,
|
||||
) -> Result<KBStatistics, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
|
||||
sm.get_vectordb_config_sync().0
|
||||
} else {
|
||||
let config_manager = ConfigManager::new(state.conn.clone());
|
||||
let qdrant_url = config_manager
|
||||
config_manager
|
||||
.get_config(&user.bot_id, "vectordb-url", Some("https://localhost:6333"))
|
||||
.unwrap_or_else(|_| "https://localhost:6333".to_string());
|
||||
.unwrap_or_else(|_| "https://localhost:6333".to_string())
|
||||
};
|
||||
let client = create_tls_client(Some(30));
|
||||
|
||||
let collections_response = client
|
||||
|
|
@ -282,10 +286,14 @@ async fn get_collection_statistics(
|
|||
state: &AppState,
|
||||
collection_name: &str,
|
||||
) -> Result<CollectionStats, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
|
||||
sm.get_vectordb_config_sync().0
|
||||
} else {
|
||||
let config_manager = ConfigManager::new(state.conn.clone());
|
||||
let qdrant_url = config_manager
|
||||
config_manager
|
||||
.get_config(&uuid::Uuid::nil(), "vectordb-url", Some("https://localhost:6333"))
|
||||
.unwrap_or_else(|_| "https://localhost:6333".to_string());
|
||||
.unwrap_or_else(|_| "https://localhost:6333".to_string())
|
||||
};
|
||||
let client = create_tls_client(Some(30));
|
||||
|
||||
let response = client
|
||||
|
|
@ -367,10 +375,14 @@ async fn list_collections(
|
|||
state: &AppState,
|
||||
user: &UserSession,
|
||||
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let qdrant_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
|
||||
sm.get_vectordb_config_sync().0
|
||||
} else {
|
||||
let config_manager = ConfigManager::new(state.conn.clone());
|
||||
let qdrant_url = config_manager
|
||||
config_manager
|
||||
.get_config(&user.bot_id, "vectordb-url", Some("https://localhost:6333"))
|
||||
.unwrap_or_else(|_| "https://localhost:6333".to_string());
|
||||
.unwrap_or_else(|_| "https://localhost:6333".to_string())
|
||||
};
|
||||
let client = create_tls_client(Some(30));
|
||||
|
||||
let response = client
|
||||
|
|
|
|||
|
|
@ -159,9 +159,15 @@ pub fn cache_health_check() -> bool {
|
|||
|
||||
/// Check if Qdrant vector database is healthy
|
||||
pub fn vector_db_health_check() -> bool {
|
||||
let qdrant_url = if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
sm.get_vectordb_config_sync().0
|
||||
} else {
|
||||
"http://localhost:6333".to_string()
|
||||
};
|
||||
|
||||
let urls = [
|
||||
"http://localhost:6333/healthz",
|
||||
"https://localhost:6333/healthz",
|
||||
format!("{}/healthz", qdrant_url),
|
||||
qdrant_url.replace("http://", "https://") + "/healthz",
|
||||
];
|
||||
|
||||
for url in &urls {
|
||||
|
|
|
|||
|
|
@ -278,7 +278,16 @@ impl KbContextManager {
|
|||
|
||||
// Load embedding config from database for this bot
|
||||
let mut embedding_config = EmbeddingConfig::from_bot_config(&self.db_pool, &bot_id);
|
||||
let qdrant_config = QdrantConfig::default();
|
||||
let qdrant_config = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
|
||||
let (url, api_key) = sm.get_vectordb_config_sync();
|
||||
crate::core::kb::QdrantConfig {
|
||||
url,
|
||||
api_key,
|
||||
timeout_secs: 30,
|
||||
}
|
||||
} else {
|
||||
crate::core::kb::QdrantConfig::default()
|
||||
};
|
||||
|
||||
// Query Qdrant to get the collection's actual vector dimension
|
||||
let collection_dimension = self.get_collection_dimension(&qdrant_config, collection_name).await?;
|
||||
|
|
|
|||
|
|
@ -31,13 +31,18 @@ impl Default for QdrantConfig {
|
|||
|
||||
impl QdrantConfig {
|
||||
pub fn from_config(pool: DbPool, bot_id: &Uuid) -> Self {
|
||||
let (url, api_key) = if let Some(sm) = crate::core::shared::utils::get_secrets_manager_sync() {
|
||||
sm.get_vectordb_config_sync()
|
||||
} else {
|
||||
let config_manager = ConfigManager::new(pool);
|
||||
let url = config_manager
|
||||
.get_config(bot_id, "vectordb-url", Some("http://localhost:6333"))
|
||||
.unwrap_or_else(|_| "http://localhost:6333".to_string());
|
||||
(url, None)
|
||||
};
|
||||
Self {
|
||||
url,
|
||||
api_key: None,
|
||||
api_key,
|
||||
timeout_secs: 30,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -421,7 +421,12 @@ impl EmailEmbeddingGenerator {
|
|||
}
|
||||
|
||||
pub async fn generate_text_embedding(&self, text: &str) -> Result<Vec<f32>> {
|
||||
let embedding_url = "http://localhost:8082".to_string();
|
||||
let embedding_url = if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
let (llm_url, _, _, _, ollama_url) = sm.get_llm_config();
|
||||
if !ollama_url.is_empty() { ollama_url } else { llm_url }
|
||||
} else {
|
||||
"http://localhost:8082".to_string()
|
||||
};
|
||||
match self.generate_local_embedding(text, &embedding_url).await {
|
||||
Ok(embedding) => Ok(embedding),
|
||||
Err(e) => {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
use anyhow::anyhow;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AuthConfig {
|
||||
pub require_auth: bool,
|
||||
|
|
@ -56,21 +54,13 @@ impl AuthConfig {
|
|||
|
||||
if let Ok(secret) = std::env::var("VAULT_TOKEN") {
|
||||
if !secret.is_empty() {
|
||||
let rt = tokio::runtime::Runtime::new().ok();
|
||||
if let Some(rt) = rt {
|
||||
let sm = crate::core::shared::utils::get_secrets_manager_sync();
|
||||
if let Some(sm) = sm {
|
||||
let sm_clone = sm.clone();
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build();
|
||||
let result = match rt {
|
||||
Ok(rt) => rt.block_on(sm_clone.get_secret(crate::core::secrets::SecretPaths::JWT)),
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to create runtime: {}", e)),
|
||||
};
|
||||
let _ = tx.send(result);
|
||||
});
|
||||
if let Ok(Ok(secrets)) = rx.recv() {
|
||||
if let Ok(secrets) =
|
||||
rt.block_on(sm.get_secret(crate::core::secrets::SecretPaths::JWT))
|
||||
{
|
||||
if let Some(s) = secrets.get("secret") {
|
||||
config.jwt_secret = Some(s.clone());
|
||||
}
|
||||
|
|
@ -88,6 +78,7 @@ impl AuthConfig {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,108 @@ pub struct TlsIntegration {
|
|||
|
||||
impl TlsIntegration {
|
||||
pub fn new(tls_enabled: bool) -> Self {
|
||||
let (qdrant_url, _) = if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
sm.get_vectordb_config_sync()
|
||||
} else {
|
||||
("http://localhost:6333".to_string(), None)
|
||||
};
|
||||
let qdrant_secure = qdrant_url.replace("http://", "https://");
|
||||
let qdrant_port: u16 = qdrant_url
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(6333);
|
||||
let qdrant_tls_port: u16 = qdrant_secure
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(6334);
|
||||
|
||||
let (llm_url, _, _, _, _) = if let Ok(sm) = crate::core::secrets::SecretsManager::from_env()
|
||||
{
|
||||
sm.get_llm_config()
|
||||
} else {
|
||||
(
|
||||
"http://localhost:8081".to_string(),
|
||||
"gpt-4".to_string(),
|
||||
None,
|
||||
None,
|
||||
"http://localhost:11434".to_string(),
|
||||
)
|
||||
};
|
||||
let llm_secure = llm_url.replace("http://", "https://");
|
||||
let llm_port: u16 = llm_url
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(8081);
|
||||
let llm_tls_port: u16 = llm_secure
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(8444);
|
||||
|
||||
let (cache_host, cache_port, _) =
|
||||
if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
sm.get_cache_config()
|
||||
} else {
|
||||
("localhost".to_string(), 6379, None)
|
||||
};
|
||||
let cache_tls_port = cache_port + 1;
|
||||
|
||||
let (db_host, db_port, _, _, _) =
|
||||
if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
sm.get_database_config_sync()
|
||||
} else {
|
||||
(
|
||||
"localhost".to_string(),
|
||||
5432,
|
||||
"botserver".to_string(),
|
||||
"gbuser".to_string(),
|
||||
"changeme".to_string(),
|
||||
)
|
||||
};
|
||||
let db_tls_port = db_port + 1;
|
||||
|
||||
let (drive_host, _drive_accesskey, _drive_secret) =
|
||||
if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
sm.get_drive_config()
|
||||
} else {
|
||||
(
|
||||
"localhost:9100".to_string(),
|
||||
"minioadmin".to_string(),
|
||||
"minioadmin".to_string(),
|
||||
)
|
||||
};
|
||||
let drive_port: u16 = drive_host
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(9100);
|
||||
|
||||
let (directory_url, _, _, _) =
|
||||
if let Ok(sm) = crate::core::secrets::SecretsManager::from_env() {
|
||||
sm.get_directory_config_sync()
|
||||
} else {
|
||||
(
|
||||
"http://localhost:9000".to_string(),
|
||||
String::new(),
|
||||
String::new(),
|
||||
String::new(),
|
||||
)
|
||||
};
|
||||
let directory_secure = directory_url.replace("http://", "https://");
|
||||
let directory_port: u16 = directory_url
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(9000);
|
||||
let directory_tls_port: u16 = directory_secure
|
||||
.split(':')
|
||||
.last()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(8446);
|
||||
|
||||
let mut services = HashMap::new();
|
||||
|
||||
services.insert(
|
||||
|
|
@ -45,10 +147,10 @@ impl TlsIntegration {
|
|||
services.insert(
|
||||
"llm".to_string(),
|
||||
ServiceUrls {
|
||||
original: "http://localhost:8081".to_string(),
|
||||
secure: "https://localhost:8444".to_string(),
|
||||
port: 8081,
|
||||
tls_port: 8444,
|
||||
original: llm_url,
|
||||
secure: llm_secure,
|
||||
port: llm_port,
|
||||
tls_port: llm_tls_port,
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -65,50 +167,50 @@ impl TlsIntegration {
|
|||
services.insert(
|
||||
"qdrant".to_string(),
|
||||
ServiceUrls {
|
||||
original: "http://localhost:6333".to_string(),
|
||||
secure: "https://localhost:6334".to_string(),
|
||||
port: 6333,
|
||||
tls_port: 6334,
|
||||
original: qdrant_url,
|
||||
secure: qdrant_secure,
|
||||
port: qdrant_port,
|
||||
tls_port: qdrant_tls_port,
|
||||
},
|
||||
);
|
||||
|
||||
services.insert(
|
||||
"redis".to_string(),
|
||||
ServiceUrls {
|
||||
original: "redis://localhost:6379".to_string(),
|
||||
secure: "rediss://localhost:6380".to_string(),
|
||||
port: 6379,
|
||||
tls_port: 6380,
|
||||
original: format!("redis://{}:{}", cache_host, cache_port),
|
||||
secure: format!("rediss://{}:{}", cache_host, cache_tls_port),
|
||||
port: cache_port,
|
||||
tls_port: cache_tls_port,
|
||||
},
|
||||
);
|
||||
|
||||
services.insert(
|
||||
"postgres".to_string(),
|
||||
ServiceUrls {
|
||||
original: "postgres://localhost:5432".to_string(),
|
||||
secure: "postgres://localhost:5433?sslmode=require".to_string(),
|
||||
port: 5432,
|
||||
tls_port: 5433,
|
||||
original: format!("postgres://{}:{}", db_host, db_port),
|
||||
secure: format!("postgres://{}:{}?sslmode=require", db_host, db_tls_port),
|
||||
port: db_port,
|
||||
tls_port: db_tls_port,
|
||||
},
|
||||
);
|
||||
|
||||
services.insert(
|
||||
"minio".to_string(),
|
||||
ServiceUrls {
|
||||
original: "https://localhost:9100".to_string(),
|
||||
secure: "https://localhost:9100".to_string(),
|
||||
port: 9100,
|
||||
tls_port: 9100,
|
||||
original: format!("https://{}", drive_host),
|
||||
secure: format!("https://{}", drive_host),
|
||||
port: drive_port,
|
||||
tls_port: drive_port,
|
||||
},
|
||||
);
|
||||
|
||||
services.insert(
|
||||
"directory".to_string(),
|
||||
ServiceUrls {
|
||||
original: "http://localhost:9000".to_string(),
|
||||
secure: "https://localhost:8446".to_string(),
|
||||
port: 9000,
|
||||
tls_port: 8446,
|
||||
original: directory_url,
|
||||
secure: directory_secure,
|
||||
port: directory_port,
|
||||
tls_port: directory_tls_port,
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -123,8 +225,9 @@ impl TlsIntegration {
|
|||
|
||||
pub fn load_ca_cert(&mut self, ca_path: &Path) -> Result<()> {
|
||||
if ca_path.exists() {
|
||||
let ca_cert_pem = fs::read(ca_path)
|
||||
.with_context(|| format!("Failed to read CA certificate from {}", ca_path.display()))?;
|
||||
let ca_cert_pem = fs::read(ca_path).with_context(|| {
|
||||
format!("Failed to read CA certificate from {}", ca_path.display())
|
||||
})?;
|
||||
|
||||
let ca_cert =
|
||||
Certificate::from_pem(&ca_cert_pem).context("Failed to parse CA certificate")?;
|
||||
|
|
@ -145,11 +248,13 @@ impl TlsIntegration {
|
|||
key_path: &Path,
|
||||
) -> Result<()> {
|
||||
if cert_path.exists() && key_path.exists() {
|
||||
let cert = fs::read(cert_path)
|
||||
.with_context(|| format!("Failed to read client cert from {}", cert_path.display()))?;
|
||||
let cert = fs::read(cert_path).with_context(|| {
|
||||
format!("Failed to read client cert from {}", cert_path.display())
|
||||
})?;
|
||||
|
||||
let key = fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read client key from {}", key_path.display()))?;
|
||||
let key = fs::read(key_path).with_context(|| {
|
||||
format!("Failed to read client key from {}", key_path.display())
|
||||
})?;
|
||||
|
||||
let identity = Identity::from_pem(&[&cert[..], &key[..]].concat())
|
||||
.context("Failed to create client identity")?;
|
||||
|
|
@ -209,8 +314,6 @@ impl TlsIntegration {
|
|||
builder = builder.identity(identity.clone());
|
||||
}
|
||||
|
||||
|
||||
|
||||
if self.https_only {
|
||||
builder = builder.https_only(true);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -318,7 +318,7 @@ impl VectorDBIndexer {
|
|||
|
||||
match self.embedding_generator.generate_text_embedding(text).await {
|
||||
Ok(embedding) => {
|
||||
if let Err(e) = email_db.index_email(&email, embedding).await {
|
||||
if let Err(e) = email_db.index_email(email, embedding).await {
|
||||
error!("Failed to index email {}: {}", email.id, e);
|
||||
} else {
|
||||
info!(" Indexed email: {}", email.subject);
|
||||
|
|
@ -370,7 +370,7 @@ impl VectorDBIndexer {
|
|||
for chunk in files.chunks(self.batch_size) {
|
||||
for file in chunk {
|
||||
let mime_type = file.mime_type.as_deref().unwrap_or("");
|
||||
if !FileContentExtractor::should_index(&mime_type, file.file_size) {
|
||||
if !FileContentExtractor::should_index(mime_type, file.file_size) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -385,7 +385,7 @@ impl VectorDBIndexer {
|
|||
.await
|
||||
{
|
||||
Ok(embedding) => {
|
||||
if let Err(e) = drive_db.index_file(&file, embedding).await {
|
||||
if let Err(e) = drive_db.index_file(file, embedding).await {
|
||||
error!("Failed to index file {}: {}", file.id, e);
|
||||
} else {
|
||||
info!(" Indexed file: {}", file.file_name);
|
||||
|
|
@ -615,7 +615,7 @@ impl VectorDBIndexer {
|
|||
total_stats.errors += job.stats.errors;
|
||||
|
||||
if let Some(last_run) = job.stats.last_run {
|
||||
if total_stats.last_run.map_or(true, |lr| lr < last_run) {
|
||||
if total_stats.last_run.is_none_or(|lr| lr < last_run) {
|
||||
total_stats.last_run = Some(last_run);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue