mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-15 18:57:01 +00:00
fix: if the Forgejo instance failed a job, cancel it (#986)
- when the Forgejo instance stops a job because it runs for too long, it will be reported back as failed and it must be stopped, exactly as if it was canceled from the web UI by the user. - add test coverage for Reporter.ReportState - extract Reporter.Outputs output of Reporter.ReportState Resolves https://code.forgejo.org/forgejo/runner/issues/980 --- Manual testing with a locally built Forgejo instance built to check every minute instead of every 30 minutes, with ```ini [actions] ENABLED = true ENDLESS_TASK_TIMEOUT = 1m ``` and the following workflow: ```yaml on: [push] jobs: test: runs-on: docker steps: - run: sleep 60 - run: sleep 61 - run: sleep 62 ``` waiting in front of the screen and watching the logs of the runner, I see it stops before `sleep 62` is complete. <!--start release-notes-assistant--> <!--URL:https://code.forgejo.org/forgejo/runner--> - bug fixes - [PR](https://code.forgejo.org/forgejo/runner/pulls/986): <!--number 986 --><!--line 0 --><!--description Zml4OiBpZiB0aGUgRm9yZ2VqbyBpbnN0YW5jZSBmYWlsZWQgYSBqb2IsIGNhbmNlbCBpdA==-->fix: if the Forgejo instance failed a job, cancel it<!--description--> <!--end release-notes-assistant--> Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/986 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.code.forgejo.org> Co-authored-by: Earl Warren <contact@earl-warren.org> Co-committed-by: Earl Warren <contact@earl-warren.org>
This commit is contained in:
parent
6f212fbb5a
commit
f728f8a923
2 changed files with 120 additions and 8 deletions
|
@ -209,6 +209,17 @@ func (r *Reporter) logf(format string, a ...any) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) GetOutputs() map[string]string {
|
||||
outputs := make(map[string]string)
|
||||
r.outputs.Range(func(k, v any) bool {
|
||||
if val, ok := v.(string); ok {
|
||||
outputs[k.(string)] = val
|
||||
}
|
||||
return true
|
||||
})
|
||||
return outputs
|
||||
}
|
||||
|
||||
func (r *Reporter) SetOutputs(outputs map[string]string) error {
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
@ -360,13 +371,7 @@ func (r *Reporter) ReportState() error {
|
|||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
outputs := make(map[string]string)
|
||||
r.outputs.Range(func(k, v any) bool {
|
||||
if val, ok := v.(string); ok {
|
||||
outputs[k.(string)] = val
|
||||
}
|
||||
return true
|
||||
})
|
||||
outputs := r.GetOutputs()
|
||||
|
||||
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
||||
State: state,
|
||||
|
@ -380,7 +385,8 @@ func (r *Reporter) ReportState() error {
|
|||
r.outputs.Store(k, struct{}{})
|
||||
}
|
||||
|
||||
if resp.Msg.GetState().GetResult() == runnerv1.Result_RESULT_CANCELLED {
|
||||
switch resp.Msg.GetState().GetResult() {
|
||||
case runnerv1.Result_RESULT_CANCELLED, runnerv1.Result_RESULT_FAILURE:
|
||||
r.cancel()
|
||||
}
|
||||
|
||||
|
|
|
@ -286,6 +286,112 @@ func TestReporter_Fire(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestReporterReportState(t *testing.T) {
|
||||
for _, testCase := range []struct {
|
||||
name string
|
||||
fixture func(t *testing.T, reporter *Reporter, client *mocks.Client)
|
||||
assert func(t *testing.T, reporter *Reporter, ctx context.Context, err error)
|
||||
}{
|
||||
{
|
||||
name: "PartialOutputs",
|
||||
fixture: func(t *testing.T, reporter *Reporter, client *mocks.Client) {
|
||||
t.Helper()
|
||||
outputKey1 := "KEY1"
|
||||
outputValue1 := "VALUE1"
|
||||
outputKey2 := "KEY2"
|
||||
outputValue2 := "VALUE2"
|
||||
reporter.SetOutputs(map[string]string{
|
||||
outputKey1: outputValue1,
|
||||
outputKey2: outputValue2,
|
||||
})
|
||||
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
t.Logf("Received UpdateTask: %s", req.Msg.String())
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{
|
||||
SentOutputs: []string{outputKey1},
|
||||
}), nil
|
||||
})
|
||||
},
|
||||
assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) {
|
||||
t.Helper()
|
||||
require.ErrorContains(t, err, "not all logs are submitted 1 remain")
|
||||
outputs := reporter.GetOutputs()
|
||||
assert.Equal(t, map[string]string{
|
||||
"KEY2": "VALUE2",
|
||||
}, outputs)
|
||||
assert.NoError(t, ctx.Err())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AllDone",
|
||||
fixture: func(t *testing.T, reporter *Reporter, client *mocks.Client) {
|
||||
t.Helper()
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
t.Logf("Received UpdateTask: %s", req.Msg.String())
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
})
|
||||
},
|
||||
assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) {
|
||||
t.Helper()
|
||||
require.NoError(t, err)
|
||||
assert.NoError(t, ctx.Err())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Canceled",
|
||||
fixture: func(t *testing.T, reporter *Reporter, client *mocks.Client) {
|
||||
t.Helper()
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
t.Logf("Received UpdateTask: %s", req.Msg.String())
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{
|
||||
State: &runnerv1.TaskState{
|
||||
Result: runnerv1.Result_RESULT_CANCELLED,
|
||||
},
|
||||
}), nil
|
||||
})
|
||||
},
|
||||
assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) {
|
||||
t.Helper()
|
||||
require.NoError(t, err)
|
||||
assert.ErrorIs(t, ctx.Err(), context.Canceled)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Failed",
|
||||
fixture: func(t *testing.T, reporter *Reporter, client *mocks.Client) {
|
||||
t.Helper()
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
t.Logf("Received UpdateTask: %s", req.Msg.String())
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{
|
||||
State: &runnerv1.TaskState{
|
||||
Result: runnerv1.Result_RESULT_FAILURE,
|
||||
},
|
||||
}), nil
|
||||
})
|
||||
},
|
||||
assert: func(t *testing.T, reporter *Reporter, ctx context.Context, err error) {
|
||||
t.Helper()
|
||||
require.NoError(t, err)
|
||||
assert.ErrorIs(t, ctx.Err(), context.Canceled)
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
client := mocks.NewClient(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
reporter := NewReporter(common.WithDaemonContext(ctx, t.Context()), cancel, client, &runnerv1.Task{
|
||||
Context: taskCtx,
|
||||
}, time.Second)
|
||||
|
||||
testCase.fixture(t, reporter, client)
|
||||
err = reporter.ReportState()
|
||||
testCase.assert(t, reporter, ctx, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReporterReportLogLost(t *testing.T) {
|
||||
reporter, client, _ := mockReporter(t)
|
||||
reporter.logRows = stringToRows("A")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue