mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-08-11 17:50:58 +00:00
fix: deep evaluate matrix strategy (#964)
* fix: deep evaluate matrix strategy * Try to make linter happy. * Apply PR feedback, fix insert directive more tests * Fix: logic error Co-authored-by: Casey Lee <cplee@nektos.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
d20651be76
commit
5e65159a3d
8 changed files with 236 additions and 44 deletions
|
@ -115,59 +115,71 @@ func New(runnerConfig *Config) (Runner, error) {
|
|||
func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
|
||||
maxJobNameLen := 0
|
||||
|
||||
pipeline := make([]common.Executor, 0)
|
||||
for s, stage := range plan.Stages {
|
||||
stageExecutor := make([]common.Executor, 0)
|
||||
for r, run := range stage.Runs {
|
||||
job := run.Job()
|
||||
matrixes := job.GetMatrixes()
|
||||
maxParallel := 4
|
||||
if job.Strategy != nil {
|
||||
maxParallel = job.Strategy.MaxParallel
|
||||
}
|
||||
|
||||
if len(matrixes) < maxParallel {
|
||||
maxParallel = len(matrixes)
|
||||
}
|
||||
|
||||
b := 0
|
||||
for i, matrix := range matrixes {
|
||||
rc := runner.newRunContext(run, matrix)
|
||||
rc.JobName = rc.Name
|
||||
if len(matrixes) > 1 {
|
||||
rc.Name = fmt.Sprintf("%s-%d", rc.Name, i+1)
|
||||
stagePipeline := make([]common.Executor, 0)
|
||||
for i := range plan.Stages {
|
||||
s := i
|
||||
stage := plan.Stages[i]
|
||||
stagePipeline = append(stagePipeline, func(ctx context.Context) error {
|
||||
pipeline := make([]common.Executor, 0)
|
||||
stageExecutor := make([]common.Executor, 0)
|
||||
for r, run := range stage.Runs {
|
||||
job := run.Job()
|
||||
if job.Strategy != nil {
|
||||
strategyRc := runner.newRunContext(run, nil)
|
||||
if err := strategyRc.NewExpressionEvaluator().EvaluateYamlNode(&job.Strategy.RawMatrix); err != nil {
|
||||
log.Errorf("Error while evaluating matrix: %v", err)
|
||||
}
|
||||
}
|
||||
if len(rc.String()) > maxJobNameLen {
|
||||
maxJobNameLen = len(rc.String())
|
||||
matrixes := job.GetMatrixes()
|
||||
maxParallel := 4
|
||||
if job.Strategy != nil {
|
||||
maxParallel = job.Strategy.MaxParallel
|
||||
}
|
||||
stageExecutor = append(stageExecutor, func(ctx context.Context) error {
|
||||
jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String())
|
||||
return rc.Executor().Finally(func(ctx context.Context) error {
|
||||
isLastRunningContainer := func(currentStage int, currentRun int) bool {
|
||||
return currentStage == len(plan.Stages)-1 && currentRun == len(stage.Runs)-1
|
||||
}
|
||||
|
||||
if runner.config.AutoRemove && isLastRunningContainer(s, r) {
|
||||
log.Infof("Cleaning up container for job %s", rc.JobName)
|
||||
if err := rc.stopJobContainer()(ctx); err != nil {
|
||||
log.Errorf("Error while cleaning container: %v", err)
|
||||
if len(matrixes) < maxParallel {
|
||||
maxParallel = len(matrixes)
|
||||
}
|
||||
|
||||
b := 0
|
||||
for i, matrix := range matrixes {
|
||||
rc := runner.newRunContext(run, matrix)
|
||||
rc.JobName = rc.Name
|
||||
if len(matrixes) > 1 {
|
||||
rc.Name = fmt.Sprintf("%s-%d", rc.Name, i+1)
|
||||
}
|
||||
if len(rc.String()) > maxJobNameLen {
|
||||
maxJobNameLen = len(rc.String())
|
||||
}
|
||||
stageExecutor = append(stageExecutor, func(ctx context.Context) error {
|
||||
jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String())
|
||||
return rc.Executor().Finally(func(ctx context.Context) error {
|
||||
isLastRunningContainer := func(currentStage int, currentRun int) bool {
|
||||
return currentStage == len(plan.Stages)-1 && currentRun == len(stage.Runs)-1
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets)))
|
||||
})
|
||||
b++
|
||||
if b == maxParallel {
|
||||
pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...))
|
||||
stageExecutor = make([]common.Executor, 0)
|
||||
b = 0
|
||||
if runner.config.AutoRemove && isLastRunningContainer(s, r) {
|
||||
log.Infof("Cleaning up container for job %s", rc.JobName)
|
||||
if err := rc.stopJobContainer()(ctx); err != nil {
|
||||
log.Errorf("Error while cleaning container: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets)))
|
||||
})
|
||||
b++
|
||||
if b == maxParallel {
|
||||
pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...))
|
||||
stageExecutor = make([]common.Executor, 0)
|
||||
b = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return common.NewPipelineExecutor(pipeline...)(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
return common.NewPipelineExecutor(pipeline...).Then(handleFailure(plan))
|
||||
return common.NewPipelineExecutor(stagePipeline...).Then(handleFailure(plan))
|
||||
}
|
||||
|
||||
func handleFailure(plan *model.Plan) common.Executor {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue