mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-05 18:40:59 +00:00
fix: report the job as failed when the [runner].timeout
expires (#870)
Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/870 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.code.forgejo.org>
This commit is contained in:
commit
1f7fa31b1a
8 changed files with 157 additions and 26 deletions
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/common"
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/config"
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/ver"
|
||||
)
|
||||
|
@ -45,7 +46,7 @@ func Execute(ctx context.Context) {
|
|||
Use: "daemon",
|
||||
Short: "Run as a runner daemon",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
RunE: runDaemon(ctx, &configFile),
|
||||
RunE: runDaemon(common.WithDaemonContext(ctx, ctx), &configFile),
|
||||
}
|
||||
rootCmd.AddCommand(daemonCmd)
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
|||
}
|
||||
}
|
||||
|
||||
poller := poll.New(cfg, cli, runner)
|
||||
poller := poll.New(ctx, cfg, cli, runner)
|
||||
|
||||
go poller.Poll()
|
||||
|
||||
|
|
|
@ -42,14 +42,14 @@ type poller struct {
|
|||
done chan any
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
|
||||
return (&poller{}).init(cfg, client, runner)
|
||||
func New(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
|
||||
return (&poller{}).init(ctx, cfg, client, runner)
|
||||
}
|
||||
|
||||
func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
|
||||
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
||||
func (p *poller) init(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
|
||||
pollingCtx, shutdownPolling := context.WithCancel(ctx)
|
||||
|
||||
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
||||
jobsCtx, shutdownJobs := context.WithCancel(ctx)
|
||||
|
||||
done := make(chan any)
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ func setTrace(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestPoller_New(t *testing.T) {
|
||||
p := New(&config.Config{}, &mockClient{}, &mockRunner{})
|
||||
p := New(t.Context(), &config.Config{}, &mockClient{}, &mockRunner{})
|
||||
assert.NotNil(t, p)
|
||||
}
|
||||
|
||||
|
@ -172,6 +172,7 @@ func TestPoller_Runner(t *testing.T) {
|
|||
}
|
||||
p := &mockPoller{}
|
||||
p.init(
|
||||
t.Context(),
|
||||
&config.Config{
|
||||
Runner: configRunner,
|
||||
},
|
||||
|
@ -239,6 +240,7 @@ func TestPoller_Fetch(t *testing.T) {
|
|||
}
|
||||
p := &mockPoller{}
|
||||
p.init(
|
||||
t.Context(),
|
||||
&config.Config{
|
||||
Runner: configRunner,
|
||||
},
|
||||
|
|
|
@ -170,11 +170,7 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
|
|||
reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval)
|
||||
var runErr error
|
||||
defer func() {
|
||||
lastWords := ""
|
||||
if runErr != nil {
|
||||
lastWords = runErr.Error()
|
||||
}
|
||||
_ = reporter.Close(lastWords)
|
||||
_ = reporter.Close(runErr)
|
||||
}()
|
||||
reporter.RunDaemon()
|
||||
runErr = r.run(ctx, task, reporter)
|
||||
|
|
26
internal/pkg/common/daemon_context.go
Normal file
26
internal/pkg/common/daemon_context.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
// Copyright 2025 The Forgejo Authors
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type daemonContextKey string
|
||||
|
||||
const daemonContextKeyVal = daemonContextKey("daemon")
|
||||
|
||||
func DaemonContext(ctx context.Context) context.Context {
|
||||
val := ctx.Value(daemonContextKeyVal)
|
||||
if val != nil {
|
||||
if daemon, ok := val.(context.Context); ok {
|
||||
return daemon
|
||||
}
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func WithDaemonContext(ctx, daemon context.Context) context.Context {
|
||||
return context.WithValue(ctx, daemonContextKeyVal, daemon)
|
||||
}
|
|
@ -20,6 +20,7 @@ import (
|
|||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/client"
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/common"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -61,7 +62,10 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, c client.Client
|
|||
}
|
||||
|
||||
rv := &Reporter{
|
||||
ctx: ctx,
|
||||
// ctx & cancel are related: the daemon context allows the reporter
|
||||
// to continue to operate even after the context is canceled, to
|
||||
// conclude the converation
|
||||
ctx: common.DaemonContext(ctx),
|
||||
cancel: cancel,
|
||||
client: c,
|
||||
masker: masker,
|
||||
|
@ -232,13 +236,24 @@ func (r *Reporter) SetOutputs(outputs map[string]string) error {
|
|||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (r *Reporter) Close(lastWords string) error {
|
||||
const (
|
||||
closeTimeoutMessage = "The runner cancelled the job because it exceeds the maximum run time"
|
||||
closeCancelledMessage = "Cancelled"
|
||||
)
|
||||
|
||||
func (r *Reporter) Close(runErr error) error {
|
||||
r.closed = true
|
||||
|
||||
r.stateMu.Lock()
|
||||
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
if lastWords == "" {
|
||||
lastWords = "Early termination"
|
||||
var lastWords string
|
||||
if errors.Is(runErr, context.DeadlineExceeded) {
|
||||
lastWords = closeTimeoutMessage
|
||||
r.state.Result = runnerv1.Result_RESULT_CANCELLED
|
||||
} else if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
if runErr == nil {
|
||||
lastWords = closeCancelledMessage
|
||||
} else {
|
||||
lastWords = runErr.Error()
|
||||
}
|
||||
for _, v := range r.state.Steps {
|
||||
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
|
@ -246,17 +261,18 @@ func (r *Reporter) Close(lastWords string) error {
|
|||
}
|
||||
}
|
||||
r.state.Result = runnerv1.Result_RESULT_FAILURE
|
||||
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||
Time: timestamppb.Now(),
|
||||
Content: lastWords,
|
||||
})
|
||||
r.state.StoppedAt = timestamppb.Now()
|
||||
} else if lastWords != "" {
|
||||
} else if runErr != nil {
|
||||
lastWords = runErr.Error()
|
||||
r.state.Result = runnerv1.Result_RESULT_FAILURE
|
||||
}
|
||||
|
||||
if lastWords != "" {
|
||||
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||
Time: timestamppb.Now(),
|
||||
Content: lastWords,
|
||||
})
|
||||
}
|
||||
r.state.StoppedAt = timestamppb.Now()
|
||||
r.stateMu.Unlock()
|
||||
|
||||
return retry.Do(func() error {
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"google.golang.org/protobuf/types/known/structpb"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/client/mocks"
|
||||
"code.forgejo.org/forgejo/runner/v9/internal/pkg/common"
|
||||
"code.forgejo.org/forgejo/runner/v9/testutils"
|
||||
)
|
||||
|
||||
|
@ -51,11 +52,11 @@ func mockReporter(t *testing.T) (*Reporter, *mocks.Client, func()) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||
reporter := NewReporter(common.WithDaemonContext(ctx, t.Context()), cancel, client, &runnerv1.Task{
|
||||
Context: taskCtx,
|
||||
}, time.Second)
|
||||
close := func() {
|
||||
assert.NoError(t, reporter.Close(""))
|
||||
assert.NoError(t, reporter.Close(nil))
|
||||
}
|
||||
return reporter, client, close
|
||||
}
|
||||
|
@ -407,3 +408,92 @@ func TestReporterReportLog(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReporterClose(t *testing.T) {
|
||||
mockReporterCloser := func(message *string, result *runnerv1.Result) *Reporter {
|
||||
reporter, client, _ := mockReporter(t)
|
||||
if message != nil {
|
||||
client.On("UpdateLog", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||
t.Logf("UpdateLogRequest: %s", req.Msg.String())
|
||||
assert.Equal(t, (*message)+"\n", rowsToString(req.Msg.Rows))
|
||||
resp := &runnerv1.UpdateLogResponse{
|
||||
AckIndex: req.Msg.Index + 1,
|
||||
}
|
||||
t.Logf("UpdateLogResponse: %s", resp.String())
|
||||
return connect_go.NewResponse(resp), nil
|
||||
})
|
||||
}
|
||||
|
||||
if result != nil {
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
t.Logf("Received UpdateTask: %s", req.Msg.String())
|
||||
assert.Equal(t, result.String(), req.Msg.State.Result.String())
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
})
|
||||
}
|
||||
return reporter
|
||||
}
|
||||
|
||||
for _, testCase := range []struct {
|
||||
name string
|
||||
err error
|
||||
expectedMessage string
|
||||
result runnerv1.Result
|
||||
expectedResult runnerv1.Result
|
||||
}{
|
||||
{
|
||||
name: "ResultSuccessAndNilErrorIsResultSuccess",
|
||||
err: nil,
|
||||
expectedMessage: "",
|
||||
result: runnerv1.Result_RESULT_SUCCESS,
|
||||
expectedResult: runnerv1.Result_RESULT_SUCCESS,
|
||||
},
|
||||
{
|
||||
name: "ResultUnspecifiedAndErrorIsResultFailure",
|
||||
err: errors.New("ERROR_MESSAGE"),
|
||||
expectedMessage: "ERROR_MESSAGE",
|
||||
result: runnerv1.Result_RESULT_UNSPECIFIED,
|
||||
expectedResult: runnerv1.Result_RESULT_FAILURE,
|
||||
},
|
||||
{
|
||||
name: "ResultUnspecifiedAndNilErrorIsResultFailure",
|
||||
err: nil,
|
||||
expectedMessage: closeCancelledMessage,
|
||||
result: runnerv1.Result_RESULT_UNSPECIFIED,
|
||||
expectedResult: runnerv1.Result_RESULT_FAILURE,
|
||||
},
|
||||
{
|
||||
name: "ResultSuccessAndErrorIsResultFailure",
|
||||
err: errors.New("ERROR_MESSAGE"),
|
||||
expectedMessage: "ERROR_MESSAGE",
|
||||
result: runnerv1.Result_RESULT_SUCCESS,
|
||||
expectedResult: runnerv1.Result_RESULT_FAILURE,
|
||||
},
|
||||
{
|
||||
name: "Timeout",
|
||||
err: context.DeadlineExceeded,
|
||||
expectedMessage: closeTimeoutMessage,
|
||||
result: runnerv1.Result_RESULT_UNSPECIFIED,
|
||||
expectedResult: runnerv1.Result_RESULT_CANCELLED,
|
||||
},
|
||||
} {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
var message *string
|
||||
if testCase.expectedMessage != "" {
|
||||
message = &testCase.expectedMessage
|
||||
}
|
||||
reporter := mockReporterCloser(message, &testCase.expectedResult)
|
||||
|
||||
// cancel() verifies Close can operate after the context is cancelled
|
||||
// because it uses the daemon context instead
|
||||
reporter.cancel()
|
||||
reporter.state.Result = testCase.result
|
||||
reporter.state.Steps = []*runnerv1.StepState{
|
||||
{
|
||||
Result: runnerv1.Result_RESULT_UNSPECIFIED,
|
||||
},
|
||||
}
|
||||
require.NoError(t, reporter.Close(testCase.err))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue