fix: replace all block_in_place with std:🧵:spawn to fix nested runtime panic
Some checks are pending
BotServer CI/CD / build (push) Waiting to run

Root cause: block_in_place + new_current_thread().block_on() panics when
called from within tokio runtime (including spawn_blocking). Tokio doesn't
allow nested block_on() calls.

Fix: Replace ALL block_in_place patterns with std:🧵:spawn + mpsc channel.
This creates a completely separate OS thread with its own runtime, avoiding
any nesting issues. Works from any context: async, spawn_blocking, or sync.

Files: 14 files across secrets, utils, state, calendar, analytics, email,
and all keyword handlers (universal_messaging, search, book, create_draft,
create_site, hearing/syntax, use_tool, find, admin_email, goals)
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-03 12:54:36 -03:00
parent 4bdf46bdfc
commit 61642343a8
14 changed files with 411 additions and 253 deletions

View file

@ -19,23 +19,26 @@ use crate::core::shared::state::AppState;
fn get_bot_context() -> (Uuid, Uuid) { fn get_bot_context() -> (Uuid, Uuid) {
let sm = crate::core::secrets::SecretsManager::from_env().ok(); let sm = crate::core::secrets::SecretsManager::from_env().ok();
let (org_id, bot_id) = if let Some(sm) = sm { let (org_id, bot_id) = if let Some(sm) = sm {
tokio::task::block_in_place(|| { let sm_owned = sm.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
rt.block_on(async { let org = sm_owned.get_value("gbo/analytics", "default_org_id").await
let org = sm.get_value("gbo/analytics", "default_org_id").await
.unwrap_or_else(|_| "system".to_string()); .unwrap_or_else(|_| "system".to_string());
let bot = sm.get_value("gbo/analytics", "default_bot_id").await let bot = sm_owned.get_value("gbo/analytics", "default_bot_id").await
.unwrap_or_else(|_| "system".to_string()); .unwrap_or_else(|_| "system".to_string());
(org, bot) (org, bot)
}) })
} else { } else {
("system".to_string(), "system".to_string()) ("system".to_string(), "system".to_string())
} };
}) let _ = tx.send(result);
});
rx.recv().unwrap_or(("system".to_string(), "system".to_string()))
} else { } else {
("system".to_string(), "system".to_string()) ("system".to_string(), "system".to_string())
}; };

View file

@ -273,23 +273,27 @@ pub fn book_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let user_for_task = user_clone2.clone(); let user_for_task = user_clone2.clone();
let meeting_json = meeting_details.to_string(); let meeting_json = meeting_details.to_string();
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
let meeting_json_clone = meeting_json.clone();
let attendees_clone = attendees.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = match rt {
match rt { Ok(rt) => rt.block_on(async move {
Some(rt) => rt.block_on(async move {
execute_book_meeting( execute_book_meeting(
&state_for_task, &state_for_task,
&user_for_task, &user_for_task,
meeting_json, meeting_json_clone,
attendees, attendees_clone,
) )
}), }),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
let _ = tx.send(result);
}); });
let result = rx.recv().unwrap_or(Err("Failed to create runtime".into()));
match result { match result {
Ok(event_id) => Ok(Dynamic::from(event_id)), Ok(event_id) => Ok(Dynamic::from(event_id)),
@ -324,22 +328,25 @@ pub fn book_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let state_for_task = Arc::clone(&state_clone3); let state_for_task = Arc::clone(&state_clone3);
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
let date_str_clone = date_str.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = match rt {
match rt { Ok(rt) => rt.block_on(async move {
Some(rt) => rt.block_on(async move {
check_availability( check_availability(
&state_for_task, &state_for_task,
&date_str, &date_str_clone,
duration_minutes, duration_minutes,
) )
}), }),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
let _ = tx.send(result);
}); });
let result = rx.recv().unwrap_or(Err("Failed to create runtime".into()));
match result { match result {
Ok(slots) => Ok(Dynamic::from(slots)), Ok(slots) => Ok(Dynamic::from(slots)),

View file

@ -14,15 +14,24 @@ pub fn create_draft_keyword(state: &AppState, _user: UserSession, engine: &mut E
let subject = context.eval_expression_tree(&inputs[1])?.to_string(); let subject = context.eval_expression_tree(&inputs[1])?.to_string();
let reply_text = context.eval_expression_tree(&inputs[2])?.to_string(); let reply_text = context.eval_expression_tree(&inputs[2])?.to_string();
let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text); let state_clone2 = state_clone.clone();
let result = let to_owned = to.clone();
tokio::task::block_in_place(|| { let subject_owned = subject.clone();
let rt = tokio::runtime::Builder::new_current_thread() let reply_text_owned = reply_text.clone();
.enable_all() let (tx, rx) = std::sync::mpsc::channel();
.build() std::thread::spawn(move || {
.map_err(|e| format!("Runtime creation failed: {e}"))?; let rt = tokio::runtime::Builder::new_current_thread()
rt.block_on(fut) .enable_all()
}) .build();
let result = match rt {
Ok(rt) => rt.block_on(async {
execute_create_draft(&state_clone2, &to_owned, &subject_owned, &reply_text_owned).await
}),
Err(e) => Err(format!("Runtime creation failed: {e}")),
};
let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Failed to receive result".to_string()))
.map_err(|e| format!("Draft creation failed: {e}"))?; .map_err(|e| format!("Draft creation failed: {e}"))?;
Ok(Dynamic::from(result)) Ok(Dynamic::from(result))
}, },

View file

@ -56,15 +56,20 @@ pub fn create_site_keyword(state: &AppState, user: UserSession, engine: &mut Eng
template_dir, template_dir,
prompt, prompt,
}; };
let fut = create_site(config, s3, bucket, bot_id, llm, params); let (tx, rx) = std::sync::mpsc::channel();
let result = std::thread::spawn(move || {
tokio::task::block_in_place(|| { let rt = tokio::runtime::Builder::new_current_thread()
let rt = tokio::runtime::Builder::new_current_thread() .enable_all()
.enable_all() .build();
.build() let result = match rt {
.map_err(|e| format!("Runtime creation failed: {}", e))?; Ok(rt) => rt.block_on(async {
rt.block_on(fut) create_site(config, s3, bucket, bot_id, llm, params).await
}) }),
Err(e) => Err(format!("Runtime creation failed: {}", e).into()),
};
let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))
.map_err(|e| format!("Site creation failed: {}", e))?; .map_err(|e| format!("Site creation failed: {}", e))?;
Ok(Dynamic::from(result)) Ok(Dynamic::from(result))
}, },

View file

@ -44,11 +44,21 @@ pub fn find_keyword(state: &AppState, user: UserSession, engine: &mut Engine) {
} }
}; };
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
tokio::runtime::Handle::current() let table_str_clone = binding2.clone();
.block_on(async { execute_find(&mut binding, &binding2, &binding3) }) let filter_str_clone = binding3.clone();
}) std::thread::spawn(move || {
.map_err(|e| format!("DB error: {e}"))?; let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = match rt {
Ok(rt) => rt.block_on(async { execute_find(&mut binding, &table_str_clone, &filter_str_clone) })
.map_err(|e| format!("DB error: {e}")),
Err(_) => Err("Failed to create runtime".into()),
};
let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))?;
if let Some(results) = result.get("results") { if let Some(results) = result.get("results") {
let filtered = let filtered =

View file

@ -28,7 +28,8 @@ fn hear_block(state: &Arc<AppState>, session_id: uuid::Uuid, variable_name: &str
// Mark session as waiting and store metadata in Redis (for UI hints like menus) // Mark session as waiting and store metadata in Redis (for UI hints like menus)
let state_clone = Arc::clone(state); let state_clone = Arc::clone(state);
let var = variable_name.to_string(); let var = variable_name.to_string();
tokio::task::block_in_place(|| { let (init_tx, init_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build(); .build();
@ -52,7 +53,9 @@ fn hear_block(state: &Arc<AppState>, session_id: uuid::Uuid, variable_name: &str
} }
}); });
} }
let _ = init_tx.send(());
}); });
let _ = init_rx.recv();
trace!("HEAR {variable_name}: blocking thread, waiting for user input"); trace!("HEAR {variable_name}: blocking thread, waiting for user input");
@ -216,7 +219,8 @@ fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut E
let state_for_suggestions = Arc::clone(&state_clone); let state_for_suggestions = Arc::clone(&state_clone);
let opts_clone = options.clone(); let opts_clone = options.clone();
let bot_id_clone = bot_id; let bot_id_clone = bot_id;
tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build(); .build();
@ -236,7 +240,9 @@ fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut E
} }
}); });
} }
let _ = tx.send(());
}); });
let _ = rx.recv();
let value = hear_block(&state_clone, session_id, &variable_name, json!({ let value = hear_block(&state_clone, session_id, &variable_name, json!({
"variable": variable_name, "variable": variable_name,

View file

@ -72,19 +72,23 @@ pub fn search_keyword(state: &AppState, user: UserSession, engine: &mut Engine)
} }
}; };
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
let table_str_clone = table_str.clone();
let query_str_clone = query_str.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = match rt {
match rt { Ok(rt) => rt.block_on(async {
Some(rt) => rt.block_on(async { execute_search(&mut binding, &table_str_clone, &query_str_clone, limit_val)
execute_search(&mut binding, &table_str, &query_str, limit_val)
}) })
.map_err(|e| format!("Search error: {e}")), .map_err(|e| format!("Search error: {e}")),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
})?; let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))?;
if let Some(results) = result.get("results") { if let Some(results) = result.get("results") {
let filtered = let filtered =
@ -125,11 +129,21 @@ pub fn search_keyword(state: &AppState, user: UserSession, engine: &mut Engine)
} }
}; };
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
tokio::runtime::Handle::current() let table_str_clone = table_str.clone();
.block_on(async { execute_search(&mut binding, &table_str, &query_str, 10) }) let query_str_clone = query_str.clone();
}) std::thread::spawn(move || {
.map_err(|e| format!("Search error: {e}"))?; let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = match rt {
Ok(rt) => rt.block_on(async { execute_search(&mut binding, &table_str_clone, &query_str_clone, 10) })
.map_err(|e| format!("Search error: {e}")),
Err(_) => Err("Failed to create runtime".into()),
};
let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Failed to receive result".into()))?;
if let Some(results) = result.get("results") { if let Some(results) = result.get("results") {
let filtered = filter_fields_by_role(results.clone(), &roles, &access_info); let filtered = filter_fields_by_role(results.clone(), &roles, &access_info);

View file

@ -79,19 +79,21 @@ fn register_send_file_to(state: Arc<AppState>, user: UserSession, engine: &mut E
let state_for_send = Arc::clone(&state_clone); let state_for_send = Arc::clone(&state_clone);
let user_for_send = Arc::clone(&user_clone); let user_for_send = Arc::clone(&user_clone);
tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = match rt {
match rt { Ok(rt) => rt.block_on(async {
Some(rt) => rt.block_on(async {
send_file_to_recipient(state_for_send, &user_for_send, &recipient, file) send_file_to_recipient(state_for_send, &user_for_send, &recipient, file)
.await .await
}), }),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
}) let _ = tx.send(result);
});
rx.recv().unwrap_or(Err("Failed to receive result".into()))
.map_err(|e| format!("Failed to send file: {}", e))?; .map_err(|e| format!("Failed to send file: {}", e))?;
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
@ -116,13 +118,13 @@ fn register_send_file_to(state: Arc<AppState>, user: UserSession, engine: &mut E
let state_for_send = Arc::clone(&state_clone2); let state_for_send = Arc::clone(&state_clone2);
let user_for_send = Arc::clone(&user_clone2); let user_for_send = Arc::clone(&user_clone2);
tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = match rt {
match rt { Ok(rt) => rt.block_on(async {
Some(rt) => rt.block_on(async {
send_file_with_caption_to_recipient( send_file_with_caption_to_recipient(
state_for_send, state_for_send,
&user_for_send, &user_for_send,
@ -132,9 +134,11 @@ fn register_send_file_to(state: Arc<AppState>, user: UserSession, engine: &mut E
) )
.await .await
}), }),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
}) let _ = tx.send(result);
});
rx.recv().unwrap_or(Err("Failed to receive result".into()))
.map_err(|e| format!("Failed to send file with caption: {}", e))?; .map_err(|e| format!("Failed to send file with caption: {}", e))?;
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
@ -159,19 +163,21 @@ fn register_send_to(state: Arc<AppState>, user: UserSession, engine: &mut Engine
let state_for_send = Arc::clone(&state_clone); let state_for_send = Arc::clone(&state_clone);
let user_for_send = user.clone(); let user_for_send = user.clone();
tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = match rt {
match rt { Ok(rt) => rt.block_on(async {
Some(rt) => rt.block_on(async {
send_to_specific_channel(state_for_send, &user_for_send, &target, &message) send_to_specific_channel(state_for_send, &user_for_send, &target, &message)
.await .await
}), }),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
}) let _ = tx.send(result);
});
rx.recv().unwrap_or(Err("Failed to receive result".into()))
.map_err(|e| format!("Failed to send: {}", e))?; .map_err(|e| format!("Failed to send: {}", e))?;
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
@ -196,20 +202,23 @@ fn register_broadcast(state: Arc<AppState>, user: UserSession, engine: &mut Engi
let state_for_send = Arc::clone(&state_clone); let state_for_send = Arc::clone(&state_clone);
let user_for_send = user.clone(); let user_for_send = user.clone();
let results = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result: Result<Dynamic, String> = match rt {
match rt { Ok(rt) => rt.block_on(async {
Some(rt) => rt.block_on(async {
broadcast_message(state_for_send, &user_for_send, &message, recipients) broadcast_message(state_for_send, &user_for_send, &message, recipients)
.await .await
.map_err(|e| format!("{}", e))
}), }),
None => Err("Failed to create runtime".into()), Err(_) => Err("Failed to create runtime".into()),
} };
}) let _ = tx.send(result);
.map_err(|e| format!("Failed to broadcast: {}", e))?; });
let results = rx.recv().unwrap_or(Err("Failed to receive result".into()))
.map_err(|e| format!("Failed to broadcast: {}", e))?;
Ok(results) Ok(results)
}, },

