package ha import ( "context" "fmt" "log" "sync" "time" "containr/internal/deployment" "containr/internal/metrics" ) // HighAvailabilityManager manages high availability features type HighAvailabilityManager struct { scheduler *deployment.Scheduler metricsCollector *metrics.MetricsCollector failoverManager *FailoverManager healthChecker *HealthChecker alertManager *AlertManager mu sync.RWMutex enabled bool checkInterval time.Duration failoverThreshold int } // FailoverManager handles service failover operations type FailoverManager struct { scheduler *deployment.Scheduler failoverPolicies map[string]*FailoverPolicy mu sync.RWMutex } // FailoverPolicy defines failover behavior for a service type FailoverPolicy struct { ServiceID string `json:"service_id"` Enabled bool `json:"enabled"` MinHealthyNodes int `json:"min_healthy_nodes"` MaxFailures int `json:"max_failures"` FailoverTimeout time.Duration `json:"failover_timeout"` RecoveryTimeout time.Duration `json:"recovery_timeout"` FailoverStrategy FailoverStrategy `json:"failover_strategy"` BackupNodes []string `json:"backup_nodes"` HealthCheckConfig *HealthCheckConfig `json:"health_check_config"` } // FailoverStrategy defines how failover is performed type FailoverStrategy string const ( FailoverStrategyActivePassive FailoverStrategy = "active_passive" FailoverStrategyActiveActive FailoverStrategy = "active_active" FailoverStrategyGraceful FailoverStrategy = "graceful" ) // HealthCheckConfig defines health check parameters type HealthCheckConfig struct { Interval time.Duration `json:"interval"` Timeout time.Duration `json:"timeout"` UnhealthyThreshold int `json:"unhealthy_threshold"` HealthyThreshold int `json:"healthy_threshold"` Path string `json:"path"` Port int `json:"port"` Protocol string `json:"protocol"` } // HealthChecker performs health checks on services and nodes type HealthChecker struct { scheduler *deployment.Scheduler checks map[string]*HealthCheck results map[string]*HealthCheckResult mu sync.RWMutex checkInterval time.Duration } // HealthCheck represents a health check configuration type HealthCheck struct { ID string `json:"id"` ServiceID string `json:"service_id"` NodeID string `json:"node_id"` Type HealthCheckType `json:"type"` Config HealthCheckConfig `json:"config"` LastCheck time.Time `json:"last_check"` Status HealthStatus `json:"status"` } // HealthCheckType represents the type of health check type HealthCheckType string const ( HealthCheckTypeHTTP HealthCheckType = "http" HealthCheckTypeTCP HealthCheckType = "tcp" HealthCheckTypeCommand HealthCheckType = "command" ) // HealthStatus represents the health status type HealthStatus string const ( HealthStatusHealthy HealthStatus = "healthy" HealthStatusUnhealthy HealthStatus = "unhealthy" HealthStatusUnknown HealthStatus = "unknown" ) // HealthCheckResult represents the result of a health check type HealthCheckResult struct { CheckID string `json:"check_id"` Status HealthStatus `json:"status"` Message string `json:"message"` Latency time.Duration `json:"latency"` Timestamp time.Time `json:"timestamp"` ErrorCode string `json:"error_code,omitempty"` } // AlertManager handles alerting and notifications type AlertManager struct { rules map[string]*AlertRule activeAlerts map[string]*Alert notifiers map[string]Notifier mu sync.RWMutex } // AlertRule defines when alerts should be triggered type AlertRule struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` Enabled bool `json:"enabled"` Condition AlertCondition `json:"condition"` Severity AlertSeverity `json:"severity"` Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Notifiers []string `json:"notifiers"` Cooldown time.Duration `json:"cooldown"` } // AlertCondition defines the condition for triggering an alert type AlertCondition struct { Metric string `json:"metric"` Operator string `json:"operator"` // >, <, >=, <=, ==, != Threshold float64 `json:"threshold"` Duration time.Duration `json:"duration"` } // AlertSeverity represents the severity level of an alert type AlertSeverity string const ( AlertSeverityCritical AlertSeverity = "critical" AlertSeverityWarning AlertSeverity = "warning" AlertSeverityInfo AlertSeverity = "info" ) // Alert represents an active alert type Alert struct { ID string `json:"id"` RuleID string `json:"rule_id"` Status AlertStatus `json:"status"` Severity AlertSeverity `json:"severity"` Message string `json:"message"` Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` StartsAt time.Time `json:"starts_at"` EndsAt *time.Time `json:"ends_at,omitempty"` UpdatedAt time.Time `json:"updated_at"` } // AlertStatus represents the status of an alert type AlertStatus string const ( AlertStatusFiring AlertStatus = "firing" AlertStatusResolved AlertStatus = "resolved" ) // Notifier sends alert notifications type Notifier interface { Send(ctx context.Context, alert *Alert) error Type() string } // NewHighAvailabilityManager creates a new HA manager func NewHighAvailabilityManager(scheduler *deployment.Scheduler, metricsCollector *metrics.MetricsCollector) *HighAvailabilityManager { failoverManager := &FailoverManager{ scheduler: scheduler, failoverPolicies: make(map[string]*FailoverPolicy), } healthChecker := &HealthChecker{ scheduler: scheduler, checks: make(map[string]*HealthCheck), results: make(map[string]*HealthCheckResult), checkInterval: 30 * time.Second, } alertManager := &AlertManager{ rules: make(map[string]*AlertRule), activeAlerts: make(map[string]*Alert), notifiers: make(map[string]Notifier), } return &HighAvailabilityManager{ scheduler: scheduler, metricsCollector: metricsCollector, failoverManager: failoverManager, healthChecker: healthChecker, alertManager: alertManager, enabled: true, checkInterval: 30 * time.Second, failoverThreshold: 3, } } // Start starts the HA management process func (ha *HighAvailabilityManager) Start(ctx context.Context) error { ticker := time.NewTicker(ha.checkInterval) defer ticker.Stop() log.Printf("HighAvailabilityManager started with check interval: %v", ha.checkInterval) // Start health checker go ha.healthChecker.Start(ctx) // Start alert manager go ha.alertManager.Start(ctx) for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: if ha.enabled { if err := ha.checkHighAvailability(ctx); err != nil { log.Printf("Error during HA check: %v", err) } } } } } // checkHighAvailability performs HA checks and takes action if needed func (ha *HighAvailabilityManager) checkHighAvailability(ctx context.Context) error { // Check node health nodes := ha.scheduler.GetNodes() unhealthyNodes := 0 for _, node := range nodes { if !ha.isNodeHealthy(node) { unhealthyNodes++ log.Printf("Node %s is unhealthy", node.ID) } } // Trigger failover if too many nodes are unhealthy if unhealthyNodes >= ha.failoverThreshold { log.Printf("Failover threshold reached: %d unhealthy nodes", unhealthyNodes) if err := ha.failoverManager.TriggerFailover(ctx, "node_failure"); err != nil { return fmt.Errorf("failed to trigger failover: %w", err) } } return nil } // isNodeHealthy checks if a node is healthy func (ha *HighAvailabilityManager) isNodeHealthy(node *deployment.Node) bool { // Check if node is ready if node.Status != "ready" { return false } // Check heartbeat if time.Since(node.LastHeartbeat) > 2*time.Minute { return false } // Check resource usage if node.Usage.CPU > 95 || node.Usage.Memory > int64(float64(node.Capacity.Memory)*0.95) { return false } return true } // SetFailoverPolicy sets or updates a failover policy func (ha *HighAvailabilityManager) SetFailoverPolicy(policy *FailoverPolicy) error { ha.mu.Lock() defer ha.mu.Unlock() ha.failoverManager.SetFailoverPolicy(policy) return nil } // GetFailoverPolicy returns a failover policy func (ha *HighAvailabilityManager) GetFailoverPolicy(serviceID string) (*FailoverPolicy, error) { ha.mu.RLock() defer ha.mu.RUnlock() return ha.failoverManager.GetFailoverPolicy(serviceID) } // TriggerFailover manually triggers a failover func (ha *HighAvailabilityManager) TriggerFailover(ctx context.Context, reason string) error { return ha.failoverManager.TriggerFailover(ctx, reason) } // GetHealthStatus returns the health status of all services and nodes func (ha *HighAvailabilityManager) GetHealthStatus() map[string]interface{} { ha.mu.RLock() defer ha.mu.RUnlock() nodes := ha.scheduler.GetNodes() healthyNodes := 0 unhealthyNodes := 0 for _, node := range nodes { if ha.isNodeHealthy(node) { healthyNodes++ } else { unhealthyNodes++ } } healthChecks := ha.healthChecker.GetAllHealthChecks() healthyChecks := 0 unhealthyChecks := 0 for _, result := range ha.healthChecker.GetAllResults() { if result.Status == HealthStatusHealthy { healthyChecks++ } else { unhealthyChecks++ } } activeAlerts := ha.alertManager.GetActiveAlerts() return map[string]interface{}{ "nodes": map[string]interface{}{ "total": len(nodes), "healthy": healthyNodes, "unhealthy": unhealthyNodes, }, "health_checks": map[string]interface{}{ "total": len(healthChecks), "healthy": healthyChecks, "unhealthy": unhealthyChecks, }, "alerts": map[string]interface{}{ "active": len(activeAlerts), }, "enabled": ha.enabled, } } // Enable enables the HA manager func (ha *HighAvailabilityManager) Enable() { ha.mu.Lock() defer ha.mu.Unlock() ha.enabled = true } // Disable disables the HA manager func (ha *HighAvailabilityManager) Disable() { ha.mu.Lock() defer ha.mu.Unlock() ha.enabled = false } // IsEnabled returns whether the HA manager is enabled func (ha *HighAvailabilityManager) IsEnabled() bool { ha.mu.RLock() defer ha.mu.RUnlock() return ha.enabled } // FailoverManager methods // SetFailoverPolicy sets a failover policy func (fm *FailoverManager) SetFailoverPolicy(policy *FailoverPolicy) { fm.mu.Lock() defer fm.mu.Unlock() fm.failoverPolicies[policy.ServiceID] = policy } // GetFailoverPolicy returns a failover policy func (fm *FailoverManager) GetFailoverPolicy(serviceID string) (*FailoverPolicy, error) { fm.mu.RLock() defer fm.mu.RUnlock() policy, exists := fm.failoverPolicies[serviceID] if !exists { return nil, fmt.Errorf("no failover policy found for service: %s", serviceID) } return policy, nil } // TriggerFailover triggers a failover for affected services func (fm *FailoverManager) TriggerFailover(ctx context.Context, reason string) error { fm.mu.RLock() policies := make([]*FailoverPolicy, 0, len(fm.failoverPolicies)) for _, policy := range fm.failoverPolicies { if policy.Enabled { policies = append(policies, policy) } } fm.mu.RUnlock() for _, policy := range policies { if err := fm.performFailover(ctx, policy, reason); err != nil { log.Printf("Failed to perform failover for service %s: %v", policy.ServiceID, err) } } return nil } // performFailover performs failover for a specific service func (fm *FailoverManager) performFailover(ctx context.Context, policy *FailoverPolicy, reason string) error { log.Printf("Performing failover for service %s: %s", policy.ServiceID, reason) // In a real implementation, this would: // 1. Identify healthy backup nodes // 2. Start new instances on backup nodes // 3. Update DNS/load balancer to point to new instances // 4. Wait for health checks to pass // 5. Shut down unhealthy instances // For now, we'll just log the action log.Printf("Failover completed for service %s", policy.ServiceID) return nil } // HealthChecker methods // Start starts the health checker func (hc *HealthChecker) Start(ctx context.Context) error { ticker := time.NewTicker(hc.checkInterval) defer ticker.Stop() log.Printf("HealthChecker started with check interval: %v", hc.checkInterval) for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: if err := hc.performHealthChecks(ctx); err != nil { log.Printf("Error during health checks: %v", err) } } } } // performHealthChecks performs all configured health checks func (hc *HealthChecker) performHealthChecks(ctx context.Context) error { hc.mu.RLock() checks := make([]*HealthCheck, 0, len(hc.checks)) for _, check := range hc.checks { checks = append(checks, check) } hc.mu.RUnlock() for _, check := range checks { result := hc.performHealthCheck(ctx, check) hc.mu.Lock() hc.results[check.ID] = result hc.mu.Unlock() } return nil } // performHealthCheck performs a single health check func (hc *HealthChecker) performHealthCheck(ctx context.Context, check *HealthCheck) *HealthCheckResult { start := time.Now() result := &HealthCheckResult{ CheckID: check.ID, Timestamp: start, Status: HealthStatusUnknown, } // In a real implementation, this would perform actual health checks // For now, we'll simulate the check time.Sleep(10 * time.Millisecond) // Simulate network latency // Simulate healthy/unhealthy based on some logic if time.Now().Unix()%10 == 0 { // 10% chance of being unhealthy result.Status = HealthStatusUnhealthy result.Message = "Service not responding" result.ErrorCode = "TIMEOUT" } else { result.Status = HealthStatusHealthy result.Message = "Service is healthy" } result.Latency = time.Since(start) return result } // AddHealthCheck adds a new health check func (hc *HealthChecker) AddHealthCheck(check *HealthCheck) { hc.mu.Lock() defer hc.mu.Unlock() hc.checks[check.ID] = check } // RemoveHealthCheck removes a health check func (hc *HealthChecker) RemoveHealthCheck(checkID string) { hc.mu.Lock() defer hc.mu.Unlock() delete(hc.checks, checkID) delete(hc.results, checkID) } // GetAllHealthChecks returns all health checks func (hc *HealthChecker) GetAllHealthChecks() map[string]*HealthCheck { hc.mu.RLock() defer hc.mu.RUnlock() result := make(map[string]*HealthCheck) for id, check := range hc.checks { result[id] = check } return result } // GetAllResults returns all health check results func (hc *HealthChecker) GetAllResults() map[string]*HealthCheckResult { hc.mu.RLock() defer hc.mu.RUnlock() result := make(map[string]*HealthCheckResult) for id, checkResult := range hc.results { result[id] = checkResult } return result } // AlertManager methods // Start starts the alert manager func (am *AlertManager) Start(ctx context.Context) error { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() log.Printf("AlertManager started") for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: if err := am.evaluateAlertRules(ctx); err != nil { log.Printf("Error evaluating alert rules: %v", err) } } } } // evaluateAlertRules evaluates all alert rules and triggers alerts if needed func (am *AlertManager) evaluateAlertRules(ctx context.Context) error { am.mu.RLock() rules := make([]*AlertRule, 0, len(am.rules)) for _, rule := range am.rules { if rule.Enabled { rules = append(rules, rule) } } am.mu.RUnlock() for _, rule := range rules { if am.shouldTriggerAlert(rule) { alert := am.createAlert(rule) if err := am.triggerAlert(ctx, alert); err != nil { log.Printf("Failed to trigger alert: %v", err) } } } return nil } // shouldTriggerAlert checks if an alert should be triggered func (am *AlertManager) shouldTriggerAlert(rule *AlertRule) bool { // In a real implementation, this would query metrics and evaluate the condition // For now, we'll simulate based on time return time.Now().Unix()%20 == 0 // 5% chance of triggering } // createAlert creates an alert from a rule func (am *AlertManager) createAlert(rule *AlertRule) *Alert { return &Alert{ ID: fmt.Sprintf("alert_%s_%d", rule.ID, time.Now().Unix()), RuleID: rule.ID, Status: AlertStatusFiring, Severity: rule.Severity, Message: fmt.Sprintf("Alert triggered: %s", rule.Name), Labels: rule.Labels, Annotations: rule.Annotations, StartsAt: time.Now(), UpdatedAt: time.Now(), } } // triggerAlert triggers an alert func (am *AlertManager) triggerAlert(ctx context.Context, alert *Alert) error { am.mu.Lock() am.activeAlerts[alert.ID] = alert am.mu.Unlock() // Send notifications for _, notifierID := range am.getAlertRule(alert.RuleID).Notifiers { if notifier, exists := am.notifiers[notifierID]; exists { if err := notifier.Send(ctx, alert); err != nil { log.Printf("Failed to send notification via %s: %v", notifierID, err) } } } log.Printf("Alert triggered: %s", alert.ID) return nil } // getAlertRule returns the rule for an alert func (am *AlertManager) getAlertRule(ruleID string) *AlertRule { am.mu.RLock() defer am.mu.RUnlock() return am.rules[ruleID] } // AddAlertRule adds a new alert rule func (am *AlertManager) AddAlertRule(rule *AlertRule) { am.mu.Lock() defer am.mu.Unlock() am.rules[rule.ID] = rule } // RemoveAlertRule removes an alert rule func (am *AlertManager) RemoveAlertRule(ruleID string) { am.mu.Lock() defer am.mu.Unlock() delete(am.rules, ruleID) } // AddNotifier adds a new notifier func (am *AlertManager) AddNotifier(id string, notifier Notifier) { am.mu.Lock() defer am.mu.Unlock() am.notifiers[id] = notifier } // GetActiveAlerts returns all active alerts func (am *AlertManager) GetActiveAlerts() map[string]*Alert { am.mu.RLock() defer am.mu.RUnlock() result := make(map[string]*Alert) for id, alert := range am.activeAlerts { result[id] = alert } return result } // ResolveAlert resolves an alert func (am *AlertManager) ResolveAlert(alertID string) { am.mu.Lock() defer am.mu.Unlock() if alert, exists := am.activeAlerts[alertID]; exists { now := time.Now() alert.Status = AlertStatusResolved alert.EndsAt = &now alert.UpdatedAt = now } delete(am.activeAlerts, alertID) } // Mock Notifier implementations // EmailNotifier sends alerts via email type EmailNotifier struct { SMTPHost string SMTPPort int Username string Password string From string To []string } func (n *EmailNotifier) Send(ctx context.Context, alert *Alert) error { log.Printf("Sending email alert: %s", alert.Message) // In a real implementation, this would send an actual email return nil } func (n *EmailNotifier) Type() string { return "email" } // SlackNotifier sends alerts to Slack type SlackNotifier struct { WebhookURL string Channel string } func (n *SlackNotifier) Send(ctx context.Context, alert *Alert) error { log.Printf("Sending Slack alert: %s", alert.Message) // In a real implementation, this would send to Slack webhook return nil } func (n *SlackNotifier) Type() string { return "slack" } // WebhookNotifier sends alerts via webhook type WebhookNotifier struct { URL string } func (n *WebhookNotifier) Send(ctx context.Context, alert *Alert) error { log.Printf("Sending webhook alert: %s", alert.Message) // In a real implementation, this would send HTTP request return nil } func (n *WebhookNotifier) Type() string { return "webhook" }