mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-08-11 17:50:58 +00:00
feat: run jobs in parallel (#1003)
* feat: run jobs in parallel This changes fixes and restructures the parallel execution of jobs. The previous changes limiting the parallel execution did break this and allowed only one job in parallel. While we run #CPU jobs in parallel now, the jobs added per job-matrix add to this. So we might over-commit to the capacity, but at least it is limited. * fix: correctly build job pipeline The job pipeline should just append all required pipeline steps. The parallelism will be handled by the ParallelExecutor and we shouldn't handle it during building the pipelines. Also this adds a test, that the ParallelExecutor does run a limited amount of parallel goroutines. * test: correct test implementation Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
3f59bd2bae
commit
0cd9ec8998
3 changed files with 35 additions and 31 deletions
|
@ -121,8 +121,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
|
|||
stage := plan.Stages[i]
|
||||
stagePipeline = append(stagePipeline, func(ctx context.Context) error {
|
||||
pipeline := make([]common.Executor, 0)
|
||||
stageExecutor := make([]common.Executor, 0)
|
||||
for r, run := range stage.Runs {
|
||||
stageExecutor := make([]common.Executor, 0)
|
||||
job := run.Job()
|
||||
if job.Strategy != nil {
|
||||
strategyRc := runner.newRunContext(run, nil)
|
||||
|
@ -140,7 +140,6 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
|
|||
maxParallel = len(matrixes)
|
||||
}
|
||||
|
||||
b := 0
|
||||
for i, matrix := range matrixes {
|
||||
rc := runner.newRunContext(run, matrix)
|
||||
rc.JobName = rc.Name
|
||||
|
@ -167,15 +166,10 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
|
|||
return nil
|
||||
})(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets)))
|
||||
})
|
||||
b++
|
||||
if b == maxParallel {
|
||||
pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...))
|
||||
stageExecutor = make([]common.Executor, 0)
|
||||
b = 0
|
||||
}
|
||||
}
|
||||
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
|
||||
}
|
||||
return common.NewPipelineExecutor(pipeline...)(ctx)
|
||||
return common.NewParallelExecutor(runtime.NumCPU(), pipeline...)(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue