1
0
Fork 0
mirror of https://code.forgejo.org/forgejo/runner.git synced 2025-09-05 18:40:59 +00:00

test: fix data race in TestNewParallelExecutor... tests (#860)

Tests were using `count` and similar variables without any concurrency safety and have been updated to use atomic operations.  This may have caused rare miscounts in tests as operations like `count++` are not thread-safe, but to my knowledge these have never been observed.

Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/860
Reviewed-by: Gusted <gusted@noreply.code.forgejo.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
Mathieu Fenniak 2025-08-15 09:12:32 +00:00 committed by earl-warren
parent 815e7aed04
commit 13247246fc
No known key found for this signature in database
GPG key ID: F128CBE6AB3A7201

View file

@ -3,6 +3,7 @@ package common
import ( import (
"context" "context"
"fmt" "fmt"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -79,37 +80,48 @@ func TestNewParallelExecutor(t *testing.T) {
ctx := t.Context() ctx := t.Context()
count := 0 var count atomic.Int32
activeCount := 0 var activeCount atomic.Int32
maxCount := 0 var maxCount atomic.Int32
emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++
activeCount++ emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
if activeCount > maxCount { count.Add(1)
maxCount = activeCount
currentActive := activeCount.Add(1)
// maxCount = max(maxCount, currentActive) -- but concurrent-safe by using CompareAndSwap.
for {
currentMax := maxCount.Load()
if currentActive <= currentMax {
break
}
if maxCount.CompareAndSwap(currentMax, currentActive) {
break
}
// If CompareAndSwap failed, retry due to concurrent update by another goroutine.
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
activeCount-- activeCount.Add(-1)
return nil return nil
}) })
err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(3, count, "should run all 3 executors") assert.Equal(int32(3), count.Load(), "should run all 3 executors")
assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Equal(int32(2), maxCount.Load(), "should run at most 2 executors in parallel")
assert.Nil(err) assert.Nil(err)
// Reset to test running the executor with 0 parallelism // Reset to test running the executor with 0 parallelism
count = 0 count.Store(0)
activeCount = 0 activeCount.Store(0)
maxCount = 0 maxCount.Store(0)
errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(3, count, "should run all 3 executors") assert.Equal(int32(3), count.Load(), "should run all 3 executors")
assert.Equal(1, maxCount, "should run at most 1 executors in parallel") assert.Equal(int32(1), maxCount.Load(), "should run at most 1 executors in parallel")
assert.Nil(errSingle) assert.Nil(errSingle)
} }
@ -119,13 +131,13 @@ func TestNewParallelExecutorFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
count := 0 var count atomic.Int32
errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error { errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++ count.Add(1)
return fmt.Errorf("fake error") return fmt.Errorf("fake error")
}) })
err := NewParallelExecutor(1, errorWorkflow)(ctx) err := NewParallelExecutor(1, errorWorkflow)(ctx)
assert.Equal(1, count) assert.Equal(int32(1), count.Load())
assert.ErrorIs(context.Canceled, err) assert.ErrorIs(context.Canceled, err)
} }
@ -137,16 +149,16 @@ func TestNewParallelExecutorCanceled(t *testing.T) {
errExpected := fmt.Errorf("fake error") errExpected := fmt.Errorf("fake error")
count := 0 var count atomic.Int32
successWorkflow := NewPipelineExecutor(func(ctx context.Context) error { successWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++ count.Add(1)
return nil return nil
}) })
errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error { errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++ count.Add(1)
return errExpected return errExpected
}) })
err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx) err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx)
assert.Equal(3, count) assert.Equal(int32(3), count.Load())
assert.Error(errExpected, err) assert.Error(errExpected, err)
} }