fix: Replace Handle::try_current().block_on() with thread::spawn pattern
All checks were successful
BotServer CI/CD / build (push) Successful in 2m38s

- Fixes panic: Cannot start a runtime from within a runtime
- kb_statistics.rs: Wrap all async calls in std:🧵:spawn
- post_to.rs: Replace Handle::try_current with thread::spawn + mpsc
- Removes dead Handle::try_current checks from sync functions
- Follows AGENTS.md pattern for async-from-sync callbacks
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-04 07:35:03 -03:00
parent 4d7297243e
commit 2a042d400b
2 changed files with 99 additions and 73 deletions

View file

@ -45,14 +45,20 @@ pub fn kb_statistics_keyword(state: Arc<AppState>, user: UserSession, engine: &m
user.user_id
);
let rt = tokio::runtime::Handle::try_current();
let Ok(runtime) = rt else {
error!("KB STATISTICS: No tokio runtime available");
return Dynamic::UNIT;
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = if let Ok(rt) = rt {
rt.block_on(async { get_kb_statistics(&state, &user).await })
} else {
Err("Failed to create runtime".into())
};
let _ = tx.send(result);
});
let result = runtime
.block_on(async { get_kb_statistics(&state, &user).await });
let result = rx.recv().unwrap_or(Err("Channel error".into()));
match result {
Ok(stats) => match serde_json::to_value(&stats) {
@ -85,16 +91,21 @@ pub fn kb_statistics_keyword(state: Arc<AppState>, user: UserSession, engine: &m
user.user_id
);
let rt = tokio::runtime::Handle::try_current();
if rt.is_err() {
error!("KB COLLECTION STATS: No tokio runtime available");
return Dynamic::UNIT;
}
let collection = collection_name.to_string();
let result = rt
.expect("valid syntax registration")
.block_on(async { get_collection_statistics(&state, &collection).await });
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = if let Ok(rt) = rt {
rt.block_on(async { get_collection_statistics(&state, &collection).await })
} else {
Err("Failed to create runtime".into())
};
let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Channel error".into()));
match result {
Ok(stats) => match serde_json::to_value(&stats) {
@ -125,15 +136,7 @@ pub fn kb_statistics_keyword(state: Arc<AppState>, user: UserSession, engine: &m
user.user_id
);
let rt = tokio::runtime::Handle::try_current();
if rt.is_err() {
error!("KB DOCUMENTS COUNT: No tokio runtime available");
return 0;
}
let result = get_documents_count(&state, &user);
result.unwrap_or(0)
get_documents_count(&state, &user).unwrap_or(0)
});
let state_clone4 = Arc::clone(&state);
@ -150,15 +153,7 @@ pub fn kb_statistics_keyword(state: Arc<AppState>, user: UserSession, engine: &m
user.user_id
);
let rt = tokio::runtime::Handle::try_current();
if rt.is_err() {
error!("KB DOCUMENTS ADDED SINCE: No tokio runtime available");
return 0;
}
let result = get_documents_added_since(&state, &user, days);
result.unwrap_or(0)
get_documents_added_since(&state, &user, days).unwrap_or(0)
});
let state_clone5 = Arc::clone(&state);
@ -174,15 +169,20 @@ pub fn kb_statistics_keyword(state: Arc<AppState>, user: UserSession, engine: &m
user.user_id
);
let rt = tokio::runtime::Handle::try_current();
if rt.is_err() {
error!("KB LIST COLLECTIONS: No tokio runtime available");
return Dynamic::UNIT;
}
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = if let Ok(rt) = rt {
rt.block_on(async { list_collections(&state, &user).await })
} else {
Err("Failed to create runtime".into())
};
let _ = tx.send(result);
});
let result = rt
.expect("valid syntax registration")
.block_on(async { list_collections(&state, &user).await });
let result = rx.recv().unwrap_or(Err("Channel error".into()));
match result {
Ok(collections) => {
@ -209,16 +209,20 @@ pub fn kb_statistics_keyword(state: Arc<AppState>, user: UserSession, engine: &m
user.user_id
);
let rt = tokio::runtime::Handle::try_current();
if rt.is_err() {
error!("KB STORAGE SIZE: No tokio runtime available");
return 0.0;
}
let result = rt
.expect("valid syntax registration")
.block_on(async { get_storage_size(&state, &user).await });
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = if let Ok(rt) = rt {
rt.block_on(async { get_storage_size(&state, &user).await })
} else {
Err("Failed to create runtime".into())
};
let _ = tx.send(result);
});
let result = rx.recv().unwrap_or(Err("Channel error".into()));
result.unwrap_or(0.0)
});
}

View file

@ -77,14 +77,21 @@ fn post_to_impl(
content = content.with_video(vid);
}
let rt = tokio::runtime::Handle::try_current().map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("No async runtime available: {}", e).into(),
rhai::Position::NONE,
))
})?;
let cm = channel_manager.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = if let Ok(rt) = rt {
rt.block_on(async { cm.post_to(&account_name, &content).await })
} else {
Err("Failed to create runtime".into())
};
let _ = tx.send(result);
});
let result = rt.block_on(async { channel_manager.post_to(&account_name, &content).await });
let result = rx.recv().unwrap_or(Err("Channel error".into()));
match result {
Ok(post_result) => {
@ -140,15 +147,22 @@ fn post_to_multiple_impl(
content = content.with_video(vid);
}
let rt = tokio::runtime::Handle::try_current().map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("No async runtime available: {}", e).into(),
rhai::Position::NONE,
))
})?;
let cm = channel_manager.clone();
let names = account_names.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let results = if let Ok(rt) = rt {
rt.block_on(async { cm.post_to_multiple(&names, &content).await })
} else {
Vec::new()
};
let _ = tx.send(results);
});
let results =
rt.block_on(async { channel_manager.post_to_multiple(&account_names, &content).await });
let results = rx.recv().unwrap_or_default();
let mut total = 0;
let mut successful = 0;
@ -266,14 +280,22 @@ fn post_to_advanced_impl(
}
}
let rt = tokio::runtime::Handle::try_current().map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("No async runtime available: {}", e).into(),
rhai::Position::NONE,
))
})?;
let cm = channel_manager.clone();
let channel_str = channel.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let result = if let Ok(rt) = rt {
rt.block_on(async { cm.post_to(&channel_str, &content).await })
} else {
Err("Failed to create runtime".into())
};
let _ = tx.send(result);
});
let result = rt.block_on(async { channel_manager.post_to(&channel, &content).await });
let result = rx.recv().unwrap_or(Err("Channel error".into()));
match result {
Ok(post_result) => {