diff --git a/internal/app/cmd/cmd.go b/internal/app/cmd/cmd.go index c3a98a88..2f9bd3e4 100644 --- a/internal/app/cmd/cmd.go +++ b/internal/app/cmd/cmd.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" "code.forgejo.org/forgejo/runner/v9/internal/pkg/config" "code.forgejo.org/forgejo/runner/v9/internal/pkg/ver" ) @@ -45,7 +46,7 @@ func Execute(ctx context.Context) { Use: "daemon", Short: "Run as a runner daemon", Args: cobra.MaximumNArgs(1), - RunE: runDaemon(ctx, &configFile), + RunE: runDaemon(common.WithDaemonContext(ctx, ctx), &configFile), } rootCmd.AddCommand(daemonCmd) diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index 0a9815ed..22fd8c26 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -114,7 +114,7 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, } } - poller := poll.New(cfg, cli, runner) + poller := poll.New(ctx, cfg, cli, runner) go poller.Poll() diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 4ebe4405..5a4061f8 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -42,14 +42,14 @@ type poller struct { done chan any } -func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { - return (&poller{}).init(cfg, client, runner) +func New(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { + return (&poller{}).init(ctx, cfg, client, runner) } -func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { - pollingCtx, shutdownPolling := context.WithCancel(context.Background()) +func (p *poller) init(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { + pollingCtx, shutdownPolling := context.WithCancel(ctx) - jobsCtx, shutdownJobs := context.WithCancel(context.Background()) + jobsCtx, shutdownJobs := context.WithCancel(ctx) done := make(chan any) diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go index 0c5febfe..7c8b7633 100644 --- a/internal/app/poll/poller_test.go +++ b/internal/app/poll/poller_test.go @@ -118,7 +118,7 @@ func setTrace(t *testing.T) { } func TestPoller_New(t *testing.T) { - p := New(&config.Config{}, &mockClient{}, &mockRunner{}) + p := New(t.Context(), &config.Config{}, &mockClient{}, &mockRunner{}) assert.NotNil(t, p) } @@ -172,6 +172,7 @@ func TestPoller_Runner(t *testing.T) { } p := &mockPoller{} p.init( + t.Context(), &config.Config{ Runner: configRunner, }, @@ -239,6 +240,7 @@ func TestPoller_Fetch(t *testing.T) { } p := &mockPoller{} p.init( + t.Context(), &config.Config{ Runner: configRunner, }, diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 25de8caa..b2fcf4fc 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -170,11 +170,7 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval) var runErr error defer func() { - lastWords := "" - if runErr != nil { - lastWords = runErr.Error() - } - _ = reporter.Close(lastWords) + _ = reporter.Close(runErr) }() reporter.RunDaemon() runErr = r.run(ctx, task, reporter) diff --git a/internal/pkg/common/daemon_context.go b/internal/pkg/common/daemon_context.go new file mode 100644 index 00000000..8cb2f70a --- /dev/null +++ b/internal/pkg/common/daemon_context.go @@ -0,0 +1,26 @@ +// Copyright 2025 The Forgejo Authors +// SPDX-License-Identifier: MIT + +package common + +import ( + "context" +) + +type daemonContextKey string + +const daemonContextKeyVal = daemonContextKey("daemon") + +func DaemonContext(ctx context.Context) context.Context { + val := ctx.Value(daemonContextKeyVal) + if val != nil { + if daemon, ok := val.(context.Context); ok { + return daemon + } + } + return context.Background() +} + +func WithDaemonContext(ctx, daemon context.Context) context.Context { + return context.WithValue(ctx, daemonContextKeyVal, daemon) +} diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 2545faf3..a02f0e72 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -20,6 +20,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "code.forgejo.org/forgejo/runner/v9/internal/pkg/client" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" ) var ( @@ -61,7 +62,10 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, c client.Client } rv := &Reporter{ - ctx: ctx, + // ctx & cancel are related: the daemon context allows the reporter + // to continue to operate even after the context is canceled, to + // conclude the converation + ctx: common.DaemonContext(ctx), cancel: cancel, client: c, masker: masker, @@ -232,13 +236,24 @@ func (r *Reporter) SetOutputs(outputs map[string]string) error { return errors.Join(errs...) } -func (r *Reporter) Close(lastWords string) error { +const ( + closeTimeoutMessage = "The runner cancelled the job because it exceeds the maximum run time" + closeCancelledMessage = "Cancelled" +) + +func (r *Reporter) Close(runErr error) error { r.closed = true r.stateMu.Lock() - if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { - if lastWords == "" { - lastWords = "Early termination" + var lastWords string + if errors.Is(runErr, context.DeadlineExceeded) { + lastWords = closeTimeoutMessage + r.state.Result = runnerv1.Result_RESULT_CANCELLED + } else if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { + if runErr == nil { + lastWords = closeCancelledMessage + } else { + lastWords = runErr.Error() } for _, v := range r.state.Steps { if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { @@ -246,17 +261,18 @@ func (r *Reporter) Close(lastWords string) error { } } r.state.Result = runnerv1.Result_RESULT_FAILURE - r.logRows = append(r.logRows, &runnerv1.LogRow{ - Time: timestamppb.Now(), - Content: lastWords, - }) - r.state.StoppedAt = timestamppb.Now() - } else if lastWords != "" { + } else if runErr != nil { + lastWords = runErr.Error() + r.state.Result = runnerv1.Result_RESULT_FAILURE + } + + if lastWords != "" { r.logRows = append(r.logRows, &runnerv1.LogRow{ Time: timestamppb.Now(), Content: lastWords, }) } + r.state.StoppedAt = timestamppb.Now() r.stateMu.Unlock() return retry.Do(func() error { diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 23a95547..40af3e30 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "code.forgejo.org/forgejo/runner/v9/internal/pkg/client/mocks" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" "code.forgejo.org/forgejo/runner/v9/testutils" ) @@ -51,11 +52,11 @@ func mockReporter(t *testing.T) (*Reporter, *mocks.Client, func()) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) - reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ + reporter := NewReporter(common.WithDaemonContext(ctx, t.Context()), cancel, client, &runnerv1.Task{ Context: taskCtx, }, time.Second) close := func() { - assert.NoError(t, reporter.Close("")) + assert.NoError(t, reporter.Close(nil)) } return reporter, client, close } @@ -407,3 +408,92 @@ func TestReporterReportLog(t *testing.T) { }) } } + +func TestReporterClose(t *testing.T) { + mockReporterCloser := func(message *string, result *runnerv1.Result) *Reporter { + reporter, client, _ := mockReporter(t) + if message != nil { + client.On("UpdateLog", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + t.Logf("UpdateLogRequest: %s", req.Msg.String()) + assert.Equal(t, (*message)+"\n", rowsToString(req.Msg.Rows)) + resp := &runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + 1, + } + t.Logf("UpdateLogResponse: %s", resp.String()) + return connect_go.NewResponse(resp), nil + }) + } + + if result != nil { + client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + t.Logf("Received UpdateTask: %s", req.Msg.String()) + assert.Equal(t, result.String(), req.Msg.State.Result.String()) + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }) + } + return reporter + } + + for _, testCase := range []struct { + name string + err error + expectedMessage string + result runnerv1.Result + expectedResult runnerv1.Result + }{ + { + name: "ResultSuccessAndNilErrorIsResultSuccess", + err: nil, + expectedMessage: "", + result: runnerv1.Result_RESULT_SUCCESS, + expectedResult: runnerv1.Result_RESULT_SUCCESS, + }, + { + name: "ResultUnspecifiedAndErrorIsResultFailure", + err: errors.New("ERROR_MESSAGE"), + expectedMessage: "ERROR_MESSAGE", + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "ResultUnspecifiedAndNilErrorIsResultFailure", + err: nil, + expectedMessage: closeCancelledMessage, + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "ResultSuccessAndErrorIsResultFailure", + err: errors.New("ERROR_MESSAGE"), + expectedMessage: "ERROR_MESSAGE", + result: runnerv1.Result_RESULT_SUCCESS, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "Timeout", + err: context.DeadlineExceeded, + expectedMessage: closeTimeoutMessage, + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_CANCELLED, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + var message *string + if testCase.expectedMessage != "" { + message = &testCase.expectedMessage + } + reporter := mockReporterCloser(message, &testCase.expectedResult) + + // cancel() verifies Close can operate after the context is cancelled + // because it uses the daemon context instead + reporter.cancel() + reporter.state.Result = testCase.result + reporter.state.Steps = []*runnerv1.StepState{ + { + Result: runnerv1.Result_RESULT_UNSPECIFIED, + }, + } + require.NoError(t, reporter.Close(testCase.err)) + }) + } +}