View file

@ -34,9 +34,18 @@ pub fn use_tool_keyword(state: Arc<AppState>, user: UserSession, engine: &mut En
rhai::Position::NONE, rhai::Position::NONE,
))); )));
} }
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
associate_tool_with_session(&state_clone, &user_clone, &tool_name) let state_clone_1 = Arc::clone(&state_clone);
let user_clone_1 = user_clone.clone();
let tool_name_1 = tool_name.clone();
std::thread::spawn(move || {
let result =
associate_tool_with_session(&state_clone_1, &user_clone_1, &tool_name_1);
let _ = tx.send(result);
}); });
let result = rx
.recv()
.unwrap_or(Err("Failed to receive result".to_string()));
match result { match result {
Ok(message) => Ok(Dynamic::from(message)), Ok(message) => Ok(Dynamic::from(message)),
Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
@ -70,9 +79,17 @@ pub fn use_tool_keyword(state: Arc<AppState>, user: UserSession, engine: &mut En
if tool_name.is_empty() { if tool_name.is_empty() {
return Dynamic::from("ERROR: Invalid tool name"); return Dynamic::from("ERROR: Invalid tool name");
} }
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
associate_tool_with_session(&state_clone2, &user_clone2, &tool_name) let state_clone_2 = Arc::clone(&state_clone2);
let user_clone_2 = user_clone2.clone();
let tool_name_2 = tool_name.clone();
std::thread::spawn(move || {
let result = associate_tool_with_session(&state_clone_2, &user_clone_2, &tool_name_2);
let _ = tx.send(result);
}); });
let result = rx
.recv()
.unwrap_or(Err("Failed to receive result".to_string()));
match result { match result {
Ok(message) => Dynamic::from(message), Ok(message) => Dynamic::from(message),
Err(e) => Dynamic::from(format!("ERROR: {}", e)), Err(e) => Dynamic::from(format!("ERROR: {}", e)),
@ -102,9 +119,17 @@ pub fn use_tool_keyword(state: Arc<AppState>, user: UserSession, engine: &mut En
if tool_name.is_empty() { if tool_name.is_empty() {
return Dynamic::from("ERROR: Invalid tool name"); return Dynamic::from("ERROR: Invalid tool name");
} }
let result = tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
associate_tool_with_session(&state_clone3, &user_clone3, &tool_name) let state_clone_3 = Arc::clone(&state_clone3);
let user_clone_3 = user_clone3.clone();
let tool_name_3 = tool_name.clone();
std::thread::spawn(move || {
let result = associate_tool_with_session(&state_clone_3, &user_clone_3, &tool_name_3);
let _ = tx.send(result);
}); });
let result = rx
.recv()
.unwrap_or(Err("Failed to receive result".to_string()));
match result { match result {
Ok(message) => Dynamic::from(message), Ok(message) => Dynamic::from(message),
Err(e) => Dynamic::from(format!("ERROR: {}", e)), Err(e) => Dynamic::from(format!("ERROR: {}", e)),

View file

@ -25,23 +25,26 @@ pub mod ui;
fn get_bot_context() -> (Uuid, Uuid) { fn get_bot_context() -> (Uuid, Uuid) {
let sm = crate::core::secrets::SecretsManager::from_env().ok(); let sm = crate::core::secrets::SecretsManager::from_env().ok();
let (org_id, bot_id) = if let Some(sm) = sm { let (org_id, bot_id) = if let Some(sm) = sm {
tokio::task::block_in_place(|| { let sm_owned = sm.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
rt.block_on(async { let org = sm_owned.get_value("gbo/analytics", "default_org_id").await
let org = sm.get_value("gbo/analytics", "default_org_id").await
.unwrap_or_else(|_| "system".to_string()); .unwrap_or_else(|_| "system".to_string());
let bot = sm.get_value("gbo/analytics", "default_bot_id").await let bot = sm_owned.get_value("gbo/analytics", "default_bot_id").await
.unwrap_or_else(|_| "system".to_string()); .unwrap_or_else(|_| "system".to_string());
(org, bot) (org, bot)
}) })
} else { } else {
("system".to_string(), "system".to_string()) ("system".to_string(), "system".to_string())
} };
}) let _ = tx.send(result);
});
rx.recv().unwrap_or(("system".to_string(), "system".to_string()))
} else { } else {
("system".to_string(), "system".to_string()) ("system".to_string(), "system".to_string())
}; };

View file

@ -331,175 +331,238 @@ impl SecretsManager {
} }
pub fn get_cache_config(&self) -> (String, u16, Option<String>) { pub fn get_cache_config(&self) -> (String, u16, Option<String>) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::CACHE)) { self_owned.get_secret(SecretPaths::CACHE).await.ok()
return ( })
secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), } else {
secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), None
secrets.get("password").cloned(), };
); let _ = tx.send(result);
} });
} if let Ok(Some(secrets)) = rx.recv() {
("localhost".to_string(), 6379, None) return (
}) secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()),
secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379),
secrets.get("password").cloned(),
);
}
("localhost".to_string(), 6379, None)
} }
pub fn get_directory_config_sync(&self) -> (String, String, String, String) { pub fn get_directory_config_sync(&self) -> (String, String, String, String) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::DIRECTORY)) { self_owned.get_secret(SecretPaths::DIRECTORY).await.ok()
return ( })
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), } else {
secrets.get("project_id").cloned().unwrap_or_default(), None
secrets.get("client_id").cloned().unwrap_or_default(), };
secrets.get("client_secret").cloned().unwrap_or_default(), let _ = tx.send(result);
); });
} if let Ok(Some(secrets)) = rx.recv() {
} return (
("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()),
}) secrets.get("project_id").cloned().unwrap_or_default(),
secrets.get("client_id").cloned().unwrap_or_default(),
secrets.get("client_secret").cloned().unwrap_or_default(),
);
}
("http://localhost:9000".to_string(), String::new(), String::new(), String::new())
} }
pub fn get_email_config(&self) -> (String, u16, String, String, String) { pub fn get_email_config(&self) -> (String, u16, String, String, String) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::EMAIL)) { self_owned.get_secret(SecretPaths::EMAIL).await.ok()
return ( })
secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), } else {
secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), None
secrets.get("smtp_user").cloned().unwrap_or_default(), };
secrets.get("smtp_password").cloned().unwrap_or_default(), let _ = tx.send(result);
secrets.get("smtp_from").cloned().unwrap_or_default(), });
); if let Ok(Some(secrets)) = rx.recv() {
} return (
} secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()),
("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587),
}) secrets.get("smtp_user").cloned().unwrap_or_default(),
secrets.get("smtp_password").cloned().unwrap_or_default(),
secrets.get("smtp_from").cloned().unwrap_or_default(),
);
}
("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new())
} }
pub fn get_llm_config(&self) -> (String, String, Option<String>, Option<String>, String) { pub fn get_llm_config(&self) -> (String, String, Option<String>, Option<String>, String) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::LLM)) { self_owned.get_secret(SecretPaths::LLM).await.ok()
return ( })
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), } else {
secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), None
secrets.get("openai_key").cloned(), };
secrets.get("anthropic_key").cloned(), let _ = tx.send(result);
secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), });
); if let Ok(Some(secrets)) = rx.recv() {
} return (
} secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()),
("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()),
}) secrets.get("openai_key").cloned(),
secrets.get("anthropic_key").cloned(),
secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()),
);
}
("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string())
} }
pub fn get_meet_config(&self) -> (String, String, String) { pub fn get_meet_config(&self) -> (String, String, String) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::MEET)) { self_owned.get_secret(SecretPaths::MEET).await.ok()
return ( })
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), } else {
secrets.get("app_id").cloned().unwrap_or_default(), None
secrets.get("app_secret").cloned().unwrap_or_default(), };
); let _ = tx.send(result);
} });
} if let Ok(Some(secrets)) = rx.recv() {
("http://localhost:7880".to_string(), String::new(), String::new()) return (
}) secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()),
secrets.get("app_id").cloned().unwrap_or_default(),
secrets.get("app_secret").cloned().unwrap_or_default(),
);
}
("http://localhost:7880".to_string(), String::new(), String::new())
} }
pub fn get_vectordb_config_sync(&self) -> (String, Option<String>) { pub fn get_vectordb_config_sync(&self) -> (String, Option<String>) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::VECTORDB)) { self_owned.get_secret(SecretPaths::VECTORDB).await.ok()
return ( })
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()), } else {
secrets.get("api_key").cloned(), None
); };
} let _ = tx.send(result);
} });
("http://localhost:6333".to_string(), None) if let Ok(Some(secrets)) = rx.recv() {
}) return (
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:6333".into()),
secrets.get("api_key").cloned(),
);
}
("http://localhost:6333".to_string(), None)
} }
pub fn get_observability_config_sync(&self) -> (String, String, String, String) { pub fn get_observability_config_sync(&self) -> (String, String, String, String) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) { self_owned.get_secret(SecretPaths::OBSERVABILITY).await.ok()
return ( })
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), } else {
secrets.get("org").cloned().unwrap_or_else(|| "system".into()), None
secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), };
secrets.get("token").cloned().unwrap_or_default(), let _ = tx.send(result);
); });
} if let Ok(Some(secrets)) = rx.recv() {
} return (
("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()),
}) secrets.get("org").cloned().unwrap_or_else(|| "system".into()),
secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()),
secrets.get("token").cloned().unwrap_or_default(),
);
}
("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new())
} }
pub fn get_alm_config(&self) -> (String, String, String) { pub fn get_alm_config(&self) -> (String, String, String) {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::ALM)) { self_owned.get_secret(SecretPaths::ALM).await.ok()
return ( })
secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), } else {
secrets.get("token").cloned().unwrap_or_default(), None
secrets.get("default_org").cloned().unwrap_or_default(), };
); let _ = tx.send(result);
} });
} if let Ok(Some(secrets)) = rx.recv() {
("http://localhost:9000".to_string(), String::new(), String::new()) return (
}) secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()),
secrets.get("token").cloned().unwrap_or_default(),
secrets.get("default_org").cloned().unwrap_or_default(),
);
}
("http://localhost:9000".to_string(), String::new(), String::new())
} }
pub fn get_jwt_secret_sync(&self) -> String { pub fn get_jwt_secret_sync(&self) -> String {
tokio::task::block_in_place(|| { let self_owned = self.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
if let Ok(secrets) = rt.block_on(self.get_secret(SecretPaths::JWT)) { self_owned.get_secret(SecretPaths::JWT).await.ok()
return secrets.get("secret").cloned().unwrap_or_default(); })
} } else {
} None
String::new() };
}) let _ = tx.send(result);
});
if let Ok(Some(secrets)) = rx.recv() {
return secrets.get("secret").cloned().unwrap_or_default();
}
String::new()
} }
pub async fn put_secret(&self, path: &str, data: HashMap<String, String>) -> Result<()> { pub async fn put_secret(&self, path: &str, data: HashMap<String, String>) -> Result<()> {

View file

@ -19,19 +19,22 @@ pub async fn send_invitation_email(
let smtp = crate::core::secrets::SecretsManager::from_env() let smtp = crate::core::secrets::SecretsManager::from_env()
.ok() .ok()
.and_then(|sm| { .and_then(|sm| {
tokio::task::block_in_place(|| { let sm_owned = sm.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); let result = if let Ok(rt) = rt {
if let Some(rt) = rt { rt.block_on(async move {
rt.block_on(async { sm_owned.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok()
sm.get_secret(crate::core::secrets::SecretPaths::EMAIL).await.ok()
}) })
} else { } else {
None None
} };
}) let _ = tx.send(result);
});
rx.recv().ok().flatten()
}); });
let smtp_host = smtp.as_ref() let smtp_host = smtp.as_ref()

