2023-04-04 21:32:04 +08:00
|
|
|
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
|
|
|
|
package poll
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
2023-07-24 04:28:44 +00:00
|
|
|
"fmt"
|
2023-04-04 21:32:04 +08:00
|
|
|
"sync"
|
2023-07-25 03:25:50 +00:00
|
|
|
"sync/atomic"
|
2023-04-04 21:32:04 +08:00
|
|
|
|
2025-07-03 16:55:53 +00:00
|
|
|
runnerv1 "code.forgejo.org/forgejo/actions-proto/runner/v1"
|
2024-07-31 00:02:21 -04:00
|
|
|
"connectrpc.com/connect"
|
2023-04-04 21:32:04 +08:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"golang.org/x/time/rate"
|
|
|
|
|
2025-09-05 07:29:38 +00:00
|
|
|
"code.forgejo.org/forgejo/runner/v11/internal/app/run"
|
|
|
|
"code.forgejo.org/forgejo/runner/v11/internal/pkg/client"
|
|
|
|
"code.forgejo.org/forgejo/runner/v11/internal/pkg/config"
|
2023-04-04 21:32:04 +08:00
|
|
|
)
|
|
|
|
|
2024-06-06 11:40:31 +02:00
|
|
|
const PollerID = "PollerID"
|
|
|
|
|
2025-09-07 13:52:31 -06:00
|
|
|
//go:generate mockery --name Poller
|
2024-06-06 11:40:31 +02:00
|
|
|
type Poller interface {
|
|
|
|
Poll()
|
|
|
|
Shutdown(ctx context.Context) error
|
|
|
|
}
|
|
|
|
|
|
|
|
type poller struct {
|
2023-07-25 03:25:50 +00:00
|
|
|
client client.Client
|
2024-06-06 11:40:31 +02:00
|
|
|
runner run.RunnerInterface
|
2023-07-25 03:25:50 +00:00
|
|
|
cfg *config.Config
|
|
|
|
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
2024-06-06 11:40:31 +02:00
|
|
|
|
|
|
|
pollingCtx context.Context
|
|
|
|
shutdownPolling context.CancelFunc
|
|
|
|
|
|
|
|
jobsCtx context.Context
|
|
|
|
shutdownJobs context.CancelFunc
|
|
|
|
|
|
|
|
done chan any
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|
|
|
|
|
2025-08-16 19:13:32 +02:00
|
|
|
func New(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
|
|
|
|
return (&poller{}).init(ctx, cfg, client, runner)
|
2024-06-06 11:40:31 +02:00
|
|
|
}
|
|
|
|
|
2025-08-16 19:13:32 +02:00
|
|
|
func (p *poller) init(ctx context.Context, cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
|
|
|
|
pollingCtx, shutdownPolling := context.WithCancel(ctx)
|
2024-06-06 11:40:31 +02:00
|
|
|
|
2025-08-16 19:13:32 +02:00
|
|
|
jobsCtx, shutdownJobs := context.WithCancel(ctx)
|
2024-06-06 11:40:31 +02:00
|
|
|
|
|
|
|
done := make(chan any)
|
|
|
|
|
|
|
|
p.client = client
|
|
|
|
p.runner = runner
|
|
|
|
p.cfg = cfg
|
|
|
|
|
|
|
|
p.pollingCtx = pollingCtx
|
|
|
|
p.shutdownPolling = shutdownPolling
|
|
|
|
|
|
|
|
p.jobsCtx = jobsCtx
|
|
|
|
p.shutdownJobs = shutdownJobs
|
|
|
|
p.done = done
|
|
|
|
|
|
|
|
return p
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|
|
|
|
|
2024-06-06 11:40:31 +02:00
|
|
|
func (p *poller) Poll() {
|
2023-04-06 10:57:36 +08:00
|
|
|
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
2023-04-04 21:32:04 +08:00
|
|
|
wg := &sync.WaitGroup{}
|
2023-04-06 10:57:36 +08:00
|
|
|
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
2023-04-04 21:32:04 +08:00
|
|
|
wg.Add(1)
|
2024-06-06 11:40:31 +02:00
|
|
|
go p.poll(i, wg, limiter)
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|
|
|
|
wg.Wait()
|
2024-06-06 11:40:31 +02:00
|
|
|
|
|
|
|
// signal the poller is finished
|
|
|
|
close(p.done)
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|
|
|
|
|
2024-06-06 11:40:31 +02:00
|
|
|
func (p *poller) Shutdown(ctx context.Context) error {
|
|
|
|
p.shutdownPolling()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-p.done:
|
|
|
|
log.Trace("all jobs are complete")
|
|
|
|
return nil
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
fix: improve logging to diagnose mystery job terminations (#1048)
Additional logging to support #1044.
Manual testing only. Cases tested:
Cancel a job from Forgejo UI; this seems like the most likely missing piece in #1044 as two jobs were simultaneously marked as "Failed". There are codepaths in Forgejo that can set this state to both cancelled and failed, but the runner didn't provide log output indicating that's why a job was stopping:
```
time="2025-10-02T13:22:53-06:00" level=info msg="UpdateTask returned task result RESULT_CANCELLED for a task that was in local state RESULT_UNSPECIFIED - beginning local task termination" func="[ReportState]" file="[reporter.go:410]"
```
Host-based executor hits step timeout in exec, or, is cancelled. This occurred but only logged the `err` from `exec`, not the context error indicating whether it was a timeout or a cancellation:
```
[Test Action/job1] this step has been cancelled: ctx: context deadline exceeded, exec: RUN signal: killed
[Test Action/job1] this step has been cancelled: ctx: context canceled, exec: RUN signal: killed
```
Unable to `ReportState` due to Forgejo inaccessible. If the runner isn't able to update state to Forgejo a job could be considered a zombie; this would trigger one of the codepaths where the job would be marked as failed. If connectivity was later restored, then the runner could identify it was marked as failed and cancel the job context. (This combination doesn't seem likely, but, I think it's reasonable to consider these failures as warnings because there may be unexpected errors here that we're not aware of).
```
time="2025-10-02T13:27:19-06:00" level=warning msg="ReportState error: unavailable: 502 Bad Gateway" func="[RunDaemon]" file="[reporter.go:207]"
```
Runner shutdown logging; just changed up to `Info` level:
```
time="2025-10-02T13:31:36-06:00" level=info msg="forcing the jobs to shutdown" func="[Shutdown]" file="[poller.go:93]"
[Test Action/job1] ❌ Failure - Main sleep 120
[Test Action/job1] this step has been cancelled: ctx: context canceled, exec: RUN signal: killed
```
<!--start release-notes-assistant-->
<!--URL:https://code.forgejo.org/forgejo/runner-->
- bug fixes
- [PR](https://code.forgejo.org/forgejo/runner/pulls/1048): <!--number 1048 --><!--line 0 --><!--description Zml4OiBpbXByb3ZlIGxvZ2dpbmcgdG8gZGlhZ25vc2UgbXlzdGVyeSBqb2IgdGVybWluYXRpb25z-->fix: improve logging to diagnose mystery job terminations<!--description-->
<!--end release-notes-assistant-->
Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/1048
Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
2025-10-02 22:43:50 +00:00
|
|
|
log.Info("forcing the jobs to shutdown")
|
2024-06-06 11:40:31 +02:00
|
|
|
p.shutdownJobs()
|
|
|
|
<-p.done
|
fix: improve logging to diagnose mystery job terminations (#1048)
Additional logging to support #1044.
Manual testing only. Cases tested:
Cancel a job from Forgejo UI; this seems like the most likely missing piece in #1044 as two jobs were simultaneously marked as "Failed". There are codepaths in Forgejo that can set this state to both cancelled and failed, but the runner didn't provide log output indicating that's why a job was stopping:
```
time="2025-10-02T13:22:53-06:00" level=info msg="UpdateTask returned task result RESULT_CANCELLED for a task that was in local state RESULT_UNSPECIFIED - beginning local task termination" func="[ReportState]" file="[reporter.go:410]"
```
Host-based executor hits step timeout in exec, or, is cancelled. This occurred but only logged the `err` from `exec`, not the context error indicating whether it was a timeout or a cancellation:
```
[Test Action/job1] this step has been cancelled: ctx: context deadline exceeded, exec: RUN signal: killed
[Test Action/job1] this step has been cancelled: ctx: context canceled, exec: RUN signal: killed
```
Unable to `ReportState` due to Forgejo inaccessible. If the runner isn't able to update state to Forgejo a job could be considered a zombie; this would trigger one of the codepaths where the job would be marked as failed. If connectivity was later restored, then the runner could identify it was marked as failed and cancel the job context. (This combination doesn't seem likely, but, I think it's reasonable to consider these failures as warnings because there may be unexpected errors here that we're not aware of).
```
time="2025-10-02T13:27:19-06:00" level=warning msg="ReportState error: unavailable: 502 Bad Gateway" func="[RunDaemon]" file="[reporter.go:207]"
```
Runner shutdown logging; just changed up to `Info` level:
```
time="2025-10-02T13:31:36-06:00" level=info msg="forcing the jobs to shutdown" func="[Shutdown]" file="[poller.go:93]"
[Test Action/job1] ❌ Failure - Main sleep 120
[Test Action/job1] this step has been cancelled: ctx: context canceled, exec: RUN signal: killed
```
<!--start release-notes-assistant-->
<!--URL:https://code.forgejo.org/forgejo/runner-->
- bug fixes
- [PR](https://code.forgejo.org/forgejo/runner/pulls/1048): <!--number 1048 --><!--line 0 --><!--description Zml4OiBpbXByb3ZlIGxvZ2dpbmcgdG8gZGlhZ25vc2UgbXlzdGVyeSBqb2IgdGVybWluYXRpb25z-->fix: improve logging to diagnose mystery job terminations<!--description-->
<!--end release-notes-assistant-->
Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/1048
Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
2025-10-02 22:43:50 +00:00
|
|
|
log.Info("all jobs have been shutdown")
|
2024-06-06 11:40:31 +02:00
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *poller) poll(id int, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
|
|
|
log.Infof("[poller %d] launched", id)
|
2023-04-04 21:32:04 +08:00
|
|
|
defer wg.Done()
|
|
|
|
for {
|
2024-06-06 11:40:31 +02:00
|
|
|
if err := limiter.Wait(p.pollingCtx); err != nil {
|
|
|
|
log.Infof("[poller %d] shutdown", id)
|
2023-07-24 07:07:53 +00:00
|
|
|
return
|
|
|
|
}
|
2024-06-06 11:40:31 +02:00
|
|
|
task, ok := p.fetchTask(p.pollingCtx)
|
2023-07-24 07:07:53 +00:00
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2024-06-06 11:40:31 +02:00
|
|
|
p.runTaskWithRecover(p.jobsCtx, task)
|
2023-07-24 04:28:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-06 11:40:31 +02:00
|
|
|
func (p *poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
|
2023-07-24 04:28:44 +00:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
err := fmt.Errorf("panic: %v", r)
|
2023-07-24 07:07:53 +00:00
|
|
|
log.WithError(err).Error("panic in runTaskWithRecover")
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|
2023-07-24 04:28:44 +00:00
|
|
|
}()
|
|
|
|
|
|
|
|
if err := p.runner.Run(ctx, task); err != nil {
|
|
|
|
log.WithError(err).Error("failed to run task")
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-06 11:40:31 +02:00
|
|
|
func (p *poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
2023-04-06 10:57:36 +08:00
|
|
|
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
|
2023-04-04 21:32:04 +08:00
|
|
|
defer cancel()
|
|
|
|
|
2023-07-25 03:25:50 +00:00
|
|
|
// Load the version value that was in the cache when the request was sent.
|
|
|
|
v := p.tasksVersion.Load()
|
|
|
|
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
|
|
|
|
TasksVersion: v,
|
|
|
|
}))
|
2023-04-04 21:32:04 +08:00
|
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
2024-06-06 11:40:31 +02:00
|
|
|
log.Trace("deadline exceeded")
|
2023-04-04 21:32:04 +08:00
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
2024-06-06 11:40:31 +02:00
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
log.WithError(err).Debugf("shutdown, fetch task canceled")
|
|
|
|
} else {
|
|
|
|
log.WithError(err).Error("failed to fetch task")
|
|
|
|
}
|
2023-04-04 21:32:04 +08:00
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
2023-07-25 03:25:50 +00:00
|
|
|
if resp == nil || resp.Msg == nil {
|
2023-04-04 21:32:04 +08:00
|
|
|
return nil, false
|
|
|
|
}
|
2023-07-25 03:25:50 +00:00
|
|
|
|
2025-07-11 07:10:41 +00:00
|
|
|
if resp.Msg.GetTasksVersion() > v {
|
|
|
|
p.tasksVersion.CompareAndSwap(v, resp.Msg.GetTasksVersion())
|
2023-07-25 03:25:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if resp.Msg.Task == nil {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
|
|
|
// got a task, set `tasksVersion` to zero to focre query db in next request.
|
2025-07-11 07:10:41 +00:00
|
|
|
p.tasksVersion.CompareAndSwap(resp.Msg.GetTasksVersion(), 0)
|
2023-07-25 03:25:50 +00:00
|
|
|
|
2025-07-11 07:10:41 +00:00
|
|
|
return resp.Msg.GetTask(), true
|
2023-04-04 21:32:04 +08:00
|
|
|
}
|