From 0cd9ec8998f9585e6c09dada45d93e801bc0402c Mon Sep 17 00:00:00 2001 From: Markus Wolf Date: Fri, 25 Feb 2022 19:47:16 +0100 Subject: [PATCH] feat: run jobs in parallel (#1003) * feat: run jobs in parallel This changes fixes and restructures the parallel execution of jobs. The previous changes limiting the parallel execution did break this and allowed only one job in parallel. While we run #CPU jobs in parallel now, the jobs added per job-matrix add to this. So we might over-commit to the capacity, but at least it is limited. * fix: correctly build job pipeline The job pipeline should just append all required pipeline steps. The parallelism will be handled by the ParallelExecutor and we shouldn't handle it during building the pipelines. Also this adds a test, that the ParallelExecutor does run a limited amount of parallel goroutines. * test: correct test implementation Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- act/common/executor.go | 34 ++++++++++++++++------------------ act/common/executor_test.go | 20 ++++++++++++++++---- act/runner/runner.go | 12 +++--------- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/act/common/executor.go b/act/common/executor.go index 5e9b28dd..fb76d0b4 100644 --- a/act/common/executor.go +++ b/act/common/executor.go @@ -91,27 +91,33 @@ func NewErrorExecutor(err error) Executor { } // NewParallelExecutor creates a new executor from a parallel of other executors -func NewParallelExecutor(executors ...Executor) Executor { +func NewParallelExecutor(parallel int, executors ...Executor) Executor { return func(ctx context.Context) error { - errChan := make(chan error) + work := make(chan Executor, len(executors)) + errs := make(chan error, len(executors)) - for _, executor := range executors { - e := executor - go func() { - err := e.ChannelError(errChan)(ctx) - if err != nil { - log.Fatal(err) + for i := 0; i < parallel; i++ { + go func(work <-chan Executor, errs chan<- error) { + for executor := range work { + errs <- executor(ctx) } - }() + }(work, errs) } + for i := 0; i < len(executors); i++ { + work <- executors[i] + } + close(work) + // Executor waits all executors to cleanup these resources. var firstErr error for i := 0; i < len(executors); i++ { - if err := <-errChan; err != nil && firstErr == nil { + err := <-errs + if firstErr == nil { firstErr = err } } + if err := ctx.Err(); err != nil { return err } @@ -119,14 +125,6 @@ func NewParallelExecutor(executors ...Executor) Executor { } } -// ChannelError sends error to errChan rather than returning error -func (e Executor) ChannelError(errChan chan error) Executor { - return func(ctx context.Context) error { - errChan <- e(ctx) - return nil - } -} - // Then runs another executor if this executor succeeds func (e Executor) Then(then Executor) Executor { return func(ctx context.Context) error { diff --git a/act/common/executor_test.go b/act/common/executor_test.go index 17df3b7f..7f691e42 100644 --- a/act/common/executor_test.go +++ b/act/common/executor_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -79,14 +80,25 @@ func TestNewParallelExecutor(t *testing.T) { ctx := context.Background() count := 0 + activeCount := 0 + maxCount := 0 emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { count++ + + activeCount++ + if activeCount > maxCount { + maxCount = activeCount + } + time.Sleep(2 * time.Second) + activeCount-- + return nil }) - err := NewParallelExecutor(emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(2, count) + err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) + assert.Equal(3, count, "should run all 3 executors") + assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Nil(err) } @@ -101,7 +113,7 @@ func TestNewParallelExecutorFailed(t *testing.T) { count++ return fmt.Errorf("fake error") }) - err := NewParallelExecutor(errorWorkflow)(ctx) + err := NewParallelExecutor(1, errorWorkflow)(ctx) assert.Equal(1, count) assert.ErrorIs(context.Canceled, err) } @@ -123,7 +135,7 @@ func TestNewParallelExecutorCanceled(t *testing.T) { count++ return errExpected }) - err := NewParallelExecutor(errorWorkflow, successWorkflow, successWorkflow)(ctx) + err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx) assert.Equal(3, count) assert.Error(errExpected, err) } diff --git a/act/runner/runner.go b/act/runner/runner.go index 7b269226..d7020dd1 100644 --- a/act/runner/runner.go +++ b/act/runner/runner.go @@ -121,8 +121,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { stage := plan.Stages[i] stagePipeline = append(stagePipeline, func(ctx context.Context) error { pipeline := make([]common.Executor, 0) - stageExecutor := make([]common.Executor, 0) for r, run := range stage.Runs { + stageExecutor := make([]common.Executor, 0) job := run.Job() if job.Strategy != nil { strategyRc := runner.newRunContext(run, nil) @@ -140,7 +140,6 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxParallel = len(matrixes) } - b := 0 for i, matrix := range matrixes { rc := runner.newRunContext(run, matrix) rc.JobName = rc.Name @@ -167,15 +166,10 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { return nil })(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets))) }) - b++ - if b == maxParallel { - pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...)) - stageExecutor = make([]common.Executor, 0) - b = 0 - } } + pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) } - return common.NewPipelineExecutor(pipeline...)(ctx) + return common.NewParallelExecutor(runtime.NumCPU(), pipeline...)(ctx) }) }