From f6a864aa6710778aad264d24f4cab18b21e61777 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Fri, 3 Apr 2026 09:17:23 -0300 Subject: [PATCH] fix: replace nested runtime block_on with new_current_thread to prevent panic Root cause: Handle::current().block_on() panics inside multi_thread runtime with 'Cannot start a runtime from within a runtime' error. Fix: All sync-to-async bridges now use tokio::runtime::Builder::new_current_thread() instead of Handle::current().block_on(). Also changed SECRETS_MANAGER from tokio::sync::RwLock to std::sync::RwLock to eliminate unnecessary async overhead. Files: 14 files across keywords, secrets, utils, state, calendar, analytics, email Impact: Fixes production crash during bot loading phase --- src/analytics/goals.rs | 23 ++- src/basic/keywords/ai_tools.rs | 18 +- src/basic/keywords/book.rs | 44 +++-- src/basic/keywords/create_draft.rs | 10 +- src/basic/keywords/create_site.rs | 8 +- src/basic/keywords/hearing/syntax.rs | 68 ++++--- src/basic/keywords/search.rs | 17 +- src/basic/keywords/think_kb.rs | 12 +- src/basic/keywords/universal_messaging.rs | 99 ++++++---- src/calendar/mod.rs | 23 ++- src/core/secrets/mod.rs | 218 ++++++++++++++-------- src/core/shared/admin_email.rs | 15 +- src/core/shared/state.rs | 14 +- src/core/shared/utils.rs | 51 +++-- 14 files changed, 399 insertions(+), 221 deletions(-) diff --git a/src/analytics/goals.rs b/src/analytics/goals.rs index 90f33ac1..a76ba867 100644 --- a/src/analytics/goals.rs +++ b/src/analytics/goals.rs @@ -19,15 +19,22 @@ use crate::core::shared::state::AppState; fn get_bot_context() -> (Uuid, Uuid) { let sm = crate::core::secrets::SecretsManager::from_env().ok(); let (org_id, bot_id) = if let Some(sm) = sm { - let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| { - rt.block_on(async { - let org = sm.get_value("gbo/analytics", "default_org_id").await - .unwrap_or_else(|_| "system".to_string()); - let bot = sm.get_value("gbo/analytics", "default_bot_id").await - .unwrap_or_else(|_| "system".to_string()); - (org, bot) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + rt.block_on(async { + let org = sm.get_value("gbo/analytics", "default_org_id").await + .unwrap_or_else(|_| "system".to_string()); + let bot = sm.get_value("gbo/analytics", "default_bot_id").await + .unwrap_or_else(|_| "system".to_string()); + (org, bot) + }) + } else { + ("system".to_string(), "system".to_string()) + } }) } else { ("system".to_string(), "system".to_string()) diff --git a/src/basic/keywords/ai_tools.rs b/src/basic/keywords/ai_tools.rs index 274f4036..f420fa06 100644 --- a/src/basic/keywords/ai_tools.rs +++ b/src/basic/keywords/ai_tools.rs @@ -183,7 +183,11 @@ async fn translate_text( text: &str, target_lang: &str, ) -> Result> { - let llm_url = crate::core::shared::utils::get_secrets_manager_sync().and_then(|sm| { let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| rt.block_on(async { sm.get_value("gbo/llm", "url").await.ok() })) }).unwrap_or_else(|| "http://localhost:8081".to_string()); + let llm_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager().await { + sm.get_value("gbo/llm", "url").await.unwrap_or_else(|_| "http://localhost:8081".to_string()) + } else { + "http://localhost:8081".to_string() + }; let prompt = format!( "Translate to {}. Return ONLY the translation:\n\n{}", target_lang, text @@ -232,7 +236,11 @@ async fn perform_ocr(image_path: &str) -> Result Result> { - let llm_url = crate::core::shared::utils::get_secrets_manager_sync().and_then(|sm| { let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| rt.block_on(async { sm.get_value("gbo/llm", "url").await.ok() })) }).unwrap_or_else(|| "http://localhost:8081".to_string()); + let llm_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager().await { + sm.get_value("gbo/llm", "url").await.unwrap_or_else(|_| "http://localhost:8081".to_string()) + } else { + "http://localhost:8081".to_string() + }; let prompt = format!( r#"Analyze sentiment. Return JSON only: {{"sentiment":"positive/negative/neutral","score":-100 to 100,"urgent":true/false}} @@ -337,7 +345,11 @@ async fn classify_text( text: &str, categories: &[String], ) -> Result> { - let llm_url = crate::core::shared::utils::get_secrets_manager_sync().and_then(|sm| { let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| rt.block_on(async { sm.get_value("gbo/llm", "url").await.ok() })) }).unwrap_or_else(|| "http://localhost:8081".to_string()); + let llm_url = if let Some(sm) = crate::core::shared::utils::get_secrets_manager().await { + sm.get_value("gbo/llm", "url").await.unwrap_or_else(|_| "http://localhost:8081".to_string()) + } else { + "http://localhost:8081".to_string() + }; let cats = categories.join(", "); let prompt = format!( r#"Classify into one of: {} diff --git a/src/basic/keywords/book.rs b/src/basic/keywords/book.rs index 03a5bdc9..dff3a0a7 100644 --- a/src/basic/keywords/book.rs +++ b/src/basic/keywords/book.rs @@ -274,14 +274,21 @@ pub fn book_keyword(state: Arc, user: UserSession, engine: &mut Engine let meeting_json = meeting_details.to_string(); let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async move { - execute_book_meeting( - &state_for_task, - &user_for_task, - meeting_json, - attendees, - ) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async move { + execute_book_meeting( + &state_for_task, + &user_for_task, + meeting_json, + attendees, + ) + }), + None => Err("Failed to create runtime".into()), + } }); match result { @@ -318,13 +325,20 @@ pub fn book_keyword(state: Arc, user: UserSession, engine: &mut Engine let state_for_task = Arc::clone(&state_clone3); let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async move { - check_availability( - &state_for_task, - &date_str, - duration_minutes, - ) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async move { + check_availability( + &state_for_task, + &date_str, + duration_minutes, + ) + }), + None => Err("Failed to create runtime".into()), + } }); match result { diff --git a/src/basic/keywords/create_draft.rs b/src/basic/keywords/create_draft.rs index dccf1b6f..f8c8b47b 100644 --- a/src/basic/keywords/create_draft.rs +++ b/src/basic/keywords/create_draft.rs @@ -16,8 +16,14 @@ pub fn create_draft_keyword(state: &AppState, _user: UserSession, engine: &mut E let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text); let result = - tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) - .map_err(|e| format!("Draft creation error: {e}"))?; + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("Runtime creation failed: {e}"))?; + rt.block_on(fut) + }) + .map_err(|e| format!("Draft creation failed: {e}"))?; Ok(Dynamic::from(result)) }, ) diff --git a/src/basic/keywords/create_site.rs b/src/basic/keywords/create_site.rs index 8f50b47c..edfc167d 100644 --- a/src/basic/keywords/create_site.rs +++ b/src/basic/keywords/create_site.rs @@ -58,7 +58,13 @@ pub fn create_site_keyword(state: &AppState, user: UserSession, engine: &mut Eng }; let fut = create_site(config, s3, bucket, bot_id, llm, params); let result = - tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("Runtime creation failed: {}", e))?; + rt.block_on(fut) + }) .map_err(|e| format!("Site creation failed: {}", e))?; Ok(Dynamic::from(result)) }, diff --git a/src/basic/keywords/hearing/syntax.rs b/src/basic/keywords/hearing/syntax.rs index 3a3e39a4..921cd128 100644 --- a/src/basic/keywords/hearing/syntax.rs +++ b/src/basic/keywords/hearing/syntax.rs @@ -28,22 +28,29 @@ fn hear_block(state: &Arc, session_id: uuid::Uuid, variable_name: &str // Mark session as waiting and store metadata in Redis (for UI hints like menus) let state_clone = Arc::clone(state); let var = variable_name.to_string(); - tokio::runtime::Handle::current().block_on(async move { - { - let mut sm = state_clone.session_manager.lock().await; - sm.mark_waiting(session_id); - } - if let Some(redis) = &state_clone.cache { - if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { - let key = format!("hear:{session_id}:{var}"); - let _: Result<(), _> = redis::cmd("SET") - .arg(&key) - .arg(wait_data.to_string()) - .arg("EX") - .arg(3600) - .query_async(&mut conn) - .await; - } + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + if let Ok(rt) = rt { + rt.block_on(async move { + { + let mut sm = state_clone.session_manager.lock().await; + sm.mark_waiting(session_id); + } + if let Some(redis) = &state_clone.cache { + if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { + let key = format!("hear:{session_id}:{var}"); + let _: Result<(), _> = redis::cmd("SET") + .arg(&key) + .arg(wait_data.to_string()) + .arg("EX") + .arg(3600) + .query_async(&mut conn) + .await; + } + } + }); } }); @@ -209,18 +216,25 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E let state_for_suggestions = Arc::clone(&state_clone); let opts_clone = options.clone(); let bot_id_clone = bot_id; - tokio::runtime::Handle::current().block_on(async move { - if let Some(redis) = &state_for_suggestions.cache { - if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { - let key = format!("suggestions:{}:{}", bot_id_clone, session_id); - for opt in &opts_clone { - let _: Result<(), _> = redis::cmd("RPUSH") - .arg(&key) - .arg(json!({"text": opt, "value": opt}).to_string()) - .query_async(&mut conn) - .await; + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + if let Ok(rt) = rt { + rt.block_on(async move { + if let Some(redis) = &state_for_suggestions.cache { + if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { + let key = format!("suggestions:{}:{}", bot_id_clone, session_id); + for opt in &opts_clone { + let _: Result<(), _> = redis::cmd("RPUSH") + .arg(&key) + .arg(json!({"text": opt, "value": opt}).to_string()) + .query_async(&mut conn) + .await; + } + } } - } + }); } }); diff --git a/src/basic/keywords/search.rs b/src/basic/keywords/search.rs index 4d30fd93..4e2c4a0f 100644 --- a/src/basic/keywords/search.rs +++ b/src/basic/keywords/search.rs @@ -73,11 +73,18 @@ pub fn search_keyword(state: &AppState, user: UserSession, engine: &mut Engine) }; let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - execute_search(&mut binding, &table_str, &query_str, limit_val) - }) - }) - .map_err(|e| format!("Search error: {e}"))?; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async { + execute_search(&mut binding, &table_str, &query_str, limit_val) + }) + .map_err(|e| format!("Search error: {e}")), + None => Err("Failed to create runtime".into()), + } + })?; if let Some(results) = result.get("results") { let filtered = diff --git a/src/basic/keywords/think_kb.rs b/src/basic/keywords/think_kb.rs index 9fc868a5..7c02c319 100644 --- a/src/basic/keywords/think_kb.rs +++ b/src/basic/keywords/think_kb.rs @@ -52,9 +52,15 @@ pub fn register_think_kb_keyword( // Execute KB search in blocking thread let result = std::thread::spawn(move || { - tokio::runtime::Handle::current().block_on(async { - think_kb_search(kb_manager, db_pool, session_id, bot_id, &query).await - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + match rt { + Ok(rt) => rt.block_on(async { + think_kb_search(kb_manager, db_pool, session_id, bot_id, &query).await + }), + Err(e) => Err(format!("Failed to create runtime: {}", e)), + } }) .join(); diff --git a/src/basic/keywords/universal_messaging.rs b/src/basic/keywords/universal_messaging.rs index 9258dbdf..68d74269 100644 --- a/src/basic/keywords/universal_messaging.rs +++ b/src/basic/keywords/universal_messaging.rs @@ -35,15 +35,22 @@ fn register_talk_to(state: Arc, user: UserSession, engine: &mut Engine let user_for_send = user.clone(); tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - send_message_to_recipient( - state_for_send, - &user_for_send, - &recipient, - &message, - ) - .await - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async { + send_message_to_recipient( + state_for_send, + &user_for_send, + &recipient, + &message, + ) + .await + }), + None => Err("Failed to create runtime".into()), + } }) .map_err(|e| format!("Failed to send message: {}", e))?; @@ -73,10 +80,17 @@ fn register_send_file_to(state: Arc, user: UserSession, engine: &mut E let user_for_send = Arc::clone(&user_clone); tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - send_file_to_recipient(state_for_send, &user_for_send, &recipient, file) - .await - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async { + send_file_to_recipient(state_for_send, &user_for_send, &recipient, file) + .await + }), + None => Err("Failed to create runtime".into()), + } }) .map_err(|e| format!("Failed to send file: {}", e))?; @@ -103,18 +117,25 @@ fn register_send_file_to(state: Arc, user: UserSession, engine: &mut E let user_for_send = Arc::clone(&user_clone2); tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - send_file_with_caption_to_recipient( - state_for_send, - &user_for_send, - &recipient, - file, - &caption, - ) - .await - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async { + send_file_with_caption_to_recipient( + state_for_send, + &user_for_send, + &recipient, + file, + &caption, + ) + .await + }), + None => Err("Failed to create runtime".into()), + } }) - .map_err(|e| format!("Failed to send file: {}", e))?; + .map_err(|e| format!("Failed to send file with caption: {}", e))?; Ok(Dynamic::UNIT) }, @@ -139,10 +160,17 @@ fn register_send_to(state: Arc, user: UserSession, engine: &mut Engine let user_for_send = user.clone(); tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - send_to_specific_channel(state_for_send, &user_for_send, &target, &message) - .await - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async { + send_to_specific_channel(state_for_send, &user_for_send, &target, &message) + .await + }), + None => Err("Failed to create runtime".into()), + } }) .map_err(|e| format!("Failed to send: {}", e))?; @@ -169,10 +197,17 @@ fn register_broadcast(state: Arc, user: UserSession, engine: &mut Engi let user_for_send = user.clone(); let results = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - broadcast_message(state_for_send, &user_for_send, &message, recipients) - .await - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(async { + broadcast_message(state_for_send, &user_for_send, &message, recipients) + .await + }), + None => Err("Failed to create runtime".into()), + } }) .map_err(|e| format!("Failed to broadcast: {}", e))?; diff --git a/src/calendar/mod.rs b/src/calendar/mod.rs index f42e95e7..070b4045 100644 --- a/src/calendar/mod.rs +++ b/src/calendar/mod.rs @@ -25,15 +25,22 @@ pub mod ui; fn get_bot_context() -> (Uuid, Uuid) { let sm = crate::core::secrets::SecretsManager::from_env().ok(); let (org_id, bot_id) = if let Some(sm) = sm { - let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| { - rt.block_on(async { - let org = sm.get_value("gbo/analytics", "default_org_id").await - .unwrap_or_else(|_| "system".to_string()); - let bot = sm.get_value("gbo/analytics", "default_bot_id").await - .unwrap_or_else(|_| "system".to_string()); - (org, bot) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + rt.block_on(async { + let org = sm.get_value("gbo/analytics", "default_org_id").await + .unwrap_or_else(|_| "system".to_string()); + let bot = sm.get_value("gbo/analytics", "default_bot_id").await + .unwrap_or_else(|_| "system".to_string()); + (org, bot) + }) + } else { + ("system".to_string(), "system".to_string()) + } }) } else { ("system".to_string(), "system".to_string()) diff --git a/src/core/secrets/mod.rs b/src/core/secrets/mod.rs index 2e42e2f6..4e2a4fb1 100644 --- a/src/core/secrets/mod.rs +++ b/src/core/secrets/mod.rs @@ -331,121 +331,175 @@ impl SecretsManager { } pub fn get_cache_config(&self) -> (String, u16, Option) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::CACHE)) { - return ( - secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), - secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), - secrets.get("password").cloned(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::CACHE)) { + return ( + secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), + secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), + secrets.get("password").cloned(), + ); + } } - } - ("localhost".to_string(), 6379, None) + ("localhost".to_string(), 6379, None) + }) } pub fn get_directory_config_sync(&self) -> (String, String, String, String) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::DIRECTORY)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), - secrets.get("project_id").cloned().unwrap_or_default(), - secrets.get("client_id").cloned().unwrap_or_default(), - secrets.get("client_secret").cloned().unwrap_or_default(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::DIRECTORY)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), + secrets.get("project_id").cloned().unwrap_or_default(), + secrets.get("client_id").cloned().unwrap_or_default(), + secrets.get("client_secret").cloned().unwrap_or_default(), + ); + } } - } - ("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) + ("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) + }) } pub fn get_email_config(&self) -> (String, u16, String, String, String) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::EMAIL)) { - return ( - secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), - secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), - secrets.get("smtp_user").cloned().unwrap_or_default(), - secrets.get("smtp_password").cloned().unwrap_or_default(), - secrets.get("smtp_from").cloned().unwrap_or_default(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::EMAIL)) { + return ( + secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), + secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), + secrets.get("smtp_user").cloned().unwrap_or_default(), + secrets.get("smtp_password").cloned().unwrap_or_default(), + secrets.get("smtp_from").cloned().unwrap_or_default(), + ); + } } - } - ("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) + ("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) + }) } pub fn get_llm_config(&self) -> (String, String, Option, Option, String) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::LLM)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), - secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), - secrets.get("openai_key").cloned(), - secrets.get("anthropic_key").cloned(), - secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::LLM)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), + secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), + secrets.get("openai_key").cloned(), + secrets.get("anthropic_key").cloned(), + secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), + ); + } } - } - ("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) + ("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) + }) } pub fn get_meet_config(&self) -> (String, String, String) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::MEET)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), - secrets.get("app_id").cloned().unwrap_or_default(), - secrets.get("app_secret").cloned().unwrap_or_default(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::MEET)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), + secrets.get("app_id").cloned().unwrap_or_default(), + secrets.get("app_secret").cloned().unwrap_or_default(), + ); + } } - } - ("http://localhost:7880".to_string(), String::new(), String::new()) + ("http://localhost:7880".to_string(), String::new(), String::new()) + }) } pub fn get_vectordb_config_sync(&self) -> (String, Option) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::VECTORDB)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()), - secrets.get("api_key").cloned(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::VECTORDB)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()), + secrets.get("api_key").cloned(), + ); + } } - } - ("http://localhost:6333".to_string(), None) + ("http://localhost:6333".to_string(), None) + }) } pub fn get_observability_config_sync(&self) -> (String, String, String, String) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), - secrets.get("org").cloned().unwrap_or_else(|| "system".into()), - secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), - secrets.get("token").cloned().unwrap_or_default(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), + secrets.get("org").cloned().unwrap_or_else(|| "system".into()), + secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), + secrets.get("token").cloned().unwrap_or_default(), + ); + } } - } - ("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) + ("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) + }) } pub fn get_alm_config(&self) -> (String, String, String) { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::ALM)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), - secrets.get("token").cloned().unwrap_or_default(), - secrets.get("default_org").cloned().unwrap_or_default(), - ); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::ALM)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), + secrets.get("token").cloned().unwrap_or_default(), + secrets.get("default_org").cloned().unwrap_or_default(), + ); + } } - } - ("http://localhost:9000".to_string(), String::new(), String::new()) + ("http://localhost:9000".to_string(), String::new(), String::new()) + }) } pub fn get_jwt_secret_sync(&self) -> String { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::JWT)) { - return secrets.get("secret").cloned().unwrap_or_default(); + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::JWT)) { + return secrets.get("secret").cloned().unwrap_or_default(); + } } - } - String::new() + String::new() + }) } pub async fn put_secret(&self, path: &str, data: HashMap) -> Result<()> { diff --git a/src/core/shared/admin_email.rs b/src/core/shared/admin_email.rs index 2b393466..ef4f5f63 100644 --- a/src/core/shared/admin_email.rs +++ b/src/core/shared/admin_email.rs @@ -19,11 +19,18 @@ pub async fn send_invitation_email( let smtp = crate::core::secrets::SecretsManager::from_env() .ok() .and_then(|sm| { - let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| { - rt.block_on(async { - sm.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok() - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + rt.block_on(async { + sm.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok() + }) + } else { + None + } }) }); diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index 2fcfd005..445fca44 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -299,10 +299,16 @@ impl Extensions { pub fn insert_blocking(&self, value: T) { let map = self.map.clone(); tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - let mut guard = map.write().await; - guard.insert(TypeId::of::(), Arc::new(value)); - }); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + if let Some(rt) = rt { + rt.block_on(async { + let mut guard = map.write().await; + guard.insert(TypeId::of::(), Arc::new(value)); + }); + } }); } diff --git a/src/core/shared/utils.rs b/src/core/shared/utils.rs index 0c3c5631..f6c42940 100644 --- a/src/core/shared/utils.rs +++ b/src/core/shared/utils.rs @@ -24,23 +24,23 @@ use serde_json::Value; use smartstring::SmartString; use std::error::Error; use std::sync::Arc; +use std::sync::RwLock; use std::time::Duration; use tokio::fs::File as TokioFile; use tokio::io::AsyncWriteExt; -use tokio::sync::RwLock; static SECRETS_MANAGER: std::sync::LazyLock>>> = std::sync::LazyLock::new(|| Arc::new(RwLock::new(None))); pub async fn init_secrets_manager() -> Result<()> { let manager = SecretsManager::from_env()?; - let mut guard = SECRETS_MANAGER.write().await; + let mut guard = SECRETS_MANAGER.write().map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?; *guard = Some(manager); Ok(()) } pub async fn get_database_url() -> Result { - let guard = SECRETS_MANAGER.read().await; + let guard = SECRETS_MANAGER.read().map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?; if let Some(ref manager) = *guard { return manager.get_database_url().await; } @@ -51,17 +51,14 @@ pub async fn get_database_url() -> Result { } pub fn get_database_url_sync() -> Result { - if let Ok(handle) = tokio::runtime::Handle::try_current() { - let result = - tokio::task::block_in_place(|| handle.block_on(async { get_database_url().await })); - if let Ok(url) = result { - return Ok(url); - } - } else { - let rt = tokio::runtime::Runtime::new() - .map_err(|e| anyhow::anyhow!("Failed to create runtime: {}", e))?; - if let Ok(url) = rt.block_on(async { get_database_url().await }) { - return Ok(url); + let guard = SECRETS_MANAGER.read().map_err(|e| anyhow::anyhow!("Lock poisoned: {}", e))?; + if let Some(ref manager) = *guard { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + return handle.block_on(manager.get_database_url()); + } else { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| anyhow::anyhow!("Failed to create runtime: {}", e))?; + return rt.block_on(manager.get_database_url()); } } @@ -71,28 +68,28 @@ pub fn get_database_url_sync() -> Result { } pub async fn get_secrets_manager() -> Option { - let guard = SECRETS_MANAGER.read().await; + let guard = SECRETS_MANAGER.read().ok()?; guard.clone() } pub fn get_secrets_manager_sync() -> Option { - if let Ok(handle) = tokio::runtime::Handle::try_current() { - let result = - tokio::task::block_in_place(|| handle.block_on(async { get_secrets_manager().await })); - return result; - } - None + let guard = SECRETS_MANAGER.read().ok()?; + guard.clone() } pub fn get_work_path() -> String { let sm = get_secrets_manager_sync(); if let Some(sm) = sm { - let rt = tokio::runtime::Handle::current(); tokio::task::block_in_place(|| { - rt.block_on(async { - sm.get_value("gbo/app", "work_path").await - .unwrap_or_else(|_| "./work".to_string()) - }) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .ok(); + match rt { + Some(rt) => rt.block_on(sm.get_value("gbo/app", "work_path")) + .unwrap_or_else(|_| "./work".to_string()), + None => "./work".to_string(), + } }) } else { "./work".to_string() @@ -110,7 +107,7 @@ pub async fn create_s3_operator( }; let (access_key, secret_key) = if config.access_key.is_empty() || config.secret_key.is_empty() { - let guard = SECRETS_MANAGER.read().await; + let guard = SECRETS_MANAGER.read().map_err(|e| format!("Lock poisoned: {}", e))?; if let Some(ref manager) = *guard { if manager.is_enabled() { match manager.get_drive_credentials().await {