fix: replace nested runtime block_on with new_current_thread to prevent panic
All checks were successful
BotServer CI/CD / build (push) Successful in 5m32s

Root cause: Handle::current().block_on() panics inside multi_thread runtime
with 'Cannot start a runtime from within a runtime' error.

Fix: All sync-to-async bridges now use tokio::runtime::Builder::new_current_thread()
instead of Handle::current().block_on(). Also changed SECRETS_MANAGER from
tokio::sync::RwLock to std::sync::RwLock to eliminate unnecessary async overhead.

Files: 14 files across keywords, secrets, utils, state, calendar, analytics, email
Impact: Fixes production crash during bot loading phase
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-03 09:17:23 -03:00
parent eece6831b4
commit f6a864aa67
14 changed files with 399 additions and 221 deletions

View file

@ -19,15 +19,22 @@ use crate::core::shared::state::AppState;
fn get_bot_context() -> (Uuid, Uuid) { fn get_bot_context() -> (Uuid, Uuid) {
let sm = crate::core::secrets::SecretsManager::from_env().ok(); let sm = crate::core::secrets::SecretsManager::from_env().ok();
let (org_id, bot_id) = if let Some(sm) = sm { let (org_id, bot_id) = if let Some(sm) = sm {
let rt = tokio::runtime::Handle::current();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
rt.block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
let org = sm.get_value("gbo/analytics", "default_org_id").await .enable_all()
.unwrap_or_else(|_| "system".to_string()); .build()
let bot = sm.get_value("gbo/analytics", "default_bot_id").await .ok();
.unwrap_or_else(|_| "system".to_string()); if let Some(rt) = rt {
(org, bot) rt.block_on(async {
}) let org = sm.get_value("gbo/analytics", "default_org_id").await
.unwrap_or_else(|_| "system".to_string());
let bot = sm.get_value("gbo/analytics", "default_bot_id").await
.unwrap_or_else(|_| "system".to_string());
(org, bot)
})
} else {
("system".to_string(), "system".to_string())
}
}) })
} else { } else {
("system".to_string(), "system".to_string()) ("system".to_string(), "system".to_string())

View file

@ -183,7 +183,11 @@ async fn translate_text(
text: &str, text: &str,
target_lang: &str, target_lang: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let llm_url = crate::core::shared::utils::get_secrets_manager_sync().and_then(|sm| { let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| rt.block_on(async { sm.get_value("gbo/llm", "url").await.ok() })) }).unwrap_or_else(|| "http://localhost:8081".to_string()); let llm_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager().await {
sm.get_value("gbo/llm", "url").await.unwrap_or_else(|_| "http://localhost:8081".to_string())
} else {
"http://localhost:8081".to_string()
};
let prompt = format!( let prompt = format!(
"Translate to {}. Return ONLY the translation:\n\n{}", "Translate to {}. Return ONLY the translation:\n\n{}",
target_lang, text target_lang, text
@ -232,7 +236,11 @@ async fn perform_ocr(image_path: &str) -> Result<String, Box<dyn std::error::Err
async fn analyze_sentiment( async fn analyze_sentiment(
text: &str, text: &str,
) -> Result<Dynamic, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Dynamic, Box<dyn std::error::Error + Send + Sync>> {
let llm_url = crate::core::shared::utils::get_secrets_manager_sync().and_then(|sm| { let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| rt.block_on(async { sm.get_value("gbo/llm", "url").await.ok() })) }).unwrap_or_else(|| "http://localhost:8081".to_string()); let llm_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager().await {
sm.get_value("gbo/llm", "url").await.unwrap_or_else(|_| "http://localhost:8081".to_string())
} else {
"http://localhost:8081".to_string()
};
let prompt = format!( let prompt = format!(
r#"Analyze sentiment. Return JSON only: r#"Analyze sentiment. Return JSON only:
{{"sentiment":"positive/negative/neutral","score":-100 to 100,"urgent":true/false}} {{"sentiment":"positive/negative/neutral","score":-100 to 100,"urgent":true/false}}
@ -337,7 +345,11 @@ async fn classify_text(
text: &str, text: &str,
categories: &[String], categories: &[String],
) -> Result<Dynamic, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Dynamic, Box<dyn std::error::Error + Send + Sync>> {
let llm_url = crate::core::shared::utils::get_secrets_manager_sync().and_then(|sm| { let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| rt.block_on(async { sm.get_value("gbo/llm", "url").await.ok() })) }).unwrap_or_else(|| "http://localhost:8081".to_string()); let llm_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager().await {
sm.get_value("gbo/llm", "url").await.unwrap_or_else(|_| "http://localhost:8081".to_string())
} else {
"http://localhost:8081".to_string()
};
let cats = categories.join(", "); let cats = categories.join(", ");
let prompt = format!( let prompt = format!(
r#"Classify into one of: {} r#"Classify into one of: {}

View file

@ -274,14 +274,21 @@ pub fn book_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let meeting_json = meeting_details.to_string(); let meeting_json = meeting_details.to_string();
let result = tokio::task::block_in_place(|| { let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move { let rt = tokio::runtime::Builder::new_current_thread()
execute_book_meeting( .enable_all()
&state_for_task, .build()
&user_for_task, .ok();
meeting_json, match rt {
attendees, Some(rt) => rt.block_on(async move {
) execute_book_meeting(
}) &state_for_task,
&user_for_task,
meeting_json,
attendees,
)
}),
None => Err("Failed to create runtime".into()),
}
}); });
match result { match result {
@ -318,13 +325,20 @@ pub fn book_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let state_for_task = Arc::clone(&state_clone3); let state_for_task = Arc::clone(&state_clone3);
let result = tokio::task::block_in_place(|| { let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move { let rt = tokio::runtime::Builder::new_current_thread()
check_availability( .enable_all()
&state_for_task, .build()
&date_str, .ok();
duration_minutes, match rt {
) Some(rt) => rt.block_on(async move {
}) check_availability(
&state_for_task,
&date_str,
duration_minutes,
)
}),
None => Err("Failed to create runtime".into()),
}
}); });
match result { match result {

View file

@ -16,8 +16,14 @@ pub fn create_draft_keyword(state: &AppState, _user: UserSession, engine: &mut E
let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text); let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text);
let result = let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) tokio::task::block_in_place(|| {
.map_err(|e| format!("Draft creation error: {e}"))?; let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("Runtime creation failed: {e}"))?;
rt.block_on(fut)
})
.map_err(|e| format!("Draft creation failed: {e}"))?;
Ok(Dynamic::from(result)) Ok(Dynamic::from(result))
}, },
) )

View file

@ -58,7 +58,13 @@ pub fn create_site_keyword(state: &AppState, user: UserSession, engine: &mut Eng
}; };
let fut = create_site(config, s3, bucket, bot_id, llm, params); let fut = create_site(config, s3, bucket, bot_id, llm, params);
let result = let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) tokio::task::block_in_place(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("Runtime creation failed: {}", e))?;
rt.block_on(fut)
})
.map_err(|e| format!("Site creation failed: {}", e))?; .map_err(|e| format!("Site creation failed: {}", e))?;
Ok(Dynamic::from(result)) Ok(Dynamic::from(result))
}, },

View file

@ -28,22 +28,29 @@ fn hear_block(state: &Arc<AppState>, session_id: uuid::Uuid, variable_name: &str
// Mark session as waiting and store metadata in Redis (for UI hints like menus) // Mark session as waiting and store metadata in Redis (for UI hints like menus)
let state_clone = Arc::clone(state); let state_clone = Arc::clone(state);
let var = variable_name.to_string(); let var = variable_name.to_string();
tokio::runtime::Handle::current().block_on(async move { tokio::task::block_in_place(|| {
{ let rt = tokio::runtime::Builder::new_current_thread()
let mut sm = state_clone.session_manager.lock().await; .enable_all()
sm.mark_waiting(session_id); .build();
} if let Ok(rt) = rt {
if let Some(redis) = &state_clone.cache { rt.block_on(async move {
if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { {
let key = format!("hear:{session_id}:{var}"); let mut sm = state_clone.session_manager.lock().await;
let _: Result<(), _> = redis::cmd("SET") sm.mark_waiting(session_id);
.arg(&key) }
.arg(wait_data.to_string()) if let Some(redis) = &state_clone.cache {
.arg("EX") if let Ok(mut conn) = redis.get_multiplexed_async_connection().await {
.arg(3600) let key = format!("hear:{session_id}:{var}");
.query_async(&mut conn) let _: Result<(), _> = redis::cmd("SET")
.await; .arg(&key)
} .arg(wait_data.to_string())
.arg("EX")
.arg(3600)
.query_async(&mut conn)
.await;
}
}
});
} }
}); });
@ -209,18 +216,25 @@ fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut E
let state_for_suggestions = Arc::clone(&state_clone); let state_for_suggestions = Arc::clone(&state_clone);
let opts_clone = options.clone(); let opts_clone = options.clone();
let bot_id_clone = bot_id; let bot_id_clone = bot_id;
tokio::runtime::Handle::current().block_on(async move { tokio::task::block_in_place(|| {
if let Some(redis) = &state_for_suggestions.cache { let rt = tokio::runtime::Builder::new_current_thread()
if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { .enable_all()
let key = format!("suggestions:{}:{}", bot_id_clone, session_id); .build();
for opt in &opts_clone { if let Ok(rt) = rt {
let _: Result<(), _> = redis::cmd("RPUSH") rt.block_on(async move {
.arg(&key) if let Some(redis) = &state_for_suggestions.cache {
.arg(json!({"text": opt, "value": opt}).to_string()) if let Ok(mut conn) = redis.get_multiplexed_async_connection().await {
.query_async(&mut conn) let key = format!("suggestions:{}:{}", bot_id_clone, session_id);
.await; for opt in &opts_clone {
let _: Result<(), _> = redis::cmd("RPUSH")
.arg(&key)
.arg(json!({"text": opt, "value": opt}).to_string())
.query_async(&mut conn)
.await;
}
}
} }
} });
} }
}); });

View file

@ -73,11 +73,18 @@ pub fn search_keyword(state: &AppState, user: UserSession, engine: &mut Engine)
}; };
let result = tokio::task::block_in_place(|| { let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
execute_search(&mut binding, &table_str, &query_str, limit_val) .enable_all()
}) .build()
}) .ok();
.map_err(|e| format!("Search error: {e}"))?; match rt {
Some(rt) => rt.block_on(async {
execute_search(&mut binding, &table_str, &query_str, limit_val)
})
.map_err(|e| format!("Search error: {e}")),
None => Err("Failed to create runtime".into()),
}
})?;
if let Some(results) = result.get("results") { if let Some(results) = result.get("results") {
let filtered = let filtered =

View file

@ -52,9 +52,15 @@ pub fn register_think_kb_keyword(
// Execute KB search in blocking thread // Execute KB search in blocking thread
let result = std::thread::spawn(move || { let result = std::thread::spawn(move || {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
think_kb_search(kb_manager, db_pool, session_id, bot_id, &query).await .enable_all()
}) .build();
match rt {
Ok(rt) => rt.block_on(async {
think_kb_search(kb_manager, db_pool, session_id, bot_id, &query).await
}),
Err(e) => Err(format!("Failed to create runtime: {}", e)),
}
}) })
.join(); .join();

View file

@ -35,15 +35,22 @@ fn register_talk_to(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let user_for_send = user.clone(); let user_for_send = user.clone();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
send_message_to_recipient( .enable_all()
state_for_send, .build()
&user_for_send, .ok();
&recipient, match rt {
&message, Some(rt) => rt.block_on(async {
) send_message_to_recipient(
.await state_for_send,
}) &user_for_send,
&recipient,
&message,
)
.await
}),
None => Err("Failed to create runtime".into()),
}
}) })
.map_err(|e| format!("Failed to send message: {}", e))?; .map_err(|e| format!("Failed to send message: {}", e))?;
@ -73,10 +80,17 @@ fn register_send_file_to(state: Arc<AppState>, user: UserSession, engine: &mut E
let user_for_send = Arc::clone(&user_clone); let user_for_send = Arc::clone(&user_clone);
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
send_file_to_recipient(state_for_send, &user_for_send, &recipient, file) .enable_all()
.await .build()
}) .ok();
match rt {
Some(rt) => rt.block_on(async {
send_file_to_recipient(state_for_send, &user_for_send, &recipient, file)
.await
}),
None => Err("Failed to create runtime".into()),
}
}) })
.map_err(|e| format!("Failed to send file: {}", e))?; .map_err(|e| format!("Failed to send file: {}", e))?;
@ -103,18 +117,25 @@ fn register_send_file_to(state: Arc<AppState>, user: UserSession, engine: &mut E
let user_for_send = Arc::clone(&user_clone2); let user_for_send = Arc::clone(&user_clone2);
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
send_file_with_caption_to_recipient( .enable_all()
state_for_send, .build()
&user_for_send, .ok();
&recipient, match rt {
file, Some(rt) => rt.block_on(async {
&caption, send_file_with_caption_to_recipient(
) state_for_send,
.await &user_for_send,
}) &recipient,
file,
&caption,
)
.await
}),
None => Err("Failed to create runtime".into()),
}
}) })
.map_err(|e| format!("Failed to send file: {}", e))?; .map_err(|e| format!("Failed to send file with caption: {}", e))?;
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
}, },
@ -139,10 +160,17 @@ fn register_send_to(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let user_for_send = user.clone(); let user_for_send = user.clone();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
send_to_specific_channel(state_for_send, &user_for_send, &target, &message) .enable_all()
.await .build()
}) .ok();
match rt {
Some(rt) => rt.block_on(async {
send_to_specific_channel(state_for_send, &user_for_send, &target, &message)
.await
}),
None => Err("Failed to create runtime".into()),
}
}) })
.map_err(|e| format!("Failed to send: {}", e))?; .map_err(|e| format!("Failed to send: {}", e))?;
@ -169,10 +197,17 @@ fn register_broadcast(state: Arc<AppState>, user: UserSession, engine: &mut Engi
let user_for_send = user.clone(); let user_for_send = user.clone();
let results = tokio::task::block_in_place(|| { let results = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
broadcast_message(state_for_send, &user_for_send, &message, recipients) .enable_all()
.await .build()
}) .ok();
match rt {
Some(rt) => rt.block_on(async {
broadcast_message(state_for_send, &user_for_send, &message, recipients)
.await
}),
None => Err("Failed to create runtime".into()),
}
}) })
.map_err(|e| format!("Failed to broadcast: {}", e))?; .map_err(|e| format!("Failed to broadcast: {}", e))?;

View file

@ -25,15 +25,22 @@ pub mod ui;
fn get_bot_context() -> (Uuid, Uuid) { fn get_bot_context() -> (Uuid, Uuid) {
let sm = crate::core::secrets::SecretsManager::from_env().ok(); let sm = crate::core::secrets::SecretsManager::from_env().ok();
let (org_id, bot_id) = if let Some(sm) = sm { let (org_id, bot_id) = if let Some(sm) = sm {
let rt = tokio::runtime::Handle::current();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
rt.block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
let org = sm.get_value("gbo/analytics", "default_org_id").await .enable_all()
.unwrap_or_else(|_| "system".to_string()); .build()
let bot = sm.get_value("gbo/analytics", "default_bot_id").await .ok();
.unwrap_or_else(|_| "system".to_string()); if let Some(rt) = rt {
(org, bot) rt.block_on(async {
}) let org = sm.get_value("gbo/analytics", "default_org_id").await
.unwrap_or_else(|_| "system".to_string());
let bot = sm.get_value("gbo/analytics", "default_bot_id").await
.unwrap_or_else(|_| "system".to_string());
(org, bot)
})
} else {
("system".to_string(), "system".to_string())
}
}) })
} else { } else {
("system".to_string(), "system".to_string()) ("system".to_string(), "system".to_string())

View file

@ -331,121 +331,175 @@ impl SecretsManager {
} }
pub fn get_cache_config(&self) -> (String, u16, Option<String>) { pub fn get_cache_config(&self) -> (String, u16, Option<String>) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::CACHE)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), .build()
secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), .ok();
secrets.get("password").cloned(), if let Some(rt) = rt {
); if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::CACHE)) {
return (
secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()),
secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379),
secrets.get("password").cloned(),
);
}
} }
} ("localhost".to_string(), 6379, None)
("localhost".to_string(), 6379, None) })
} }
pub fn get_directory_config_sync(&self) -> (String, String, String, String) { pub fn get_directory_config_sync(&self) -> (String, String, String, String) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::DIRECTORY)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), .build()
secrets.get("project_id").cloned().unwrap_or_default(), .ok();
secrets.get("client_id").cloned().unwrap_or_default(), if let Some(rt) = rt {
secrets.get("client_secret").cloned().unwrap_or_default(), if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::DIRECTORY)) {
); return (
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()),
secrets.get("project_id").cloned().unwrap_or_default(),
secrets.get("client_id").cloned().unwrap_or_default(),
secrets.get("client_secret").cloned().unwrap_or_default(),
);
}
} }
} ("http://localhost:9000".to_string(), String::new(), String::new(), String::new())
("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) })
} }
pub fn get_email_config(&self) -> (String, u16, String, String, String) { pub fn get_email_config(&self) -> (String, u16, String, String, String) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::EMAIL)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), .build()
secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), .ok();
secrets.get("smtp_user").cloned().unwrap_or_default(), if let Some(rt) = rt {
secrets.get("smtp_password").cloned().unwrap_or_default(), if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::EMAIL)) {
secrets.get("smtp_from").cloned().unwrap_or_default(), return (
); secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()),
secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587),
secrets.get("smtp_user").cloned().unwrap_or_default(),
secrets.get("smtp_password").cloned().unwrap_or_default(),
secrets.get("smtp_from").cloned().unwrap_or_default(),
);
}
} }
} ("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new())
("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) })
} }
pub fn get_llm_config(&self) -> (String, String, Option<String>, Option<String>, String) { pub fn get_llm_config(&self) -> (String, String, Option<String>, Option<String>, String) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::LLM)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), .build()
secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), .ok();
secrets.get("openai_key").cloned(), if let Some(rt) = rt {
secrets.get("anthropic_key").cloned(), if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::LLM)) {
secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), return (
); secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()),
secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()),
secrets.get("openai_key").cloned(),
secrets.get("anthropic_key").cloned(),
secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()),
);
}
} }
} ("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string())
("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) })
} }
pub fn get_meet_config(&self) -> (String, String, String) { pub fn get_meet_config(&self) -> (String, String, String) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::MEET)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), .build()
secrets.get("app_id").cloned().unwrap_or_default(), .ok();
secrets.get("app_secret").cloned().unwrap_or_default(), if let Some(rt) = rt {
); if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::MEET)) {
return (
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()),
secrets.get("app_id").cloned().unwrap_or_default(),
secrets.get("app_secret").cloned().unwrap_or_default(),
);
}
} }
} ("http://localhost:7880".to_string(), String::new(), String::new())
("http://localhost:7880".to_string(), String::new(), String::new()) })
} }
pub fn get_vectordb_config_sync(&self) -> (String, Option<String>) { pub fn get_vectordb_config_sync(&self) -> (String, Option<String>) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::VECTORDB)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()), .build()
secrets.get("api_key").cloned(), .ok();
); if let Some(rt) = rt {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::VECTORDB)) {
return (
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()),
secrets.get("api_key").cloned(),
);
}
} }
} ("http://localhost:6333".to_string(), None)
("http://localhost:6333".to_string(), None) })
} }
pub fn get_observability_config_sync(&self) -> (String, String, String, String) { pub fn get_observability_config_sync(&self) -> (String, String, String, String) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), .build()
secrets.get("org").cloned().unwrap_or_else(|| "system".into()), .ok();
secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), if let Some(rt) = rt {
secrets.get("token").cloned().unwrap_or_default(), if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) {
); return (
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()),
secrets.get("org").cloned().unwrap_or_else(|| "system".into()),
secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()),
secrets.get("token").cloned().unwrap_or_default(),
);
}
} }
} ("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new())
("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) })
} }
pub fn get_alm_config(&self) -> (String, String, String) { pub fn get_alm_config(&self) -> (String, String, String) {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::ALM)) { let rt = tokio::runtime::Builder::new_current_thread()
return ( .enable_all()
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), .build()
secrets.get("token").cloned().unwrap_or_default(), .ok();
secrets.get("default_org").cloned().unwrap_or_default(), if let Some(rt) = rt {
); if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::ALM)) {
return (
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()),
secrets.get("token").cloned().unwrap_or_default(),
secrets.get("default_org").cloned().unwrap_or_default(),
);
}
} }
} ("http://localhost:9000".to_string(), String::new(), String::new())
("http://localhost:9000".to_string(), String::new(), String::new()) })
} }
pub fn get_jwt_secret_sync(&self) -> String { pub fn get_jwt_secret_sync(&self) -> String {
if let Ok(runtime) = tokio::runtime::Handle::try_current() { tokio::task::block_in_place(|| {
if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::JWT)) { let rt = tokio::runtime::Builder::new_current_thread()
return secrets.get("secret").cloned().unwrap_or_default(); .enable_all()
.build()
.ok();
if let Some(rt) = rt {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::JWT)) {
return secrets.get("secret").cloned().unwrap_or_default();
}
} }
} String::new()
String::new() })
} }
pub async fn put_secret(&self, path: &str, data: HashMap<String, String>) -> Result<()> { pub async fn put_secret(&self, path: &str, data: HashMap<String, String>) -> Result<()> {

View file

@ -19,11 +19,18 @@ pub async fn send_invitation_email(
let smtp = crate::core::secrets::SecretsManager::from_env() let smtp = crate::core::secrets::SecretsManager::from_env()
.ok() .ok()
.and_then(|sm| { .and_then(|sm| {
let rt = tokio::runtime::Handle::current();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
rt.block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
sm.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok() .enable_all()
}) .build()
.ok();
if let Some(rt) = rt {
rt.block_on(async {
sm.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok()
})
} else {
None
}
}) })
}); });

View file

@ -299,10 +299,16 @@ impl Extensions {
pub fn insert_blocking<T: Send + Sync + 'static>(&self, value: T) { pub fn insert_blocking<T: Send + Sync + 'static>(&self, value: T) {
let map = self.map.clone(); let map = self.map.clone();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
let mut guard = map.write().await; .enable_all()
guard.insert(TypeId::of::<T>(), Arc::new(value)); .build()
}); .ok();
if let Some(rt) = rt {
rt.block_on(async {
let mut guard = map.write().await;
guard.insert(TypeId::of::<T>(), Arc::new(value));
});
}
}); });
} }

View file

@ -24,23 +24,23 @@ use serde_json::Value;
use smartstring::SmartString; use smartstring::SmartString;
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration; use std::time::Duration;
use tokio::fs::File as TokioFile; use tokio::fs::File as TokioFile;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
static SECRETS_MANAGER: std::sync::LazyLock<Arc<RwLock<Option<SecretsManager>>>> = static SECRETS_MANAGER: std::sync::LazyLock<Arc<RwLock<Option<SecretsManager>>>> =
std::sync::LazyLock::new(|| Arc::new(RwLock::new(None))); std::sync::LazyLock::new(|| Arc::new(RwLock::new(None)));
pub async fn init_secrets_manager() -> Result<()> { pub async fn init_secrets_manager() -> Result<()> {
let manager = SecretsManager::from_env()?; let manager = SecretsManager::from_env()?;
let mut guard = SECRETS_MANAGER.write().await; let mut guard = SECRETS_MANAGER.write().map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
*guard = Some(manager); *guard = Some(manager);
Ok(()) Ok(())
} }
pub async fn get_database_url() -> Result<String> { pub async fn get_database_url() -> Result<String> {
let guard = SECRETS_MANAGER.read().await; let guard = SECRETS_MANAGER.read().map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
if let Some(ref manager) = *guard { if let Some(ref manager) = *guard {
return manager.get_database_url().await; return manager.get_database_url().await;
} }
@ -51,17 +51,14 @@ pub async fn get_database_url() -> Result<String> {
} }
pub fn get_database_url_sync() -> Result<String> { pub fn get_database_url_sync() -> Result<String> {
if let Ok(handle) = tokio::runtime::Handle::try_current() { let guard = SECRETS_MANAGER.read().map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?;
let result = if let Some(ref manager) = *guard {
tokio::task::block_in_place(|| handle.block_on(async { get_database_url().await })); if let Ok(handle) = tokio::runtime::Handle::try_current() {
if let Ok(url) = result { return handle.block_on(manager.get_database_url());
return Ok(url); } else {
} let rt = tokio::runtime::Runtime::new()
} else { .map_err(|e| anyhow::anyhow!("Failed to create runtime: {}", e))?;
let rt = tokio::runtime::Runtime::new() return rt.block_on(manager.get_database_url());
.map_err(|e| anyhow::anyhow!("Failed to create runtime: {}", e))?;
if let Ok(url) = rt.block_on(async { get_database_url().await }) {
return Ok(url);
} }
} }
@ -71,28 +68,28 @@ pub fn get_database_url_sync() -> Result<String> {
} }
pub async fn get_secrets_manager() -> Option<SecretsManager> { pub async fn get_secrets_manager() -> Option<SecretsManager> {
let guard = SECRETS_MANAGER.read().await; let guard = SECRETS_MANAGER.read().ok()?;
guard.clone() guard.clone()
} }
pub fn get_secrets_manager_sync() -> Option<SecretsManager> { pub fn get_secrets_manager_sync() -> Option<SecretsManager> {
if let Ok(handle) = tokio::runtime::Handle::try_current() { let guard = SECRETS_MANAGER.read().ok()?;
let result = guard.clone()
tokio::task::block_in_place(|| handle.block_on(async { get_secrets_manager().await }));
return result;
}
None
} }
pub fn get_work_path() -> String { pub fn get_work_path() -> String {
let sm = get_secrets_manager_sync(); let sm = get_secrets_manager_sync();
if let Some(sm) = sm { if let Some(sm) = sm {
let rt = tokio::runtime::Handle::current();
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
rt.block_on(async { let rt = tokio::runtime::Builder::new_current_thread()
sm.get_value("gbo/app", "work_path").await .enable_all()
.unwrap_or_else(|_| "./work".to_string()) .build()
}) .ok();
match rt {
Some(rt) => rt.block_on(sm.get_value("gbo/app", "work_path"))
.unwrap_or_else(|_| "./work".to_string()),
None => "./work".to_string(),
}
}) })
} else { } else {
"./work".to_string() "./work".to_string()
@ -110,7 +107,7 @@ pub async fn create_s3_operator(
}; };
let (access_key, secret_key) = if config.access_key.is_empty() || config.secret_key.is_empty() { let (access_key, secret_key) = if config.access_key.is_empty() || config.secret_key.is_empty() {
let guard = SECRETS_MANAGER.read().await; let guard = SECRETS_MANAGER.read().map_err(|e| format!("Lock poisoned: {}", e))?;
if let Some(ref manager) = *guard { if let Some(ref manager) = *guard {
if manager.is_enabled() { if manager.is_enabled() {
match manager.get_drive_credentials().await { match manager.get_drive_credentials().await {