mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-15 18:57:01 +00:00
feat: wait for services to be healthy before starting a job (#805)
If a --health-cmd is defined for a container, block until its status is healthy or unhealthy. The timeout is defined by the server internal logic based on associated --health-* defined delays. If it blocks indefinitely, the job timeout will eventually cancel it. While waiting, the simplest solution would be to sleep 1 second until the container is healthy or unhealthy. To minimize log verbosity, the sleep interval is instead set to --health-interval and default to one second if it is not defined. This logic does not apply to host containers as they do not support services. They are assumed to always be healthy. If --health-cmd is set for the container running a job, the first step will start to run without waiting for the container to become healthy. There may be valid use cases for that but they are not the focus of this implementation. <!--start release-notes-assistant--> <!--URL:https://code.forgejo.org/forgejo/runner--> - features - [PR](https://code.forgejo.org/forgejo/runner/pulls/805): <!--number 805 --><!--line 0 --><!--description ZmVhdDogd2FpdCBmb3Igc2VydmljZXMgdG8gYmUgaGVhbHRoeSBiZWZvcmUgc3RhcnRpbmcgYSBqb2I=-->feat: wait for services to be healthy before starting a job<!--description--> <!--end release-notes-assistant--> Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/805 Co-authored-by: Earl Warren <contact@earl-warren.org> Co-committed-by: Earl Warren <contact@earl-warren.org>
This commit is contained in:
parent
8644cc9e07
commit
96891ab314
10 changed files with 240 additions and 2 deletions
|
@ -3,6 +3,7 @@ package container
|
|||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v9/act/common"
|
||||
"github.com/docker/go-connections/nat"
|
||||
|
@ -63,6 +64,7 @@ type Container interface {
|
|||
Remove() common.Executor
|
||||
Close() common.Executor
|
||||
ReplaceLogWriter(io.Writer, io.Writer) (io.Writer, io.Writer)
|
||||
IsHealthy(ctx context.Context) (time.Duration, error)
|
||||
}
|
||||
|
||||
// NewDockerBuildExecutorInput the input for the NewDockerBuildExecutor function
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/semver"
|
||||
"github.com/docker/cli/cli/compose/loader"
|
||||
|
@ -191,6 +192,47 @@ func (cr *containerReference) Remove() common.Executor {
|
|||
).IfNot(common.Dryrun)
|
||||
}
|
||||
|
||||
func (cr *containerReference) inspect(ctx context.Context) (container.InspectResponse, error) {
|
||||
resp, err := cr.cli.ContainerInspect(ctx, cr.id)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("service %v: %s", cr.input.NetworkAliases, err)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (cr *containerReference) IsHealthy(ctx context.Context) (time.Duration, error) {
|
||||
resp, err := cr.inspect(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return cr.isHealthy(ctx, resp)
|
||||
}
|
||||
|
||||
func (cr *containerReference) isHealthy(ctx context.Context, resp container.InspectResponse) (time.Duration, error) {
|
||||
logger := common.Logger(ctx)
|
||||
if resp.Config == nil || resp.Config.Healthcheck == nil || resp.State == nil || resp.State.Health == nil || len(resp.Config.Healthcheck.Test) == 1 && strings.EqualFold(resp.Config.Healthcheck.Test[0], "NONE") {
|
||||
logger.Debugf("no container health check defined, hope for the best")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
switch resp.State.Health.Status {
|
||||
case container.Starting:
|
||||
wait := resp.Config.Healthcheck.Interval
|
||||
if wait <= 0 {
|
||||
wait = time.Second
|
||||
}
|
||||
logger.Infof("service %v: container health check %s (%s) is starting, waiting %v", cr.input.NetworkAliases, cr.id, resp.Config.Image, wait)
|
||||
return wait, nil
|
||||
case container.Healthy:
|
||||
logger.Infof("service %v: container health check %s (%s) is healthy", cr.input.NetworkAliases, cr.id, resp.Config.Image)
|
||||
return 0, nil
|
||||
case container.Unhealthy:
|
||||
return 0, fmt.Errorf("service %v: container health check %s (%s) is not healthy", cr.input.NetworkAliases, cr.id, resp.Config.Image)
|
||||
default:
|
||||
return 0, fmt.Errorf("service %v: unexpected health status %s (%s) %v", cr.input.NetworkAliases, cr.id, resp.Config.Image, resp.State.Health.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func (cr *containerReference) ReplaceLogWriter(stdout, stderr io.Writer) (io.Writer, io.Writer) {
|
||||
out := cr.input.Stdout
|
||||
err := cr.input.Stderr
|
||||
|
|
|
@ -386,3 +386,79 @@ func TestMergeJobOptions(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerRun_isHealthy(t *testing.T) {
|
||||
cr := containerReference{
|
||||
id: "containerid",
|
||||
input: &NewContainerInput{
|
||||
NetworkAliases: []string{"servicename"},
|
||||
},
|
||||
}
|
||||
ctx := context.Background()
|
||||
makeInspectResponse := func(interval time.Duration, status container.HealthStatus, test []string) container.InspectResponse {
|
||||
return container.InspectResponse{
|
||||
Config: &container.Config{
|
||||
Image: "example.com/some/image",
|
||||
Healthcheck: &container.HealthConfig{
|
||||
Interval: interval,
|
||||
Test: test,
|
||||
},
|
||||
},
|
||||
ContainerJSONBase: &container.ContainerJSONBase{
|
||||
State: &container.State{
|
||||
Health: &container.Health{
|
||||
Status: status,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("IncompleteResponseOrNoHealthCheck", func(t *testing.T) {
|
||||
wait, err := cr.isHealthy(ctx, container.InspectResponse{})
|
||||
assert.Zero(t, wait)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// --no-healthcheck translates into a NONE test command
|
||||
resp := makeInspectResponse(0, container.NoHealthcheck, []string{"NONE"})
|
||||
wait, err = cr.isHealthy(ctx, resp)
|
||||
assert.Zero(t, wait)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("StartingUndefinedIntervalIsNotZero", func(t *testing.T) {
|
||||
resp := makeInspectResponse(0, container.Starting, nil)
|
||||
wait, err := cr.isHealthy(ctx, resp)
|
||||
assert.NotZero(t, wait)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("StartingWithInterval", func(t *testing.T) {
|
||||
expectedWait := time.Duration(42)
|
||||
resp := makeInspectResponse(expectedWait, container.Starting, nil)
|
||||
actualWait, err := cr.isHealthy(ctx, resp)
|
||||
assert.Equal(t, expectedWait, actualWait)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Unhealthy", func(t *testing.T) {
|
||||
resp := makeInspectResponse(0, container.Unhealthy, nil)
|
||||
wait, err := cr.isHealthy(ctx, resp)
|
||||
assert.Zero(t, wait)
|
||||
assert.ErrorContains(t, err, "is not healthy")
|
||||
})
|
||||
|
||||
t.Run("Healthy", func(t *testing.T) {
|
||||
resp := makeInspectResponse(0, container.Healthy, nil)
|
||||
wait, err := cr.isHealthy(ctx, resp)
|
||||
assert.Zero(t, wait)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("UnknownStatus", func(t *testing.T) {
|
||||
resp := makeInspectResponse(0, container.NoHealthcheck, nil)
|
||||
wait, err := cr.isHealthy(ctx, resp)
|
||||
assert.Zero(t, wait)
|
||||
assert.ErrorContains(t, err, "unexpected")
|
||||
})
|
||||
}
|
||||
|
|
|
@ -493,6 +493,10 @@ func (e *HostEnvironment) GetRunnerContext(_ context.Context) map[string]interfa
|
|||
}
|
||||
}
|
||||
|
||||
func (e *HostEnvironment) IsHealthy(ctx context.Context) (time.Duration, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (e *HostEnvironment) ReplaceLogWriter(stdout, _ io.Writer) (io.Writer, io.Writer) {
|
||||
org := e.StdOut
|
||||
e.StdOut = stdout
|
||||
|
|
|
@ -610,6 +610,7 @@ func (rc *RunContext) startJobContainer() common.Executor {
|
|||
Mode: 0o666,
|
||||
Body: "",
|
||||
}),
|
||||
rc.waitForServiceContainers(),
|
||||
)(ctx)
|
||||
}
|
||||
}
|
||||
|
@ -744,6 +745,35 @@ func (rc *RunContext) startServiceContainers(_ string) common.Executor {
|
|||
}
|
||||
}
|
||||
|
||||
func waitForServiceContainer(ctx context.Context, c container.ExecutionsEnvironment) error {
|
||||
for {
|
||||
wait, err := c.IsHealthy(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if wait == time.Duration(0) {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-time.After(wait):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rc *RunContext) waitForServiceContainers() common.Executor {
|
||||
return func(ctx context.Context) error {
|
||||
execs := []common.Executor{}
|
||||
for _, c := range rc.ServiceContainers {
|
||||
execs = append(execs, func(ctx context.Context) error {
|
||||
return waitForServiceContainer(ctx, c)
|
||||
})
|
||||
}
|
||||
return common.NewParallelExecutor(len(execs), execs...)(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (rc *RunContext) stopServiceContainers() common.Executor {
|
||||
return func(ctx context.Context) error {
|
||||
execs := []common.Executor{}
|
||||
|
|
|
@ -3,12 +3,14 @@ package runner
|
|||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v9/act/container"
|
||||
"code.forgejo.org/forgejo/runner/v9/act/exprparser"
|
||||
|
@ -18,6 +20,7 @@ import (
|
|||
"github.com/docker/go-connections/nat"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
yaml "gopkg.in/yaml.v3"
|
||||
)
|
||||
|
@ -824,3 +827,44 @@ jobs:
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
type waitForServiceContainerMock struct {
|
||||
mock.Mock
|
||||
container.Container
|
||||
container.LinuxContainerEnvironmentExtensions
|
||||
}
|
||||
|
||||
func (o *waitForServiceContainerMock) IsHealthy(ctx context.Context) (time.Duration, error) {
|
||||
args := o.Called(ctx)
|
||||
return args.Get(0).(time.Duration), args.Error(1)
|
||||
}
|
||||
|
||||
func Test_waitForServiceContainer(t *testing.T) {
|
||||
t.Run("Wait", func(t *testing.T) {
|
||||
m := &waitForServiceContainerMock{}
|
||||
ctx := context.Background()
|
||||
mock.InOrder(
|
||||
m.On("IsHealthy", ctx).Return(1*time.Millisecond, nil).Once(),
|
||||
m.On("IsHealthy", ctx).Return(time.Duration(0), nil).Once(),
|
||||
)
|
||||
require.NoError(t, waitForServiceContainer(ctx, m))
|
||||
m.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("Cancel", func(t *testing.T) {
|
||||
m := &waitForServiceContainerMock{}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
m.On("IsHealthy", ctx).Return(1*time.Millisecond, nil).Once()
|
||||
require.NoError(t, waitForServiceContainer(ctx, m))
|
||||
m.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("Error", func(t *testing.T) {
|
||||
m := &waitForServiceContainerMock{}
|
||||
ctx := context.Background()
|
||||
m.On("IsHealthy", ctx).Return(time.Duration(0), errors.New("ERROR"))
|
||||
require.ErrorContains(t, waitForServiceContainer(ctx, m), "ERROR")
|
||||
m.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -322,6 +322,8 @@ func TestRunner_RunEvent(t *testing.T) {
|
|||
// services
|
||||
{workdir, "services", "push", "", platforms, secrets},
|
||||
{workdir, "services-with-container", "push", "", platforms, secrets},
|
||||
{workdir, "mysql-service-container-with-health-check", "push", "", platforms, secrets},
|
||||
{workdir, "mysql-service-container-premature-terminate", "push", "service [maindb]", platforms, secrets},
|
||||
}
|
||||
|
||||
for _, table := range tables {
|
||||
|
|
21
act/runner/testdata/mysql-service-container-premature-terminate/push.yml
vendored
Normal file
21
act/runner/testdata/mysql-service-container-premature-terminate/push.yml
vendored
Normal file
|
@ -0,0 +1,21 @@
|
|||
name: service-container
|
||||
on: push
|
||||
jobs:
|
||||
service-container-test:
|
||||
runs-on: ubuntu-latest
|
||||
container: code.forgejo.org/oci/mysql:8.4
|
||||
services:
|
||||
maindb:
|
||||
image: code.forgejo.org/oci/mysql:8.4
|
||||
# This container should immediately exit due to missing env variable for password config. ... [ERROR]
|
||||
# [Entrypoint]: Database is uninitialized and password option is not specified You need to specify one of the
|
||||
# following as an environment variable:
|
||||
# - MYSQL_ROOT_PASSWORD
|
||||
# - MYSQL_ALLOW_EMPTY_PASSWORD
|
||||
# - MYSQL_RANDOM_ROOT_PASSWORD
|
||||
#
|
||||
# This container should retain the same health check config as the mysql-service-container-with-health-check
|
||||
# case.
|
||||
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
|
||||
steps:
|
||||
- run: exit 100 # should never be hit since service will never be healthy
|
17
act/runner/testdata/mysql-service-container-with-health-check/push.yml
vendored
Normal file
17
act/runner/testdata/mysql-service-container-with-health-check/push.yml
vendored
Normal file
|
@ -0,0 +1,17 @@
|
|||
name: service-container
|
||||
on: push
|
||||
jobs:
|
||||
service-container-test:
|
||||
runs-on: ubuntu-latest
|
||||
container: code.forgejo.org/oci/mysql:8.4
|
||||
services:
|
||||
maindb:
|
||||
image: code.forgejo.org/oci/mysql:8.4
|
||||
env:
|
||||
MYSQL_DATABASE: dbname
|
||||
MYSQL_USER: dbuser
|
||||
MYSQL_PASSWORD: dbpass
|
||||
MYSQL_RANDOM_ROOT_PASSWORD: yes
|
||||
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
|
||||
steps:
|
||||
- run: mysql -u dbuser -D dbname -pdbpass -h maindb -e "create table T(id INT NOT NULL AUTO_INCREMENT, val VARCHAR(255), PRIMARY KEY (id))"
|
4
act/runner/testdata/services/push.yaml
vendored
4
act/runner/testdata/services/push.yaml
vendored
|
@ -6,7 +6,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
services:
|
||||
postgres:
|
||||
image: code.forgejo.org/oci/bitnami/postgresql:16
|
||||
image: code.forgejo.org/oci/postgres:16
|
||||
env:
|
||||
POSTGRES_USER: runner
|
||||
POSTGRES_PASSWORD: mysecretdbpass
|
||||
|
@ -15,7 +15,7 @@ jobs:
|
|||
--health-cmd pg_isready
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
--health-retries 20
|
||||
ports:
|
||||
- 5432:5432
|
||||
steps:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue