Files
Tomas Dvorak 355a97bab4 overhaul
2026-04-14 18:04:48 +02:00

737 lines
20 KiB
Go

package ha
import (
"context"
"fmt"
"log"
"sync"
"time"
"containr/internal/deployment"
"containr/internal/metrics"
)
// HighAvailabilityManager manages high availability features
type HighAvailabilityManager struct {
scheduler *deployment.Scheduler
metricsCollector *metrics.MetricsCollector
failoverManager *FailoverManager
healthChecker *HealthChecker
alertManager *AlertManager
mu sync.RWMutex
enabled bool
checkInterval time.Duration
failoverThreshold int
}
// FailoverManager handles service failover operations
type FailoverManager struct {
scheduler *deployment.Scheduler
failoverPolicies map[string]*FailoverPolicy
mu sync.RWMutex
}
// FailoverPolicy defines failover behavior for a service
type FailoverPolicy struct {
ServiceID string `json:"service_id"`
Enabled bool `json:"enabled"`
MinHealthyNodes int `json:"min_healthy_nodes"`
MaxFailures int `json:"max_failures"`
FailoverTimeout time.Duration `json:"failover_timeout"`
RecoveryTimeout time.Duration `json:"recovery_timeout"`
FailoverStrategy FailoverStrategy `json:"failover_strategy"`
BackupNodes []string `json:"backup_nodes"`
HealthCheckConfig *HealthCheckConfig `json:"health_check_config"`
}
// FailoverStrategy defines how failover is performed
type FailoverStrategy string
const (
FailoverStrategyActivePassive FailoverStrategy = "active_passive"
FailoverStrategyActiveActive FailoverStrategy = "active_active"
FailoverStrategyGraceful FailoverStrategy = "graceful"
)
// HealthCheckConfig defines health check parameters
type HealthCheckConfig struct {
Interval time.Duration `json:"interval"`
Timeout time.Duration `json:"timeout"`
UnhealthyThreshold int `json:"unhealthy_threshold"`
HealthyThreshold int `json:"healthy_threshold"`
Path string `json:"path"`
Port int `json:"port"`
Protocol string `json:"protocol"`
}
// HealthChecker performs health checks on services and nodes
type HealthChecker struct {
scheduler *deployment.Scheduler
checks map[string]*HealthCheck
results map[string]*HealthCheckResult
mu sync.RWMutex
checkInterval time.Duration
}
// HealthCheck represents a health check configuration
type HealthCheck struct {
ID string `json:"id"`
ServiceID string `json:"service_id"`
NodeID string `json:"node_id"`
Type HealthCheckType `json:"type"`
Config HealthCheckConfig `json:"config"`
LastCheck time.Time `json:"last_check"`
Status HealthStatus `json:"status"`
}
// HealthCheckType represents the type of health check
type HealthCheckType string
const (
HealthCheckTypeHTTP HealthCheckType = "http"
HealthCheckTypeTCP HealthCheckType = "tcp"
HealthCheckTypeCommand HealthCheckType = "command"
)
// HealthStatus represents the health status
type HealthStatus string
const (
HealthStatusHealthy HealthStatus = "healthy"
HealthStatusUnhealthy HealthStatus = "unhealthy"
HealthStatusUnknown HealthStatus = "unknown"
)
// HealthCheckResult represents the result of a health check
type HealthCheckResult struct {
CheckID string `json:"check_id"`
Status HealthStatus `json:"status"`
Message string `json:"message"`
Latency time.Duration `json:"latency"`
Timestamp time.Time `json:"timestamp"`
ErrorCode string `json:"error_code,omitempty"`
}
// AlertManager handles alerting and notifications
type AlertManager struct {
rules map[string]*AlertRule
activeAlerts map[string]*Alert
notifiers map[string]Notifier
mu sync.RWMutex
}
// AlertRule defines when alerts should be triggered
type AlertRule struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
Condition AlertCondition `json:"condition"`
Severity AlertSeverity `json:"severity"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Notifiers []string `json:"notifiers"`
Cooldown time.Duration `json:"cooldown"`
}
// AlertCondition defines the condition for triggering an alert
type AlertCondition struct {
Metric string `json:"metric"`
Operator string `json:"operator"` // >, <, >=, <=, ==, !=
Threshold float64 `json:"threshold"`
Duration time.Duration `json:"duration"`
}
// AlertSeverity represents the severity level of an alert
type AlertSeverity string
const (
AlertSeverityCritical AlertSeverity = "critical"
AlertSeverityWarning AlertSeverity = "warning"
AlertSeverityInfo AlertSeverity = "info"
)
// Alert represents an active alert
type Alert struct {
ID string `json:"id"`
RuleID string `json:"rule_id"`
Status AlertStatus `json:"status"`
Severity AlertSeverity `json:"severity"`
Message string `json:"message"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
StartsAt time.Time `json:"starts_at"`
EndsAt *time.Time `json:"ends_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
// AlertStatus represents the status of an alert
type AlertStatus string
const (
AlertStatusFiring AlertStatus = "firing"
AlertStatusResolved AlertStatus = "resolved"
)
// Notifier sends alert notifications
type Notifier interface {
Send(ctx context.Context, alert *Alert) error
Type() string
}
// NewHighAvailabilityManager creates a new HA manager
func NewHighAvailabilityManager(scheduler *deployment.Scheduler, metricsCollector *metrics.MetricsCollector) *HighAvailabilityManager {
failoverManager := &FailoverManager{
scheduler: scheduler,
failoverPolicies: make(map[string]*FailoverPolicy),
}
healthChecker := &HealthChecker{
scheduler: scheduler,
checks: make(map[string]*HealthCheck),
results: make(map[string]*HealthCheckResult),
checkInterval: 30 * time.Second,
}
alertManager := &AlertManager{
rules: make(map[string]*AlertRule),
activeAlerts: make(map[string]*Alert),
notifiers: make(map[string]Notifier),
}
return &HighAvailabilityManager{
scheduler: scheduler,
metricsCollector: metricsCollector,
failoverManager: failoverManager,
healthChecker: healthChecker,
alertManager: alertManager,
enabled: true,
checkInterval: 30 * time.Second,
failoverThreshold: 3,
}
}
// Start starts the HA management process
func (ha *HighAvailabilityManager) Start(ctx context.Context) error {
ticker := time.NewTicker(ha.checkInterval)
defer ticker.Stop()
log.Printf("HighAvailabilityManager started with check interval: %v", ha.checkInterval)
// Start health checker
go ha.healthChecker.Start(ctx)
// Start alert manager
go ha.alertManager.Start(ctx)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if ha.enabled {
if err := ha.checkHighAvailability(ctx); err != nil {
log.Printf("Error during HA check: %v", err)
}
}
}
}
}
// checkHighAvailability performs HA checks and takes action if needed
func (ha *HighAvailabilityManager) checkHighAvailability(ctx context.Context) error {
// Check node health
nodes := ha.scheduler.GetNodes()
unhealthyNodes := 0
for _, node := range nodes {
if !ha.isNodeHealthy(node) {
unhealthyNodes++
log.Printf("Node %s is unhealthy", node.ID)
}
}
// Trigger failover if too many nodes are unhealthy
if unhealthyNodes >= ha.failoverThreshold {
log.Printf("Failover threshold reached: %d unhealthy nodes", unhealthyNodes)
if err := ha.failoverManager.TriggerFailover(ctx, "node_failure"); err != nil {
return fmt.Errorf("failed to trigger failover: %w", err)
}
}
return nil
}
// isNodeHealthy checks if a node is healthy
func (ha *HighAvailabilityManager) isNodeHealthy(node *deployment.Node) bool {
// Check if node is ready
if node.Status != "ready" {
return false
}
// Check heartbeat
if time.Since(node.LastHeartbeat) > 2*time.Minute {
return false
}
// Check resource usage
if node.Usage.CPU > 95 || node.Usage.Memory > int64(float64(node.Capacity.Memory)*0.95) {
return false
}
return true
}
// SetFailoverPolicy sets or updates a failover policy
func (ha *HighAvailabilityManager) SetFailoverPolicy(policy *FailoverPolicy) error {
ha.mu.Lock()
defer ha.mu.Unlock()
ha.failoverManager.SetFailoverPolicy(policy)
return nil
}
// GetFailoverPolicy returns a failover policy
func (ha *HighAvailabilityManager) GetFailoverPolicy(serviceID string) (*FailoverPolicy, error) {
ha.mu.RLock()
defer ha.mu.RUnlock()
return ha.failoverManager.GetFailoverPolicy(serviceID)
}
// TriggerFailover manually triggers a failover
func (ha *HighAvailabilityManager) TriggerFailover(ctx context.Context, reason string) error {
return ha.failoverManager.TriggerFailover(ctx, reason)
}
// GetHealthStatus returns the health status of all services and nodes
func (ha *HighAvailabilityManager) GetHealthStatus() map[string]interface{} {
ha.mu.RLock()
defer ha.mu.RUnlock()
nodes := ha.scheduler.GetNodes()
healthyNodes := 0
unhealthyNodes := 0
for _, node := range nodes {
if ha.isNodeHealthy(node) {
healthyNodes++
} else {
unhealthyNodes++
}
}
healthChecks := ha.healthChecker.GetAllHealthChecks()
healthyChecks := 0
unhealthyChecks := 0
for _, result := range ha.healthChecker.GetAllResults() {
if result.Status == HealthStatusHealthy {
healthyChecks++
} else {
unhealthyChecks++
}
}
activeAlerts := ha.alertManager.GetActiveAlerts()
return map[string]interface{}{
"nodes": map[string]interface{}{
"total": len(nodes),
"healthy": healthyNodes,
"unhealthy": unhealthyNodes,
},
"health_checks": map[string]interface{}{
"total": len(healthChecks),
"healthy": healthyChecks,
"unhealthy": unhealthyChecks,
},
"alerts": map[string]interface{}{
"active": len(activeAlerts),
},
"enabled": ha.enabled,
}
}
// Enable enables the HA manager
func (ha *HighAvailabilityManager) Enable() {
ha.mu.Lock()
defer ha.mu.Unlock()
ha.enabled = true
}
// Disable disables the HA manager
func (ha *HighAvailabilityManager) Disable() {
ha.mu.Lock()
defer ha.mu.Unlock()
ha.enabled = false
}
// IsEnabled returns whether the HA manager is enabled
func (ha *HighAvailabilityManager) IsEnabled() bool {
ha.mu.RLock()
defer ha.mu.RUnlock()
return ha.enabled
}
// FailoverManager methods
// SetFailoverPolicy sets a failover policy
func (fm *FailoverManager) SetFailoverPolicy(policy *FailoverPolicy) {
fm.mu.Lock()
defer fm.mu.Unlock()
fm.failoverPolicies[policy.ServiceID] = policy
}
// GetFailoverPolicy returns a failover policy
func (fm *FailoverManager) GetFailoverPolicy(serviceID string) (*FailoverPolicy, error) {
fm.mu.RLock()
defer fm.mu.RUnlock()
policy, exists := fm.failoverPolicies[serviceID]
if !exists {
return nil, fmt.Errorf("no failover policy found for service: %s", serviceID)
}
return policy, nil
}
// TriggerFailover triggers a failover for affected services
func (fm *FailoverManager) TriggerFailover(ctx context.Context, reason string) error {
fm.mu.RLock()
policies := make([]*FailoverPolicy, 0, len(fm.failoverPolicies))
for _, policy := range fm.failoverPolicies {
if policy.Enabled {
policies = append(policies, policy)
}
}
fm.mu.RUnlock()
for _, policy := range policies {
if err := fm.performFailover(ctx, policy, reason); err != nil {
log.Printf("Failed to perform failover for service %s: %v", policy.ServiceID, err)
}
}
return nil
}
// performFailover performs failover for a specific service
func (fm *FailoverManager) performFailover(ctx context.Context, policy *FailoverPolicy, reason string) error {
log.Printf("Performing failover for service %s: %s", policy.ServiceID, reason)
// In a real implementation, this would:
// 1. Identify healthy backup nodes
// 2. Start new instances on backup nodes
// 3. Update DNS/load balancer to point to new instances
// 4. Wait for health checks to pass
// 5. Shut down unhealthy instances
// For now, we'll just log the action
log.Printf("Failover completed for service %s", policy.ServiceID)
return nil
}
// HealthChecker methods
// Start starts the health checker
func (hc *HealthChecker) Start(ctx context.Context) error {
ticker := time.NewTicker(hc.checkInterval)
defer ticker.Stop()
log.Printf("HealthChecker started with check interval: %v", hc.checkInterval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := hc.performHealthChecks(ctx); err != nil {
log.Printf("Error during health checks: %v", err)
}
}
}
}
// performHealthChecks performs all configured health checks
func (hc *HealthChecker) performHealthChecks(ctx context.Context) error {
hc.mu.RLock()
checks := make([]*HealthCheck, 0, len(hc.checks))
for _, check := range hc.checks {
checks = append(checks, check)
}
hc.mu.RUnlock()
for _, check := range checks {
result := hc.performHealthCheck(ctx, check)
hc.mu.Lock()
hc.results[check.ID] = result
hc.mu.Unlock()
}
return nil
}
// performHealthCheck performs a single health check
func (hc *HealthChecker) performHealthCheck(ctx context.Context, check *HealthCheck) *HealthCheckResult {
start := time.Now()
result := &HealthCheckResult{
CheckID: check.ID,
Timestamp: start,
Status: HealthStatusUnknown,
}
// In a real implementation, this would perform actual health checks
// For now, we'll simulate the check
time.Sleep(10 * time.Millisecond) // Simulate network latency
// Simulate healthy/unhealthy based on some logic
if time.Now().Unix()%10 == 0 { // 10% chance of being unhealthy
result.Status = HealthStatusUnhealthy
result.Message = "Service not responding"
result.ErrorCode = "TIMEOUT"
} else {
result.Status = HealthStatusHealthy
result.Message = "Service is healthy"
}
result.Latency = time.Since(start)
return result
}
// AddHealthCheck adds a new health check
func (hc *HealthChecker) AddHealthCheck(check *HealthCheck) {
hc.mu.Lock()
defer hc.mu.Unlock()
hc.checks[check.ID] = check
}
// RemoveHealthCheck removes a health check
func (hc *HealthChecker) RemoveHealthCheck(checkID string) {
hc.mu.Lock()
defer hc.mu.Unlock()
delete(hc.checks, checkID)
delete(hc.results, checkID)
}
// GetAllHealthChecks returns all health checks
func (hc *HealthChecker) GetAllHealthChecks() map[string]*HealthCheck {
hc.mu.RLock()
defer hc.mu.RUnlock()
result := make(map[string]*HealthCheck)
for id, check := range hc.checks {
result[id] = check
}
return result
}
// GetAllResults returns all health check results
func (hc *HealthChecker) GetAllResults() map[string]*HealthCheckResult {
hc.mu.RLock()
defer hc.mu.RUnlock()
result := make(map[string]*HealthCheckResult)
for id, checkResult := range hc.results {
result[id] = checkResult
}
return result
}
// AlertManager methods
// Start starts the alert manager
func (am *AlertManager) Start(ctx context.Context) error {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
log.Printf("AlertManager started")
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := am.evaluateAlertRules(ctx); err != nil {
log.Printf("Error evaluating alert rules: %v", err)
}
}
}
}
// evaluateAlertRules evaluates all alert rules and triggers alerts if needed
func (am *AlertManager) evaluateAlertRules(ctx context.Context) error {
am.mu.RLock()
rules := make([]*AlertRule, 0, len(am.rules))
for _, rule := range am.rules {
if rule.Enabled {
rules = append(rules, rule)
}
}
am.mu.RUnlock()
for _, rule := range rules {
if am.shouldTriggerAlert(rule) {
alert := am.createAlert(rule)
if err := am.triggerAlert(ctx, alert); err != nil {
log.Printf("Failed to trigger alert: %v", err)
}
}
}
return nil
}
// shouldTriggerAlert checks if an alert should be triggered
func (am *AlertManager) shouldTriggerAlert(rule *AlertRule) bool {
// In a real implementation, this would query metrics and evaluate the condition
// For now, we'll simulate based on time
return time.Now().Unix()%20 == 0 // 5% chance of triggering
}
// createAlert creates an alert from a rule
func (am *AlertManager) createAlert(rule *AlertRule) *Alert {
return &Alert{
ID: fmt.Sprintf("alert_%s_%d", rule.ID, time.Now().Unix()),
RuleID: rule.ID,
Status: AlertStatusFiring,
Severity: rule.Severity,
Message: fmt.Sprintf("Alert triggered: %s", rule.Name),
Labels: rule.Labels,
Annotations: rule.Annotations,
StartsAt: time.Now(),
UpdatedAt: time.Now(),
}
}
// triggerAlert triggers an alert
func (am *AlertManager) triggerAlert(ctx context.Context, alert *Alert) error {
am.mu.Lock()
am.activeAlerts[alert.ID] = alert
am.mu.Unlock()
// Send notifications
for _, notifierID := range am.getAlertRule(alert.RuleID).Notifiers {
if notifier, exists := am.notifiers[notifierID]; exists {
if err := notifier.Send(ctx, alert); err != nil {
log.Printf("Failed to send notification via %s: %v", notifierID, err)
}
}
}
log.Printf("Alert triggered: %s", alert.ID)
return nil
}
// getAlertRule returns the rule for an alert
func (am *AlertManager) getAlertRule(ruleID string) *AlertRule {
am.mu.RLock()
defer am.mu.RUnlock()
return am.rules[ruleID]
}
// AddAlertRule adds a new alert rule
func (am *AlertManager) AddAlertRule(rule *AlertRule) {
am.mu.Lock()
defer am.mu.Unlock()
am.rules[rule.ID] = rule
}
// RemoveAlertRule removes an alert rule
func (am *AlertManager) RemoveAlertRule(ruleID string) {
am.mu.Lock()
defer am.mu.Unlock()
delete(am.rules, ruleID)
}
// AddNotifier adds a new notifier
func (am *AlertManager) AddNotifier(id string, notifier Notifier) {
am.mu.Lock()
defer am.mu.Unlock()
am.notifiers[id] = notifier
}
// GetActiveAlerts returns all active alerts
func (am *AlertManager) GetActiveAlerts() map[string]*Alert {
am.mu.RLock()
defer am.mu.RUnlock()
result := make(map[string]*Alert)
for id, alert := range am.activeAlerts {
result[id] = alert
}
return result
}
// ResolveAlert resolves an alert
func (am *AlertManager) ResolveAlert(alertID string) {
am.mu.Lock()
defer am.mu.Unlock()
if alert, exists := am.activeAlerts[alertID]; exists {
now := time.Now()
alert.Status = AlertStatusResolved
alert.EndsAt = &now
alert.UpdatedAt = now
}
delete(am.activeAlerts, alertID)
}
// Mock Notifier implementations
// EmailNotifier sends alerts via email
type EmailNotifier struct {
SMTPHost string
SMTPPort int
Username string
Password string
From string
To []string
}
func (n *EmailNotifier) Send(ctx context.Context, alert *Alert) error {
log.Printf("Sending email alert: %s", alert.Message)
// In a real implementation, this would send an actual email
return nil
}
func (n *EmailNotifier) Type() string {
return "email"
}
// SlackNotifier sends alerts to Slack
type SlackNotifier struct {
WebhookURL string
Channel string
}
func (n *SlackNotifier) Send(ctx context.Context, alert *Alert) error {
log.Printf("Sending Slack alert: %s", alert.Message)
// In a real implementation, this would send to Slack webhook
return nil
}
func (n *SlackNotifier) Type() string {
return "slack"
}
// WebhookNotifier sends alerts via webhook
type WebhookNotifier struct {
URL string
}
func (n *WebhookNotifier) Send(ctx context.Context, alert *Alert) error {
log.Printf("Sending webhook alert: %s", alert.Message)
// In a real implementation, this would send HTTP request
return nil
}
func (n *WebhookNotifier) Type() string {
return "webhook"
}