From 25879b92f4e4666c696e2da0a10c17fe3c349590 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Sat, 16 Aug 2025 19:27:17 +0200 Subject: [PATCH] feat: log a descriptive message when a job exceeds the config timeout - change the argument from string to error to differentiate a timeout error - when there is a timeout, display a message more descriptive than "context deadline" - always set the StoppedAt state value instead of only if the result was unspecified: it is the last state update. --- internal/app/run/runner.go | 6 +- internal/pkg/report/reporter.go | 32 +++++++--- internal/pkg/report/reporter_test.go | 94 +++++++++++++++++++++++++++- 3 files changed, 115 insertions(+), 17 deletions(-) diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 25de8caa..b2fcf4fc 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -170,11 +170,7 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval) var runErr error defer func() { - lastWords := "" - if runErr != nil { - lastWords = runErr.Error() - } - _ = reporter.Close(lastWords) + _ = reporter.Close(runErr) }() reporter.RunDaemon() runErr = r.run(ctx, task, reporter) diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index ed7e81a4..a02f0e72 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -236,13 +236,24 @@ func (r *Reporter) SetOutputs(outputs map[string]string) error { return errors.Join(errs...) } -func (r *Reporter) Close(lastWords string) error { +const ( + closeTimeoutMessage = "The runner cancelled the job because it exceeds the maximum run time" + closeCancelledMessage = "Cancelled" +) + +func (r *Reporter) Close(runErr error) error { r.closed = true r.stateMu.Lock() - if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { - if lastWords == "" { - lastWords = "Early termination" + var lastWords string + if errors.Is(runErr, context.DeadlineExceeded) { + lastWords = closeTimeoutMessage + r.state.Result = runnerv1.Result_RESULT_CANCELLED + } else if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { + if runErr == nil { + lastWords = closeCancelledMessage + } else { + lastWords = runErr.Error() } for _, v := range r.state.Steps { if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { @@ -250,17 +261,18 @@ func (r *Reporter) Close(lastWords string) error { } } r.state.Result = runnerv1.Result_RESULT_FAILURE - r.logRows = append(r.logRows, &runnerv1.LogRow{ - Time: timestamppb.Now(), - Content: lastWords, - }) - r.state.StoppedAt = timestamppb.Now() - } else if lastWords != "" { + } else if runErr != nil { + lastWords = runErr.Error() + r.state.Result = runnerv1.Result_RESULT_FAILURE + } + + if lastWords != "" { r.logRows = append(r.logRows, &runnerv1.LogRow{ Time: timestamppb.Now(), Content: lastWords, }) } + r.state.StoppedAt = timestamppb.Now() r.stateMu.Unlock() return retry.Do(func() error { diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 23a95547..40af3e30 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "code.forgejo.org/forgejo/runner/v9/internal/pkg/client/mocks" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" "code.forgejo.org/forgejo/runner/v9/testutils" ) @@ -51,11 +52,11 @@ func mockReporter(t *testing.T) (*Reporter, *mocks.Client, func()) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) - reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ + reporter := NewReporter(common.WithDaemonContext(ctx, t.Context()), cancel, client, &runnerv1.Task{ Context: taskCtx, }, time.Second) close := func() { - assert.NoError(t, reporter.Close("")) + assert.NoError(t, reporter.Close(nil)) } return reporter, client, close } @@ -407,3 +408,92 @@ func TestReporterReportLog(t *testing.T) { }) } } + +func TestReporterClose(t *testing.T) { + mockReporterCloser := func(message *string, result *runnerv1.Result) *Reporter { + reporter, client, _ := mockReporter(t) + if message != nil { + client.On("UpdateLog", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + t.Logf("UpdateLogRequest: %s", req.Msg.String()) + assert.Equal(t, (*message)+"\n", rowsToString(req.Msg.Rows)) + resp := &runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + 1, + } + t.Logf("UpdateLogResponse: %s", resp.String()) + return connect_go.NewResponse(resp), nil + }) + } + + if result != nil { + client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + t.Logf("Received UpdateTask: %s", req.Msg.String()) + assert.Equal(t, result.String(), req.Msg.State.Result.String()) + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }) + } + return reporter + } + + for _, testCase := range []struct { + name string + err error + expectedMessage string + result runnerv1.Result + expectedResult runnerv1.Result + }{ + { + name: "ResultSuccessAndNilErrorIsResultSuccess", + err: nil, + expectedMessage: "", + result: runnerv1.Result_RESULT_SUCCESS, + expectedResult: runnerv1.Result_RESULT_SUCCESS, + }, + { + name: "ResultUnspecifiedAndErrorIsResultFailure", + err: errors.New("ERROR_MESSAGE"), + expectedMessage: "ERROR_MESSAGE", + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "ResultUnspecifiedAndNilErrorIsResultFailure", + err: nil, + expectedMessage: closeCancelledMessage, + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "ResultSuccessAndErrorIsResultFailure", + err: errors.New("ERROR_MESSAGE"), + expectedMessage: "ERROR_MESSAGE", + result: runnerv1.Result_RESULT_SUCCESS, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "Timeout", + err: context.DeadlineExceeded, + expectedMessage: closeTimeoutMessage, + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_CANCELLED, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + var message *string + if testCase.expectedMessage != "" { + message = &testCase.expectedMessage + } + reporter := mockReporterCloser(message, &testCase.expectedResult) + + // cancel() verifies Close can operate after the context is cancelled + // because it uses the daemon context instead + reporter.cancel() + reporter.state.Result = testCase.result + reporter.state.Steps = []*runnerv1.StepState{ + { + Result: runnerv1.Result_RESULT_UNSPECIFIED, + }, + } + require.NoError(t, reporter.Close(testCase.err)) + }) + } +}