diff --git a/src/analytics/goals.rs b/src/analytics/goals.rs index a76ba867..7bf8b551 100644 --- a/src/analytics/goals.rs +++ b/src/analytics/goals.rs @@ -19,23 +19,26 @@ use crate::core::shared::state::AppState; fn get_bot_context() -> (Uuid, Uuid) { let sm = crate::core::secrets::SecretsManager::from_env().ok(); let (org_id, bot_id) = if let Some(sm) = sm { - tokio::task::block_in_place(|| { + let sm_owned = sm.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - rt.block_on(async { - let org = sm.get_value("gbo/analytics", "default_org_id").await + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + let org = sm_owned.get_value("gbo/analytics", "default_org_id").await .unwrap_or_else(|_| "system".to_string()); - let bot = sm.get_value("gbo/analytics", "default_bot_id").await + let bot = sm_owned.get_value("gbo/analytics", "default_bot_id").await .unwrap_or_else(|_| "system".to_string()); (org, bot) }) } else { ("system".to_string(), "system".to_string()) - } - }) + }; + let _ = tx.send(result); + }); + rx.recv().unwrap_or(("system".to_string(), "system".to_string())) } else { ("system".to_string(), "system".to_string()) }; diff --git a/src/basic/keywords/book.rs b/src/basic/keywords/book.rs index dff3a0a7..75fa569a 100644 --- a/src/basic/keywords/book.rs +++ b/src/basic/keywords/book.rs @@ -273,23 +273,27 @@ pub fn book_keyword(state: Arc, user: UserSession, engine: &mut Engine let user_for_task = user_clone2.clone(); let meeting_json = meeting_details.to_string(); - let result = tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + let meeting_json_clone = meeting_json.clone(); + let attendees_clone = attendees.clone(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async move { + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async move { execute_book_meeting( &state_for_task, &user_for_task, - meeting_json, - attendees, + meeting_json_clone, + attendees_clone, ) }), - None => Err("Failed to create runtime".into()), - } + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); }); + let result = rx.recv().unwrap_or(Err("Failed to create runtime".into())); match result { Ok(event_id) => Ok(Dynamic::from(event_id)), @@ -324,22 +328,25 @@ pub fn book_keyword(state: Arc, user: UserSession, engine: &mut Engine let state_for_task = Arc::clone(&state_clone3); - let result = tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + let date_str_clone = date_str.clone(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async move { + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async move { check_availability( &state_for_task, - &date_str, + &date_str_clone, duration_minutes, ) }), - None => Err("Failed to create runtime".into()), - } + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); }); + let result = rx.recv().unwrap_or(Err("Failed to create runtime".into())); match result { Ok(slots) => Ok(Dynamic::from(slots)), diff --git a/src/basic/keywords/create_draft.rs b/src/basic/keywords/create_draft.rs index f8c8b47b..62aaf5e4 100644 --- a/src/basic/keywords/create_draft.rs +++ b/src/basic/keywords/create_draft.rs @@ -14,15 +14,24 @@ pub fn create_draft_keyword(state: &AppState, _user: UserSession, engine: &mut E let subject = context.eval_expression_tree(&inputs[1])?.to_string(); let reply_text = context.eval_expression_tree(&inputs[2])?.to_string(); - let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text); - let result = - tokio::task::block_in_place(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| format!("Runtime creation failed: {e}"))?; - rt.block_on(fut) - }) + let state_clone2 = state_clone.clone(); + let to_owned = to.clone(); + let subject_owned = subject.clone(); + let reply_text_owned = reply_text.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { + execute_create_draft(&state_clone2, &to_owned, &subject_owned, &reply_text_owned).await + }), + Err(e) => Err(format!("Runtime creation failed: {e}")), + }; + let _ = tx.send(result); + }); + let result = rx.recv().unwrap_or(Err("Failed to receive result".to_string())) .map_err(|e| format!("Draft creation failed: {e}"))?; Ok(Dynamic::from(result)) }, diff --git a/src/basic/keywords/create_site.rs b/src/basic/keywords/create_site.rs index edfc167d..d8912bd7 100644 --- a/src/basic/keywords/create_site.rs +++ b/src/basic/keywords/create_site.rs @@ -56,15 +56,20 @@ pub fn create_site_keyword(state: &AppState, user: UserSession, engine: &mut Eng template_dir, prompt, }; - let fut = create_site(config, s3, bucket, bot_id, llm, params); - let result = - tokio::task::block_in_place(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| format!("Runtime creation failed: {}", e))?; - rt.block_on(fut) - }) + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { + create_site(config, s3, bucket, bot_id, llm, params).await + }), + Err(e) => Err(format!("Runtime creation failed: {}", e).into()), + }; + let _ = tx.send(result); + }); + let result = rx.recv().unwrap_or(Err("Failed to receive result".into())) .map_err(|e| format!("Site creation failed: {}", e))?; Ok(Dynamic::from(result)) }, diff --git a/src/basic/keywords/find.rs b/src/basic/keywords/find.rs index 63fc2fda..65d03acb 100644 --- a/src/basic/keywords/find.rs +++ b/src/basic/keywords/find.rs @@ -44,11 +44,21 @@ pub fn find_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { } }; - let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(async { execute_find(&mut binding, &binding2, &binding3) }) - }) - .map_err(|e| format!("DB error: {e}"))?; + let (tx, rx) = std::sync::mpsc::channel(); + let table_str_clone = binding2.clone(); + let filter_str_clone = binding3.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { execute_find(&mut binding, &table_str_clone, &filter_str_clone) }) + .map_err(|e| format!("DB error: {e}")), + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))?; if let Some(results) = result.get("results") { let filtered = diff --git a/src/basic/keywords/hearing/syntax.rs b/src/basic/keywords/hearing/syntax.rs index 921cd128..e86035a0 100644 --- a/src/basic/keywords/hearing/syntax.rs +++ b/src/basic/keywords/hearing/syntax.rs @@ -28,7 +28,8 @@ fn hear_block(state: &Arc, session_id: uuid::Uuid, variable_name: &str // Mark session as waiting and store metadata in Redis (for UI hints like menus) let state_clone = Arc::clone(state); let var = variable_name.to_string(); - tokio::task::block_in_place(|| { + let (init_tx, init_rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build(); @@ -52,7 +53,9 @@ fn hear_block(state: &Arc, session_id: uuid::Uuid, variable_name: &str } }); } + let _ = init_tx.send(()); }); + let _ = init_rx.recv(); trace!("HEAR {variable_name}: blocking thread, waiting for user input"); @@ -216,7 +219,8 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E let state_for_suggestions = Arc::clone(&state_clone); let opts_clone = options.clone(); let bot_id_clone = bot_id; - tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build(); @@ -236,7 +240,9 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E } }); } + let _ = tx.send(()); }); + let _ = rx.recv(); let value = hear_block(&state_clone, session_id, &variable_name, json!({ "variable": variable_name, diff --git a/src/basic/keywords/search.rs b/src/basic/keywords/search.rs index 4e2c4a0f..39ba5be3 100644 --- a/src/basic/keywords/search.rs +++ b/src/basic/keywords/search.rs @@ -72,19 +72,23 @@ pub fn search_keyword(state: &AppState, user: UserSession, engine: &mut Engine) } }; - let result = tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + let table_str_clone = table_str.clone(); + let query_str_clone = query_str.clone(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async { - execute_search(&mut binding, &table_str, &query_str, limit_val) + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { + execute_search(&mut binding, &table_str_clone, &query_str_clone, limit_val) }) .map_err(|e| format!("Search error: {e}")), - None => Err("Failed to create runtime".into()), - } - })?; + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))?; if let Some(results) = result.get("results") { let filtered = @@ -125,11 +129,21 @@ pub fn search_keyword(state: &AppState, user: UserSession, engine: &mut Engine) } }; - let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(async { execute_search(&mut binding, &table_str, &query_str, 10) }) - }) - .map_err(|e| format!("Search error: {e}"))?; + let (tx, rx) = std::sync::mpsc::channel(); + let table_str_clone = table_str.clone(); + let query_str_clone = query_str.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { execute_search(&mut binding, &table_str_clone, &query_str_clone, 10) }) + .map_err(|e| format!("Search error: {e}")), + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))?; if let Some(results) = result.get("results") { let filtered = filter_fields_by_role(results.clone(), &roles, &access_info); diff --git a/src/basic/keywords/universal_messaging.rs b/src/basic/keywords/universal_messaging.rs index 68d74269..f21d6a57 100644 --- a/src/basic/keywords/universal_messaging.rs +++ b/src/basic/keywords/universal_messaging.rs @@ -79,19 +79,21 @@ fn register_send_file_to(state: Arc, user: UserSession, engine: &mut E let state_for_send = Arc::clone(&state_clone); let user_for_send = Arc::clone(&user_clone); - tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async { + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { send_file_to_recipient(state_for_send, &user_for_send, &recipient, file) .await }), - None => Err("Failed to create runtime".into()), - } - }) + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + rx.recv().unwrap_or(Err("Failed to receive result".into())) .map_err(|e| format!("Failed to send file: {}", e))?; Ok(Dynamic::UNIT) @@ -116,13 +118,13 @@ fn register_send_file_to(state: Arc, user: UserSession, engine: &mut E let state_for_send = Arc::clone(&state_clone2); let user_for_send = Arc::clone(&user_clone2); - tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async { + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { send_file_with_caption_to_recipient( state_for_send, &user_for_send, @@ -132,9 +134,11 @@ fn register_send_file_to(state: Arc, user: UserSession, engine: &mut E ) .await }), - None => Err("Failed to create runtime".into()), - } - }) + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + rx.recv().unwrap_or(Err("Failed to receive result".into())) .map_err(|e| format!("Failed to send file with caption: {}", e))?; Ok(Dynamic::UNIT) @@ -159,19 +163,21 @@ fn register_send_to(state: Arc, user: UserSession, engine: &mut Engine let state_for_send = Arc::clone(&state_clone); let user_for_send = user.clone(); - tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async { + .build(); + let result = match rt { + Ok(rt) => rt.block_on(async { send_to_specific_channel(state_for_send, &user_for_send, &target, &message) .await }), - None => Err("Failed to create runtime".into()), - } - }) + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + rx.recv().unwrap_or(Err("Failed to receive result".into())) .map_err(|e| format!("Failed to send: {}", e))?; Ok(Dynamic::UNIT) @@ -196,20 +202,23 @@ fn register_broadcast(state: Arc, user: UserSession, engine: &mut Engi let state_for_send = Arc::clone(&state_clone); let user_for_send = user.clone(); - let results = tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - match rt { - Some(rt) => rt.block_on(async { + .build(); + let result: Result = match rt { + Ok(rt) => rt.block_on(async { broadcast_message(state_for_send, &user_for_send, &message, recipients) .await + .map_err(|e| format!("{}", e)) }), - None => Err("Failed to create runtime".into()), - } - }) - .map_err(|e| format!("Failed to broadcast: {}", e))?; + Err(_) => Err("Failed to create runtime".into()), + }; + let _ = tx.send(result); + }); + let results = rx.recv().unwrap_or(Err("Failed to receive result".into())) + .map_err(|e| format!("Failed to broadcast: {}", e))?; Ok(results) }, diff --git a/src/basic/keywords/use_tool.rs b/src/basic/keywords/use_tool.rs index 6178949e..1c35e37c 100644 --- a/src/basic/keywords/use_tool.rs +++ b/src/basic/keywords/use_tool.rs @@ -34,9 +34,18 @@ pub fn use_tool_keyword(state: Arc, user: UserSession, engine: &mut En rhai::Position::NONE, ))); } - let result = tokio::task::block_in_place(|| { - associate_tool_with_session(&state_clone, &user_clone, &tool_name) + let (tx, rx) = std::sync::mpsc::channel(); + let state_clone_1 = Arc::clone(&state_clone); + let user_clone_1 = user_clone.clone(); + let tool_name_1 = tool_name.clone(); + std::thread::spawn(move || { + let result = + associate_tool_with_session(&state_clone_1, &user_clone_1, &tool_name_1); + let _ = tx.send(result); }); + let result = rx + .recv() + .unwrap_or(Err("Failed to receive result".to_string())); match result { Ok(message) => Ok(Dynamic::from(message)), Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( @@ -70,9 +79,17 @@ pub fn use_tool_keyword(state: Arc, user: UserSession, engine: &mut En if tool_name.is_empty() { return Dynamic::from("ERROR: Invalid tool name"); } - let result = tokio::task::block_in_place(|| { - associate_tool_with_session(&state_clone2, &user_clone2, &tool_name) + let (tx, rx) = std::sync::mpsc::channel(); + let state_clone_2 = Arc::clone(&state_clone2); + let user_clone_2 = user_clone2.clone(); + let tool_name_2 = tool_name.clone(); + std::thread::spawn(move || { + let result = associate_tool_with_session(&state_clone_2, &user_clone_2, &tool_name_2); + let _ = tx.send(result); }); + let result = rx + .recv() + .unwrap_or(Err("Failed to receive result".to_string())); match result { Ok(message) => Dynamic::from(message), Err(e) => Dynamic::from(format!("ERROR: {}", e)), @@ -102,9 +119,17 @@ pub fn use_tool_keyword(state: Arc, user: UserSession, engine: &mut En if tool_name.is_empty() { return Dynamic::from("ERROR: Invalid tool name"); } - let result = tokio::task::block_in_place(|| { - associate_tool_with_session(&state_clone3, &user_clone3, &tool_name) + let (tx, rx) = std::sync::mpsc::channel(); + let state_clone_3 = Arc::clone(&state_clone3); + let user_clone_3 = user_clone3.clone(); + let tool_name_3 = tool_name.clone(); + std::thread::spawn(move || { + let result = associate_tool_with_session(&state_clone_3, &user_clone_3, &tool_name_3); + let _ = tx.send(result); }); + let result = rx + .recv() + .unwrap_or(Err("Failed to receive result".to_string())); match result { Ok(message) => Dynamic::from(message), Err(e) => Dynamic::from(format!("ERROR: {}", e)), diff --git a/src/calendar/mod.rs b/src/calendar/mod.rs index 070b4045..bc1e30f1 100644 --- a/src/calendar/mod.rs +++ b/src/calendar/mod.rs @@ -25,23 +25,26 @@ pub mod ui; fn get_bot_context() -> (Uuid, Uuid) { let sm = crate::core::secrets::SecretsManager::from_env().ok(); let (org_id, bot_id) = if let Some(sm) = sm { - tokio::task::block_in_place(|| { + let sm_owned = sm.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - rt.block_on(async { - let org = sm.get_value("gbo/analytics", "default_org_id").await + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + let org = sm_owned.get_value("gbo/analytics", "default_org_id").await .unwrap_or_else(|_| "system".to_string()); - let bot = sm.get_value("gbo/analytics", "default_bot_id").await + let bot = sm_owned.get_value("gbo/analytics", "default_bot_id").await .unwrap_or_else(|_| "system".to_string()); (org, bot) }) } else { ("system".to_string(), "system".to_string()) - } - }) + }; + let _ = tx.send(result); + }); + rx.recv().unwrap_or(("system".to_string(), "system".to_string())) } else { ("system".to_string(), "system".to_string()) }; diff --git a/src/core/secrets/mod.rs b/src/core/secrets/mod.rs index 4e2a4fb1..d420cc7b 100644 --- a/src/core/secrets/mod.rs +++ b/src/core/secrets/mod.rs @@ -331,175 +331,238 @@ impl SecretsManager { } pub fn get_cache_config(&self) -> (String, u16, Option) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::CACHE)) { - return ( - secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), - secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), - secrets.get("password").cloned(), - ); - } - } - ("localhost".to_string(), 6379, None) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::CACHE).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), + secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), + secrets.get("password").cloned(), + ); + } + ("localhost".to_string(), 6379, None) } pub fn get_directory_config_sync(&self) -> (String, String, String, String) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::DIRECTORY)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), - secrets.get("project_id").cloned().unwrap_or_default(), - secrets.get("client_id").cloned().unwrap_or_default(), - secrets.get("client_secret").cloned().unwrap_or_default(), - ); - } - } - ("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::DIRECTORY).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), + secrets.get("project_id").cloned().unwrap_or_default(), + secrets.get("client_id").cloned().unwrap_or_default(), + secrets.get("client_secret").cloned().unwrap_or_default(), + ); + } + ("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) } pub fn get_email_config(&self) -> (String, u16, String, String, String) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::EMAIL)) { - return ( - secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), - secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), - secrets.get("smtp_user").cloned().unwrap_or_default(), - secrets.get("smtp_password").cloned().unwrap_or_default(), - secrets.get("smtp_from").cloned().unwrap_or_default(), - ); - } - } - ("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::EMAIL).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), + secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), + secrets.get("smtp_user").cloned().unwrap_or_default(), + secrets.get("smtp_password").cloned().unwrap_or_default(), + secrets.get("smtp_from").cloned().unwrap_or_default(), + ); + } + ("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) } pub fn get_llm_config(&self) -> (String, String, Option, Option, String) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::LLM)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), - secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), - secrets.get("openai_key").cloned(), - secrets.get("anthropic_key").cloned(), - secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), - ); - } - } - ("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::LLM).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), + secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), + secrets.get("openai_key").cloned(), + secrets.get("anthropic_key").cloned(), + secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), + ); + } + ("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) } pub fn get_meet_config(&self) -> (String, String, String) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::MEET)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), - secrets.get("app_id").cloned().unwrap_or_default(), - secrets.get("app_secret").cloned().unwrap_or_default(), - ); - } - } - ("http://localhost:7880".to_string(), String::new(), String::new()) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::MEET).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), + secrets.get("app_id").cloned().unwrap_or_default(), + secrets.get("app_secret").cloned().unwrap_or_default(), + ); + } + ("http://localhost:7880".to_string(), String::new(), String::new()) } pub fn get_vectordb_config_sync(&self) -> (String, Option) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::VECTORDB)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()), - secrets.get("api_key").cloned(), - ); - } - } - ("http://localhost:6333".to_string(), None) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::VECTORDB).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()), + secrets.get("api_key").cloned(), + ); + } + ("http://localhost:6333".to_string(), None) } pub fn get_observability_config_sync(&self) -> (String, String, String, String) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), - secrets.get("org").cloned().unwrap_or_else(|| "system".into()), - secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), - secrets.get("token").cloned().unwrap_or_default(), - ); - } - } - ("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::OBSERVABILITY).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), + secrets.get("org").cloned().unwrap_or_else(|| "system".into()), + secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), + secrets.get("token").cloned().unwrap_or_default(), + ); + } + ("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) } pub fn get_alm_config(&self) -> (String, String, String) { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::ALM)) { - return ( - secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), - secrets.get("token").cloned().unwrap_or_default(), - secrets.get("default_org").cloned().unwrap_or_default(), - ); - } - } - ("http://localhost:9000".to_string(), String::new(), String::new()) - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::ALM).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), + secrets.get("token").cloned().unwrap_or_default(), + secrets.get("default_org").cloned().unwrap_or_default(), + ); + } + ("http://localhost:9000".to_string(), String::new(), String::new()) } pub fn get_jwt_secret_sync(&self) -> String { - tokio::task::block_in_place(|| { + let self_owned = self.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::JWT)) { - return secrets.get("secret").cloned().unwrap_or_default(); - } - } - String::new() - }) + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + self_owned.get_secret(SecretPaths::JWT).await.ok() + }) + } else { + None + }; + let _ = tx.send(result); + }); + if let Ok(Some(secrets)) = rx.recv() { + return secrets.get("secret").cloned().unwrap_or_default(); + } + String::new() } pub async fn put_secret(&self, path: &str, data: HashMap) -> Result<()> { diff --git a/src/core/shared/admin_email.rs b/src/core/shared/admin_email.rs index ef4f5f63..1f3800b4 100644 --- a/src/core/shared/admin_email.rs +++ b/src/core/shared/admin_email.rs @@ -19,19 +19,22 @@ pub async fn send_invitation_email( let smtp = crate::core::secrets::SecretsManager::from_env() .ok() .and_then(|sm| { - tokio::task::block_in_place(|| { + let sm_owned = sm.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { - rt.block_on(async { - sm.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async move { + sm_owned.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok() }) } else { None - } - }) + }; + let _ = tx.send(result); + }); + rx.recv().ok().flatten() }); let smtp_host = smtp.as_ref() diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index 445fca44..20a7d35f 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -298,18 +298,20 @@ impl Extensions { pub fn insert_blocking(&self, value: T) { let map = self.map.clone(); - tokio::task::block_in_place(|| { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build() - .ok(); - if let Some(rt) = rt { + .build(); + if let Ok(rt) = rt { rt.block_on(async { let mut guard = map.write().await; guard.insert(TypeId::of::(), Arc::new(value)); }); } + let _ = tx.send(()); }); + let _ = rx.recv(); } pub async fn get(&self) -> Option> { diff --git a/src/core/shared/utils.rs b/src/core/shared/utils.rs index a2d39464..2c66d81c 100644 --- a/src/core/shared/utils.rs +++ b/src/core/shared/utils.rs @@ -77,7 +77,6 @@ pub async fn get_secrets_manager() -> Option { let guard = SECRETS_MANAGER.read().ok()?; guard.clone() } - pub fn get_secrets_manager_sync() -> Option { let guard = SECRETS_MANAGER.read().ok()?; guard.clone()