From 2573ccaf19b25bc4420203ecae124e995d387292 Mon Sep 17 00:00:00 2001 From: Mathieu Fenniak Date: Sun, 14 Sep 2025 20:02:43 +0000 Subject: [PATCH] fix: send job outputs & job result to Forgejo in sync with each other (#995) Fixes #994. First commit ensures that the interpolateResults method is invoked before data is sent to the reporter. Second commit changes how data is sent to the reporter to include both the result and the job outputs. - bug fixes - [PR](https://code.forgejo.org/forgejo/runner/pulls/995): fix: send job outputs & job result to Forgejo in sync with each other Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/995 Reviewed-by: earl-warren Co-authored-by: Mathieu Fenniak Co-committed-by: Mathieu Fenniak --- act/runner/job_executor.go | 69 ++++--- act/runner/job_executor_test.go | 45 ++++- act/runner/mocks/FieldLogger.go | 264 +++++++++++++++++++++++++++ internal/app/run/runner.go | 1 - internal/pkg/report/reporter.go | 17 +- internal/pkg/report/reporter_test.go | 29 ++- 6 files changed, 374 insertions(+), 51 deletions(-) create mode 100644 act/runner/mocks/FieldLogger.go diff --git a/act/runner/job_executor.go b/act/runner/job_executor.go index cb50c84c..7a939930 100644 --- a/act/runner/job_executor.go +++ b/act/runner/job_executor.go @@ -8,6 +8,7 @@ import ( "code.forgejo.org/forgejo/runner/v11/act/common" "code.forgejo.org/forgejo/runner/v11/act/container" "code.forgejo.org/forgejo/runner/v11/act/model" + "github.com/sirupsen/logrus" ) type jobInfo interface { @@ -104,37 +105,40 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo } } - postExecutor = postExecutor.Finally(func(ctx context.Context) error { + setJobResults := func(ctx context.Context) error { jobError := common.JobError(ctx) // Fresh context to ensure job result output works even if prev. context was a cancelled job ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute) defer cancel() setJobResult(ctx, info, rc, jobError == nil) - setJobOutputs(ctx, rc) + return nil + } + + cleanupJob := func(_ context.Context) error { var err error - { - // Separate timeout for cleanup tasks; logger is cleared so that cleanup logs go to runner, not job - ctx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) - defer cancel() - logger := common.Logger(ctx) - logger.Debugf("Cleaning up container for job %s", rc.jobContainerName()) - if err = info.stopContainer()(ctx); err != nil { - logger.Errorf("Error while stop job container %s: %v", rc.jobContainerName(), err) - } + // Separate timeout for cleanup tasks; logger is cleared so that cleanup logs go to runner, not job + ctx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) + defer cancel() - if !rc.IsHostEnv(ctx) && rc.getNetworkCreated(ctx) { - networkName := rc.getNetworkName(ctx) - logger.Debugf("Cleaning up network %s for job %s", networkName, rc.jobContainerName()) - if err := container.NewDockerNetworkRemoveExecutor(networkName)(ctx); err != nil { - logger.Errorf("Error while cleaning network %s: %v", networkName, err) - } + logger := common.Logger(ctx) + logger.Debugf("Cleaning up container for job %s", rc.jobContainerName()) + if err = info.stopContainer()(ctx); err != nil { + logger.Errorf("Error while stop job container %s: %v", rc.jobContainerName(), err) + } + + if !rc.IsHostEnv(ctx) && rc.getNetworkCreated(ctx) { + networkName := rc.getNetworkName(ctx) + logger.Debugf("Cleaning up network %s for job %s", networkName, rc.jobContainerName()) + if err := container.NewDockerNetworkRemoveExecutor(networkName)(ctx); err != nil { + logger.Errorf("Error while cleaning network %s: %v", networkName, err) } } + return err - }) + } pipeline := make([]common.Executor, 0) pipeline = append(pipeline, preSteps...) @@ -152,6 +156,8 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo return postExecutor(ctx) }). Finally(info.interpolateOutputs()). + Finally(setJobResults). + Finally(cleanupJob). Finally(info.closeContainer())) } @@ -185,22 +191,27 @@ func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success boo jobResultMessage = "failed" } - logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage) -} - -func setJobOutputs(ctx context.Context, rc *RunContext) { + jobOutputs := rc.Run.Job().Outputs if rc.caller != nil { - // map outputs for reusable workflows - callerOutputs := make(map[string]string) - + // Rewrite the job's outputs into the workflow_call outputs... + jobOutputs = make(map[string]string) ee := rc.NewExpressionEvaluator(ctx) - for k, v := range rc.Run.Workflow.WorkflowCallConfig().Outputs { - callerOutputs[k] = ee.Interpolate(ctx, ee.Interpolate(ctx, v.Value)) + jobOutputs[k] = ee.Interpolate(ctx, ee.Interpolate(ctx, v.Value)) } - - rc.caller.runContext.Run.Job().Outputs = callerOutputs + // When running as a daemon and receiving jobs from Forgejo, the next job (and any of it's `needs` outputs) will + // be provided by Forgejo based upon the data sent to the logger below. However, when running `forgejo-runner + // exec` with a reusable workflow, the next job will only be able to read outputs if those outputs are stored on + // the workflow -- that's what is accomplished here: + rc.caller.runContext.Run.Job().Outputs = jobOutputs } + + logger. + WithFields(logrus.Fields{ + "jobResult": jobResult, + "jobOutputs": jobOutputs, + }). + Infof("\U0001F3C1 Job %s", jobResultMessage) } func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor { diff --git a/act/runner/job_executor_test.go b/act/runner/job_executor_test.go index e42bf610..fe40df0f 100644 --- a/act/runner/job_executor_test.go +++ b/act/runner/job_executor_test.go @@ -12,10 +12,14 @@ import ( "code.forgejo.org/forgejo/runner/v11/act/common" "code.forgejo.org/forgejo/runner/v11/act/container" "code.forgejo.org/forgejo/runner/v11/act/model" + "code.forgejo.org/forgejo/runner/v11/act/runner/mocks" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) +//go:generate mockery --srcpkg=github.com/sirupsen/logrus --name=FieldLogger + func TestJobExecutor(t *testing.T) { tables := []TestJobFileInfo{ {workdir, "uses-and-run-in-one-step", "push", "Invalid run/uses syntax for job:test step:Test", platforms, secrets}, @@ -127,8 +131,9 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { executedSteps: []string{ "startContainer", "step1", - "stopContainer", "interpolateOutputs", + "setJobResults", + "stopContainer", "closeContainer", }, result: "success", @@ -144,8 +149,9 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { executedSteps: []string{ "startContainer", "step1", - "stopContainer", "interpolateOutputs", + "setJobResults", + "stopContainer", "closeContainer", }, result: "failure", @@ -162,8 +168,9 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { "startContainer", "pre1", "step1", - "stopContainer", "interpolateOutputs", + "setJobResults", + "stopContainer", "closeContainer", }, result: "success", @@ -180,8 +187,9 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { "startContainer", "step1", "post1", - "stopContainer", "interpolateOutputs", + "setJobResults", + "stopContainer", "closeContainer", }, result: "success", @@ -199,8 +207,9 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { "pre1", "step1", "post1", - "stopContainer", "interpolateOutputs", + "setJobResults", + "stopContainer", "closeContainer", }, result: "success", @@ -229,8 +238,9 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { "step3", "post3", "post2", - "stopContainer", "interpolateOutputs", + "setJobResults", + "stopContainer", "closeContainer", }, result: "success", @@ -246,7 +256,27 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fmt.Printf("::group::%s\n", tt.name) - ctx := common.WithJobErrorContainer(t.Context()) + executorOrder := make([]string, 0) + + mockLogger := mocks.NewFieldLogger(t) + mockLogger.On("Debugf", mock.Anything, mock.Anything).Return(0).Maybe() + mockLogger.On("Warningf", mock.Anything, mock.Anything).Return(0).Maybe() + mockLogger.On("WithField", mock.Anything, mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() + // When `WithFields()` is called with jobResult & jobOutputs field, add `setJobResults` to executorOrder. + mockLogger.On("WithFields", + mock.MatchedBy(func(fields logrus.Fields) bool { + _, okJobResult := fields["jobResult"] + _, okJobOutput := fields["jobOutputs"] + return okJobOutput && okJobResult + })). + Run(func(args mock.Arguments) { + executorOrder = append(executorOrder, "setJobResults") + }). + Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() + + mockLogger.On("WithFields", mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() + + ctx := common.WithLogger(common.WithJobErrorContainer(t.Context()), mockLogger) jim := &jobInfoMock{} sfm := &stepFactoryMock{} rc := &RunContext{ @@ -262,7 +292,6 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { Config: &Config{}, } rc.ExprEval = rc.NewExpressionEvaluator(ctx) - executorOrder := make([]string, 0) jim.On("steps").Return(tt.steps) diff --git a/act/runner/mocks/FieldLogger.go b/act/runner/mocks/FieldLogger.go new file mode 100644 index 00000000..5cfa5ad2 --- /dev/null +++ b/act/runner/mocks/FieldLogger.go @@ -0,0 +1,264 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package mocks + +import ( + logrus "github.com/sirupsen/logrus" + mock "github.com/stretchr/testify/mock" +) + +// FieldLogger is an autogenerated mock type for the FieldLogger type +type FieldLogger struct { + mock.Mock +} + +// Debug provides a mock function with given fields: args +func (_m *FieldLogger) Debug(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Debugf provides a mock function with given fields: format, args +func (_m *FieldLogger) Debugf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Debugln provides a mock function with given fields: args +func (_m *FieldLogger) Debugln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Error provides a mock function with given fields: args +func (_m *FieldLogger) Error(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Errorf provides a mock function with given fields: format, args +func (_m *FieldLogger) Errorf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Errorln provides a mock function with given fields: args +func (_m *FieldLogger) Errorln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Fatal provides a mock function with given fields: args +func (_m *FieldLogger) Fatal(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Fatalf provides a mock function with given fields: format, args +func (_m *FieldLogger) Fatalf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Fatalln provides a mock function with given fields: args +func (_m *FieldLogger) Fatalln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Info provides a mock function with given fields: args +func (_m *FieldLogger) Info(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Infof provides a mock function with given fields: format, args +func (_m *FieldLogger) Infof(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Infoln provides a mock function with given fields: args +func (_m *FieldLogger) Infoln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Panic provides a mock function with given fields: args +func (_m *FieldLogger) Panic(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Panicf provides a mock function with given fields: format, args +func (_m *FieldLogger) Panicf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Panicln provides a mock function with given fields: args +func (_m *FieldLogger) Panicln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Print provides a mock function with given fields: args +func (_m *FieldLogger) Print(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Printf provides a mock function with given fields: format, args +func (_m *FieldLogger) Printf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Println provides a mock function with given fields: args +func (_m *FieldLogger) Println(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Warn provides a mock function with given fields: args +func (_m *FieldLogger) Warn(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Warnf provides a mock function with given fields: format, args +func (_m *FieldLogger) Warnf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Warning provides a mock function with given fields: args +func (_m *FieldLogger) Warning(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Warningf provides a mock function with given fields: format, args +func (_m *FieldLogger) Warningf(format string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, format) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Warningln provides a mock function with given fields: args +func (_m *FieldLogger) Warningln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// Warnln provides a mock function with given fields: args +func (_m *FieldLogger) Warnln(args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// WithError provides a mock function with given fields: err +func (_m *FieldLogger) WithError(err error) *logrus.Entry { + ret := _m.Called(err) + + if len(ret) == 0 { + panic("no return value specified for WithError") + } + + var r0 *logrus.Entry + if rf, ok := ret.Get(0).(func(error) *logrus.Entry); ok { + r0 = rf(err) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logrus.Entry) + } + } + + return r0 +} + +// WithField provides a mock function with given fields: key, value +func (_m *FieldLogger) WithField(key string, value interface{}) *logrus.Entry { + ret := _m.Called(key, value) + + if len(ret) == 0 { + panic("no return value specified for WithField") + } + + var r0 *logrus.Entry + if rf, ok := ret.Get(0).(func(string, interface{}) *logrus.Entry); ok { + r0 = rf(key, value) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logrus.Entry) + } + } + + return r0 +} + +// WithFields provides a mock function with given fields: fields +func (_m *FieldLogger) WithFields(fields logrus.Fields) *logrus.Entry { + ret := _m.Called(fields) + + if len(ret) == 0 { + panic("no return value specified for WithFields") + } + + var r0 *logrus.Entry + if rf, ok := ret.Get(0).(func(logrus.Fields) *logrus.Entry); ok { + r0 = rf(fields) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logrus.Entry) + } + } + + return r0 +} + +// NewFieldLogger creates a new instance of FieldLogger. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewFieldLogger(t interface { + mock.TestingT + Cleanup(func()) +}, +) *FieldLogger { + mock := &FieldLogger{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 3f2b0127..a1283d61 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -366,7 +366,6 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report. } execErr := executor(ctx) - _ = reporter.SetOutputs(job.Outputs) return execErr } diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 3233cb2a..0d1ebadb 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -130,6 +130,11 @@ func (r *Reporter) Fire(entry *log.Entry) error { } } } + if v, ok := entry.Data["jobOutputs"]; ok { + _ = r.setOutputs(v.(map[string]string)) + } else { + log.Panicf("received log entry with jobResult, but without jobOutputs -- outputs will be corrupted for this job") + } } if !r.duringSteps() { r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) @@ -209,7 +214,7 @@ func (r *Reporter) logf(format string, a ...any) { } } -func (r *Reporter) GetOutputs() map[string]string { +func (r *Reporter) cloneOutputs() map[string]string { outputs := make(map[string]string) r.outputs.Range(func(k, v any) bool { if val, ok := v.(string); ok { @@ -220,10 +225,9 @@ func (r *Reporter) GetOutputs() map[string]string { return outputs } -func (r *Reporter) SetOutputs(outputs map[string]string) error { - r.stateMu.Lock() - defer r.stateMu.Unlock() - +// Errors from setOutputs are logged into the reporter automatically; the `errors` return value is only used for unit +// tests. +func (r *Reporter) setOutputs(outputs map[string]string) error { var errs []error recordError := func(format string, a ...any) { r.logf(format, a...) @@ -369,10 +373,9 @@ func (r *Reporter) ReportState() error { r.stateMu.RLock() state := proto.Clone(r.state).(*runnerv1.TaskState) + outputs := r.cloneOutputs() r.stateMu.RUnlock() - outputs := r.GetOutputs() - resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, Outputs: outputs, diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 65fcbfbe..a949c01b 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -78,7 +78,7 @@ func TestReporterSetOutputs(t *testing.T) { reporter, _, _ := mockReporter(t) expected := map[string]string{"a": "b", "c": "d"} - assert.NoError(t, reporter.SetOutputs(expected)) + assert.NoError(t, reporter.setOutputs(expected)) assertEqual(t, expected, &reporter.outputs) }) @@ -93,7 +93,7 @@ func TestReporterSetOutputs(t *testing.T) { "c": "ABCDEFG", // value too big "d": "e", } - err := reporter.SetOutputs(in) + err := reporter.setOutputs(in) assert.ErrorContains(t, err, "ignore output because the length of the value for \"c\" is 7 (the maximum is 5)") assert.ErrorContains(t, err, "ignore output because the key is longer than 5: \"0123456\"") expected := map[string]string{"d": "e"} @@ -104,11 +104,11 @@ func TestReporterSetOutputs(t *testing.T) { reporter, _, _ := mockReporter(t) first := map[string]string{"a": "b", "c": "d"} - assert.NoError(t, reporter.SetOutputs(first)) + assert.NoError(t, reporter.setOutputs(first)) assertEqual(t, first, &reporter.outputs) second := map[string]string{"c": "d", "e": "f"} - assert.ErrorContains(t, reporter.SetOutputs(second), "ignore output because a value already exists for the key \"c\"") + assert.ErrorContains(t, reporter.setOutputs(second), "ignore output because a value already exists for the key \"c\"") expected := map[string]string{"a": "b", "c": "d", "e": "f"} assertEqual(t, expected, &reporter.outputs) @@ -284,6 +284,23 @@ func TestReporter_Fire(t *testing.T) { assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength) }) + + t.Run("jobResult jobOutputs extracted from log entry", func(t *testing.T) { + reporter, _, _ := mockReporter(t) + + dataStep0 := map[string]any{ + "stage": "Post", + "stepNumber": 0, + "raw_output": true, + "jobResult": "success", + "jobOutputs": map[string]string{"key1": "value1"}, + } + assert.NoError(t, reporter.Fire(&log.Entry{Message: "success!", Data: dataStep0})) + + assert.EqualValues(t, runnerv1.Result_RESULT_SUCCESS, reporter.state.Result) + value, _ := reporter.outputs.Load("key1") + assert.EqualValues(t, "value1", value) + }) } func TestReporterReportState(t *testing.T) { @@ -300,7 +317,7 @@ func TestReporterReportState(t *testing.T) { outputValue1 := "VALUE1" outputKey2 := "KEY2" outputValue2 := "VALUE2" - reporter.SetOutputs(map[string]string{ + reporter.setOutputs(map[string]string{ outputKey1: outputValue1, outputKey2: outputValue2, }) @@ -315,7 +332,7 @@ func TestReporterReportState(t *testing.T) { assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) { t.Helper() require.ErrorContains(t, err, "not all logs are submitted 1 remain") - outputs := reporter.GetOutputs() + outputs := reporter.cloneOutputs() assert.Equal(t, map[string]string{ "KEY2": "VALUE2", }, outputs)