Files
SEEN/backend/DRAGONFLY_INTEGRATION.md
T
2026-04-10 12:06:24 +02:00

512 lines
14 KiB
Markdown

# Dragonfly DB Integration
## Overview
SEEN uses **Dragonfly DB** as a high-performance, Redis-compatible in-memory cache. Dragonfly provides superior performance and memory efficiency compared to traditional Redis, making it ideal for caching session data, catalog queries, and real-time download progress.
## Why Dragonfly?
- **25x faster** than Redis on multi-core systems
- **Lower memory footprint** (up to 30% less memory usage)
- **100% Redis API compatible** - drop-in replacement
- **Multi-threaded architecture** - better CPU utilization
- **Built-in snapshot support** for persistence
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Cache Manager │
│ ┌──────────────┬──────────────┬──────────────────┐ │
│ │ Session │ Catalog │ Download │ │
│ │ Cache │ Cache │ Cache │ │
│ └──────────────┴──────────────┴──────────────────┘ │
│ │ │
│ Cache Service │
│ │ │
│ Dragonfly Client │
└─────────────────────────────────────────────────────────┘
Dragonfly DB
```
## Components
### 1. Cache Manager (`cache/manager.go`)
Central orchestrator for all caching operations:
```go
manager, err := cache.NewManager(ctx, cfg.Cache, log)
if err != nil {
log.Fatal("dragonfly startup failed", zap.Error(err))
}
defer manager.Close()
// Access specialized caches
sessionCache := manager.Session()
catalogCache := manager.Catalog()
downloadCache := manager.Download()
```
**Features:**
- Automatic connection management
- Background cleanup tasks
- Health checks
- Cache statistics
- User data invalidation
### 2. Cache Service (`cache/service.go`)
Low-level cache operations:
```go
service := manager.Service()
// Store data
err := service.Set(ctx, "key", data, 5*time.Minute)
// Retrieve data
var result MyType
err := service.Get(ctx, "key", &result)
// Delete data
err := service.Delete(ctx, "key1", "key2")
// Atomic operations
count, err := service.Increment(ctx, "counter")
ok, err := service.SetNX(ctx, "lock:resource", "owner", 30*time.Second)
```
### 3. Session Cache (`cache/session_cache.go`)
Manages user sessions and authentication:
```go
sessionCache := manager.Session()
// Store session
session := cache.SessionData{
SessionID: "session-123",
UserID: "user-456",
RefreshToken: "token",
ExpiresAt: time.Now().Add(24 * time.Hour),
}
err := sessionCache.SetSession(ctx, session)
// Retrieve session
session, err := sessionCache.GetSession(ctx, "session-123")
// Cache user data
user := cache.UserData{
ID: "user-456",
Email: "user@example.com",
DisplayName: "User Name",
Role: "user",
}
err := sessionCache.SetUser(ctx, user)
// Rate limiting
allowed, err := sessionCache.RateLimitCheck(ctx, "user-456", "api-call", 100, time.Minute)
// Distributed locks
lockID, acquired, err := sessionCache.AcquireLock(ctx, "resource", 30*time.Second)
if acquired {
defer sessionCache.ReleaseLock(ctx, "resource", lockID)
// Do work
}
```
### 4. Catalog Cache (`cache/catalog_cache.go`)
Caches catalog queries and user lists:
```go
catalogCache := manager.Catalog()
// Cache dashboard
err := catalogCache.SetDashboard(ctx, userID, dashboardData)
err := catalogCache.GetDashboard(ctx, userID, &dashboardData)
// Cache discover sections
err := catalogCache.SetDiscover(ctx, "sci-fi", "movie", 1, sections)
err := catalogCache.GetDiscover(ctx, "sci-fi", "movie", 1, &sections)
// Cache search results
err := catalogCache.SetSearch(ctx, "neon", "", "all", results)
err := catalogCache.GetSearch(ctx, "neon", "", "all", &results)
// Cache watch later
err := catalogCache.SetWatchLater(ctx, userID, items)
err := catalogCache.GetWatchLater(ctx, userID, &items)
// Cache continue watching
err := catalogCache.SetContinueWatching(ctx, userID, items)
err := catalogCache.GetContinueWatching(ctx, userID, &items)
// Invalidate user catalog
err := catalogCache.InvalidateUserCatalog(ctx, userID)
```
### 5. Download Cache (`cache/download_cache.go`)
Real-time download progress tracking:
```go
downloadCache := manager.Download()
// Store download progress
progress := cache.DownloadProgress{
JobID: "job-123",
Status: "downloading",
ProgressPercent: 45,
BytesTotal: 1000000000,
BytesDownloaded: 450000000,
DownloadSpeedMbps: 15.3,
EtaSeconds: 120,
}
err := downloadCache.SetProgress(ctx, progress)
// Retrieve progress
progress, err := downloadCache.GetProgress(ctx, "job-123")
// Update specific fields
err := downloadCache.IncrementDownloadedBytes(ctx, "job-123", 1024*1024)
err := downloadCache.SetDownloadSpeed(ctx, "job-123", 20.5)
// Get active downloads
activeDownloads, err := downloadCache.GetActiveDownloads(ctx)
// Bulk operations
err := downloadCache.BulkSetProgress(ctx, []cache.DownloadProgress{...})
// Cleanup stale data
err := downloadCache.CleanupStaleProgress(ctx, 1*time.Hour)
```
### 6. Key Builder (`cache/keys.go`)
Consistent key naming:
```go
kb := cache.NewKeyBuilder("seen")
// Build keys
sessionKey := kb.SessionKey("session-123")
// Result: "seen:session:session-123"
userKey := kb.UserKey("user-456")
// Result: "seen:user:user-456"
dashboardKey := kb.CatalogDashboardKey("user-456")
// Result: "seen:catalog:dashboard:user-456"
downloadKey := kb.DownloadJobKey("job-123")
// Result: "seen:download:job:job-123"
```
## Cache TTLs
Default time-to-live values:
```go
const (
TTLSession = 24 * time.Hour // User sessions
TTLUser = 15 * time.Minute // User profile data
TTLCatalog = 5 * time.Minute // Catalog queries
TTLDownload = 30 * time.Second // Download progress
TTLRateLimit = 1 * time.Minute // Rate limit counters
TTLLock = 30 * time.Second // Distributed locks
TTLSearch = 10 * time.Minute // Search results
TTLRecommendation = 1 * time.Hour // Recommendations
)
```
## Configuration
Environment variables:
```bash
SEEN_CACHE_ADDR=dragonfly:6379
SEEN_CACHE_PASSWORD=
SEEN_CACHE_DB=0
```
Docker Compose:
```yaml
dragonfly:
image: docker.dragonflydb.io/dragonflydb/dragonfly:latest
container_name: seen-dragonfly
command: ["--logtostderr", "--proactor_threads=2"]
ports:
- '6379:6379'
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 10
```
## Usage Examples
### Example 1: Caching Dashboard Data
```go
func (h *CatalogHandler) Dashboard(c *gin.Context) {
userID := getUserID(c)
// Try cache first
var dashboard DashboardPayload
err := h.cacheManager.Catalog().GetDashboard(c.Request.Context(), userID, &dashboard)
if err == nil {
c.JSON(http.StatusOK, dashboard)
return
}
// Cache miss - fetch from database
dashboard = h.service.Dashboard()
// Store in cache
_ = h.cacheManager.Catalog().SetDashboard(c.Request.Context(), userID, dashboard)
c.JSON(http.StatusOK, dashboard)
}
```
### Example 2: Session Management
```go
func (s *AuthService) CreateSession(ctx context.Context, userID string) (*Session, error) {
session := &Session{
ID: uuid.New().String(),
UserID: userID,
RefreshToken: generateToken(),
ExpiresAt: time.Now().Add(24 * time.Hour),
}
// Store in database
if err := s.repo.CreateSession(ctx, session); err != nil {
return nil, err
}
// Cache session data
sessionData := cache.SessionData{
SessionID: session.ID,
UserID: session.UserID,
RefreshToken: session.RefreshToken,
ExpiresAt: session.ExpiresAt,
}
_ = s.cacheManager.Session().SetSession(ctx, sessionData)
return session, nil
}
```
### Example 3: Real-time Download Progress
```go
func (w *DownloadWorker) UpdateProgress(ctx context.Context, jobID string, downloaded int64) {
// Update progress in cache for real-time updates
err := w.cacheManager.Download().IncrementDownloadedBytes(ctx, jobID, downloaded)
if err != nil {
w.log.Error("failed to update download progress", zap.Error(err))
}
// Periodically persist to database
if downloaded%10485760 == 0 { // Every 10MB
progress, _ := w.cacheManager.Download().GetProgress(ctx, jobID)
_ = w.repo.UpdateJob(ctx, jobID, progress)
}
}
```
### Example 4: Rate Limiting
```go
func RateLimitMiddleware(cacheManager *cache.Manager) gin.HandlerFunc {
return func(c *gin.Context) {
userID := getUserID(c)
allowed, err := cacheManager.Session().RateLimitCheck(
c.Request.Context(),
userID,
"api-request",
100, // 100 requests
time.Minute, // per minute
)
if err != nil || !allowed {
c.JSON(http.StatusTooManyRequests, gin.H{
"error": "rate limit exceeded",
})
c.Abort()
return
}
c.Next()
}
}
```
### Example 5: Distributed Locking
```go
func (s *DownloadService) StartDownload(ctx context.Context, jobID string) error {
// Acquire lock to prevent duplicate processing
lockID, acquired, err := s.cacheManager.Session().AcquireLock(
ctx,
fmt.Sprintf("download:%s", jobID),
30*time.Second,
)
if err != nil {
return err
}
if !acquired {
return fmt.Errorf("download already in progress")
}
defer s.cacheManager.Session().ReleaseLock(ctx, fmt.Sprintf("download:%s", jobID), lockID)
// Process download
return s.processDownload(ctx, jobID)
}
```
## Background Tasks
The cache manager runs automatic cleanup tasks:
```go
// Runs every 5 minutes
func (m *Manager) runCleanupTasks(ctx context.Context) {
// Remove stale download progress (older than 1 hour)
m.download.CleanupStaleProgress(ctx, 1*time.Hour)
// Log cache statistics
stats, _ := m.Stats(ctx)
m.log.Debug("cache stats", zap.Any("stats", stats))
}
```
## Health Checks
```go
// Basic connectivity check
err := cacheManager.Ping(ctx)
// Comprehensive health check (read/write test)
err := cacheManager.HealthCheck(ctx)
// Get cache statistics
stats, err := cacheManager.Stats(ctx)
// Returns: dbSize, memory usage, keyspace info
```
## Cache Invalidation
```go
// Invalidate all user data
err := cacheManager.InvalidateUser(ctx, userID)
// Invalidate specific caches
err := cacheManager.Catalog().InvalidateUserCatalog(ctx, userID)
err := cacheManager.Download().InvalidateUserDownloads(ctx, userID)
err := cacheManager.Session().InvalidateUserSessions(ctx, userID)
// Delete by pattern
err := cacheManager.DeleteKeysByPattern(ctx, "seen:catalog:*")
```
## Performance Tips
1. **Use appropriate TTLs** - Balance freshness vs. cache hit rate
2. **Batch operations** - Use `MSet` and `MGet` for multiple keys
3. **Avoid scanning** - Use specific keys instead of pattern matching
4. **Monitor cache hit rate** - Adjust TTLs based on metrics
5. **Use compression** - For large objects, compress before caching
## Monitoring
```bash
# Connect to Dragonfly CLI
docker exec -it seen-dragonfly redis-cli
# Check cache size
DBSIZE
# View memory usage
INFO memory
# List all keys (development only!)
KEYS seen:*
# Monitor commands in real-time
MONITOR
# Get cache statistics
INFO stats
```
## Best Practices
1. **Always set TTLs** - Prevent memory leaks
2. **Handle cache misses gracefully** - Always have a fallback
3. **Use namespaces** - Organize keys with prefixes
4. **Invalidate on updates** - Keep cache consistent
5. **Monitor performance** - Track hit rates and latency
6. **Use atomic operations** - For counters and locks
7. **Compress large values** - Reduce memory usage
8. **Batch operations** - Reduce network round trips
## Troubleshooting
### Cache not connecting
```bash
# Check Dragonfly is running
docker ps | grep dragonfly
# Check logs
docker logs seen-dragonfly
# Test connection
docker exec -it seen-dragonfly redis-cli ping
```
### High memory usage
```bash
# Check memory stats
docker exec -it seen-dragonfly redis-cli INFO memory
# Find large keys
docker exec -it seen-dragonfly redis-cli --bigkeys
# Clear cache (development only!)
docker exec -it seen-dragonfly redis-cli FLUSHDB
```
### Slow performance
- Check network latency
- Monitor CPU usage on Dragonfly container
- Review TTL settings
- Check for large values
- Consider increasing `proactor_threads`
## Summary
The Dragonfly integration provides:
- ✅ High-performance caching (25x faster than Redis)
- ✅ Session management with automatic expiry
- ✅ Catalog query caching
- ✅ Real-time download progress tracking
- ✅ Rate limiting
- ✅ Distributed locking
- ✅ Automatic cleanup tasks
- ✅ Health monitoring
- ✅ Comprehensive API
All cache operations are production-ready and fully integrated into the SEEN backend!