mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-10-05 19:30:59 +00:00
merge job results and job outputs to same Reporter lock
This commit is contained in:
parent
17ebf904bc
commit
c3e8134de9
5 changed files with 50 additions and 17 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
"code.forgejo.org/forgejo/runner/v11/act/common"
|
"code.forgejo.org/forgejo/runner/v11/act/common"
|
||||||
"code.forgejo.org/forgejo/runner/v11/act/container"
|
"code.forgejo.org/forgejo/runner/v11/act/container"
|
||||||
"code.forgejo.org/forgejo/runner/v11/act/model"
|
"code.forgejo.org/forgejo/runner/v11/act/model"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type jobInfo interface {
|
type jobInfo interface {
|
||||||
|
@ -191,7 +192,12 @@ func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success boo
|
||||||
jobResultMessage = "failed"
|
jobResultMessage = "failed"
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage)
|
logger.
|
||||||
|
WithFields(logrus.Fields{
|
||||||
|
"jobResult": jobResult,
|
||||||
|
"jobOutputs": rc.Run.Job().Outputs,
|
||||||
|
}).
|
||||||
|
Infof("\U0001F3C1 Job %s", jobResultMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setJobOutputs(ctx context.Context, rc *RunContext) {
|
func setJobOutputs(ctx context.Context, rc *RunContext) {
|
||||||
|
|
|
@ -260,12 +260,20 @@ func TestJobExecutorNewJobExecutor(t *testing.T) {
|
||||||
|
|
||||||
mockLogger := mocks.NewFieldLogger(t)
|
mockLogger := mocks.NewFieldLogger(t)
|
||||||
mockLogger.On("Debugf", mock.Anything, mock.Anything).Return(0).Maybe()
|
mockLogger.On("Debugf", mock.Anything, mock.Anything).Return(0).Maybe()
|
||||||
mockLogger.On("WithField", "jobResult", mock.Anything).
|
mockLogger.On("Warningf", mock.Anything, mock.Anything).Return(0).Maybe()
|
||||||
|
mockLogger.On("WithField", mock.Anything, mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe()
|
||||||
|
// When `WithFields()` is called with jobResult & jobOutputs field, add `setJobResults` to executorOrder.
|
||||||
|
mockLogger.On("WithFields",
|
||||||
|
mock.MatchedBy(func(fields logrus.Fields) bool {
|
||||||
|
_, okJobResult := fields["jobResult"]
|
||||||
|
_, okJobOutput := fields["jobOutputs"]
|
||||||
|
return okJobOutput && okJobResult
|
||||||
|
})).
|
||||||
Run(func(args mock.Arguments) {
|
Run(func(args mock.Arguments) {
|
||||||
executorOrder = append(executorOrder, "setJobResults")
|
executorOrder = append(executorOrder, "setJobResults")
|
||||||
}).
|
}).
|
||||||
Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe()
|
Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe()
|
||||||
mockLogger.On("WithField", mock.Anything, mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe()
|
|
||||||
mockLogger.On("WithFields", mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe()
|
mockLogger.On("WithFields", mock.Anything).Return(&logrus.Entry{Logger: &logrus.Logger{}}).Maybe()
|
||||||
|
|
||||||
ctx := common.WithLogger(common.WithJobErrorContainer(t.Context()), mockLogger)
|
ctx := common.WithLogger(common.WithJobErrorContainer(t.Context()), mockLogger)
|
||||||
|
|
|
@ -366,7 +366,6 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||||
}
|
}
|
||||||
|
|
||||||
execErr := executor(ctx)
|
execErr := executor(ctx)
|
||||||
_ = reporter.SetOutputs(job.Outputs)
|
|
||||||
return execErr
|
return execErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,6 +130,11 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if v, ok := entry.Data["jobOutputs"]; ok {
|
||||||
|
_ = r.setOutputs(v.(map[string]string))
|
||||||
|
} else {
|
||||||
|
log.Panicf("received log entry with jobResult, but without jobOutputs -- outputs will be corrupted for this job")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if !r.duringSteps() {
|
if !r.duringSteps() {
|
||||||
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
||||||
|
@ -209,7 +214,7 @@ func (r *Reporter) logf(format string, a ...any) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) GetOutputs() map[string]string {
|
func (r *Reporter) cloneOutputs() map[string]string {
|
||||||
outputs := make(map[string]string)
|
outputs := make(map[string]string)
|
||||||
r.outputs.Range(func(k, v any) bool {
|
r.outputs.Range(func(k, v any) bool {
|
||||||
if val, ok := v.(string); ok {
|
if val, ok := v.(string); ok {
|
||||||
|
@ -220,10 +225,9 @@ func (r *Reporter) GetOutputs() map[string]string {
|
||||||
return outputs
|
return outputs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) SetOutputs(outputs map[string]string) error {
|
// Errors from setOutputs are logged into the reporter automatically; the `errors` return value is only used for unit
|
||||||
r.stateMu.Lock()
|
// tests.
|
||||||
defer r.stateMu.Unlock()
|
func (r *Reporter) setOutputs(outputs map[string]string) error {
|
||||||
|
|
||||||
var errs []error
|
var errs []error
|
||||||
recordError := func(format string, a ...any) {
|
recordError := func(format string, a ...any) {
|
||||||
r.logf(format, a...)
|
r.logf(format, a...)
|
||||||
|
@ -369,10 +373,9 @@ func (r *Reporter) ReportState() error {
|
||||||
|
|
||||||
r.stateMu.RLock()
|
r.stateMu.RLock()
|
||||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||||
|
outputs := r.cloneOutputs()
|
||||||
r.stateMu.RUnlock()
|
r.stateMu.RUnlock()
|
||||||
|
|
||||||
outputs := r.GetOutputs()
|
|
||||||
|
|
||||||
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
||||||
State: state,
|
State: state,
|
||||||
Outputs: outputs,
|
Outputs: outputs,
|
||||||
|
|
|
@ -78,7 +78,7 @@ func TestReporterSetOutputs(t *testing.T) {
|
||||||
reporter, _, _ := mockReporter(t)
|
reporter, _, _ := mockReporter(t)
|
||||||
|
|
||||||
expected := map[string]string{"a": "b", "c": "d"}
|
expected := map[string]string{"a": "b", "c": "d"}
|
||||||
assert.NoError(t, reporter.SetOutputs(expected))
|
assert.NoError(t, reporter.setOutputs(expected))
|
||||||
assertEqual(t, expected, &reporter.outputs)
|
assertEqual(t, expected, &reporter.outputs)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ func TestReporterSetOutputs(t *testing.T) {
|
||||||
"c": "ABCDEFG", // value too big
|
"c": "ABCDEFG", // value too big
|
||||||
"d": "e",
|
"d": "e",
|
||||||
}
|
}
|
||||||
err := reporter.SetOutputs(in)
|
err := reporter.setOutputs(in)
|
||||||
assert.ErrorContains(t, err, "ignore output because the length of the value for \"c\" is 7 (the maximum is 5)")
|
assert.ErrorContains(t, err, "ignore output because the length of the value for \"c\" is 7 (the maximum is 5)")
|
||||||
assert.ErrorContains(t, err, "ignore output because the key is longer than 5: \"0123456\"")
|
assert.ErrorContains(t, err, "ignore output because the key is longer than 5: \"0123456\"")
|
||||||
expected := map[string]string{"d": "e"}
|
expected := map[string]string{"d": "e"}
|
||||||
|
@ -104,11 +104,11 @@ func TestReporterSetOutputs(t *testing.T) {
|
||||||
reporter, _, _ := mockReporter(t)
|
reporter, _, _ := mockReporter(t)
|
||||||
|
|
||||||
first := map[string]string{"a": "b", "c": "d"}
|
first := map[string]string{"a": "b", "c": "d"}
|
||||||
assert.NoError(t, reporter.SetOutputs(first))
|
assert.NoError(t, reporter.setOutputs(first))
|
||||||
assertEqual(t, first, &reporter.outputs)
|
assertEqual(t, first, &reporter.outputs)
|
||||||
|
|
||||||
second := map[string]string{"c": "d", "e": "f"}
|
second := map[string]string{"c": "d", "e": "f"}
|
||||||
assert.ErrorContains(t, reporter.SetOutputs(second), "ignore output because a value already exists for the key \"c\"")
|
assert.ErrorContains(t, reporter.setOutputs(second), "ignore output because a value already exists for the key \"c\"")
|
||||||
|
|
||||||
expected := map[string]string{"a": "b", "c": "d", "e": "f"}
|
expected := map[string]string{"a": "b", "c": "d", "e": "f"}
|
||||||
assertEqual(t, expected, &reporter.outputs)
|
assertEqual(t, expected, &reporter.outputs)
|
||||||
|
@ -284,6 +284,23 @@ func TestReporter_Fire(t *testing.T) {
|
||||||
|
|
||||||
assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
|
assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("jobResult jobOutputs extracted from log entry", func(t *testing.T) {
|
||||||
|
reporter, _, _ := mockReporter(t)
|
||||||
|
|
||||||
|
dataStep0 := map[string]any{
|
||||||
|
"stage": "Post",
|
||||||
|
"stepNumber": 0,
|
||||||
|
"raw_output": true,
|
||||||
|
"jobResult": "success",
|
||||||
|
"jobOutputs": map[string]string{"key1": "value1"},
|
||||||
|
}
|
||||||
|
assert.NoError(t, reporter.Fire(&log.Entry{Message: "success!", Data: dataStep0}))
|
||||||
|
|
||||||
|
assert.EqualValues(t, runnerv1.Result_RESULT_SUCCESS, reporter.state.Result)
|
||||||
|
value, _ := reporter.outputs.Load("key1")
|
||||||
|
assert.EqualValues(t, "value1", value)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReporterReportState(t *testing.T) {
|
func TestReporterReportState(t *testing.T) {
|
||||||
|
@ -300,7 +317,7 @@ func TestReporterReportState(t *testing.T) {
|
||||||
outputValue1 := "VALUE1"
|
outputValue1 := "VALUE1"
|
||||||
outputKey2 := "KEY2"
|
outputKey2 := "KEY2"
|
||||||
outputValue2 := "VALUE2"
|
outputValue2 := "VALUE2"
|
||||||
reporter.SetOutputs(map[string]string{
|
reporter.setOutputs(map[string]string{
|
||||||
outputKey1: outputValue1,
|
outputKey1: outputValue1,
|
||||||
outputKey2: outputValue2,
|
outputKey2: outputValue2,
|
||||||
})
|
})
|
||||||
|
@ -315,7 +332,7 @@ func TestReporterReportState(t *testing.T) {
|
||||||
assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) {
|
assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
require.ErrorContains(t, err, "not all logs are submitted 1 remain")
|
require.ErrorContains(t, err, "not all logs are submitted 1 remain")
|
||||||
outputs := reporter.GetOutputs()
|
outputs := reporter.cloneOutputs()
|
||||||
assert.Equal(t, map[string]string{
|
assert.Equal(t, map[string]string{
|
||||||
"KEY2": "VALUE2",
|
"KEY2": "VALUE2",
|
||||||
}, outputs)
|
}, outputs)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue