From d114f3646ddf948935f33e50b27f62728f22b7f8 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Sat, 16 Aug 2025 19:13:32 +0200 Subject: [PATCH 1/3] feat: insert the daemon context in the poller context The daemon context is needed when the context of a job or the poller is done. Otherwise it is no longer possible to send a conclusion report to Forgejo, short of creating a context.Background() which poses its own set of problems. - WithDaemonContext is used to store the daemon context - The poller uses the daemon context instead of context.Background --- internal/app/cmd/cmd.go | 3 ++- internal/app/cmd/daemon.go | 2 +- internal/app/poll/poller.go | 10 +++++----- internal/app/poll/poller_test.go | 4 +++- internal/pkg/common/daemon_context.go | 26 ++++++++++++++++++++++++++ 5 files changed, 37 insertions(+), 8 deletions(-) create mode 100644 internal/pkg/common/daemon_context.go diff --git a/internal/app/cmd/cmd.go b/internal/app/cmd/cmd.go index c3a98a88..2f9bd3e4 100644 --- a/internal/app/cmd/cmd.go +++ b/internal/app/cmd/cmd.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" "code.forgejo.org/forgejo/runner/v9/internal/pkg/config" "code.forgejo.org/forgejo/runner/v9/internal/pkg/ver" ) @@ -45,7 +46,7 @@ func Execute(ctx context.Context) { Use: "daemon", Short: "Run as a runner daemon", Args: cobra.MaximumNArgs(1), - RunE: runDaemon(ctx, &configFile), + RunE: runDaemon(common.WithDaemonContext(ctx, ctx), &configFile), } rootCmd.AddCommand(daemonCmd) diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index 0a9815ed..22fd8c26 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -114,7 +114,7 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, } } - poller := poll.New(cfg, cli, runner) + poller := poll.New(ctx, cfg, cli, runner) go poller.Poll() diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 4ebe4405..5a4061f8 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -42,14 +42,14 @@ type poller struct { done chan any } -func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { - return (&poller{}).init(cfg, client, runner) +func New(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { + return (&poller{}).init(ctx, cfg, client, runner) } -func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { - pollingCtx, shutdownPolling := context.WithCancel(context.Background()) +func (p *poller) init(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller { + pollingCtx, shutdownPolling := context.WithCancel(ctx) - jobsCtx, shutdownJobs := context.WithCancel(context.Background()) + jobsCtx, shutdownJobs := context.WithCancel(ctx) done := make(chan any) diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go index 0c5febfe..7c8b7633 100644 --- a/internal/app/poll/poller_test.go +++ b/internal/app/poll/poller_test.go @@ -118,7 +118,7 @@ func setTrace(t *testing.T) { } func TestPoller_New(t *testing.T) { - p := New(&config.Config{}, &mockClient{}, &mockRunner{}) + p := New(t.Context(), &config.Config{}, &mockClient{}, &mockRunner{}) assert.NotNil(t, p) } @@ -172,6 +172,7 @@ func TestPoller_Runner(t *testing.T) { } p := &mockPoller{} p.init( + t.Context(), &config.Config{ Runner: configRunner, }, @@ -239,6 +240,7 @@ func TestPoller_Fetch(t *testing.T) { } p := &mockPoller{} p.init( + t.Context(), &config.Config{ Runner: configRunner, }, diff --git a/internal/pkg/common/daemon_context.go b/internal/pkg/common/daemon_context.go new file mode 100644 index 00000000..8cb2f70a --- /dev/null +++ b/internal/pkg/common/daemon_context.go @@ -0,0 +1,26 @@ +// Copyright 2025 The Forgejo Authors +// SPDX-License-Identifier: MIT + +package common + +import ( + "context" +) + +type daemonContextKey string + +const daemonContextKeyVal = daemonContextKey("daemon") + +func DaemonContext(ctx context.Context) context.Context { + val := ctx.Value(daemonContextKeyVal) + if val != nil { + if daemon, ok := val.(context.Context); ok { + return daemon + } + } + return context.Background() +} + +func WithDaemonContext(ctx, daemon context.Context) context.Context { + return context.WithValue(ctx, daemonContextKeyVal, daemon) +} From bd2b7d2b32beba4e9cef6f16fa3fae323686fea4 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Sat, 16 Aug 2025 19:25:50 +0200 Subject: [PATCH 2/3] fix: after canceling the context, the reporter cannot communicate ctx & cancel are related and when cancel is called, the context is no longer usable. Use the daemon context context to allow the reporter to continue to operate and conclude the converation with the Forgejo instance. --- internal/pkg/report/reporter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 2545faf3..ed7e81a4 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -20,6 +20,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "code.forgejo.org/forgejo/runner/v9/internal/pkg/client" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" ) var ( @@ -61,7 +62,10 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, c client.Client } rv := &Reporter{ - ctx: ctx, + // ctx & cancel are related: the daemon context allows the reporter + // to continue to operate even after the context is canceled, to + // conclude the converation + ctx: common.DaemonContext(ctx), cancel: cancel, client: c, masker: masker, From 25879b92f4e4666c696e2da0a10c17fe3c349590 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Sat, 16 Aug 2025 19:27:17 +0200 Subject: [PATCH 3/3] feat: log a descriptive message when a job exceeds the config timeout - change the argument from string to error to differentiate a timeout error - when there is a timeout, display a message more descriptive than "context deadline" - always set the StoppedAt state value instead of only if the result was unspecified: it is the last state update. --- internal/app/run/runner.go | 6 +- internal/pkg/report/reporter.go | 32 +++++++--- internal/pkg/report/reporter_test.go | 94 +++++++++++++++++++++++++++- 3 files changed, 115 insertions(+), 17 deletions(-) diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 25de8caa..b2fcf4fc 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -170,11 +170,7 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval) var runErr error defer func() { - lastWords := "" - if runErr != nil { - lastWords = runErr.Error() - } - _ = reporter.Close(lastWords) + _ = reporter.Close(runErr) }() reporter.RunDaemon() runErr = r.run(ctx, task, reporter) diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index ed7e81a4..a02f0e72 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -236,13 +236,24 @@ func (r *Reporter) SetOutputs(outputs map[string]string) error { return errors.Join(errs...) } -func (r *Reporter) Close(lastWords string) error { +const ( + closeTimeoutMessage = "The runner cancelled the job because it exceeds the maximum run time" + closeCancelledMessage = "Cancelled" +) + +func (r *Reporter) Close(runErr error) error { r.closed = true r.stateMu.Lock() - if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { - if lastWords == "" { - lastWords = "Early termination" + var lastWords string + if errors.Is(runErr, context.DeadlineExceeded) { + lastWords = closeTimeoutMessage + r.state.Result = runnerv1.Result_RESULT_CANCELLED + } else if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { + if runErr == nil { + lastWords = closeCancelledMessage + } else { + lastWords = runErr.Error() } for _, v := range r.state.Steps { if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { @@ -250,17 +261,18 @@ func (r *Reporter) Close(lastWords string) error { } } r.state.Result = runnerv1.Result_RESULT_FAILURE - r.logRows = append(r.logRows, &runnerv1.LogRow{ - Time: timestamppb.Now(), - Content: lastWords, - }) - r.state.StoppedAt = timestamppb.Now() - } else if lastWords != "" { + } else if runErr != nil { + lastWords = runErr.Error() + r.state.Result = runnerv1.Result_RESULT_FAILURE + } + + if lastWords != "" { r.logRows = append(r.logRows, &runnerv1.LogRow{ Time: timestamppb.Now(), Content: lastWords, }) } + r.state.StoppedAt = timestamppb.Now() r.stateMu.Unlock() return retry.Do(func() error { diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 23a95547..40af3e30 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "code.forgejo.org/forgejo/runner/v9/internal/pkg/client/mocks" + "code.forgejo.org/forgejo/runner/v9/internal/pkg/common" "code.forgejo.org/forgejo/runner/v9/testutils" ) @@ -51,11 +52,11 @@ func mockReporter(t *testing.T) (*Reporter, *mocks.Client, func()) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) - reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ + reporter := NewReporter(common.WithDaemonContext(ctx, t.Context()), cancel, client, &runnerv1.Task{ Context: taskCtx, }, time.Second) close := func() { - assert.NoError(t, reporter.Close("")) + assert.NoError(t, reporter.Close(nil)) } return reporter, client, close } @@ -407,3 +408,92 @@ func TestReporterReportLog(t *testing.T) { }) } } + +func TestReporterClose(t *testing.T) { + mockReporterCloser := func(message *string, result *runnerv1.Result) *Reporter { + reporter, client, _ := mockReporter(t) + if message != nil { + client.On("UpdateLog", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + t.Logf("UpdateLogRequest: %s", req.Msg.String()) + assert.Equal(t, (*message)+"\n", rowsToString(req.Msg.Rows)) + resp := &runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + 1, + } + t.Logf("UpdateLogResponse: %s", resp.String()) + return connect_go.NewResponse(resp), nil + }) + } + + if result != nil { + client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + t.Logf("Received UpdateTask: %s", req.Msg.String()) + assert.Equal(t, result.String(), req.Msg.State.Result.String()) + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }) + } + return reporter + } + + for _, testCase := range []struct { + name string + err error + expectedMessage string + result runnerv1.Result + expectedResult runnerv1.Result + }{ + { + name: "ResultSuccessAndNilErrorIsResultSuccess", + err: nil, + expectedMessage: "", + result: runnerv1.Result_RESULT_SUCCESS, + expectedResult: runnerv1.Result_RESULT_SUCCESS, + }, + { + name: "ResultUnspecifiedAndErrorIsResultFailure", + err: errors.New("ERROR_MESSAGE"), + expectedMessage: "ERROR_MESSAGE", + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "ResultUnspecifiedAndNilErrorIsResultFailure", + err: nil, + expectedMessage: closeCancelledMessage, + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "ResultSuccessAndErrorIsResultFailure", + err: errors.New("ERROR_MESSAGE"), + expectedMessage: "ERROR_MESSAGE", + result: runnerv1.Result_RESULT_SUCCESS, + expectedResult: runnerv1.Result_RESULT_FAILURE, + }, + { + name: "Timeout", + err: context.DeadlineExceeded, + expectedMessage: closeTimeoutMessage, + result: runnerv1.Result_RESULT_UNSPECIFIED, + expectedResult: runnerv1.Result_RESULT_CANCELLED, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + var message *string + if testCase.expectedMessage != "" { + message = &testCase.expectedMessage + } + reporter := mockReporterCloser(message, &testCase.expectedResult) + + // cancel() verifies Close can operate after the context is cancelled + // because it uses the daemon context instead + reporter.cancel() + reporter.state.Result = testCase.result + reporter.state.Steps = []*runnerv1.StepState{ + { + Result: runnerv1.Result_RESULT_UNSPECIFIED, + }, + } + require.NoError(t, reporter.Close(testCase.err)) + }) + } +}