diff --git a/src/attendance/mod.rs b/src/attendance/mod.rs index f0bf025e..8f8c2d69 100644 --- a/src/attendance/mod.rs +++ b/src/attendance/mod.rs @@ -197,7 +197,7 @@ pub async fn attendant_respond( ); } - let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let adapter = WhatsAppAdapter::new(&state, session.bot_id); let response = BotResponse { bot_id: session.bot_id.to_string(), session_id: session.id.to_string(), @@ -568,7 +568,7 @@ async fn handle_attendant_message( session.context_data.get("phone").and_then(|v| v.as_str()) { let adapter = - WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + WhatsAppAdapter::new(&state, session.bot_id); let response = BotResponse { bot_id: session.bot_id.to_string(), session_id: session.id.to_string(), diff --git a/src/basic/keywords/hearing/talk.rs b/src/basic/keywords/hearing/talk.rs index 236358de..1bab5c00 100644 --- a/src/basic/keywords/hearing/talk.rs +++ b/src/basic/keywords/hearing/talk.rs @@ -61,7 +61,7 @@ pub async fn execute_talk( let bot_id = user_session.bot_id; tokio::spawn(async move { - let adapter = WhatsAppAdapter::new(pool, bot_id); + let adapter = WhatsAppAdapter::new(&state, bot_id); if let Err(e) = adapter.send_message(wa_response).await { error!("Failed to send TALK message via whatsapp adapter: {}", e); } else { diff --git a/src/basic/keywords/universal_messaging.rs b/src/basic/keywords/universal_messaging.rs index 02ad2704..aaa89b14 100644 --- a/src/basic/keywords/universal_messaging.rs +++ b/src/basic/keywords/universal_messaging.rs @@ -239,7 +239,7 @@ pub async fn send_message_to_recipient( match channel.as_str() { "whatsapp" => { - let adapter = WhatsAppAdapter::new(state.conn.clone(), user.bot_id); + let adapter = WhatsAppAdapter::new(&state, user.bot_id); let response = crate::core::shared::models::BotResponse { bot_id: "default".to_string(), session_id: user.id.to_string(), @@ -450,7 +450,7 @@ async fn send_whatsapp_file( ) -> Result<(), Box> { use reqwest::Client; - let _adapter = WhatsAppAdapter::new(state.conn.clone(), user.bot_id); + let _adapter = WhatsAppAdapter::new(&state, user.bot_id); let phone_number_id = ""; let upload_url = format!("https://graph.facebook.com/v17.0/{}/media", phone_number_id); diff --git a/src/core/bot/channels/whatsapp.rs b/src/core/bot/channels/whatsapp.rs index 7851f92b..518c6605 100644 --- a/src/core/bot/channels/whatsapp.rs +++ b/src/core/bot/channels/whatsapp.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use log::{error, info}; +use redis::Client as RedisClient; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -7,7 +8,7 @@ use crate::core::bot::channels::ChannelAdapter; use crate::core::bot::channels::whatsapp_queue::{QueuedWhatsAppMessage, WhatsAppMessageQueue}; use crate::core::config::ConfigManager; use crate::core::shared::models::BotResponse; -use crate::core::shared::utils::DbPool; +use crate::core::shared::state::AppState; use std::sync::Arc; /// Global WhatsApp message queue (shared across all adapters) @@ -25,8 +26,8 @@ pub struct WhatsAppAdapter { } impl WhatsAppAdapter { - pub fn new(pool: DbPool, bot_id: Uuid) -> Self { - let config_manager = ConfigManager::new(pool.clone()); + pub fn new(state: &Arc, bot_id: Uuid) -> Self { + let config_manager = ConfigManager::new(state.conn.clone()); let api_key = config_manager .get_config(&bot_id, "whatsapp-api-key", None) @@ -54,11 +55,6 @@ impl WhatsAppAdapter { .to_lowercase() == "true"; - // Get Redis URL from config - let redis_url = config_manager - .get_config(&bot_id, "redis-url", Some("redis://127.0.0.1:6379")) - .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - Self { api_key, phone_number_id, @@ -67,21 +63,15 @@ impl WhatsAppAdapter { api_version, _voice_response: voice_response, queue: WHATSAPP_QUEUE.get_or_init(|| { - let queue_res = WhatsAppMessageQueue::new(&redis_url); - match queue_res { - Ok(q) => { - let q_arc = Arc::new(q); - let worker_queue = Arc::clone(&q_arc); - tokio::spawn(async move { - worker_queue.start_worker().await; - }); - Some(q_arc) - } - Err(e) => { - error!("FATAL: Failed to create WhatsApp queue: {}. WhatsApp features will be disabled.", e); - None - } - } + state.cache.as_ref().map(|client| { + let q = WhatsAppMessageQueue::new(client.clone()); + let q_arc = Arc::new(q); + let worker_queue = Arc::clone(&q_arc); + tokio::spawn(async move { + worker_queue.start_worker().await; + }); + q_arc + }) }).as_ref(), } } diff --git a/src/core/bot/channels/whatsapp_queue.rs b/src/core/bot/channels/whatsapp_queue.rs index 93b992a7..14a36942 100644 --- a/src/core/bot/channels/whatsapp_queue.rs +++ b/src/core/bot/channels/whatsapp_queue.rs @@ -22,7 +22,7 @@ pub struct QueuedWhatsAppMessage { #[derive(Debug)] pub struct WhatsAppMessageQueue { - redis_client: redis::Client, + redis_client: Arc, } impl WhatsAppMessageQueue { @@ -31,10 +31,10 @@ impl WhatsAppMessageQueue { const BURST_CAPACITY: i64 = 45; const RATE_SECS: i64 = 6; - pub fn new(redis_url: &str) -> Result { - Ok(Self { - redis_client: redis::Client::open(redis_url)?, - }) + pub fn new(redis_client: Arc) -> Self { + Self { + redis_client, + } } pub async fn enqueue(&self, msg: QueuedWhatsAppMessage) -> Result<(), redis::RedisError> { diff --git a/src/marketing/whatsapp.rs b/src/marketing/whatsapp.rs index 8dace03f..2318c851 100644 --- a/src/marketing/whatsapp.rs +++ b/src/marketing/whatsapp.rs @@ -123,7 +123,7 @@ pub async fn send_whatsapp_message( return Err("WhatsApp not configured for this bot".to_string()); } - let adapter = WhatsAppAdapter::new(state.conn.clone(), bot_id); + let adapter = WhatsAppAdapter::new(&state, bot_id); let result: Result> = if let Some(template_name) = payload.template_name { adapter diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs index 14a630d7..5216cbc7 100644 --- a/src/whatsapp/mod.rs +++ b/src/whatsapp/mod.rs @@ -568,7 +568,7 @@ async fn process_incoming_message( } // Send confirmation message - let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); + let adapter = WhatsAppAdapter::new(&state, effective_bot_id); let bot_response = BotResponse { bot_id: effective_bot_id.to_string(), session_id: session.id.to_string(), @@ -614,7 +614,7 @@ async fn process_incoming_message( // Handle /clear command - available to all users if content.trim().to_lowercase() == "/clear" { - let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); + let adapter = WhatsAppAdapter::new(&state, effective_bot_id); // Find and clear the user's session match find_or_create_session(&state, &effective_bot_id, &phone, &name).await { @@ -652,7 +652,7 @@ async fn process_incoming_message( if content.starts_with('/') { if let Some(response) = process_attendant_command(&state, &phone, &content).await { - let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); + let adapter = WhatsAppAdapter::new(&state, effective_bot_id); let bot_response = BotResponse { bot_id: effective_bot_id.to_string(), session_id: Uuid::nil().to_string(), @@ -975,7 +975,7 @@ async fn route_to_bot( context_name: None, }; - let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let adapter = WhatsAppAdapter::new(&state, session.bot_id); let orchestrator = BotOrchestrator::new(state.clone()); @@ -989,7 +989,7 @@ async fn route_to_bot( .to_string(); let phone_for_error = phone.clone(); - let adapter_for_send = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let adapter_for_send = WhatsAppAdapter::new(&state, session.bot_id); let bot_id_for_voice = session.bot_id; let state_clone = state.clone(); @@ -1288,7 +1288,7 @@ async fn route_to_bot( match client.generate_audio(&final_text, None, None).await { Ok(audio_url) => { info!("TTS generated: {}", audio_url); - let wa_adapter = WhatsAppAdapter::new(state_for_voice.conn.clone(), bot_id_for_voice); + let wa_adapter = WhatsAppAdapter::new(&state_for_voice, bot_id_for_voice); if let Err(e) = wa_adapter.send_voice_message(&phone_for_voice, &audio_url).await { error!("Failed to send voice message: {}", e); } else { @@ -1477,7 +1477,7 @@ pub async fn send_message( info!("Sending WhatsApp message to {}", request.to); let bot_id = get_default_bot_id(&state).await; - let adapter = WhatsAppAdapter::new(state.conn.clone(), bot_id); + let adapter = WhatsAppAdapter::new(&state, bot_id); let response = BotResponse { bot_id: bot_id.to_string(), @@ -1589,7 +1589,7 @@ pub async fn attendant_respond( match channel { "whatsapp" => { - let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let adapter = WhatsAppAdapter::new(&state, session.bot_id); let response = BotResponse { bot_id: session.bot_id.to_string(), session_id: session.id.to_string(), @@ -1689,7 +1689,7 @@ async fn process_audio_message( bot_id: &Uuid, audio: &WhatsAppMedia, ) -> Result> { - let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id); + let adapter = WhatsAppAdapter::new(&state, *bot_id); let binary = adapter.download_media(&audio.id).await?; let bot_models = BotModelsClient::from_state(state, bot_id);