Files
Containr/internal/build/parallel.go
T
Tomas Dvorak 355a97bab4 overhaul
2026-04-14 18:04:48 +02:00

429 lines
10 KiB
Go

package build
import (
"context"
"fmt"
"sync"
"time"
"containr/internal/types"
)
// ParallelBuilder handles parallel build execution
type ParallelBuilder struct {
buildManager *BuildManager
cacheManager *CacheManager
maxWorkers int
}
// BuildJob represents a build job in the queue
type BuildJob struct {
ID string
Request *types.BuildRequest
Result chan *BuildResult
Priority int
CreatedAt time.Time
}
// BuildResult represents the result of a build job
type BuildResult struct {
Success bool
Response *types.BuildResponse
Error error
Duration time.Duration
CacheHit bool
BuildID string
}
// WorkerPool manages a pool of build workers
type WorkerPool struct {
jobs chan *BuildJob
workers []*Worker
wg sync.WaitGroup
quit chan bool
maxWorkers int
buildManager *BuildManager
cacheManager *CacheManager
}
// Worker represents a build worker
type Worker struct {
id int
jobChan chan *BuildJob
quit chan bool
buildManager *BuildManager
cacheManager *CacheManager
}
// NewParallelBuilder creates a new parallel build manager
func NewParallelBuilder(buildManager *BuildManager, cacheManager *CacheManager, maxWorkers int) *ParallelBuilder {
return &ParallelBuilder{
buildManager: buildManager,
cacheManager: cacheManager,
maxWorkers: maxWorkers,
}
}
// BuildWithCache attempts to build using cache first
func (pb *ParallelBuilder) BuildWithCache(ctx context.Context, req *types.BuildRequest) (*BuildResult, error) {
startTime := time.Now()
// Generate cache key
cacheKey, err := pb.cacheManager.GenerateCacheKey(ctx, req)
if err != nil {
return &BuildResult{
Success: false,
Error: fmt.Errorf("failed to generate cache key: %w", err),
Duration: time.Since(startTime),
CacheHit: false,
}, err
}
// Try to find cached result
if cachedEntry, err := pb.cacheManager.LookupCache(ctx, cacheKey); err == nil {
return &BuildResult{
Success: true,
Response: &types.BuildResponse{
ImageName: cachedEntry.ImageName,
ImageTag: req.ImageTag,
Size: cachedEntry.Size,
Digest: cachedEntry.Hash,
},
Duration: time.Since(startTime),
CacheHit: true,
}, nil
}
// No cache hit, proceed with build
response, err := pb.buildManager.Build(ctx, req)
duration := time.Since(startTime)
if err != nil {
return &BuildResult{
Success: false,
Error: err,
Duration: duration,
CacheHit: false,
}, err
}
// Store in cache for future builds
metadata := map[string]string{
"build_type": req.BuildType,
"runtime": cacheKey.Runtime,
"build_time": duration.String(),
}
if storeErr := pb.cacheManager.StoreCache(ctx, cacheKey, response.ImageName, metadata); storeErr != nil {
fmt.Printf("Warning: failed to store build cache: %v\n", storeErr)
}
return &BuildResult{
Success: true,
Response: response,
Duration: duration,
CacheHit: false,
}, nil
}
// NewWorkerPool creates a new worker pool for parallel builds
func NewWorkerPool(maxWorkers int, buildManager *BuildManager, cacheManager *CacheManager) *WorkerPool {
return &WorkerPool{
jobs: make(chan *BuildJob, 100), // Buffered channel
workers: make([]*Worker, maxWorkers),
quit: make(chan bool),
maxWorkers: maxWorkers,
buildManager: buildManager,
cacheManager: cacheManager,
}
}
// Start starts the worker pool
func (wp *WorkerPool) Start() {
for i := 0; i < wp.maxWorkers; i++ {
worker := &Worker{
id: i,
jobChan: wp.jobs,
quit: wp.quit,
buildManager: wp.buildManager,
cacheManager: wp.cacheManager,
}
wp.workers[i] = worker
wp.wg.Add(1)
go worker.start(&wp.wg)
}
}
// Stop stops the worker pool
func (wp *WorkerPool) Stop() {
close(wp.quit)
wp.wg.Wait()
}
// SubmitJob submits a build job to the worker pool
func (wp *WorkerPool) SubmitJob(ctx context.Context, job *BuildJob) error {
select {
case wp.jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// BuildParallel executes builds in parallel using the worker pool
func (pb *ParallelBuilder) BuildParallel(ctx context.Context, requests []*types.BuildRequest) ([]*BuildResult, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no build requests provided")
}
// Create worker pool
workerPool := NewWorkerPool(pb.maxWorkers, pb.buildManager, pb.cacheManager)
workerPool.Start()
defer workerPool.Stop()
// Create jobs
jobs := make([]*BuildJob, len(requests))
results := make([]*BuildResult, len(requests))
for i, req := range requests {
job := &BuildJob{
ID: fmt.Sprintf("build-%d-%d", time.Now().UnixNano(), i),
Request: req,
Result: make(chan *BuildResult, 1),
Priority: 0,
CreatedAt: time.Now(),
}
jobs[i] = job
// Submit job
if err := workerPool.SubmitJob(ctx, job); err != nil {
return nil, fmt.Errorf("failed to submit job %s: %w", job.ID, err)
}
}
// Collect results
var wg sync.WaitGroup
for i, job := range jobs {
wg.Add(1)
go func(index int, buildJob *BuildJob) {
defer wg.Done()
select {
case result := <-buildJob.Result:
results[index] = result
case <-ctx.Done():
results[index] = &BuildResult{
Success: false,
Error: ctx.Err(),
BuildID: buildJob.ID,
}
}
}(i, job)
}
wg.Wait()
return results, nil
}
// BuildBatch executes a batch of builds with optimized scheduling
func (pb *ParallelBuilder) BuildBatch(ctx context.Context, requests []*types.BuildRequest) (*BatchBuildResult, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no build requests provided")
}
startTime := time.Now()
// Group builds by cache key potential for optimization
cacheGroups := pb.groupByCachePotential(ctx, requests)
// Execute builds in optimized order
var allResults []*BuildResult
var totalCacheHits int
for _, group := range cacheGroups {
groupResults, err := pb.BuildParallel(ctx, group.Requests)
if err != nil {
return nil, err
}
allResults = append(allResults, groupResults...)
// Count cache hits
for _, result := range groupResults {
if result.CacheHit {
totalCacheHits++
}
}
}
// Map results back to original order
resultMap := make(map[string]*BuildResult)
for _, result := range allResults {
if result.Response != nil {
resultMap[result.Response.ImageName] = result
}
}
// Create final results array in original order
finalResults := make([]*BuildResult, len(requests))
for i, req := range requests {
imageName := fmt.Sprintf("%s:%s", req.ImageName, req.ImageTag)
if result, exists := resultMap[imageName]; exists {
finalResults[i] = result
} else {
finalResults[i] = &BuildResult{
Success: false,
Error: fmt.Errorf("build result not found for %s", imageName),
}
}
}
return &BatchBuildResult{
Results: finalResults,
TotalBuilds: len(requests),
Successful: pb.countSuccessful(finalResults),
Failed: pb.countFailed(finalResults),
CacheHits: totalCacheHits,
TotalDuration: time.Since(startTime),
AverageTime: time.Since(startTime) / time.Duration(len(requests)),
}, nil
}
// BatchBuildResult represents the result of a batch build
type BatchBuildResult struct {
Results []*BuildResult
TotalBuilds int
Successful int
Failed int
CacheHits int
TotalDuration time.Duration
AverageTime time.Duration
}
// CacheGroup represents a group of builds with similar cache potential
type CacheGroup struct {
Name string
Requests []*types.BuildRequest
Priority int
}
// groupByCachePotential groups builds by their cache optimization potential
func (pb *ParallelBuilder) groupByCachePotential(ctx context.Context, requests []*types.BuildRequest) []CacheGroup {
groups := make(map[string][]*types.BuildRequest)
for _, req := range requests {
cacheKey, err := pb.cacheManager.GenerateCacheKey(ctx, req)
if err != nil {
// Put in uncached group if we can't generate a key
groups["uncached"] = append(groups["uncached"], req)
continue
}
// Group by runtime and similar characteristics
groupKey := fmt.Sprintf("%s-%s", cacheKey.Runtime, pb.hashBuildArgs(cacheKey.BuildArgs))
groups[groupKey] = append(groups[groupKey], req)
}
// Convert to sorted groups with priorities
var sortedGroups []CacheGroup
for name, reqs := range groups {
priority := 0
if name == "uncached" {
priority = 999 // Lowest priority
} else if len(reqs) > 1 {
priority = 1 // High priority for groups with multiple builds
} else {
priority = 2 // Medium priority for single builds
}
sortedGroups = append(sortedGroups, CacheGroup{
Name: name,
Requests: reqs,
Priority: priority,
})
}
// Sort by priority (lower number = higher priority)
for i := 0; i < len(sortedGroups); i++ {
for j := i + 1; j < len(sortedGroups); j++ {
if sortedGroups[i].Priority > sortedGroups[j].Priority {
sortedGroups[i], sortedGroups[j] = sortedGroups[j], sortedGroups[i]
}
}
}
return sortedGroups
}
// hashBuildArgs creates a hash of build arguments for grouping
func (pb *ParallelBuilder) hashBuildArgs(args map[string]string) string {
if len(args) == 0 {
return "no-args"
}
// Simple hash for grouping - in production, use proper hashing
result := ""
for k, v := range args {
result += k + "=" + v + ";"
}
return result
}
func (pb *ParallelBuilder) countSuccessful(results []*BuildResult) int {
count := 0
for _, result := range results {
if result.Success {
count++
}
}
return count
}
func (pb *ParallelBuilder) countFailed(results []*BuildResult) int {
count := 0
for _, result := range results {
if !result.Success {
count++
}
}
return count
}
// Worker methods
func (w *Worker) start(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case job := <-w.jobChan:
// Process the job
result := w.processJob(job)
job.Result <- result
case <-w.quit:
return
}
}
}
func (w *Worker) processJob(job *BuildJob) *BuildResult {
ctx := context.Background()
// Use the parallel builder's cache-aware build method
pb := &ParallelBuilder{
buildManager: w.buildManager,
cacheManager: w.cacheManager,
}
result, err := pb.BuildWithCache(ctx, job.Request)
if err != nil {
return &BuildResult{
Success: false,
Error: err,
BuildID: job.ID,
}
}
result.BuildID = job.ID
return result
}