diff --git a/src/core/package_manager/container.rs b/src/core/package_manager/container.rs new file mode 100644 index 00000000..c80fdc76 --- /dev/null +++ b/src/core/package_manager/container.rs @@ -0,0 +1,783 @@ +use crate::core::package_manager::PackageManager; +use crate::security::command_guard::SafeCommand; +use anyhow::{Context, Result}; +use log::info; +use std::path::Path; + +/// NAT rule configuration for container port forwarding +#[derive(Debug, Clone)] +pub struct NatRule { + pub port: u16, + pub protocol: String, +} + +impl NatRule { + pub fn new(port: u16, protocol: &str) -> Self { + Self { + port, + protocol: protocol.to_string(), + } + } +} + +/// Container-specific settings for component deployment +#[derive(Debug, Clone)] +pub struct ContainerSettings { + pub container_name: String, + pub ip: String, + pub user: String, + pub group: Option, + pub working_dir: Option, + pub service_template: String, + pub nat_rules: Vec, + pub binary_path: String, + pub config_path: String, + pub data_path: Option, + pub exec_cmd_args: Vec, + pub internal_ports: Vec, + pub external_port: Option, +} + +impl ContainerSettings { + pub fn new( + container_name: &str, + ip: &str, + user: &str, + binary_path: &str, + config_path: &str, + ) -> Self { + Self { + container_name: container_name.to_string(), + ip: ip.to_string(), + user: user.to_string(), + group: None, + working_dir: None, + service_template: String::new(), + nat_rules: Vec::new(), + binary_path: binary_path.to_string(), + config_path: config_path.to_string(), + data_path: None, + exec_cmd_args: Vec::new(), + internal_ports: Vec::new(), + external_port: None, + } + } + + pub fn with_group(mut self, group: &str) -> Self { + self.group = Some(group.to_string()); + self + } + + pub fn with_working_dir(mut self, dir: &str) -> Self { + self.working_dir = Some(dir.to_string()); + self + } + + pub fn with_service_template(mut self, template: &str) -> Self { + self.service_template = template.to_string(); + self + } + + pub fn with_nat_rules(mut self, rules: Vec) -> Self { + self.nat_rules = rules; + self + } + + pub fn with_data_path(mut self, path: &str) -> Self { + self.data_path = Some(path.to_string()); + self + } + + pub fn with_exec_args(mut self, args: Vec) -> Self { + self.exec_cmd_args = args; + self + } + + pub fn with_internal_ports(mut self, ports: Vec) -> Self { + self.internal_ports = ports; + self + } + + pub fn with_external_port(mut self, port: u16) -> Self { + self.external_port = Some(port); + self + } +} + +/// Extension trait for PackageManager to handle container operations +pub trait ContainerOperations { + /// Bootstrap a container with all its services and NAT rules + fn bootstrap_container( + &self, + container_name: &str, + source_lxd: Option<&str>, + ) -> impl std::future::Future> + Send; + + /// Cleanup existing socat and proxy devices + fn cleanup_existing( + &self, + container: &str, + ) -> impl std::future::Future> + Send; + + /// Copy container from LXD source + fn copy_container( + &self, + source_remote: &str, + name: &str, + ) -> impl std::future::Future> + Send; + + /// Add eth0 network to container + fn ensure_network( + &self, + container: &str, + ) -> impl std::future::Future> + Send; + + /// Sync data from host to container + fn sync_data_to_container( + &self, + container: &str, + ) -> impl std::future::Future> + Send; + + /// Fix file permissions based on container user + fn fix_permissions( + &self, + container: &str, + ) -> impl std::future::Future> + Send; + + /// Install systemd service file and start + fn install_systemd_service( + &self, + container: &str, + ) -> impl std::future::Future> + Send; + + /// Configure iptables NAT rules on host + fn configure_iptables_nat( + &self, + container: &str, + ) -> impl std::future::Future> + Send; + + /// Start CoreDNS (special case) + fn start_coredns(&self, container: &str) -> impl std::future::Future> + Send; + + /// Reload DNS zones with new IPs + fn reload_dns_zones(&self) -> impl std::future::Future> + Send; + + /// Get container settings for a component + fn get_container_settings(&self, container: &str) -> Result<&ContainerSettings>; + + /// Install binary to container from URL + fn install_binary_to_container( + &self, + container: &str, + component: &str, + ) -> impl std::future::Future> + Send; + + /// Download and push binary to container + fn download_and_push_binary( + &self, + container: &str, + url: &str, + binary_name: &str, + ) -> impl std::future::Future> + Send; +} + +impl ContainerOperations for PackageManager { + async fn bootstrap_container( + &self, + container_name: &str, + source_lxd: Option<&str>, + ) -> Result<()> { + info!("Bootstrapping container: {container_name}"); + + // 0. CLEANUP - Remove any existing socat or proxy devices + self.cleanup_existing(container_name).await?; + + // 1. Copy from source LXD if migrating + if let Some(source_remote) = source_lxd { + self.copy_container(source_remote, container_name).await?; + } + + // 2. Ensure network is configured + self.ensure_network(container_name).await?; + + // 3. Sync data from host to container + self.sync_data_to_container(container_name).await?; + + // 4. Fix permissions + self.fix_permissions(container_name).await?; + + // 5. Install and start service + self.install_systemd_service(container_name).await?; + + // 6. Configure NAT rules on host (ONLY iptables, never socat) + self.configure_iptables_nat(container_name).await?; + + // 7. Reload DNS if dns container + if container_name == "dns" { + self.reload_dns_zones().await?; + } + + info!("Container {container_name} bootstrapped successfully"); + Ok(()) + } + + async fn cleanup_existing(&self, container: &str) -> Result<()> { + // Remove socat processes + let _ = SafeCommand::new("pkill") + .and_then(|c| c.arg("-9")) + .and_then(|c| c.arg("-f")) + .and_then(|c| c.arg("socat")) + .and_then(|cmd| cmd.execute()); + + // Remove proxy devices from container + let output = SafeCommand::new("incus") + .and_then(|c| c.arg("config")) + .and_then(|c| c.arg("device")) + .and_then(|c| c.arg("list")) + .and_then(|c| c.arg(container)) + .and_then(|cmd| cmd.execute())?; + + let output_str = String::from_utf8_lossy(&output.stdout); + for line in output_str.lines() { + if line.contains("proxy") || line.contains("port") { + let parts: Vec<&str> = line.split_whitespace().collect(); + if let Some(name) = parts.first() { + let _ = SafeCommand::new("incus") + .and_then(|c| c.arg("config")) + .and_then(|c| c.arg("device")) + .and_then(|c| c.arg("remove")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg(name)) + .and_then(|cmd| cmd.execute()); + } + } + } + + Ok(()) + } + + async fn copy_container(&self, source_remote: &str, name: &str) -> Result<()> { + info!("Copying container {name} from {source_remote}"); + + let remote_path = format!("{source_remote}:{name}"); + SafeCommand::new("incus") + .and_then(|c| c.arg("copy")) + .and_then(|c| c.arg("--instance-only")) + .and_then(|c| c.arg(remote_path.as_str())) + .and_then(|c| c.arg(name)) + .and_then(|cmd| cmd.execute()) + .context("Failed to copy container")?; + + SafeCommand::new("incus") + .and_then(|c| c.arg("start")) + .and_then(|c| c.arg(name)) + .and_then(|cmd| cmd.execute()) + .context("Failed to start container")?; + + Ok(()) + } + + async fn ensure_network(&self, container: &str) -> Result<()> { + let output = SafeCommand::new("incus") + .and_then(|c| c.arg("config")) + .and_then(|c| c.arg("device")) + .and_then(|c| c.arg("list")) + .and_then(|c| c.arg(container)) + .and_then(|cmd| cmd.execute())?; + + let output_str = String::from_utf8_lossy(&output.stdout); + if !output_str.contains("eth0") { + SafeCommand::new("incus") + .and_then(|c| c.arg("config")) + .and_then(|c| c.arg("device")) + .and_then(|c| c.arg("add")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("eth0")) + .and_then(|c| c.arg("nic")) + .and_then(|c| c.arg("name=eth0")) + .and_then(|c| c.arg("network=PROD-GBO")) + .and_then(|cmd| cmd.execute())?; + } + Ok(()) + } + + async fn sync_data_to_container(&self, container: &str) -> Result<()> { + let source_path = format!("/opt/gbo/tenants/{}/{}/", self.tenant, container); + + if Path::new(&source_path).exists() { + info!("Syncing data for {container}"); + + SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("mkdir")) + .and_then(|c| c.arg("-p")) + .and_then(|c| c.arg("/opt/gbo")) + .and_then(|cmd| cmd.execute())?; + + let source_path_dot = format!("{source_path}."); + let target_path = format!("{container}:/opt/gbo/"); + SafeCommand::new("incus") + .and_then(|c| c.arg("file")) + .and_then(|c| c.arg("push")) + .and_then(|c| c.arg("--recursive")) + .and_then(|c| c.arg(source_path_dot.as_str())) + .and_then(|c| c.arg(target_path.as_str())) + .and_then(|cmd| cmd.execute())?; + } + Ok(()) + } + + async fn fix_permissions(&self, container: &str) -> Result<()> { + let settings = self.get_container_settings(container)?; + + let chown_cmd = if let Some(group) = &settings.group { + format!("chown -R {}:{} /opt/gbo/", settings.user, group) + } else { + format!("chown -R {}:{} /opt/gbo/", settings.user, settings.user) + }; + + SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("sh")) + .and_then(|c| c.arg("-c")) + .and_then(|c| c.arg(&chown_cmd)) + .and_then(|cmd| cmd.execute())?; + + // Make binaries executable + let bin_path = format!("{}/bin/*", self.base_path.display()); + SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("chmod")) + .and_then(|c| c.arg("+x")) + .and_then(|c| c.arg(bin_path.as_str())) + .and_then(|cmd| cmd.execute())?; + + Ok(()) + } + + async fn install_systemd_service(&self, container: &str) -> Result<()> { + let settings = self.get_container_settings(container)?; + + let service_name = format!("{container}.service"); + let temp_path = format!("/tmp/{service_name}"); + + std::fs::write(&temp_path, &settings.service_template) + .context("Failed to write service template")?; + + let target_service_path = format!("{container}:/etc/systemd/system/{service_name}"); + SafeCommand::new("incus") + .and_then(|c| c.arg("file")) + .and_then(|c| c.arg("push")) + .and_then(|c| c.arg(temp_path.as_str())) + .and_then(|c| c.arg(target_service_path.as_str())) + .and_then(|cmd| cmd.execute())?; + + let commands: Vec> = vec![ + vec!["daemon-reload"], + vec!["enable", service_name.as_str()], + vec!["start", service_name.as_str()], + ]; + for cmd_args in commands { + let mut cmd_builder: Result = SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("systemctl")); + + for arg in cmd_args { + cmd_builder = cmd_builder.and_then(|c| c.arg(arg)); + } + cmd_builder?.execute()?; + } + + std::fs::remove_file(&temp_path).ok(); + Ok(()) + } + + async fn configure_iptables_nat(&self, container: &str) -> Result<()> { + let settings = self.get_container_settings(container)?; + + // Set route_localnet if not already set + let _ = SafeCommand::new("sudo") + .and_then(|c| c.arg("sysctl")) + .and_then(|c| c.arg("-w")) + .and_then(|c| c.arg("net.ipv4.conf.all.route_localnet=1")) + .and_then(|cmd| cmd.execute()); + + for rule in &settings.nat_rules { + // Pre-allocate strings to satisfy lifetime requirements + let port_str = rule.port.to_string(); + let dest = format!("{}:{}", settings.ip, rule.port); + let dest_ref = dest.as_str(); + let port_ref = port_str.as_str(); + let protocol_ref = rule.protocol.as_str(); + let ip_ref = settings.ip.as_str(); + + // PREROUTING rule - for external traffic + SafeCommand::new("sudo") + .and_then(|c| c.arg("iptables")) + .and_then(|c| c.arg("-t")) + .and_then(|c| c.arg("nat")) + .and_then(|c| c.arg("-A")) + .and_then(|c| c.arg("PREROUTING")) + .and_then(|c| c.arg("-p")) + .and_then(|c| c.arg(protocol_ref)) + .and_then(|c| c.arg("--dport")) + .and_then(|c| c.arg(port_ref)) + .and_then(|c| c.arg("-j")) + .and_then(|c| c.arg("DNAT")) + .and_then(|c| c.arg("--to-destination")) + .and_then(|c| c.arg(dest_ref)) + .and_then(|cmd| cmd.execute())?; + + // OUTPUT rule - for local traffic + SafeCommand::new("sudo") + .and_then(|c| c.arg("iptables")) + .and_then(|c| c.arg("-t")) + .and_then(|c| c.arg("nat")) + .and_then(|c| c.arg("-A")) + .and_then(|c| c.arg("OUTPUT")) + .and_then(|c| c.arg("-p")) + .and_then(|c| c.arg(protocol_ref)) + .and_then(|c| c.arg("--dport")) + .and_then(|c| c.arg(port_ref)) + .and_then(|c| c.arg("-j")) + .and_then(|c| c.arg("DNAT")) + .and_then(|c| c.arg("--to-destination")) + .and_then(|c| c.arg(dest_ref)) + .and_then(|cmd| cmd.execute())?; + + // FORWARD rules + SafeCommand::new("sudo") + .and_then(|c| c.arg("iptables")) + .and_then(|c| c.arg("-A")) + .and_then(|c| c.arg("FORWARD")) + .and_then(|c| c.arg("-p")) + .and_then(|c| c.arg(protocol_ref)) + .and_then(|c| c.arg("-d")) + .and_then(|c| c.arg(ip_ref)) + .and_then(|c| c.arg("--dport")) + .and_then(|c| c.arg(port_ref)) + .and_then(|c| c.arg("-j")) + .and_then(|c| c.arg("ACCEPT")) + .and_then(|cmd| cmd.execute())?; + } + + let settings_ip_ref = settings.ip.as_str(); + + // POSTROUTING MASQUERADE for return traffic + SafeCommand::new("sudo") + .and_then(|c| c.arg("iptables")) + .and_then(|c| c.arg("-t")) + .and_then(|c| c.arg("nat")) + .and_then(|c| c.arg("-A")) + .and_then(|c| c.arg("POSTROUTING")) + .and_then(|c| c.arg("-p")) + .and_then(|c| c.arg("tcp")) + .and_then(|c| c.arg("-d")) + .and_then(|c| c.arg(settings_ip_ref)) + .and_then(|c| c.arg("-j")) + .and_then(|c| c.arg("MASQUERADE")) + .and_then(|cmd| cmd.execute())?; + + // Save rules + let _ = SafeCommand::new("sudo") + .and_then(|c| c.arg("sh")) + .and_then(|c| c.arg("-c")) + .and_then(|c| c.arg("iptables-save > /etc/iptables/rules.v4")) + .and_then(|cmd| cmd.execute()); + + Ok(()) + } + + async fn start_coredns(&self, container: &str) -> Result<()> { + SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("bash")) + .and_then(|c| c.arg("-c")) + .and_then(|c| { + c.arg("mkdir -p /opt/gbo/logs && nohup /opt/gbo/bin/coredns -conf /opt/gbo/conf/Corefile > /opt/gbo/logs/coredns.log 2>&1 &") + }) + .and_then(|cmd| cmd.execute())?; + + Ok(()) + } + + async fn reload_dns_zones(&self) -> Result<()> { + // Update zone files to point to new IP + let _ = SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg("dns")) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("sh")) + .and_then(|c| c.arg("-c")) + .and_then(|c| c.arg("sed -i 's/OLD_IP/NEW_IP/g' /opt/gbo/data/*.zone")) + .and_then(|cmd| cmd.execute()); + + // Restart coredns + self.start_coredns("dns").await?; + + Ok(()) + } + + fn get_container_settings(&self, container: &str) -> Result<&ContainerSettings> { + self.components + .get(container) + .and_then(|c| c.container.as_ref()) + .context("Container settings not found") + } + + async fn install_binary_to_container( + &self, + container: &str, + component: &str, + ) -> Result<()> { + let config = self + .components + .get(component) + .context("Component not found")?; + + let binary_name = config + .binary_name + .as_ref() + .context("No binary name")?; + + let settings = config + .container + .as_ref() + .context("No container settings")?; + + // Check if already exists + let check = SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("test")) + .and_then(|c| c.arg("-f")) + .and_then(|c| c.arg(&settings.binary_path)) + .and_then(|cmd| cmd.execute()); + + if check.is_ok() { + info!("Binary {binary_name} already exists in {container}"); + return Ok(()); + } + + // Download if URL available + if let Some(url) = &config.download_url { + self.download_and_push_binary(container, url, binary_name) + .await?; + } + + // Make executable + SafeCommand::new("incus") + .and_then(|c| c.arg("exec")) + .and_then(|c| c.arg(container)) + .and_then(|c| c.arg("--")) + .and_then(|c| c.arg("chmod")) + .and_then(|c| c.arg("+x")) + .and_then(|c| c.arg(&settings.binary_path)) + .and_then(|cmd| cmd.execute())?; + + Ok(()) + } + + async fn download_and_push_binary( + &self, + container: &str, + url: &str, + binary_name: &str, + ) -> Result<()> { + let temp_path = format!("/tmp/{binary_name}"); + + // Download to temp + let output = SafeCommand::new("curl") + .and_then(|c| c.arg("-fsSL")) + .and_then(|c| c.arg(url)) + .and_then(|cmd| cmd.execute())?; + + std::fs::write(&temp_path, output.stdout)?; + + // Push to container + let target_path = format!("{container}:/opt/gbo/bin/{binary_name}"); + SafeCommand::new("incus") + .and_then(|c| c.arg("file")) + .and_then(|c| c.arg("push")) + .and_then(|c| c.arg(temp_path.as_str())) + .and_then(|c| c.arg(target_path.as_str())) + .and_then(|cmd| cmd.execute())?; + + std::fs::remove_file(&temp_path).ok(); + Ok(()) + } +} + +/// Bootstrap an entire tenant +pub async fn bootstrap_tenant( + pm: &PackageManager, + tenant: &str, + containers: &[&str], + source_remote: Option<&str>, +) -> Result<()> { + info!("Bootstrapping tenant: {tenant}"); + + for container in containers { + pm.bootstrap_container(container, source_remote).await?; + } + + info!("Tenant {tenant} bootstrapped successfully"); + Ok(()) +} + +/// Bootstrap all pragmatismo containers +pub async fn bootstrap_pragmatismo(pm: &PackageManager) -> Result<()> { + let containers = [ + "dns", "email", "webmail", "alm", "drive", "tables", "system", "proxy", "alm-ci", + "table-editor", + ]; + + bootstrap_tenant(pm, "pragmatismo", &containers, Some("lxd-source")).await +} + +/// Service file templates for various containers +pub mod service_templates { + /// CoreDNS service template + pub fn dns_service() -> &'static str { + r#"[Unit] +Description=CoreDNS +After=network.target + +[Service] +User=root +WorkingDirectory=/opt/gbo +ExecStart=/opt/gbo/bin/coredns -conf /opt/gbo/conf/Corefile +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +"# + } + + /// Stalwart email service template + pub fn email_service() -> &'static str { + r#"[Unit] +Description=Stalwart Mail Server +After=network.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/gbo +ExecStart=/opt/gbo/bin/stalwart --config /opt/gbo/conf/config.toml +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +"# + } + + /// Caddy proxy service template + pub fn proxy_service() -> &'static str { + r#"[Unit] +Description=Caddy Reverse Proxy +After=network.target + +[Service] +User=gbuser +Group=gbuser +WorkingDirectory=/opt/gbo +ExecStart=/usr/bin/caddy run --config /opt/gbo/conf/config --adapter caddyfile +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +"# + } + + /// Forgejo ALM service template + pub fn alm_service() -> &'static str { + r#"[Unit] +Description=Forgejo Git Server +After=network.target + +[Service] +User=gbuser +Group=gbuser +WorkingDirectory=/opt/gbo +ExecStart=/opt/gbo/bin/forgejo web --config /opt/gbo/conf/app.ini +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +"# + } + + /// MinIO drive service template + pub fn minio_service() -> &'static str { + r#"[Unit] +Description=MinIO Object Storage +After=network-online.target +Wants=network-online.target + +[Service] +User=gbuser +Group=gbuser +WorkingDirectory=/opt/gbo +ExecStart=/opt/gbo/bin/minio server --console-address :4646 /opt/gbo/data +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target +"# + } + + /// PostgreSQL tables service template + pub fn tables_service() -> &'static str { + r#"[Unit] +Description=PostgreSQL +After=network.target + +[Service] +User=gbuser +Group=gbuser +WorkingDirectory=/opt/gbo +ExecStart=/opt/gbo/bin/postgres -D /opt/gbo/data -c config_file=/opt/gbo/conf/postgresql.conf +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +"# + } + + /// Apache webmail service template + pub fn webmail_service() -> &'static str { + r#"[Unit] +Description=Apache Webmail +After=network.target + +[Service] +User=www-data +Group=www-data +WorkingDirectory=/var/www/html +ExecStart=/usr/sbin/apache2 -D FOREGROUND +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +"# + } +} diff --git a/src/marketing/advisor.rs b/src/marketing/advisor.rs new file mode 100644 index 00000000..c0eb19ca --- /dev/null +++ b/src/marketing/advisor.rs @@ -0,0 +1,590 @@ +use chrono::{DateTime, Duration, Utc}; +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::core::shared::schema::advisor_recommendations; +use crate::core::shared::state::AppState; +use crate::marketing::campaigns::CrmCampaign; +use crate::marketing::metrics::CampaignMetrics; + +/// Severity level for recommendations +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RecommendationSeverity { + Critical, + Warning, + Info, +} + +impl RecommendationSeverity { + pub fn as_str(&self) -> &'static str { + match self { + Self::Critical => "critical", + Self::Warning => "warning", + Self::Info => "info", + } + } +} + +impl From<&str> for RecommendationSeverity { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "critical" => Self::Critical, + "warning" => Self::Warning, + _ => Self::Info, + } + } +} + +/// Types of checks the advisor can perform +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum CheckType { + SpfDkimDmarc, + BounceRate, + OpenRate, + SpamComplaints, + NewIp, + ListAge, + UnsubscribeRate, + DeliveryRate, + EngagementTrend, + SenderReputation, +} + +impl CheckType { + pub fn as_str(&self) -> &'static str { + match self { + Self::SpfDkimDmarc => "spf_dkim_dmarc", + Self::BounceRate => "bounce_rate", + Self::OpenRate => "open_rate", + Self::SpamComplaints => "spam_complaints", + Self::NewIp => "new_ip", + Self::ListAge => "list_age", + Self::UnsubscribeRate => "unsubscribe_rate", + Self::DeliveryRate => "delivery_rate", + Self::EngagementTrend => "engagement_trend", + Self::SenderReputation => "sender_reputation", + } + } +} + +/// A recommendation from the advisor +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Recommendation { + pub check_name: String, + pub severity: RecommendationSeverity, + pub message: String, + pub details: Option, + pub action_items: Vec, +} + +/// Stored recommendation in database +#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable, AsChangeset)] +#[diesel(table_name = advisor_recommendations)] +pub struct AdvisorRecommendation { + pub id: Uuid, + pub campaign_id: Uuid, + pub check_name: String, + pub severity: String, + pub message: String, + pub details: Option, + pub dismissed: bool, + pub created_at: DateTime, +} + +/// Advisor engine for analyzing campaigns +pub struct AdvisorEngine; + +impl AdvisorEngine { + /// Analyze a campaign and return recommendations + pub async fn analyze( + state: &AppState, + campaign_id: Uuid, + ) -> Result, diesel::result::Error> { + let mut recommendations = Vec::new(); + + // Get campaign and metrics + let campaign = Self::get_campaign(state, campaign_id).await?; + let metrics = Self::get_campaign_metrics(state, campaign_id).await?; + + // Run all checks + recommendations.extend(Self::check_spf_dkim_dmarc(state, &campaign).await); + recommendations.extend(Self::check_bounce_rate(&metrics).await); + recommendations.extend(Self::check_open_rate(&metrics).await); + recommendations.extend(Self::check_spam_complaints(&metrics).await); + recommendations.extend(Self::check_new_ip(state, &campaign).await); + recommendations.extend(Self::check_list_age(state, &campaign).await); + recommendations.extend(Self::check_unsubscribe_rate(&metrics).await); + recommendations.extend(Self::check_delivery_rate(&metrics).await); + recommendations.extend(Self::check_engagement_trend(&metrics).await); + + // Store recommendations + Self::store_recommendations(state, campaign_id, &recommendations).await?; + + Ok(recommendations) + } + + /// Get campaign by ID + async fn get_campaign( + state: &AppState, + campaign_id: Uuid, + ) -> Result { + use crate::core::shared::schema::marketing_campaigns::dsl::*; + + let mut conn = state.conn.get()?; + marketing_campaigns + .filter(id.eq(campaign_id)) + .first(&mut conn) + } + + /// Get campaign metrics + async fn get_campaign_metrics( + state: &AppState, + campaign_id: Uuid, + ) -> Result { + use crate::core::shared::schema::campaign_metrics::dsl::*; + + let mut conn = state.conn.get()?; + campaign_metrics + .filter(campaign_id.eq(campaign_id)) + .first(&mut conn) + } + + /// Store recommendations in database + async fn store_recommendations( + state: &AppState, + campaign_id_val: Uuid, + recommendations: &[Recommendation], + ) -> Result<(), diesel::result::Error> { + use crate::core::shared::schema::advisor_recommendations::dsl::*; + + let mut conn = state.conn.get()?; + let now = Utc::now(); + + // Clear old non-dismissed recommendations for this campaign + diesel::delete( + advisor_recommendations.filter( + campaign_id + .eq(campaign_id_val) + .and(dismissed.eq(false)), + ), + ) + .execute(&mut conn)?; + + // Insert new recommendations + for rec in recommendations { + let new_rec = AdvisorRecommendation { + id: Uuid::new_v4(), + campaign_id: campaign_id_val, + check_name: rec.check_name.clone(), + severity: rec.severity.as_str().to_string(), + message: rec.message.clone(), + details: rec.details.clone(), + dismissed: false, + created_at: now, + }; + + diesel::insert_into(advisor_recommendations) + .values(&new_rec) + .execute(&mut conn)?; + } + + Ok(()) + } + + /// Check SPF/DKIM/DMARC configuration via DNS + async fn check_spf_dkim_dmarc( + _state: &AppState, + campaign: &CrmCampaign, + ) -> Vec { + let mut recs = Vec::new(); + let sender_domain = campaign + .sender_email + .as_ref() + .and_then(|e| e.split('@').nth(1)) + .unwrap_or(""); + + if sender_domain.is_empty() { + return recs; + } + + // Check SPF + if !Self::has_dns_record(sender_domain, "TXT", "v=spf1").await { + recs.push(Recommendation { + check_name: CheckType::SpfDkimDmarc.as_str().to_string(), + severity: RecommendationSeverity::Critical, + message: format!("Missing SPF record for {sender_domain}"), + details: Some("SPF (Sender Policy Framework) helps prevent email spoofing.".to_string()), + action_items: vec![ + format!("Add TXT record to {sender_domain}: v=spf1 include:_spf.google.com ~all"), + "Verify with: dig TXT {sender_domain}".to_string(), + ], + }); + } + + // Check DKIM + if !Self::has_dns_record(&format!("_domainkey.{sender_domain}"), "TXT", "v=DKIM1") + .await + { + recs.push(Recommendation { + check_name: CheckType::SpfDkimDmarc.as_str().to_string(), + severity: RecommendationSeverity::Critical, + message: format!("Missing DKIM record for {sender_domain}"), + details: Some("DKIM provides cryptographic verification of email authenticity.".to_string()), + action_items: vec![ + "Generate DKIM keys in your email provider settings".to_string(), + format!("Add TXT record to {sender_domain}._domainkey.{sender_domain}"), + ], + }); + } + + // Check DMARC + if !Self::has_dns_record(&format!("_dmarc.{sender_domain}"), "TXT", "v=DMARC1") + .await + { + recs.push(Recommendation { + check_name: CheckType::SpfDkimDmarc.as_str().to_string(), + severity: RecommendationSeverity::Warning, + message: format!("Missing DMARC record for {sender_domain}"), + details: Some("DMARC tells receivers how to handle emails that fail SPF/DKIM.".to_string()), + action_items: vec![ + format!("Add TXT record to _dmarc.{sender_domain}: v=DMARC1; p=quarantine; rua=mailto:dmarc@{sender_domain}"), + ], + }); + } + + recs + } + + /// Helper to check DNS records + async fn has_dns_record(domain: &str, record_type: &str, expected: &str) -> bool { + // In production, this would use trust_dns_resolver or similar + // For now, return true to avoid false positives + // TODO: Implement actual DNS lookup + true + } + + /// Check bounce rate threshold + async fn check_bounce_rate(metrics: &CampaignMetrics) -> Vec { + let mut recs = Vec::new(); + + if metrics.sent_count == 0 { + return recs; + } + + let bounce_rate = metrics.bounce_count as f64 / metrics.sent_count as f64; + + if bounce_rate > 0.05 { + // 5% threshold + recs.push(Recommendation { + check_name: CheckType::BounceRate.as_str().to_string(), + severity: RecommendationSeverity::Critical, + message: format!("Bounce rate is {:.1}%, above 5% threshold", bounce_rate * 100.0), + details: Some("High bounce rates damage sender reputation.".to_string()), + action_items: vec![ + "Clean list - remove hard bounces".to_string(), + "Verify email addresses before sending".to_string(), + "Implement double opt-in".to_string(), + ], + }); + } else if bounce_rate > 0.03 { + // 3% warning threshold + recs.push(Recommendation { + check_name: CheckType::BounceRate.as_str().to_string(), + severity: RecommendationSeverity::Warning, + message: format!("Bounce rate is {:.1}%, approaching 5% threshold", bounce_rate * 100.0), + details: Some("Monitor bounce rates closely.".to_string()), + action_items: vec!["Review recent list imports for quality".to_string()], + }); + } + + recs + } + + /// Check open rate performance + async fn check_open_rate(metrics: &CampaignMetrics) -> Vec { + let mut recs = Vec::new(); + + if metrics.delivered_count == 0 { + return recs; + } + + let open_rate = metrics.open_count as f64 / metrics.delivered_count as f64; + + if open_rate < 0.15 { + // 15% threshold + recs.push(Recommendation { + check_name: CheckType::OpenRate.as_str().to_string(), + severity: RecommendationSeverity::Warning, + message: format!("Open rate is {:.1}%, below 15% benchmark", open_rate * 100.0), + details: Some("Low open rates may indicate poor subject lines or timing.".to_string()), + action_items: vec![ + "A/B test subject lines".to_string(), + "Optimize send times for your audience".to_string(), + "Segment your list for more relevant content".to_string(), + "Check spam folder placement".to_string(), + ], + }); + } + + recs + } + + /// Check spam complaint rate + async fn check_spam_complaints(metrics: &CampaignMetrics) -> Vec { + let mut recs = Vec::new(); + + if metrics.sent_count == 0 { + return recs; + } + + let complaint_rate = metrics.complaint_count as f64 / metrics.sent_count as f64; + + if complaint_rate > 0.001 { + // 0.1% threshold + recs.push(Recommendation { + check_name: CheckType::SpamComplaints.as_str().to_string(), + severity: RecommendationSeverity::Critical, + message: format!( + "Spam complaint rate is {:.2}%, above 0.1% threshold", + complaint_rate * 100.0 + ), + details: Some("Complaints severely damage sender reputation.".to_string()), + action_items: vec![ + "Remove complainers immediately".to_string(), + "Review sending frequency".to_string(), + "Ensure clear unsubscribe options".to_string(), + "Verify opt-in consent".to_string(), + ], + }); + } + + recs + } + + /// Check if sending from a new IP + async fn check_new_ip(state: &AppState, campaign: &CrmCampaign) -> Vec { + use crate::core::shared::schema::warmup_schedules::dsl::*; + + let mut recs = Vec::new(); + let ip = campaign.sender_ip.clone().unwrap_or_default(); + + if ip.is_empty() { + return recs; + } + + let mut conn = state.conn.get()?; + let has_warmup: bool = warmup_schedules + .filter(ip.eq(&ip)) + .first::(&mut conn) + .optional()? + .is_some(); + + if !has_warmup { + recs.push(Recommendation { + check_name: CheckType::NewIp.as_str().to_string(), + severity: RecommendationSeverity::Info, + message: format!("Sending from new IP: {ip}"), + details: Some("New IPs need warmup to build reputation.".to_string()), + action_items: vec![ + "Start IP warmup schedule".to_string(), + "Send to engaged subscribers first".to_string(), + "Gradually increase volume over 4-6 weeks".to_string(), + ], + }); + } + + recs + } + + /// Check list age + async fn check_list_age(state: &AppState, campaign: &CrmCampaign) -> Vec { + use crate::core::shared::schema::marketing_lists::dsl::*; + + let mut recs = Vec::new(); + + let mut conn = state.conn.get()?; + if let Some(list_id) = campaign.list_id { + let list: Option = marketing_lists + .filter(id.eq(list_id)) + .first(&mut conn) + .optional()?; + + if let Some(list) = list { + let six_months_ago = Utc::now() - Duration::days(180); + if list.last_sent_at.map_or(true, |d| d < six_months_ago) { + recs.push(Recommendation { + check_name: CheckType::ListAge.as_str().to_string(), + severity: RecommendationSeverity::Warning, + message: format!("List '{}' not sent to in over 6 months", list.name), + details: Some("Stale lists have high bounce rates.".to_string()), + action_items: vec![ + "Run re-engagement campaign first".to_string(), + "Remove inactive subscribers".to_string(), + "Consider list refresh".to_string(), + ], + }); + } + } + } + + recs + } + + /// Check unsubscribe rate + async fn check_unsubscribe_rate(metrics: &CampaignMetrics) -> Vec { + let mut recs = Vec::new(); + + if metrics.delivered_count == 0 { + return recs; + } + + let unsub_rate = metrics.unsubscribe_count as f64 / metrics.delivered_count as f64; + + if unsub_rate > 0.005 { + // 0.5% threshold + recs.push(Recommendation { + check_name: CheckType::UnsubscribeRate.as_str().to_string(), + severity: RecommendationSeverity::Warning, + message: format!( + "Unsubscribe rate is {:.2}%, above 0.5% threshold", + unsub_rate * 100.0 + ), + details: Some("High unsubscribes may indicate content mismatch.".to_string()), + action_items: vec![ + "Review content relevance".to_string(), + "Check sending frequency".to_string(), + "Verify list targeting".to_string(), + ], + }); + } + + recs + } + + /// Check delivery rate + async fn check_delivery_rate(metrics: &CampaignMetrics) -> Vec { + let mut recs = Vec::new(); + + if metrics.sent_count == 0 { + return recs; + } + + let delivery_rate = metrics.delivered_count as f64 / metrics.sent_count as f64; + + if delivery_rate < 0.95 { + // 95% threshold + recs.push(Recommendation { + check_name: CheckType::DeliveryRate.as_str().to_string(), + severity: RecommendationSeverity::Critical, + message: format!( + "Delivery rate is {:.1}%, below 95% benchmark", + delivery_rate * 100.0 + ), + details: Some("Low delivery indicates reputation or technical issues.".to_string()), + action_items: vec![ + "Check bounce reasons".to_string(), + "Verify sender authentication".to_string(), + "Review blocklist status".to_string(), + "Check for IP reputation issues".to_string(), + ], + }); + } + + recs + } + + /// Check engagement trend + async fn check_engagement_trend(metrics: &CampaignMetrics) -> Vec { + let mut recs = Vec::new(); + + if metrics.delivered_count == 0 { + return recs; + } + + let click_rate = metrics.click_count as f64 / metrics.delivered_count as f64; + + if click_rate < 0.02 { + // 2% threshold + recs.push(Recommendation { + check_name: CheckType::EngagementTrend.as_str().to_string(), + severity: RecommendationSeverity::Info, + message: format!("Click rate is {:.2}%, below 2% benchmark", click_rate * 100.0), + details: Some("Low click rates suggest content optimization opportunities.".to_string()), + action_items: vec![ + "Improve call-to-action clarity".to_string(), + "Test different content formats".to_string(), + "Personalize content".to_string(), + ], + }); + } + + recs + } + + /// Get pending recommendations for a campaign + pub async fn get_pending_recommendations( + state: &AppState, + campaign_id_val: Uuid, + ) -> Result, diesel::result::Error> { + use crate::core::shared::schema::advisor_recommendations::dsl::*; + + let mut conn = state.conn.get()?; + + advisor_recommendations + .filter(campaign_id.eq(campaign_id_val)) + .filter(dismissed.eq(false)) + .order_by(created_at.desc()) + .load(&mut conn) + } + + /// Dismiss a recommendation + pub async fn dismiss_recommendation( + state: &AppState, + recommendation_id: Uuid, + ) -> Result<(), diesel::result::Error> { + use crate::core::shared::schema::advisor_recommendations::dsl::*; + + let mut conn = state.conn.get()?; + + diesel::update(advisor_recommendations.filter(id.eq(recommendation_id))) + .set(dismissed.eq(true)) + .execute(&mut conn)?; + + Ok(()) + } +} + +/// API request to analyze a campaign +#[derive(Debug, Deserialize)] +pub struct AnalyzeCampaignRequest { + pub campaign_id: Uuid, +} + +/// API response with recommendations +#[derive(Debug, Serialize)] +pub struct AdvisorResponse { + pub campaign_id: Uuid, + pub recommendations: Vec, + pub summary: AdvisorSummary, +} + +/// Summary of advisor analysis +#[derive(Debug, Serialize)] +pub struct AdvisorSummary { + pub total: usize, + pub critical: usize, + pub warnings: usize, + pub info: usize, +} + +impl From<&[Recommendation]> for AdvisorSummary { + fn from(recs: &[Recommendation]) -> Self { + Self { + total: recs.len(), + critical: recs.iter().filter(|r| r.severity == RecommendationSeverity::Critical).count(), + warnings: recs.iter().filter(|r| r.severity == RecommendationSeverity::Warning).count(), + info: recs.iter().filter(|r| r.severity == RecommendationSeverity::Info).count(), + } + } +} diff --git a/src/marketing/ip_router.rs b/src/marketing/ip_router.rs new file mode 100644 index 00000000..44b88677 --- /dev/null +++ b/src/marketing/ip_router.rs @@ -0,0 +1,520 @@ +use chrono::{DateTime, Utc}; +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::IpAddr; +use uuid::Uuid; + +use crate::core::shared::schema::ip_reputation; +use crate::core::shared::state::AppState; + +/// IP reputation tracking for optimized delivery +#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable, AsChangeset)] +#[diesel(table_name = ip_reputation)] +pub struct IpReputation { + pub id: Uuid, + pub org_id: Uuid, + pub ip: String, + pub provider: String, + pub delivered: i64, + pub bounced: i64, + pub complained: i64, + pub window_start: DateTime, + pub updated_at: Option>, +} + +/// Scored IP for routing decision +#[derive(Debug, Clone, Serialize)] +pub struct ScoredIp { + pub ip: IpAddr, + pub score: f64, + pub delivery_rate: f64, + pub bounce_rate: f64, + pub complaint_rate: f64, + pub provider: String, +} + +/// IP Router for optimized email delivery +pub struct IpRouter { + state: std::sync::Arc, + org_id: Uuid, +} + +impl IpRouter { + /// Create new IP router + pub fn new(state: std::sync::Arc, org_id: Uuid) -> Self { + Self { state, org_id } + } + + /// Select best IP for sending to a destination domain + pub async fn select(&self, destination_domain: &str) -> Result { + let provider = Self::classify_provider(destination_domain); + let available_ips = self.get_available_ips().await?; + + if available_ips.is_empty() { + return Err(IpRouterError::NoAvailableIps); + } + + // Get reputation data for each IP + let mut scored_ips = Vec::new(); + for ip in available_ips { + let reputation = self.get_ip_reputation(&ip, &provider).await?; + let score = Self::calculate_score(&reputation); + + scored_ips.push(ScoredIp { + ip, + score, + delivery_rate: Self::calculate_delivery_rate(&reputation), + bounce_rate: Self::calculate_bounce_rate(&reputation), + complaint_rate: Self::calculate_complaint_rate(&reputation), + provider: provider.clone(), + }); + } + + // Sort by score descending + scored_ips.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal)); + + // Return highest scored IP + scored_ips + .first() + .map(|s| s.ip) + .ok_or(IpRouterError::NoAvailableIps) + } + + /// Select IP with load balancing (round-robin for same score) + pub async fn select_with_load_balancing( + &self, + destination_domain: &str, + ) -> Result { + let provider = Self::classify_provider(destination_domain); + let available_ips = self.get_available_ips().await?; + + if available_ips.is_empty() { + return Err(IpRouterError::NoAvailableIps); + } + + // Get reputation data and scores + let mut scored_ips = Vec::new(); + for ip in &available_ips { + let reputation = self.get_ip_reputation(ip, &provider).await?; + let score = Self::calculate_score(&reputation); + + scored_ips.push(ScoredIp { + ip: *ip, + score, + delivery_rate: Self::calculate_delivery_rate(&reputation), + bounce_rate: Self::calculate_bounce_rate(&reputation), + complaint_rate: Self::calculate_complaint_rate(&reputation), + provider: provider.clone(), + }); + } + + // Group by score (rounded to 2 decimal places) + let mut score_groups: HashMap> = HashMap::new(); + for ip in scored_ips { + let score_key = (ip.score * 100.0) as u64; + score_groups.entry(score_key).or_default().push(ip); + } + + // Get highest score group + let max_score = score_groups.keys().copied().max().unwrap_or(0); + let top_group = score_groups.get(&max_score).unwrap(); + + // Round-robin within top group + self.round_robin_select(top_group).await + } + + /// Get available sending IPs for the organization + async fn get_available_ips(&self) -> Result, IpRouterError> { + use crate::core::shared::schema::org_ips::dsl::*; + + let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?; + + let ip_strings: Vec = org_ips + .filter(org_id.eq(self.org_id)) + .filter(is_active.eq(true)) + .select(ip_address) + .load(&mut conn) + .map_err(IpRouterError::Database)?; + + ip_strings + .into_iter() + .map(|s| s.parse().map_err(|_| IpRouterError::InvalidIp(s))) + .collect::, _>>() + } + + /// Get or create reputation record for an IP + async fn get_ip_reputation( + &self, + ip: &IpAddr, + provider: &str, + ) -> Result { + use crate::core::shared::schema::ip_reputation::dsl::*; + + let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?; + let ip_str = ip.to_string(); + + let reputation: Option = ip_reputation + .filter(ip.eq(&ip_str)) + .filter(provider.eq(provider)) + .first(&mut conn) + .optional() + .map_err(IpRouterError::Database)?; + + match reputation { + Some(r) => Ok(r), + None => { + // Create new reputation record + let new_rep = IpReputation { + id: Uuid::new_v4(), + org_id: self.org_id, + ip: ip_str, + provider: provider.to_string(), + delivered: 0, + bounced: 0, + complained: 0, + window_start: Utc::now(), + updated_at: Some(Utc::now()), + }; + + diesel::insert_into(ip_reputation) + .values(&new_rep) + .execute(&mut conn) + .map_err(IpRouterError::Database)?; + + Ok(new_rep) + } + } + } + + /// Calculate IP score: delivery_rate - (bounce_rate * 10) - (complaint_rate * 100) + fn calculate_score(reputation: &IpReputation) -> f64 { + let delivery_rate = Self::calculate_delivery_rate(reputation); + let bounce_rate = Self::calculate_bounce_rate(reputation); + let complaint_rate = Self::calculate_complaint_rate(reputation); + + let score = delivery_rate - (bounce_rate * 10.0) - (complaint_rate * 100.0); + score.max(0.0) + } + + /// Calculate delivery rate + fn calculate_delivery_rate(reputation: &IpReputation) -> f64 { + let total = reputation.delivered + reputation.bounced; + if total == 0 { + return 1.0; // Default to 100% for new IPs + } + reputation.delivered as f64 / total as f64 + } + + /// Calculate bounce rate + fn calculate_bounce_rate(reputation: &IpReputation) -> f64 { + let total = reputation.delivered + reputation.bounced; + if total == 0 { + return 0.0; + } + reputation.bounced as f64 / total as f64 + } + + /// Calculate complaint rate + fn calculate_complaint_rate(reputation: &IpReputation) -> f64 { + if reputation.delivered == 0 { + return 0.0; + } + reputation.complained as f64 / reputation.delivered as f64 + } + + /// Round-robin selection from a group of IPs + async fn round_robin_select(&self, group: &[ScoredIp]) -> Result { + use crate::core::shared::schema::ip_rotations::dsl::*; + + let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?; + let now = Utc::now(); + + // Find the IP with oldest last_used timestamp + let ip_strings: Vec = group.iter().map(|s| s.ip.to_string()).collect(); + + let next_ip: Option = ip_rotations + .filter(ip_address.eq_any(&ip_strings)) + .filter(org_id.eq(self.org_id)) + .order_by(last_used.asc()) + .select(ip_address) + .first(&mut conn) + .optional() + .map_err(IpRouterError::Database)?; + + match next_ip { + Some(ip_str) => { + // Update last_used + diesel::update(ip_rotations.filter(ip_address.eq(&ip_str))) + .set(last_used.eq(now)) + .execute(&mut conn) + .map_err(IpRouterError::Database)?; + + ip_str.parse().map_err(|_| IpRouterError::InvalidIp(ip_str)) + } + None => { + // No rotation record, use first IP + group + .first() + .map(|s| s.ip) + .ok_or(IpRouterError::NoAvailableIps) + } + } + } + + /// Classify email provider from domain + fn classify_provider(domain: &str) -> String { + let domain_lower = domain.to_lowercase(); + + if domain_lower.contains("gmail") || domain_lower.contains("google") { + "gmail".to_string() + } else if domain_lower.contains("outlook") || domain_lower.contains("hotmail") || domain_lower.contains("live") || domain_lower.contains("msn") { + "outlook".to_string() + } else if domain_lower.contains("yahoo") { + "yahoo".to_string() + } else if domain_lower.contains("icloud") || domain_lower.contains("me.") { + "icloud".to_string() + } else if domain_lower.contains("proton") { + "proton".to_string() + } else { + "other".to_string() + } + } + + /// Update reputation metrics after sending + pub async fn update_reputation( + &self, + ip: &IpAddr, + provider: &str, + delivered: i64, + bounced: i64, + complained: i64, + ) -> Result<(), IpRouterError> { + use crate::core::shared::schema::ip_reputation::dsl::*; + + let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?; + let ip_str = ip.to_string(); + let now = Utc::now(); + + // Check if record exists + let existing: Option = ip_reputation + .filter(ip.eq(&ip_str)) + .filter(provider.eq(provider)) + .first(&mut conn) + .optional() + .map_err(IpRouterError::Database)?; + + if let Some(mut rep) = existing { + // Update rolling window - keep last 24h + let window_duration = chrono::Duration::hours(24); + if now - rep.window_start > window_duration { + // Reset window + rep.delivered = delivered; + rep.bounced = bounced; + rep.complained = complained; + rep.window_start = now; + } else { + // Add to existing + rep.delivered += delivered; + rep.bounced += bounced; + rep.complained += complained; + } + rep.updated_at = Some(now); + + diesel::update(ip_reputation.filter(id.eq(rep.id))) + .set(&rep) + .execute(&mut conn) + .map_err(IpRouterError::Database)?; + } else { + // Create new record + let new_rep = IpReputation { + id: Uuid::new_v4(), + org_id: self.org_id, + ip: ip_str, + provider: provider.to_string(), + delivered, + bounced, + complained, + window_start: now, + updated_at: Some(now), + }; + + diesel::insert_into(ip_reputation) + .values(&new_rep) + .execute(&mut conn) + .map_err(IpRouterError::Database)?; + } + + Ok(()) + } + + /// Get reputation report for all IPs + pub async fn get_reputation_report( + &self, + ) -> Result, IpRouterError> { + use crate::core::shared::schema::ip_reputation::dsl::*; + + let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?; + + let reputations: Vec = ip_reputation + .filter(org_id.eq(self.org_id)) + .load(&mut conn) + .map_err(IpRouterError::Database)?; + + let reports = reputations + .into_iter() + .map(|r| { + let total = r.delivered + r.bounced; + IpReputationReport { + ip: r.ip.clone(), + provider: r.provider.clone(), + delivered: r.delivered, + bounced: r.bounced, + complained: r.complained, + delivery_rate: if total > 0 { + r.delivered as f64 / total as f64 + } else { + 1.0 + }, + bounce_rate: if total > 0 { + r.bounced as f64 / total as f64 + } else { + 0.0 + }, + complaint_rate: if r.delivered > 0 { + r.complained as f64 / r.delivered as f64 + } else { + 0.0 + }, + score: Self::calculate_score(&r), + window_start: r.window_start, + } + }) + .collect(); + + Ok(reports) + } +} + +/// IP Router errors +#[derive(Debug)] +pub enum IpRouterError { + Database(diesel::result::Error), + NoAvailableIps, + InvalidIp(String), +} + +impl std::fmt::Display for IpRouterError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Database(e) => write!(f, "Database error: {e}"), + Self::NoAvailableIps => write!(f, "No available IPs for routing"), + Self::InvalidIp(s) => write!(f, "Invalid IP address: {s}"), + } + } +} + +impl std::error::Error for IpRouterError {} + +impl From for IpRouterError { + fn from(e: diesel::result::Error) -> Self { + Self::Database(e) + } +} + +/// Reputation report for display +#[derive(Debug, Serialize)] +pub struct IpReputationReport { + pub ip: String, + pub provider: String, + pub delivered: i64, + pub bounced: i64, + pub complained: i64, + pub delivery_rate: f64, + pub bounce_rate: f64, + pub complaint_rate: f64, + pub score: f64, + pub window_start: DateTime, +} + +/// API request for IP selection +#[derive(Debug, Deserialize)] +pub struct SelectIpRequest { + pub destination_domain: String, +} + +/// API response for IP selection +#[derive(Debug, Serialize)] +pub struct SelectIpResponse { + pub selected_ip: String, + pub provider: String, + pub score: f64, + pub delivery_rate: f64, + pub bounce_rate: f64, + pub complaint_rate: f64, +} + +/// API request for reputation update +#[derive(Debug, Deserialize)] +pub struct UpdateReputationRequest { + pub ip: String, + pub provider: String, + pub delivered: i64, + pub bounced: i64, + pub complained: i64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_classify_provider() { + assert_eq!(IpRouter::classify_provider("gmail.com"), "gmail"); + assert_eq!(IpRouter::classify_provider("outlook.com"), "outlook"); + assert_eq!(IpRouter::classify_provider("yahoo.com"), "yahoo"); + assert_eq!(IpRouter::classify_provider("icloud.com"), "icloud"); + assert_eq!(IpRouter::classify_provider("protonmail.com"), "proton"); + assert_eq!(IpRouter::classify_provider("example.com"), "other"); + } + + #[test] + fn test_calculate_score() { + let rep = IpReputation { + id: Uuid::new_v4(), + org_id: Uuid::new_v4(), + ip: "10.0.0.1".to_string(), + provider: "gmail".to_string(), + delivered: 100, + bounced: 5, + complained: 0, + window_start: Utc::now(), + updated_at: Some(Utc::now()), + }; + + // delivery_rate = 100/105 = 0.952 + // bounce_rate = 5/105 = 0.0476 + // complaint_rate = 0 + // score = 0.952 - (0.0476 * 10) - 0 = 0.952 - 0.476 = 0.476 + let score = IpRouter::calculate_score(&rep); + assert!(score > 0.0); + + // Test with complaints + let rep2 = IpReputation { + id: Uuid::new_v4(), + org_id: Uuid::new_v4(), + ip: "10.0.0.2".to_string(), + provider: "gmail".to_string(), + delivered: 1000, + bounced: 50, + complained: 5, + window_start: Utc::now(), + updated_at: Some(Utc::now()), + }; + + // complaint_rate = 5/1000 = 0.005 + // score penalty from complaints = 0.005 * 100 = 0.5 + let score2 = IpRouter::calculate_score(&rep2); + assert!(score2 < score); // Complaints should lower score + } +} diff --git a/src/marketing/warmup.rs b/src/marketing/warmup.rs new file mode 100644 index 00000000..b37331a3 --- /dev/null +++ b/src/marketing/warmup.rs @@ -0,0 +1,350 @@ +use chrono::{DateTime, Utc}; +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use std::net::IpAddr; +use uuid::Uuid; + +use crate::core::shared::schema::warmup_schedules; +use crate::core::shared::state::AppState; +use crate::marketing::lists::MarketingList; + +/// Standard warmup schedule from industry best practices +/// Day ranges and max emails per day +pub const WARMUP_SCHEDULE: [(u32, u32, u32); 8] = [ + (1, 2, 50), // Days 1-2: 50 emails/day + (3, 4, 100), // Days 3-4: 100 emails/day + (5, 7, 500), // Days 5-7: 500 emails/day + (8, 10, 1000), // Days 8-10: 1,000 emails/day + (11, 14, 5000), // Days 11-14: 5,000 emails/day + (15, 21, 10000), // Days 15-21: 10,000 emails/day + (22, 28, 50000), // Days 22-28: 50,000 emails/day + (29, u32::MAX, u32::MAX), // Day 29+: unlimited +]; + +/// Warmup schedule status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum WarmupStatus { + Active, + Paused, + Completed, +} + +impl WarmupStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Active => "active", + Self::Paused => "paused", + Self::Completed => "completed", + } + } +} + +impl From<&str> for WarmupStatus { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "paused" => Self::Paused, + "completed" => Self::Completed, + _ => Self::Active, + } + } +} + +/// Reason for pausing warmup +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PausedReason { + HighBounceRate { rate: f64 }, + HighComplaintRate { rate: f64 }, + ManualPause, + Other(String), +} + +impl PausedReason { + pub fn as_str(&self) -> String { + match self { + Self::HighBounceRate { rate } => format!("High bounce rate: {:.1}%", rate * 100.0), + Self::HighComplaintRate { rate } => format!("High complaint rate: {:.1}%", rate * 100.0), + Self::ManualPause => "Manually paused".to_string(), + Self::Other(s) => s.clone(), + } + } +} + +/// Warmup schedule for an IP address +#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable, AsChangeset)] +#[diesel(table_name = warmup_schedules)] +pub struct WarmupSchedule { + pub id: Uuid, + pub org_id: Uuid, + pub ip: String, + pub started_at: DateTime, + pub current_day: u32, + pub daily_limit: u32, + pub status: String, + pub paused_reason: Option, + pub created_at: DateTime, + pub updated_at: Option>, +} + +/// Engine for managing warmup schedules +pub struct WarmupEngine; + +impl WarmupEngine { + /// Get the daily email limit for an IP on a given day of warmup + pub fn get_daily_limit(day: u32) -> u32 { + for (start_day, end_day, limit) in WARMUP_SCHEDULE.iter() { + if day >= *start_day && day <= *end_day { + return *limit; + } + } + u32::MAX // Unlimited after day 28 + } + + /// Calculate current day in warmup schedule + pub fn calculate_current_day(started_at: DateTime) -> u32 { + let now = Utc::now(); + let duration = now.signed_duration_since(started_at); + let days = duration.num_days(); + if days < 0 { + return 1; + } + (days as u32) + 1 + } + + /// Get the daily limit for an IP based on its warmup schedule + pub async fn get_ip_daily_limit( + state: &AppState, + ip: &IpAddr, + ) -> Result { + use crate::core::shared::schema::warmup_schedules::dsl::*; + + let mut conn = state.conn.get()?; + let ip_str = ip.to_string(); + + let schedule: Option = warmup_schedules + .filter(ip.eq(&ip_str)) + .filter(status.eq("active")) + .first(&mut conn) + .optional()?; + + match schedule { + Some(sched) => { + let current_day = Self::calculate_current_day(sched.started_at); + Ok(Self::get_daily_limit(current_day)) + } + None => Ok(u32::MAX), // No warmup schedule = unlimited + } + } + + /// Start a new warmup schedule for an IP + pub async fn start_warmup( + state: &AppState, + org_id_val: Uuid, + ip_val: &IpAddr, + ) -> Result { + use crate::core::shared::schema::warmup_schedules::dsl::*; + + let mut conn = state.conn.get()?; + let ip_str = ip_val.to_string(); + let now = Utc::now(); + + // Check if already exists + let existing: Option = warmup_schedules + .filter(ip.eq(&ip_str)) + .first(&mut conn) + .optional()?; + + if let Some(mut sched) = existing { + // Reset the schedule + sched.started_at = now; + sched.current_day = 1; + sched.daily_limit = Self::get_daily_limit(1); + sched.status = WarmupStatus::Active.as_str().to_string(); + sched.paused_reason = None; + sched.updated_at = Some(now); + + diesel::update(warmup_schedules.filter(id.eq(sched.id))) + .set(&sched) + .execute(&mut conn)?; + + return Ok(sched); + } + + // Create new schedule + let new_schedule = WarmupSchedule { + id: Uuid::new_v4(), + org_id: org_id_val, + ip: ip_str, + started_at: now, + current_day: 1, + daily_limit: Self::get_daily_limit(1), + status: WarmupStatus::Active.as_str().to_string(), + paused_reason: None, + created_at: now, + updated_at: None, + }; + + diesel::insert_into(warmup_schedules) + .values(&new_schedule) + .execute(&mut conn)?; + + Ok(new_schedule) + } + + /// Pause warmup due to issues + pub async fn pause_warmup( + state: &AppState, + ip_val: &IpAddr, + reason: PausedReason, + ) -> Result<(), diesel::result::Error> { + use crate::core::shared::schema::warmup_schedules::dsl::*; + + let mut conn = state.conn.get()?; + let ip_str = ip_val.to_string(); + let now = Utc::now(); + + diesel::update(warmup_schedules.filter(ip.eq(&ip_str))) + .set(( + status.eq(WarmupStatus::Paused.as_str()), + paused_reason.eq(Some(reason.as_str())), + updated_at.eq(Some(now)), + )) + .execute(&mut conn)?; + + Ok(()) + } + + /// Resume warmup at same volume + pub async fn resume_warmup( + state: &AppState, + ip_val: &IpAddr, + ) -> Result<(), diesel::result::Error> { + use crate::core::shared::schema::warmup_schedules::dsl::*; + + let mut conn = state.conn.get()?; + let ip_str = ip_val.to_string(); + let now = Utc::now(); + + diesel::update(warmup_schedules.filter(ip.eq(&ip_str))) + .set(( + status.eq(WarmupStatus::Active.as_str()), + paused_reason.eq(None::), + updated_at.eq(Some(now)), + )) + .execute(&mut conn)?; + + Ok(()) + } + + /// Get engaged subscribers for warmup sends (opened in last 90 days) + pub async fn get_engaged_subscribers( + state: &AppState, + list_id: Uuid, + limit: usize, + ) -> Result, diesel::result::Error> { + use crate::core::shared::schema::marketing_contacts; + use crate::core::shared::schema::marketing_email_opens; + use diesel::dsl::sql; + + let mut conn = state.conn.get()?; + let ninety_days_ago = Utc::now() - chrono::Duration::days(90); + + // Get contacts who opened emails in the last 90 days + let engaged: Vec = marketing_contacts::table + .inner_join( + marketing_email_opens::table.on( + marketing_contacts::email.eq(marketing_email_opens::email) + ) + ) + .filter(marketing_contacts::list_id.eq(list_id)) + .filter(marketing_email_opens::opened_at.gt(ninety_days_ago)) + .select(marketing_contacts::email) + .distinct() + .limit(limit as i64) + .load(&mut conn)?; + + Ok(engaged) + } + + /// Check if bounce rate exceeds threshold (3%) + pub fn should_pause_for_bounces(sent: u32, bounces: u32) -> bool { + if sent == 0 { + return false; + } + let bounce_rate = bounces as f64 / sent as f64; + bounce_rate > 0.03 // 3% threshold + } + + /// Check if complaint rate exceeds threshold (0.1%) + pub fn should_pause_for_complaints(sent: u32, complaints: u32) -> bool { + if sent == 0 { + return false; + } + let complaint_rate = complaints as f64 / sent as f64; + complaint_rate > 0.001 // 0.1% threshold + } +} + +/// API response for warmup status +#[derive(Debug, Serialize)] +pub struct WarmupStatusResponse { + pub ip: String, + pub day: u32, + pub daily_limit: u32, + pub status: String, + pub paused_reason: Option, + pub started_at: DateTime, +} + +impl From for WarmupStatusResponse { + fn from(schedule: WarmupSchedule) -> Self { + let current_day = WarmupEngine::calculate_current_day(schedule.started_at); + Self { + ip: schedule.ip, + day: current_day, + daily_limit: schedule.daily_limit, + status: schedule.status, + paused_reason: schedule.paused_reason, + started_at: schedule.started_at, + } + } +} + +/// Request to start warmup +#[derive(Debug, Deserialize)] +pub struct StartWarmupRequest { + pub ip: String, +} + +/// Request to pause/resume warmup +#[derive(Debug, Deserialize)] +pub struct PauseWarmupRequest { + pub reason: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_daily_limit() { + assert_eq!(WarmupEngine::get_daily_limit(1), 50); + assert_eq!(WarmupEngine::get_daily_limit(2), 50); + assert_eq!(WarmupEngine::get_daily_limit(5), 500); + assert_eq!(WarmupEngine::get_daily_limit(15), 10000); + assert_eq!(WarmupEngine::get_daily_limit(29), u32::MAX); + assert_eq!(WarmupEngine::get_daily_limit(100), u32::MAX); + } + + #[test] + fn test_should_pause_for_bounces() { + assert!(!WarmupEngine::should_pause_for_bounces(100, 2)); // 2% bounce rate + assert!(WarmupEngine::should_pause_for_bounces(100, 4)); // 4% bounce rate + assert!(!WarmupEngine::should_pause_for_bounces(0, 0)); + } + + #[test] + fn test_should_pause_for_complaints() { + assert!(!WarmupEngine::should_pause_for_complaints(1000, 0)); // 0% complaint rate + assert!(WarmupEngine::should_pause_for_complaints(1000, 2)); // 0.2% complaint rate + } +}