From c3e8134de9182f20ade989681939b948236b3406 Mon Sep 17 00:00:00 2001 From: Mathieu Fenniak Date: Sat, 13 Sep 2025 13:37:16 -0600 Subject: [PATCH] merge job results and job outputs to same Reporter lock --- act/runner/job_executor.go | 8 +++++++- act/runner/job_executor_test.go | 12 ++++++++++-- internal/app/run/runner.go | 1 - internal/pkg/report/reporter.go | 17 +++++++++------- internal/pkg/report/reporter_test.go | 29 ++++++++++++++++++++++------ 5 files changed, 50 insertions(+), 17 deletions(-) diff --git a/act/runner/job_executor.go b/act/runner/job_executor.go index 7f018bf4..e5c8218a 100644 --- a/act/runner/job_executor.go +++ b/act/runner/job_executor.go @@ -8,6 +8,7 @@ import ( "code.forgejo.org/forgejo/runner/v11/act/common" "code.forgejo.org/forgejo/runner/v11/act/container" "code.forgejo.org/forgejo/runner/v11/act/model" + "github.com/sirupsen/logrus" ) type jobInfo interface { @@ -191,7 +192,12 @@ func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success boo jobResultMessage = "failed" } - logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage) + logger. + WithFields(logrus.Fields{ + "jobResult": jobResult, + "jobOutputs": rc.Run.Job().Outputs, + }). + Infof("\U0001F3C1 Job %s", jobResultMessage) } func setJobOutputs(ctx context.Context, rc *RunContext) { diff --git a/act/runner/job_executor_test.go b/act/runner/job_executor_test.go index a6c70ce0..fe40df0f 100644 --- a/act/runner/job_executor_test.go +++ b/act/runner/job_executor_test.go @@ -260,12 +260,20 @@ func TestJobExecutorNewJobExecutor(t *testing.T) { mockLogger := mocks.NewFieldLogger(t) mockLogger.On("Debugf", mock.Anything, mock.Anything).Return(0).Maybe() - mockLogger.On("WithField", "jobResult", mock.Anything). + mockLogger.On("Warningf", mock.Anything, mock.Anything).Return(0).Maybe() + mockLogger.On("WithField", mock.Anything, mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() + // When `WithFields()` is called with jobResult & jobOutputs field, add `setJobResults` to executorOrder. + mockLogger.On("WithFields", + mock.MatchedBy(func(fields logrus.Fields) bool { + _, okJobResult := fields["jobResult"] + _, okJobOutput := fields["jobOutputs"] + return okJobOutput && okJobResult + })). Run(func(args mock.Arguments) { executorOrder = append(executorOrder, "setJobResults") }). Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() - mockLogger.On("WithField", mock.Anything, mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() + mockLogger.On("WithFields", mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe() ctx := common.WithLogger(common.WithJobErrorContainer(t.Context()), mockLogger) diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 3f2b0127..a1283d61 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -366,7 +366,6 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report. } execErr := executor(ctx) - _ = reporter.SetOutputs(job.Outputs) return execErr } diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 3233cb2a..0d1ebadb 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -130,6 +130,11 @@ func (r *Reporter) Fire(entry *log.Entry) error { } } } + if v, ok := entry.Data["jobOutputs"]; ok { + _ = r.setOutputs(v.(map[string]string)) + } else { + log.Panicf("received log entry with jobResult, but without jobOutputs -- outputs will be corrupted for this job") + } } if !r.duringSteps() { r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry)) @@ -209,7 +214,7 @@ func (r *Reporter) logf(format string, a ...any) { } } -func (r *Reporter) GetOutputs() map[string]string { +func (r *Reporter) cloneOutputs() map[string]string { outputs := make(map[string]string) r.outputs.Range(func(k, v any) bool { if val, ok := v.(string); ok { @@ -220,10 +225,9 @@ func (r *Reporter) GetOutputs() map[string]string { return outputs } -func (r *Reporter) SetOutputs(outputs map[string]string) error { - r.stateMu.Lock() - defer r.stateMu.Unlock() - +// Errors from setOutputs are logged into the reporter automatically; the `errors` return value is only used for unit +// tests. +func (r *Reporter) setOutputs(outputs map[string]string) error { var errs []error recordError := func(format string, a ...any) { r.logf(format, a...) @@ -369,10 +373,9 @@ func (r *Reporter) ReportState() error { r.stateMu.RLock() state := proto.Clone(r.state).(*runnerv1.TaskState) + outputs := r.cloneOutputs() r.stateMu.RUnlock() - outputs := r.GetOutputs() - resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, Outputs: outputs, diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 65fcbfbe..a949c01b 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -78,7 +78,7 @@ func TestReporterSetOutputs(t *testing.T) { reporter, _, _ := mockReporter(t) expected := map[string]string{"a": "b", "c": "d"} - assert.NoError(t, reporter.SetOutputs(expected)) + assert.NoError(t, reporter.setOutputs(expected)) assertEqual(t, expected, &reporter.outputs) }) @@ -93,7 +93,7 @@ func TestReporterSetOutputs(t *testing.T) { "c": "ABCDEFG", // value too big "d": "e", } - err := reporter.SetOutputs(in) + err := reporter.setOutputs(in) assert.ErrorContains(t, err, "ignore output because the length of the value for \"c\" is 7 (the maximum is 5)") assert.ErrorContains(t, err, "ignore output because the key is longer than 5: \"0123456\"") expected := map[string]string{"d": "e"} @@ -104,11 +104,11 @@ func TestReporterSetOutputs(t *testing.T) { reporter, _, _ := mockReporter(t) first := map[string]string{"a": "b", "c": "d"} - assert.NoError(t, reporter.SetOutputs(first)) + assert.NoError(t, reporter.setOutputs(first)) assertEqual(t, first, &reporter.outputs) second := map[string]string{"c": "d", "e": "f"} - assert.ErrorContains(t, reporter.SetOutputs(second), "ignore output because a value already exists for the key \"c\"") + assert.ErrorContains(t, reporter.setOutputs(second), "ignore output because a value already exists for the key \"c\"") expected := map[string]string{"a": "b", "c": "d", "e": "f"} assertEqual(t, expected, &reporter.outputs) @@ -284,6 +284,23 @@ func TestReporter_Fire(t *testing.T) { assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength) }) + + t.Run("jobResult jobOutputs extracted from log entry", func(t *testing.T) { + reporter, _, _ := mockReporter(t) + + dataStep0 := map[string]any{ + "stage": "Post", + "stepNumber": 0, + "raw_output": true, + "jobResult": "success", + "jobOutputs": map[string]string{"key1": "value1"}, + } + assert.NoError(t, reporter.Fire(&log.Entry{Message: "success!", Data: dataStep0})) + + assert.EqualValues(t, runnerv1.Result_RESULT_SUCCESS, reporter.state.Result) + value, _ := reporter.outputs.Load("key1") + assert.EqualValues(t, "value1", value) + }) } func TestReporterReportState(t *testing.T) { @@ -300,7 +317,7 @@ func TestReporterReportState(t *testing.T) { outputValue1 := "VALUE1" outputKey2 := "KEY2" outputValue2 := "VALUE2" - reporter.SetOutputs(map[string]string{ + reporter.setOutputs(map[string]string{ outputKey1: outputValue1, outputKey2: outputValue2, }) @@ -315,7 +332,7 @@ func TestReporterReportState(t *testing.T) { assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) { t.Helper() require.ErrorContains(t, err, "not all logs are submitted 1 remain") - outputs := reporter.GetOutputs() + outputs := reporter.cloneOutputs() assert.Equal(t, map[string]string{ "KEY2": "VALUE2", }, outputs)