1
0
Fork 0
mirror of https://code.forgejo.org/forgejo/runner.git synced 2025-09-15 18:57:01 +00:00

Merge tag 'nektos/v0.2.49'

Conflicts:
	cmd/input.go
	go.mod
	go.sum
	pkg/exprparser/interpreter.go
	pkg/model/workflow.go
	pkg/runner/expression.go
	pkg/runner/job_executor.go
	pkg/runner/runner.go
This commit is contained in:
Jason Song 2023-08-02 11:52:14 +08:00
commit fc7b3c5c43
43 changed files with 465 additions and 196 deletions

View file

@ -5,13 +5,13 @@ import (
"encoding/json"
"fmt"
"os"
"runtime"
"time"
docker_container "github.com/docker/docker/api/types/container"
log "github.com/sirupsen/logrus"
docker_container "github.com/docker/docker/api/types/container"
"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/container"
"github.com/nektos/act/pkg/model"
)
@ -34,6 +34,7 @@ type Config struct {
ForceRebuild bool // force rebuilding local docker image action
LogOutput bool // log the output from docker run
JSONLogger bool // use json or text logger
LogPrefixJobID bool // switches from the full job name to the job id
Env map[string]string // env for containers
Inputs map[string]string // manually passed action inputs
Secrets map[string]string // list of secrets
@ -128,15 +129,45 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen := 0
stagePipeline := make([]common.Executor, 0)
log.Debugf("Plan Stages: %v", plan.Stages)
for i := range plan.Stages {
stage := plan.Stages[i]
stagePipeline = append(stagePipeline, func(ctx context.Context) error {
pipeline := make([]common.Executor, 0)
for _, run := range stage.Runs {
log.Debugf("Stages Runs: %v", stage.Runs)
stageExecutor := make([]common.Executor, 0)
job := run.Job()
log.Debugf("Job.Name: %v", job.Name)
log.Debugf("Job.RawNeeds: %v", job.RawNeeds)
log.Debugf("Job.RawRunsOn: %v", job.RawRunsOn)
log.Debugf("Job.Env: %v", job.Env)
log.Debugf("Job.If: %v", job.If)
for step := range job.Steps {
if nil != job.Steps[step] {
log.Debugf("Job.Steps: %v", job.Steps[step].String())
}
}
log.Debugf("Job.TimeoutMinutes: %v", job.TimeoutMinutes)
log.Debugf("Job.Services: %v", job.Services)
log.Debugf("Job.Strategy: %v", job.Strategy)
log.Debugf("Job.RawContainer: %v", job.RawContainer)
log.Debugf("Job.Defaults.Run.Shell: %v", job.Defaults.Run.Shell)
log.Debugf("Job.Defaults.Run.WorkingDirectory: %v", job.Defaults.Run.WorkingDirectory)
log.Debugf("Job.Outputs: %v", job.Outputs)
log.Debugf("Job.Uses: %v", job.Uses)
log.Debugf("Job.With: %v", job.With)
// log.Debugf("Job.RawSecrets: %v", job.RawSecrets)
log.Debugf("Job.Result: %v", job.Result)
if job.Strategy != nil {
log.Debugf("Job.Strategy.FailFast: %v", job.Strategy.FailFast)
log.Debugf("Job.Strategy.MaxParallel: %v", job.Strategy.MaxParallel)
log.Debugf("Job.Strategy.FailFastString: %v", job.Strategy.FailFastString)
log.Debugf("Job.Strategy.MaxParallelString: %v", job.Strategy.MaxParallelString)
log.Debugf("Job.Strategy.RawMatrix: %v", job.Strategy.RawMatrix)
strategyRc := runner.newRunContext(ctx, run, nil)
if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil {
log.Errorf("Error while evaluating matrix: %v", err)
@ -147,6 +178,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
if m, err := job.GetMatrixes(); err != nil {
log.Errorf("Error while get job's matrix: %v", err)
} else {
log.Debugf("Job Matrices: %v", m)
log.Debugf("Runner Matrices: %v", runner.config.Matrix)
matrixes = selectMatrixes(m, runner.config.Matrix)
}
log.Debugf("Final matrix after applying user inclusions '%v'", matrixes)
@ -172,19 +205,22 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
}
stageExecutor = append(stageExecutor, func(ctx context.Context) error {
jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String())
return rc.Executor()(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
executor, err := rc.Executor()
if err != nil {
return err
}
return executor(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
})
}
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
}
var ncpu int
info, err := container.GetHostInfo(ctx)
if err != nil {
log.Errorf("failed to obtain container engine info: %s", err)
ncpu = 1 // sane default?
} else {
ncpu = info.NCPU
ncpu := runtime.NumCPU()
if 1 > ncpu {
ncpu = 1
}
log.Debugf("Detected CPUs: %d", ncpu)
return common.NewParallelExecutor(ncpu, pipeline...)(ctx)
})
}