mirror of
https://github.com/Dvorinka/Containr.git
synced 2026-06-03 20:12:58 +00:00
737 lines
20 KiB
Go
737 lines
20 KiB
Go
package ha
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"containr/internal/deployment"
|
|
"containr/internal/metrics"
|
|
)
|
|
|
|
// HighAvailabilityManager manages high availability features
|
|
type HighAvailabilityManager struct {
|
|
scheduler *deployment.Scheduler
|
|
metricsCollector *metrics.MetricsCollector
|
|
failoverManager *FailoverManager
|
|
healthChecker *HealthChecker
|
|
alertManager *AlertManager
|
|
mu sync.RWMutex
|
|
enabled bool
|
|
checkInterval time.Duration
|
|
failoverThreshold int
|
|
}
|
|
|
|
// FailoverManager handles service failover operations
|
|
type FailoverManager struct {
|
|
scheduler *deployment.Scheduler
|
|
failoverPolicies map[string]*FailoverPolicy
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// FailoverPolicy defines failover behavior for a service
|
|
type FailoverPolicy struct {
|
|
ServiceID string `json:"service_id"`
|
|
Enabled bool `json:"enabled"`
|
|
MinHealthyNodes int `json:"min_healthy_nodes"`
|
|
MaxFailures int `json:"max_failures"`
|
|
FailoverTimeout time.Duration `json:"failover_timeout"`
|
|
RecoveryTimeout time.Duration `json:"recovery_timeout"`
|
|
FailoverStrategy FailoverStrategy `json:"failover_strategy"`
|
|
BackupNodes []string `json:"backup_nodes"`
|
|
HealthCheckConfig *HealthCheckConfig `json:"health_check_config"`
|
|
}
|
|
|
|
// FailoverStrategy defines how failover is performed
|
|
type FailoverStrategy string
|
|
|
|
const (
|
|
FailoverStrategyActivePassive FailoverStrategy = "active_passive"
|
|
FailoverStrategyActiveActive FailoverStrategy = "active_active"
|
|
FailoverStrategyGraceful FailoverStrategy = "graceful"
|
|
)
|
|
|
|
// HealthCheckConfig defines health check parameters
|
|
type HealthCheckConfig struct {
|
|
Interval time.Duration `json:"interval"`
|
|
Timeout time.Duration `json:"timeout"`
|
|
UnhealthyThreshold int `json:"unhealthy_threshold"`
|
|
HealthyThreshold int `json:"healthy_threshold"`
|
|
Path string `json:"path"`
|
|
Port int `json:"port"`
|
|
Protocol string `json:"protocol"`
|
|
}
|
|
|
|
// HealthChecker performs health checks on services and nodes
|
|
type HealthChecker struct {
|
|
scheduler *deployment.Scheduler
|
|
checks map[string]*HealthCheck
|
|
results map[string]*HealthCheckResult
|
|
mu sync.RWMutex
|
|
checkInterval time.Duration
|
|
}
|
|
|
|
// HealthCheck represents a health check configuration
|
|
type HealthCheck struct {
|
|
ID string `json:"id"`
|
|
ServiceID string `json:"service_id"`
|
|
NodeID string `json:"node_id"`
|
|
Type HealthCheckType `json:"type"`
|
|
Config HealthCheckConfig `json:"config"`
|
|
LastCheck time.Time `json:"last_check"`
|
|
Status HealthStatus `json:"status"`
|
|
}
|
|
|
|
// HealthCheckType represents the type of health check
|
|
type HealthCheckType string
|
|
|
|
const (
|
|
HealthCheckTypeHTTP HealthCheckType = "http"
|
|
HealthCheckTypeTCP HealthCheckType = "tcp"
|
|
HealthCheckTypeCommand HealthCheckType = "command"
|
|
)
|
|
|
|
// HealthStatus represents the health status
|
|
type HealthStatus string
|
|
|
|
const (
|
|
HealthStatusHealthy HealthStatus = "healthy"
|
|
HealthStatusUnhealthy HealthStatus = "unhealthy"
|
|
HealthStatusUnknown HealthStatus = "unknown"
|
|
)
|
|
|
|
// HealthCheckResult represents the result of a health check
|
|
type HealthCheckResult struct {
|
|
CheckID string `json:"check_id"`
|
|
Status HealthStatus `json:"status"`
|
|
Message string `json:"message"`
|
|
Latency time.Duration `json:"latency"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
ErrorCode string `json:"error_code,omitempty"`
|
|
}
|
|
|
|
// AlertManager handles alerting and notifications
|
|
type AlertManager struct {
|
|
rules map[string]*AlertRule
|
|
activeAlerts map[string]*Alert
|
|
notifiers map[string]Notifier
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// AlertRule defines when alerts should be triggered
|
|
type AlertRule struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
Enabled bool `json:"enabled"`
|
|
Condition AlertCondition `json:"condition"`
|
|
Severity AlertSeverity `json:"severity"`
|
|
Labels map[string]string `json:"labels"`
|
|
Annotations map[string]string `json:"annotations"`
|
|
Notifiers []string `json:"notifiers"`
|
|
Cooldown time.Duration `json:"cooldown"`
|
|
}
|
|
|
|
// AlertCondition defines the condition for triggering an alert
|
|
type AlertCondition struct {
|
|
Metric string `json:"metric"`
|
|
Operator string `json:"operator"` // >, <, >=, <=, ==, !=
|
|
Threshold float64 `json:"threshold"`
|
|
Duration time.Duration `json:"duration"`
|
|
}
|
|
|
|
// AlertSeverity represents the severity level of an alert
|
|
type AlertSeverity string
|
|
|
|
const (
|
|
AlertSeverityCritical AlertSeverity = "critical"
|
|
AlertSeverityWarning AlertSeverity = "warning"
|
|
AlertSeverityInfo AlertSeverity = "info"
|
|
)
|
|
|
|
// Alert represents an active alert
|
|
type Alert struct {
|
|
ID string `json:"id"`
|
|
RuleID string `json:"rule_id"`
|
|
Status AlertStatus `json:"status"`
|
|
Severity AlertSeverity `json:"severity"`
|
|
Message string `json:"message"`
|
|
Labels map[string]string `json:"labels"`
|
|
Annotations map[string]string `json:"annotations"`
|
|
StartsAt time.Time `json:"starts_at"`
|
|
EndsAt *time.Time `json:"ends_at,omitempty"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
// AlertStatus represents the status of an alert
|
|
type AlertStatus string
|
|
|
|
const (
|
|
AlertStatusFiring AlertStatus = "firing"
|
|
AlertStatusResolved AlertStatus = "resolved"
|
|
)
|
|
|
|
// Notifier sends alert notifications
|
|
type Notifier interface {
|
|
Send(ctx context.Context, alert *Alert) error
|
|
Type() string
|
|
}
|
|
|
|
// NewHighAvailabilityManager creates a new HA manager
|
|
func NewHighAvailabilityManager(scheduler *deployment.Scheduler, metricsCollector *metrics.MetricsCollector) *HighAvailabilityManager {
|
|
failoverManager := &FailoverManager{
|
|
scheduler: scheduler,
|
|
failoverPolicies: make(map[string]*FailoverPolicy),
|
|
}
|
|
|
|
healthChecker := &HealthChecker{
|
|
scheduler: scheduler,
|
|
checks: make(map[string]*HealthCheck),
|
|
results: make(map[string]*HealthCheckResult),
|
|
checkInterval: 30 * time.Second,
|
|
}
|
|
|
|
alertManager := &AlertManager{
|
|
rules: make(map[string]*AlertRule),
|
|
activeAlerts: make(map[string]*Alert),
|
|
notifiers: make(map[string]Notifier),
|
|
}
|
|
|
|
return &HighAvailabilityManager{
|
|
scheduler: scheduler,
|
|
metricsCollector: metricsCollector,
|
|
failoverManager: failoverManager,
|
|
healthChecker: healthChecker,
|
|
alertManager: alertManager,
|
|
enabled: true,
|
|
checkInterval: 30 * time.Second,
|
|
failoverThreshold: 3,
|
|
}
|
|
}
|
|
|
|
// Start starts the HA management process
|
|
func (ha *HighAvailabilityManager) Start(ctx context.Context) error {
|
|
ticker := time.NewTicker(ha.checkInterval)
|
|
defer ticker.Stop()
|
|
|
|
log.Printf("HighAvailabilityManager started with check interval: %v", ha.checkInterval)
|
|
|
|
// Start health checker
|
|
go ha.healthChecker.Start(ctx)
|
|
|
|
// Start alert manager
|
|
go ha.alertManager.Start(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
if ha.enabled {
|
|
if err := ha.checkHighAvailability(ctx); err != nil {
|
|
log.Printf("Error during HA check: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkHighAvailability performs HA checks and takes action if needed
|
|
func (ha *HighAvailabilityManager) checkHighAvailability(ctx context.Context) error {
|
|
// Check node health
|
|
nodes := ha.scheduler.GetNodes()
|
|
unhealthyNodes := 0
|
|
|
|
for _, node := range nodes {
|
|
if !ha.isNodeHealthy(node) {
|
|
unhealthyNodes++
|
|
log.Printf("Node %s is unhealthy", node.ID)
|
|
}
|
|
}
|
|
|
|
// Trigger failover if too many nodes are unhealthy
|
|
if unhealthyNodes >= ha.failoverThreshold {
|
|
log.Printf("Failover threshold reached: %d unhealthy nodes", unhealthyNodes)
|
|
if err := ha.failoverManager.TriggerFailover(ctx, "node_failure"); err != nil {
|
|
return fmt.Errorf("failed to trigger failover: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isNodeHealthy checks if a node is healthy
|
|
func (ha *HighAvailabilityManager) isNodeHealthy(node *deployment.Node) bool {
|
|
// Check if node is ready
|
|
if node.Status != "ready" {
|
|
return false
|
|
}
|
|
|
|
// Check heartbeat
|
|
if time.Since(node.LastHeartbeat) > 2*time.Minute {
|
|
return false
|
|
}
|
|
|
|
// Check resource usage
|
|
if node.Usage.CPU > 95 || node.Usage.Memory > int64(float64(node.Capacity.Memory)*0.95) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// SetFailoverPolicy sets or updates a failover policy
|
|
func (ha *HighAvailabilityManager) SetFailoverPolicy(policy *FailoverPolicy) error {
|
|
ha.mu.Lock()
|
|
defer ha.mu.Unlock()
|
|
|
|
ha.failoverManager.SetFailoverPolicy(policy)
|
|
return nil
|
|
}
|
|
|
|
// GetFailoverPolicy returns a failover policy
|
|
func (ha *HighAvailabilityManager) GetFailoverPolicy(serviceID string) (*FailoverPolicy, error) {
|
|
ha.mu.RLock()
|
|
defer ha.mu.RUnlock()
|
|
|
|
return ha.failoverManager.GetFailoverPolicy(serviceID)
|
|
}
|
|
|
|
// TriggerFailover manually triggers a failover
|
|
func (ha *HighAvailabilityManager) TriggerFailover(ctx context.Context, reason string) error {
|
|
return ha.failoverManager.TriggerFailover(ctx, reason)
|
|
}
|
|
|
|
// GetHealthStatus returns the health status of all services and nodes
|
|
func (ha *HighAvailabilityManager) GetHealthStatus() map[string]interface{} {
|
|
ha.mu.RLock()
|
|
defer ha.mu.RUnlock()
|
|
|
|
nodes := ha.scheduler.GetNodes()
|
|
healthyNodes := 0
|
|
unhealthyNodes := 0
|
|
|
|
for _, node := range nodes {
|
|
if ha.isNodeHealthy(node) {
|
|
healthyNodes++
|
|
} else {
|
|
unhealthyNodes++
|
|
}
|
|
}
|
|
|
|
healthChecks := ha.healthChecker.GetAllHealthChecks()
|
|
healthyChecks := 0
|
|
unhealthyChecks := 0
|
|
|
|
for _, result := range ha.healthChecker.GetAllResults() {
|
|
if result.Status == HealthStatusHealthy {
|
|
healthyChecks++
|
|
} else {
|
|
unhealthyChecks++
|
|
}
|
|
}
|
|
|
|
activeAlerts := ha.alertManager.GetActiveAlerts()
|
|
|
|
return map[string]interface{}{
|
|
"nodes": map[string]interface{}{
|
|
"total": len(nodes),
|
|
"healthy": healthyNodes,
|
|
"unhealthy": unhealthyNodes,
|
|
},
|
|
"health_checks": map[string]interface{}{
|
|
"total": len(healthChecks),
|
|
"healthy": healthyChecks,
|
|
"unhealthy": unhealthyChecks,
|
|
},
|
|
"alerts": map[string]interface{}{
|
|
"active": len(activeAlerts),
|
|
},
|
|
"enabled": ha.enabled,
|
|
}
|
|
}
|
|
|
|
// Enable enables the HA manager
|
|
func (ha *HighAvailabilityManager) Enable() {
|
|
ha.mu.Lock()
|
|
defer ha.mu.Unlock()
|
|
ha.enabled = true
|
|
}
|
|
|
|
// Disable disables the HA manager
|
|
func (ha *HighAvailabilityManager) Disable() {
|
|
ha.mu.Lock()
|
|
defer ha.mu.Unlock()
|
|
ha.enabled = false
|
|
}
|
|
|
|
// IsEnabled returns whether the HA manager is enabled
|
|
func (ha *HighAvailabilityManager) IsEnabled() bool {
|
|
ha.mu.RLock()
|
|
defer ha.mu.RUnlock()
|
|
return ha.enabled
|
|
}
|
|
|
|
// FailoverManager methods
|
|
|
|
// SetFailoverPolicy sets a failover policy
|
|
func (fm *FailoverManager) SetFailoverPolicy(policy *FailoverPolicy) {
|
|
fm.mu.Lock()
|
|
defer fm.mu.Unlock()
|
|
fm.failoverPolicies[policy.ServiceID] = policy
|
|
}
|
|
|
|
// GetFailoverPolicy returns a failover policy
|
|
func (fm *FailoverManager) GetFailoverPolicy(serviceID string) (*FailoverPolicy, error) {
|
|
fm.mu.RLock()
|
|
defer fm.mu.RUnlock()
|
|
|
|
policy, exists := fm.failoverPolicies[serviceID]
|
|
if !exists {
|
|
return nil, fmt.Errorf("no failover policy found for service: %s", serviceID)
|
|
}
|
|
|
|
return policy, nil
|
|
}
|
|
|
|
// TriggerFailover triggers a failover for affected services
|
|
func (fm *FailoverManager) TriggerFailover(ctx context.Context, reason string) error {
|
|
fm.mu.RLock()
|
|
policies := make([]*FailoverPolicy, 0, len(fm.failoverPolicies))
|
|
for _, policy := range fm.failoverPolicies {
|
|
if policy.Enabled {
|
|
policies = append(policies, policy)
|
|
}
|
|
}
|
|
fm.mu.RUnlock()
|
|
|
|
for _, policy := range policies {
|
|
if err := fm.performFailover(ctx, policy, reason); err != nil {
|
|
log.Printf("Failed to perform failover for service %s: %v", policy.ServiceID, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// performFailover performs failover for a specific service
|
|
func (fm *FailoverManager) performFailover(ctx context.Context, policy *FailoverPolicy, reason string) error {
|
|
log.Printf("Performing failover for service %s: %s", policy.ServiceID, reason)
|
|
|
|
// In a real implementation, this would:
|
|
// 1. Identify healthy backup nodes
|
|
// 2. Start new instances on backup nodes
|
|
// 3. Update DNS/load balancer to point to new instances
|
|
// 4. Wait for health checks to pass
|
|
// 5. Shut down unhealthy instances
|
|
|
|
// For now, we'll just log the action
|
|
log.Printf("Failover completed for service %s", policy.ServiceID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// HealthChecker methods
|
|
|
|
// Start starts the health checker
|
|
func (hc *HealthChecker) Start(ctx context.Context) error {
|
|
ticker := time.NewTicker(hc.checkInterval)
|
|
defer ticker.Stop()
|
|
|
|
log.Printf("HealthChecker started with check interval: %v", hc.checkInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
if err := hc.performHealthChecks(ctx); err != nil {
|
|
log.Printf("Error during health checks: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// performHealthChecks performs all configured health checks
|
|
func (hc *HealthChecker) performHealthChecks(ctx context.Context) error {
|
|
hc.mu.RLock()
|
|
checks := make([]*HealthCheck, 0, len(hc.checks))
|
|
for _, check := range hc.checks {
|
|
checks = append(checks, check)
|
|
}
|
|
hc.mu.RUnlock()
|
|
|
|
for _, check := range checks {
|
|
result := hc.performHealthCheck(ctx, check)
|
|
hc.mu.Lock()
|
|
hc.results[check.ID] = result
|
|
hc.mu.Unlock()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// performHealthCheck performs a single health check
|
|
func (hc *HealthChecker) performHealthCheck(ctx context.Context, check *HealthCheck) *HealthCheckResult {
|
|
start := time.Now()
|
|
result := &HealthCheckResult{
|
|
CheckID: check.ID,
|
|
Timestamp: start,
|
|
Status: HealthStatusUnknown,
|
|
}
|
|
|
|
// In a real implementation, this would perform actual health checks
|
|
// For now, we'll simulate the check
|
|
time.Sleep(10 * time.Millisecond) // Simulate network latency
|
|
|
|
// Simulate healthy/unhealthy based on some logic
|
|
if time.Now().Unix()%10 == 0 { // 10% chance of being unhealthy
|
|
result.Status = HealthStatusUnhealthy
|
|
result.Message = "Service not responding"
|
|
result.ErrorCode = "TIMEOUT"
|
|
} else {
|
|
result.Status = HealthStatusHealthy
|
|
result.Message = "Service is healthy"
|
|
}
|
|
|
|
result.Latency = time.Since(start)
|
|
return result
|
|
}
|
|
|
|
// AddHealthCheck adds a new health check
|
|
func (hc *HealthChecker) AddHealthCheck(check *HealthCheck) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
hc.checks[check.ID] = check
|
|
}
|
|
|
|
// RemoveHealthCheck removes a health check
|
|
func (hc *HealthChecker) RemoveHealthCheck(checkID string) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
delete(hc.checks, checkID)
|
|
delete(hc.results, checkID)
|
|
}
|
|
|
|
// GetAllHealthChecks returns all health checks
|
|
func (hc *HealthChecker) GetAllHealthChecks() map[string]*HealthCheck {
|
|
hc.mu.RLock()
|
|
defer hc.mu.RUnlock()
|
|
|
|
result := make(map[string]*HealthCheck)
|
|
for id, check := range hc.checks {
|
|
result[id] = check
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// GetAllResults returns all health check results
|
|
func (hc *HealthChecker) GetAllResults() map[string]*HealthCheckResult {
|
|
hc.mu.RLock()
|
|
defer hc.mu.RUnlock()
|
|
|
|
result := make(map[string]*HealthCheckResult)
|
|
for id, checkResult := range hc.results {
|
|
result[id] = checkResult
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// AlertManager methods
|
|
|
|
// Start starts the alert manager
|
|
func (am *AlertManager) Start(ctx context.Context) error {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
log.Printf("AlertManager started")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
if err := am.evaluateAlertRules(ctx); err != nil {
|
|
log.Printf("Error evaluating alert rules: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// evaluateAlertRules evaluates all alert rules and triggers alerts if needed
|
|
func (am *AlertManager) evaluateAlertRules(ctx context.Context) error {
|
|
am.mu.RLock()
|
|
rules := make([]*AlertRule, 0, len(am.rules))
|
|
for _, rule := range am.rules {
|
|
if rule.Enabled {
|
|
rules = append(rules, rule)
|
|
}
|
|
}
|
|
am.mu.RUnlock()
|
|
|
|
for _, rule := range rules {
|
|
if am.shouldTriggerAlert(rule) {
|
|
alert := am.createAlert(rule)
|
|
if err := am.triggerAlert(ctx, alert); err != nil {
|
|
log.Printf("Failed to trigger alert: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// shouldTriggerAlert checks if an alert should be triggered
|
|
func (am *AlertManager) shouldTriggerAlert(rule *AlertRule) bool {
|
|
// In a real implementation, this would query metrics and evaluate the condition
|
|
// For now, we'll simulate based on time
|
|
return time.Now().Unix()%20 == 0 // 5% chance of triggering
|
|
}
|
|
|
|
// createAlert creates an alert from a rule
|
|
func (am *AlertManager) createAlert(rule *AlertRule) *Alert {
|
|
return &Alert{
|
|
ID: fmt.Sprintf("alert_%s_%d", rule.ID, time.Now().Unix()),
|
|
RuleID: rule.ID,
|
|
Status: AlertStatusFiring,
|
|
Severity: rule.Severity,
|
|
Message: fmt.Sprintf("Alert triggered: %s", rule.Name),
|
|
Labels: rule.Labels,
|
|
Annotations: rule.Annotations,
|
|
StartsAt: time.Now(),
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
}
|
|
|
|
// triggerAlert triggers an alert
|
|
func (am *AlertManager) triggerAlert(ctx context.Context, alert *Alert) error {
|
|
am.mu.Lock()
|
|
am.activeAlerts[alert.ID] = alert
|
|
am.mu.Unlock()
|
|
|
|
// Send notifications
|
|
for _, notifierID := range am.getAlertRule(alert.RuleID).Notifiers {
|
|
if notifier, exists := am.notifiers[notifierID]; exists {
|
|
if err := notifier.Send(ctx, alert); err != nil {
|
|
log.Printf("Failed to send notification via %s: %v", notifierID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Printf("Alert triggered: %s", alert.ID)
|
|
return nil
|
|
}
|
|
|
|
// getAlertRule returns the rule for an alert
|
|
func (am *AlertManager) getAlertRule(ruleID string) *AlertRule {
|
|
am.mu.RLock()
|
|
defer am.mu.RUnlock()
|
|
return am.rules[ruleID]
|
|
}
|
|
|
|
// AddAlertRule adds a new alert rule
|
|
func (am *AlertManager) AddAlertRule(rule *AlertRule) {
|
|
am.mu.Lock()
|
|
defer am.mu.Unlock()
|
|
am.rules[rule.ID] = rule
|
|
}
|
|
|
|
// RemoveAlertRule removes an alert rule
|
|
func (am *AlertManager) RemoveAlertRule(ruleID string) {
|
|
am.mu.Lock()
|
|
defer am.mu.Unlock()
|
|
delete(am.rules, ruleID)
|
|
}
|
|
|
|
// AddNotifier adds a new notifier
|
|
func (am *AlertManager) AddNotifier(id string, notifier Notifier) {
|
|
am.mu.Lock()
|
|
defer am.mu.Unlock()
|
|
am.notifiers[id] = notifier
|
|
}
|
|
|
|
// GetActiveAlerts returns all active alerts
|
|
func (am *AlertManager) GetActiveAlerts() map[string]*Alert {
|
|
am.mu.RLock()
|
|
defer am.mu.RUnlock()
|
|
|
|
result := make(map[string]*Alert)
|
|
for id, alert := range am.activeAlerts {
|
|
result[id] = alert
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// ResolveAlert resolves an alert
|
|
func (am *AlertManager) ResolveAlert(alertID string) {
|
|
am.mu.Lock()
|
|
defer am.mu.Unlock()
|
|
|
|
if alert, exists := am.activeAlerts[alertID]; exists {
|
|
now := time.Now()
|
|
alert.Status = AlertStatusResolved
|
|
alert.EndsAt = &now
|
|
alert.UpdatedAt = now
|
|
}
|
|
|
|
delete(am.activeAlerts, alertID)
|
|
}
|
|
|
|
// Mock Notifier implementations
|
|
|
|
// EmailNotifier sends alerts via email
|
|
type EmailNotifier struct {
|
|
SMTPHost string
|
|
SMTPPort int
|
|
Username string
|
|
Password string
|
|
From string
|
|
To []string
|
|
}
|
|
|
|
func (n *EmailNotifier) Send(ctx context.Context, alert *Alert) error {
|
|
log.Printf("Sending email alert: %s", alert.Message)
|
|
// In a real implementation, this would send an actual email
|
|
return nil
|
|
}
|
|
|
|
func (n *EmailNotifier) Type() string {
|
|
return "email"
|
|
}
|
|
|
|
// SlackNotifier sends alerts to Slack
|
|
type SlackNotifier struct {
|
|
WebhookURL string
|
|
Channel string
|
|
}
|
|
|
|
func (n *SlackNotifier) Send(ctx context.Context, alert *Alert) error {
|
|
log.Printf("Sending Slack alert: %s", alert.Message)
|
|
// In a real implementation, this would send to Slack webhook
|
|
return nil
|
|
}
|
|
|
|
func (n *SlackNotifier) Type() string {
|
|
return "slack"
|
|
}
|
|
|
|
// WebhookNotifier sends alerts via webhook
|
|
type WebhookNotifier struct {
|
|
URL string
|
|
}
|
|
|
|
func (n *WebhookNotifier) Send(ctx context.Context, alert *Alert) error {
|
|
log.Printf("Sending webhook alert: %s", alert.Message)
|
|
// In a real implementation, this would send HTTP request
|
|
return nil
|
|
}
|
|
|
|
func (n *WebhookNotifier) Type() string {
|
|
return "webhook"
|
|
}
|