From 13247246fcffbd2cd7b3e3f91e96afdcf7133d5d Mon Sep 17 00:00:00 2001 From: Mathieu Fenniak Date: Fri, 15 Aug 2025 09:12:32 +0000 Subject: [PATCH] test: fix data race in TestNewParallelExecutor... tests (#860) Tests were using `count` and similar variables without any concurrency safety and have been updated to use atomic operations. This may have caused rare miscounts in tests as operations like `count++` are not thread-safe, but to my knowledge these have never been observed. Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/860 Reviewed-by: Gusted Co-authored-by: Mathieu Fenniak Co-committed-by: Mathieu Fenniak --- act/common/executor_test.go | 58 ++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/act/common/executor_test.go b/act/common/executor_test.go index 75dca49f..721d487f 100644 --- a/act/common/executor_test.go +++ b/act/common/executor_test.go @@ -3,6 +3,7 @@ package common import ( "context" "fmt" + "sync/atomic" "testing" "time" @@ -79,37 +80,48 @@ func TestNewParallelExecutor(t *testing.T) { ctx := t.Context() - count := 0 - activeCount := 0 - maxCount := 0 - emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + var count atomic.Int32 + var activeCount atomic.Int32 + var maxCount atomic.Int32 - activeCount++ - if activeCount > maxCount { - maxCount = activeCount + emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { + count.Add(1) + + currentActive := activeCount.Add(1) + + // maxCount = max(maxCount, currentActive) -- but concurrent-safe by using CompareAndSwap. + for { + currentMax := maxCount.Load() + if currentActive <= currentMax { + break + } + if maxCount.CompareAndSwap(currentMax, currentActive) { + break + } + // If CompareAndSwap failed, retry due to concurrent update by another goroutine. } + time.Sleep(2 * time.Second) - activeCount-- + activeCount.Add(-1) return nil }) err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(3, count, "should run all 3 executors") - assert.Equal(2, maxCount, "should run at most 2 executors in parallel") + assert.Equal(int32(3), count.Load(), "should run all 3 executors") + assert.Equal(int32(2), maxCount.Load(), "should run at most 2 executors in parallel") assert.Nil(err) // Reset to test running the executor with 0 parallelism - count = 0 - activeCount = 0 - maxCount = 0 + count.Store(0) + activeCount.Store(0) + maxCount.Store(0) errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(3, count, "should run all 3 executors") - assert.Equal(1, maxCount, "should run at most 1 executors in parallel") + assert.Equal(int32(3), count.Load(), "should run all 3 executors") + assert.Equal(int32(1), maxCount.Load(), "should run at most 1 executors in parallel") assert.Nil(errSingle) } @@ -119,13 +131,13 @@ func TestNewParallelExecutorFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - count := 0 + var count atomic.Int32 errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + count.Add(1) return fmt.Errorf("fake error") }) err := NewParallelExecutor(1, errorWorkflow)(ctx) - assert.Equal(1, count) + assert.Equal(int32(1), count.Load()) assert.ErrorIs(context.Canceled, err) } @@ -137,16 +149,16 @@ func TestNewParallelExecutorCanceled(t *testing.T) { errExpected := fmt.Errorf("fake error") - count := 0 + var count atomic.Int32 successWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + count.Add(1) return nil }) errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + count.Add(1) return errExpected }) err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx) - assert.Equal(3, count) + assert.Equal(int32(3), count.Load()) assert.Error(errExpected, err) }