1
0
Fork 0
mirror of https://code.forgejo.org/forgejo/runner.git synced 2025-09-05 18:40:59 +00:00
forgejo-runner/act/runner/job_executor.go
Mathieu Fenniak ce6502e7b6
chore: fix 'false positive' data race detection in Id/Number default init (#867)
A step's `Id` & `Number` are potentially initialized in different goroutines on matrix evaluations; this change ensures they're initialized before execution fans out to multiple goroutines.  There doesn't seem to be any functional impact of this data race for end-users.

Where `Number` was previously initialized, a runtime error was added to ensure that the behavior is the same.

`ID` data race:
```
==================
WARNING: DATA RACE
Read at 0x00c0001ff348 by goroutine 77:
  code.forgejo.org/forgejo/runner/v9/act/runner.newJobExecutor()
      /.../forgejo-runner/act/runner/job_executor.go:64 +0x424
  code.forgejo.org/forgejo/runner/v9/act/runner.(*RunContext).Executor()
      /.../forgejo-runner/act/runner/run_context.go:931 +0x2c6
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.1()
      /.../forgejo-runner/act/runner/runner.go:218 +0x150
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f

Previous write at 0x00c0001ff348 by goroutine 76:
  code.forgejo.org/forgejo/runner/v9/act/runner.newJobExecutor()
      /.../forgejo-runner/act/runner/job_executor.go:65 +0x4cc
  code.forgejo.org/forgejo/runner/v9/act/runner.(*RunContext).Executor()
      /.../forgejo-runner/act/runner/run_context.go:931 +0x2c6
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.1()
      /.../forgejo-runner/act/runner/runner.go:218 +0x150
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f

Goroutine 77 (running) created at:
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2()
      /.../forgejo-runner/act/common/executor.go:105 +0x144
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f

Goroutine 76 (running) created at:
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2()
      /.../forgejo-runner/act/common/executor.go:105 +0x144
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f
==================
```

`Number` data race:
```
==================
WARNING: DATA RACE
Write at 0x00c0001ff340 by goroutine 77:
  code.forgejo.org/forgejo/runner/v9/act/runner.newJobExecutor()
      /.../forgejo-runner/act/runner/job_executor.go:67 +0x536
  code.forgejo.org/forgejo/runner/v9/act/runner.(*RunContext).Executor()
      /.../forgejo-runner/act/runner/run_context.go:931 +0x2c6
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.1()
      /.../forgejo-runner/act/runner/runner.go:218 +0x150
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f

Previous write at 0x00c0001ff340 by goroutine 76:
  code.forgejo.org/forgejo/runner/v9/act/runner.newJobExecutor()
      /.../forgejo-runner/act/runner/job_executor.go:67 +0x536
  code.forgejo.org/forgejo/runner/v9/act/runner.(*RunContext).Executor()
      /.../forgejo-runner/act/runner/run_context.go:931 +0x2c6
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.1()
      /.../forgejo-runner/act/runner/runner.go:218 +0x150
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f

Goroutine 77 (running) created at:
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2()
      /.../forgejo-runner/act/common/executor.go:105 +0x144
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f

Goroutine 76 (running) created at:
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.2()
      /.../forgejo-runner/act/common/executor.go:105 +0x144
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.1()
      /.../forgejo-runner/act/common/executor.go:107 +0x61
  code.forgejo.org/forgejo/runner/v9/act/runner.(*runnerImpl).NewPlanExecutor.func1.NewParallelExecutor.3.gowrap1()
      /.../forgejo-runner/act/common/executor.go:109 +0x4f
==================
```

<!--start release-notes-assistant-->
<!--URL:https://code.forgejo.org/forgejo/runner-->
- other
  - [PR](https://code.forgejo.org/forgejo/runner/pulls/867): <!--number 867 --><!--line 0 --><!--description Y2hvcmU6IGZpeCAnZmFsc2UgcG9zaXRpdmUnIGRhdGEgcmFjZSBkZXRlY3Rpb24gaW4gSWQvTnVtYmVyIGRlZmF1bHQgaW5pdA==-->chore: fix 'false positive' data race detection in Id/Number default init<!--description-->
<!--end release-notes-assistant-->

Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/867
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
2025-08-20 19:56:03 +00:00

225 lines
6.8 KiB
Go

package runner
import (
"context"
"fmt"
"time"
"code.forgejo.org/forgejo/runner/v9/act/common"
"code.forgejo.org/forgejo/runner/v9/act/container"
"code.forgejo.org/forgejo/runner/v9/act/model"
)
type jobInfo interface {
matrix() map[string]any
steps() []*model.Step
startContainer() common.Executor
stopContainer() common.Executor
closeContainer() common.Executor
interpolateOutputs() common.Executor
result(result string)
}
const cleanupTimeout = 30 * time.Minute
func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executor {
steps := make([]common.Executor, 0)
preSteps := make([]common.Executor, 0)
var postExecutor common.Executor
steps = append(steps, func(ctx context.Context) error {
logger := common.Logger(ctx)
if len(info.matrix()) > 0 {
logger.Infof("\U0001F9EA Matrix: %v", info.matrix())
}
return nil
})
infoSteps := info.steps()
if len(infoSteps) == 0 {
return common.NewDebugExecutor("No steps found")
}
preSteps = append(preSteps, func(ctx context.Context) error {
// Have to be skipped for some Tests
if rc.Run == nil {
return nil
}
rc.ExprEval = rc.NewExpressionEvaluator(ctx)
// evaluate environment variables since they can contain
// GitHub's special environment variables.
for k, v := range rc.GetEnv() {
rc.Env[k] = rc.ExprEval.Interpolate(ctx, v)
}
return nil
})
for i, stepModel := range infoSteps {
if stepModel == nil {
return common.NewErrorExecutor(fmt.Errorf("invalid Step %v: missing run or uses key", i))
} else if stepModel.Number != i {
return common.NewErrorExecutor(fmt.Errorf("internal error: invalid Step: Number expected %v, was actually %v", i, stepModel.Number))
}
step, err := sf.newStep(stepModel, rc)
if err != nil {
return common.NewErrorExecutor(err)
}
preExec := step.pre()
preSteps = append(preSteps, useStepLogger(rc, stepModel, stepStagePre, func(ctx context.Context) error {
logger := common.Logger(ctx)
preErr := preExec(ctx)
if preErr != nil {
logger.Errorf("%v", preErr)
common.SetJobError(ctx, preErr)
} else if ctx.Err() != nil {
logger.Errorf("%v", ctx.Err())
common.SetJobError(ctx, ctx.Err())
}
return preErr
}))
stepExec := step.main()
steps = append(steps, useStepLogger(rc, stepModel, stepStageMain, func(ctx context.Context) error {
logger := common.Logger(ctx)
err := stepExec(ctx)
if err != nil {
logger.Errorf("%v", err)
common.SetJobError(ctx, err)
} else if ctx.Err() != nil {
logger.Errorf("%v", ctx.Err())
common.SetJobError(ctx, ctx.Err())
}
return nil
}))
postExec := useStepLogger(rc, stepModel, stepStagePost, step.post())
if postExecutor != nil {
// run the post executor in reverse order
postExecutor = postExec.Finally(postExecutor)
} else {
postExecutor = postExec
}
}
postExecutor = postExecutor.Finally(func(ctx context.Context) error {
jobError := common.JobError(ctx)
// Fresh context to ensure job result output works even if prev. context was a cancelled job
ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
defer cancel()
setJobResult(ctx, info, rc, jobError == nil)
setJobOutputs(ctx, rc)
var err error
{
// Separate timeout for cleanup tasks; logger is cleared so that cleanup logs go to runner, not job
ctx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
logger := common.Logger(ctx)
logger.Debugf("Cleaning up container for job %s", rc.jobContainerName())
if err = info.stopContainer()(ctx); err != nil {
logger.Errorf("Error while stop job container %s: %v", rc.jobContainerName(), err)
}
if !rc.IsHostEnv(ctx) && rc.getNetworkCreated(ctx) {
networkName := rc.getNetworkName(ctx)
logger.Debugf("Cleaning up network %s for job %s", networkName, rc.jobContainerName())
if err := container.NewDockerNetworkRemoveExecutor(networkName)(ctx); err != nil {
logger.Errorf("Error while cleaning network %s: %v", networkName, err)
}
}
}
return err
})
pipeline := make([]common.Executor, 0)
pipeline = append(pipeline, preSteps...)
pipeline = append(pipeline, steps...)
return common.NewPipelineExecutor(info.startContainer(), common.NewPipelineExecutor(pipeline...).
Finally(func(ctx context.Context) error { //nolint:contextcheck
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
// in case of an aborted run, we still should execute the
// post steps to allow cleanup.
ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), cleanupTimeout)
defer cancel()
}
return postExecutor(ctx)
}).
Finally(info.interpolateOutputs()).
Finally(info.closeContainer()))
}
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {
logger := common.Logger(ctx)
// As we're reading the matrix build's status (`rc.Run.Job().Result`), it's possible for it change in another
// goroutine running `setJobResult` and invoking `.result(...)`. Prevent concurrent execution of `setJobResult`...
rc.Run.Job().ResultMutex.Lock()
defer rc.Run.Job().ResultMutex.Unlock()
jobResult := "success"
// we have only one result for a whole matrix build, so we need
// to keep an existing result state if we run a matrix
if len(info.matrix()) > 0 && rc.Run.Job().Result != "" {
jobResult = rc.Run.Job().Result
}
if !success {
jobResult = "failure"
}
info.result(jobResult)
if rc.caller != nil {
// set reusable workflow job result
rc.caller.runContext.result(jobResult)
}
jobResultMessage := "succeeded"
if jobResult != "success" {
jobResultMessage = "failed"
}
logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage)
}
func setJobOutputs(ctx context.Context, rc *RunContext) {
if rc.caller != nil {
// map outputs for reusable workflows
callerOutputs := make(map[string]string)
ee := rc.NewExpressionEvaluator(ctx)
for k, v := range rc.Run.Workflow.WorkflowCallConfig().Outputs {
callerOutputs[k] = ee.Interpolate(ctx, ee.Interpolate(ctx, v.Value))
}
rc.caller.runContext.Run.Job().Outputs = callerOutputs
}
}
func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor {
return func(ctx context.Context) error {
ctx = withStepLogger(ctx, stepModel.Number, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String())
rawLogger := common.Logger(ctx).WithField("raw_output", true)
logWriter := common.NewLineWriter(rc.commandHandler(ctx), func(s string) bool {
if rc.Config.LogOutput {
rawLogger.Infof("%s", s)
} else {
rawLogger.Debugf("%s", s)
}
return true
})
oldout, olderr := rc.JobContainer.ReplaceLogWriter(logWriter, logWriter)
defer rc.JobContainer.ReplaceLogWriter(oldout, olderr)
return executor(ctx)
}
}