mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-10-20 19:52:06 +00:00
Additional logging to support #1044. Manual testing only. Cases tested: Cancel a job from Forgejo UI; this seems like the most likely missing piece in #1044 as two jobs were simultaneously marked as "Failed". There are codepaths in Forgejo that can set this state to both cancelled and failed, but the runner didn't provide log output indicating that's why a job was stopping: ``` time="2025-10-02T13:22:53-06:00" level=info msg="UpdateTask returned task result RESULT_CANCELLED for a task that was in local state RESULT_UNSPECIFIED - beginning local task termination" func="[ReportState]" file="[reporter.go:410]" ``` Host-based executor hits step timeout in exec, or, is cancelled. This occurred but only logged the `err` from `exec`, not the context error indicating whether it was a timeout or a cancellation: ``` [Test Action/job1] this step has been cancelled: ctx: context deadline exceeded, exec: RUN signal: killed [Test Action/job1] this step has been cancelled: ctx: context canceled, exec: RUN signal: killed ``` Unable to `ReportState` due to Forgejo inaccessible. If the runner isn't able to update state to Forgejo a job could be considered a zombie; this would trigger one of the codepaths where the job would be marked as failed. If connectivity was later restored, then the runner could identify it was marked as failed and cancel the job context. (This combination doesn't seem likely, but, I think it's reasonable to consider these failures as warnings because there may be unexpected errors here that we're not aware of). ``` time="2025-10-02T13:27:19-06:00" level=warning msg="ReportState error: unavailable: 502 Bad Gateway" func="[RunDaemon]" file="[reporter.go:207]" ``` Runner shutdown logging; just changed up to `Info` level: ``` time="2025-10-02T13:31:36-06:00" level=info msg="forcing the jobs to shutdown" func="[Shutdown]" file="[poller.go:93]" [Test Action/job1] ❌ Failure - Main sleep 120 [Test Action/job1] this step has been cancelled: ctx: context canceled, exec: RUN signal: killed ``` <!--start release-notes-assistant--> <!--URL:https://code.forgejo.org/forgejo/runner--> - bug fixes - [PR](https://code.forgejo.org/forgejo/runner/pulls/1048): <!--number 1048 --><!--line 0 --><!--description Zml4OiBpbXByb3ZlIGxvZ2dpbmcgdG8gZGlhZ25vc2UgbXlzdGVyeSBqb2IgdGVybWluYXRpb25z-->fix: improve logging to diagnose mystery job terminations<!--description--> <!--end release-notes-assistant--> Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/1048 Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
521 lines
13 KiB
Go
521 lines
13 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package report
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
runnerv1 "code.forgejo.org/forgejo/actions-proto/runner/v1"
|
|
"code.forgejo.org/forgejo/runner/v11/act/runner"
|
|
"connectrpc.com/connect"
|
|
retry "github.com/avast/retry-go/v4"
|
|
log "github.com/sirupsen/logrus"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
|
|
"code.forgejo.org/forgejo/runner/v11/internal/pkg/client"
|
|
"code.forgejo.org/forgejo/runner/v11/internal/pkg/common"
|
|
)
|
|
|
|
var (
|
|
outputKeyMaxLength = 255
|
|
outputValueMaxLength = 1024 * 1024
|
|
)
|
|
|
|
type Reporter struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
closed bool
|
|
client client.Client
|
|
clientM sync.Mutex
|
|
|
|
logOffset int
|
|
logRows []*runnerv1.LogRow
|
|
masker *masker
|
|
reportInterval time.Duration
|
|
|
|
state *runnerv1.TaskState
|
|
stateMu sync.RWMutex
|
|
outputs sync.Map
|
|
|
|
debugOutputEnabled bool
|
|
stopCommandEndToken string
|
|
issuedLocalCancel bool
|
|
}
|
|
|
|
func NewReporter(ctx context.Context, cancel context.CancelFunc, c client.Client, task *runnerv1.Task, reportInterval time.Duration) *Reporter {
|
|
masker := newMasker()
|
|
if v := task.Context.Fields["token"].GetStringValue(); v != "" {
|
|
masker.add(v)
|
|
}
|
|
if v := client.BackwardCompatibleContext(task, "runtime_token"); v != "" {
|
|
masker.add(v)
|
|
}
|
|
for _, v := range task.Secrets {
|
|
masker.add(v)
|
|
}
|
|
|
|
rv := &Reporter{
|
|
// ctx & cancel are related: the daemon context allows the reporter
|
|
// to continue to operate even after the context is canceled, to
|
|
// conclude the converation
|
|
ctx: common.DaemonContext(ctx),
|
|
cancel: cancel,
|
|
client: c,
|
|
masker: masker,
|
|
reportInterval: reportInterval,
|
|
state: &runnerv1.TaskState{
|
|
Id: task.Id,
|
|
},
|
|
}
|
|
|
|
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
|
|
rv.debugOutputEnabled = true
|
|
}
|
|
|
|
return rv
|
|
}
|
|
|
|
func (r *Reporter) ResetSteps(l int) {
|
|
r.stateMu.Lock()
|
|
defer r.stateMu.Unlock()
|
|
for i := range l {
|
|
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
|
|
Id: int64(i),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Levels() []log.Level {
|
|
return log.AllLevels
|
|
}
|
|
|
|
func appendIfNotNil[T any](s []*T, v *T) []*T {
|
|
if v != nil {
|
|
return append(s, v)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (r *Reporter) Fire(entry *log.Entry) error {
|
|
r.stateMu.Lock()
|
|
defer r.stateMu.Unlock()
|
|
|
|
log.WithFields(entry.Data).Trace(entry.Message)
|
|
|
|
timestamp := entry.Time
|
|
if r.state.StartedAt == nil {
|
|
r.state.StartedAt = timestamppb.New(timestamp)
|
|
}
|
|
|
|
stage := entry.Data["stage"]
|
|
|
|
if stage != "Main" {
|
|
if v, ok := entry.Data["jobResult"]; ok {
|
|
if jobResult, ok := r.parseResult(v); ok {
|
|
r.state.Result = jobResult
|
|
r.state.StoppedAt = timestamppb.New(timestamp)
|
|
for _, s := range r.state.Steps {
|
|
if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
|
s.Result = runnerv1.Result_RESULT_CANCELLED
|
|
if jobResult == runnerv1.Result_RESULT_SKIPPED {
|
|
s.Result = runnerv1.Result_RESULT_SKIPPED
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if r.state.Result == runnerv1.Result_RESULT_SUCCESS {
|
|
if v, ok := entry.Data["jobOutputs"]; ok {
|
|
_ = r.setOutputs(v.(map[string]string))
|
|
} else {
|
|
log.Panicf("received log entry with successful jobResult, but without jobOutputs -- outputs will be corrupted for this job")
|
|
}
|
|
}
|
|
}
|
|
if !r.duringSteps() {
|
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var step *runnerv1.StepState
|
|
if v, ok := entry.Data["stepNumber"]; ok {
|
|
if v, ok := v.(int); ok && len(r.state.Steps) > v {
|
|
step = r.state.Steps[v]
|
|
}
|
|
}
|
|
if step == nil {
|
|
if !r.duringSteps() {
|
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if step.StartedAt == nil {
|
|
step.StartedAt = timestamppb.New(timestamp)
|
|
}
|
|
if v, ok := entry.Data["raw_output"]; ok {
|
|
if rawOutput, ok := v.(bool); ok && rawOutput {
|
|
if row := r.parseLogRow(entry); row != nil {
|
|
if step.LogLength == 0 {
|
|
step.LogIndex = int64(r.logOffset + len(r.logRows))
|
|
}
|
|
step.LogLength++
|
|
r.logRows = append(r.logRows, row)
|
|
}
|
|
}
|
|
} else if !r.duringSteps() {
|
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
|
}
|
|
if v := runner.GetOuterStepResult(entry); v != nil {
|
|
if stepResult, ok := r.parseResult(v); ok {
|
|
if step.LogLength == 0 {
|
|
step.LogIndex = int64(r.logOffset + len(r.logRows))
|
|
}
|
|
step.Result = stepResult
|
|
step.StoppedAt = timestamppb.New(timestamp)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Reporter) RunDaemon() {
|
|
if r.closed {
|
|
return
|
|
}
|
|
if r.ctx.Err() != nil {
|
|
// This shouldn't happen because DaemonContext is used for `r.ctx` which should outlive any running job.
|
|
log.Warnf("Terminating RunDaemon on an active job due to error: %v", r.ctx.Err())
|
|
return
|
|
}
|
|
|
|
err := r.ReportLog(false)
|
|
if err != nil {
|
|
log.Warnf("ReportLog error: %v", err)
|
|
}
|
|
err = r.ReportState()
|
|
if err != nil {
|
|
log.Warnf("ReportState error: %v", err)
|
|
}
|
|
|
|
time.AfterFunc(r.reportInterval, r.RunDaemon)
|
|
}
|
|
|
|
func (r *Reporter) Logf(format string, a ...any) {
|
|
r.stateMu.Lock()
|
|
defer r.stateMu.Unlock()
|
|
|
|
r.logf(format, a...)
|
|
}
|
|
|
|
func (r *Reporter) logf(format string, a ...any) {
|
|
if !r.duringSteps() {
|
|
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
|
Time: timestamppb.Now(),
|
|
Content: fmt.Sprintf(format, a...),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) cloneOutputs() map[string]string {
|
|
outputs := make(map[string]string)
|
|
r.outputs.Range(func(k, v any) bool {
|
|
if val, ok := v.(string); ok {
|
|
outputs[k.(string)] = val
|
|
}
|
|
return true
|
|
})
|
|
return outputs
|
|
}
|
|
|
|
// Errors from setOutputs are logged into the reporter automatically; the `errors` return value is only used for unit
|
|
// tests.
|
|
func (r *Reporter) setOutputs(outputs map[string]string) error {
|
|
var errs []error
|
|
recordError := func(format string, a ...any) {
|
|
r.logf(format, a...)
|
|
errs = append(errs, fmt.Errorf(format, a...))
|
|
}
|
|
for k, v := range outputs {
|
|
if len(k) > outputKeyMaxLength {
|
|
recordError("ignore output because the key is longer than %d: %q", outputKeyMaxLength, k)
|
|
continue
|
|
}
|
|
if l := len(v); l > outputValueMaxLength {
|
|
recordError("ignore output because the length of the value for %q is %d (the maximum is %d)", k, l, outputValueMaxLength)
|
|
continue
|
|
}
|
|
if _, ok := r.outputs.Load(k); ok {
|
|
recordError("ignore output because a value already exists for the key %q", k)
|
|
continue
|
|
}
|
|
r.outputs.Store(k, v)
|
|
}
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
const (
|
|
closeTimeoutMessage = "The runner cancelled the job because it exceeds the maximum run time"
|
|
closeCancelledMessage = "Cancelled"
|
|
)
|
|
|
|
func (r *Reporter) Close(runErr error) error {
|
|
r.closed = true
|
|
|
|
setStepCancel := func() {
|
|
for _, v := range r.state.Steps {
|
|
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
|
v.Result = runnerv1.Result_RESULT_CANCELLED
|
|
}
|
|
}
|
|
}
|
|
|
|
r.stateMu.Lock()
|
|
var lastWords string
|
|
if errors.Is(runErr, context.DeadlineExceeded) {
|
|
lastWords = closeTimeoutMessage
|
|
r.state.Result = runnerv1.Result_RESULT_CANCELLED
|
|
setStepCancel()
|
|
} else if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
|
if runErr == nil {
|
|
lastWords = closeCancelledMessage
|
|
} else {
|
|
lastWords = runErr.Error()
|
|
}
|
|
r.state.Result = runnerv1.Result_RESULT_FAILURE
|
|
setStepCancel()
|
|
} else if runErr != nil {
|
|
lastWords = runErr.Error()
|
|
}
|
|
|
|
if lastWords != "" {
|
|
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
|
Time: timestamppb.Now(),
|
|
Content: lastWords,
|
|
})
|
|
}
|
|
r.state.StoppedAt = timestamppb.Now()
|
|
r.stateMu.Unlock()
|
|
|
|
return retry.Do(func() error {
|
|
if err := r.ReportLog(true); err != nil {
|
|
return err
|
|
}
|
|
return r.ReportState()
|
|
}, retry.Context(r.ctx))
|
|
}
|
|
|
|
type ErrRetry struct {
|
|
message string
|
|
}
|
|
|
|
func (err ErrRetry) Error() string {
|
|
return err.message
|
|
}
|
|
|
|
func (err ErrRetry) Is(target error) bool {
|
|
_, ok := target.(*ErrRetry)
|
|
return ok
|
|
}
|
|
|
|
const (
|
|
errRetryNeedMoreRows = "need more rows to figure out if multiline secrets must be masked"
|
|
errRetrySendAll = "not all logs are submitted %d remain"
|
|
)
|
|
|
|
func NewErrRetry(message string, args ...any) error {
|
|
return &ErrRetry{message: fmt.Sprintf(message, args...)}
|
|
}
|
|
|
|
func (r *Reporter) ReportLog(noMore bool) error {
|
|
r.clientM.Lock()
|
|
defer r.clientM.Unlock()
|
|
|
|
r.stateMu.RLock()
|
|
rows := r.logRows
|
|
r.stateMu.RUnlock()
|
|
|
|
if len(rows) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if needMore := r.masker.replace(rows, noMore); needMore {
|
|
return NewErrRetry(errRetryNeedMoreRows)
|
|
}
|
|
|
|
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
|
TaskId: r.state.Id,
|
|
Index: int64(r.logOffset),
|
|
Rows: rows,
|
|
NoMore: noMore,
|
|
}))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ack := int(resp.Msg.GetAckIndex())
|
|
if ack < r.logOffset {
|
|
return fmt.Errorf("submitted logs are lost %d < %d", ack, r.logOffset)
|
|
}
|
|
|
|
r.stateMu.Lock()
|
|
r.logRows = r.logRows[ack-r.logOffset:]
|
|
r.logOffset = ack
|
|
r.stateMu.Unlock()
|
|
|
|
if noMore && len(r.logRows) > 0 {
|
|
return NewErrRetry(errRetrySendAll, len(r.logRows))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Reporter) ReportState() error {
|
|
r.clientM.Lock()
|
|
defer r.clientM.Unlock()
|
|
|
|
r.stateMu.RLock()
|
|
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
|
outputs := r.cloneOutputs()
|
|
r.stateMu.RUnlock()
|
|
|
|
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
|
State: state,
|
|
Outputs: outputs,
|
|
}))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, k := range resp.Msg.GetSentOutputs() {
|
|
r.outputs.Store(k, struct{}{})
|
|
}
|
|
|
|
localResultState := state.GetResult()
|
|
remoteResultState := resp.Msg.GetState().GetResult()
|
|
switch remoteResultState {
|
|
case runnerv1.Result_RESULT_CANCELLED, runnerv1.Result_RESULT_FAILURE:
|
|
// issuedLocalCancel is just used to deduplicate this log message if our local state doesn't catch up with our
|
|
// remote state as quickly as the report-interval, which would cause this message to repeat in the logs.
|
|
if !r.issuedLocalCancel && remoteResultState != localResultState {
|
|
log.Infof("UpdateTask returned task result %v for a task that was in local state %v - beginning local task termination",
|
|
remoteResultState, localResultState)
|
|
r.issuedLocalCancel = true
|
|
}
|
|
r.cancel()
|
|
}
|
|
|
|
var noSent []string
|
|
r.outputs.Range(func(k, v any) bool {
|
|
if _, ok := v.(string); ok {
|
|
noSent = append(noSent, k.(string))
|
|
}
|
|
return true
|
|
})
|
|
if len(noSent) > 0 {
|
|
return NewErrRetry(errRetrySendAll, len(noSent))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Reporter) duringSteps() bool {
|
|
if steps := r.state.Steps; len(steps) == 0 {
|
|
return false
|
|
} else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {
|
|
return false
|
|
} else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
var stringToResult = map[string]runnerv1.Result{
|
|
"success": runnerv1.Result_RESULT_SUCCESS,
|
|
"failure": runnerv1.Result_RESULT_FAILURE,
|
|
"skipped": runnerv1.Result_RESULT_SKIPPED,
|
|
"cancelled": runnerv1.Result_RESULT_CANCELLED,
|
|
}
|
|
|
|
func (r *Reporter) parseResult(result any) (runnerv1.Result, bool) {
|
|
str := ""
|
|
if v, ok := result.(string); ok { // for jobResult
|
|
str = v
|
|
} else if v, ok := result.(fmt.Stringer); ok { // for stepResult
|
|
str = v.String()
|
|
}
|
|
|
|
ret, ok := stringToResult[str]
|
|
return ret, ok
|
|
}
|
|
|
|
var cmdRegex = regexp.MustCompile(`^::([^ :]+)( .*)?::(.*)$`)
|
|
|
|
func (r *Reporter) handleCommand(originalContent, command, parameters, value string) *string {
|
|
if r.stopCommandEndToken != "" && command != r.stopCommandEndToken {
|
|
return &originalContent
|
|
}
|
|
|
|
switch command {
|
|
case "add-mask":
|
|
r.masker.add(value)
|
|
return nil
|
|
case "debug":
|
|
if r.debugOutputEnabled {
|
|
return &value
|
|
}
|
|
return nil
|
|
|
|
case "notice":
|
|
// Not implemented yet, so just return the original content.
|
|
return &originalContent
|
|
case "warning":
|
|
// Not implemented yet, so just return the original content.
|
|
return &originalContent
|
|
case "error":
|
|
// Not implemented yet, so just return the original content.
|
|
return &originalContent
|
|
case "group":
|
|
// Rewriting into ##[] syntax which the frontend understands
|
|
content := "##[group]" + value
|
|
return &content
|
|
case "endgroup":
|
|
// Ditto
|
|
content := "##[endgroup]"
|
|
return &content
|
|
case "stop-commands":
|
|
r.stopCommandEndToken = value
|
|
return nil
|
|
case r.stopCommandEndToken:
|
|
r.stopCommandEndToken = ""
|
|
return nil
|
|
}
|
|
return &originalContent
|
|
}
|
|
|
|
func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {
|
|
content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' })
|
|
|
|
matches := cmdRegex.FindStringSubmatch(content)
|
|
if matches != nil {
|
|
if output := r.handleCommand(content, matches[1], matches[2], matches[3]); output != nil {
|
|
content = *output
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return &runnerv1.LogRow{
|
|
Time: timestamppb.New(entry.Time),
|
|
Content: strings.ToValidUTF8(content, "?"),
|
|
}
|
|
}
|