View file

@ -298,18 +298,20 @@ impl Extensions {
pub fn insert_blocking<T: Send + Sync + 'static>(&self, value: T) { pub fn insert_blocking<T: Send + Sync + 'static>(&self, value: T) {
let map = self.map.clone(); let map = self.map.clone();
tokio::task::block_in_place(|| { let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build();
.ok(); if let Ok(rt) = rt {
if let Some(rt) = rt {
rt.block_on(async { rt.block_on(async {
let mut guard = map.write().await; let mut guard = map.write().await;
guard.insert(TypeId::of::<T>(), Arc::new(value)); guard.insert(TypeId::of::<T>(), Arc::new(value));
}); });
} }
let _ = tx.send(());
}); });
let _ = rx.recv();
} }
pub async fn get<T: Send + Sync + 'static>(&self) -> Option<Arc<T>> { pub async fn get<T: Send + Sync + 'static>(&self) -> Option<Arc<T>> {

View file

@ -77,7 +77,6 @@ pub async fn get_secrets_manager() -> Option<SecretsManager> {
let guard = SECRETS_MANAGER.read().ok()?; let guard = SECRETS_MANAGER.read().ok()?;
guard.clone() guard.clone()
} }
pub fn get_secrets_manager_sync() -> Option<SecretsManager> { pub fn get_secrets_manager_sync() -> Option<SecretsManager> {
let guard = SECRETS_MANAGER.read().ok()?; let guard = SECRETS_MANAGER.read().ok()?;
guard.clone() guard.clone()