diff --git a/act/common/executor_test.go b/act/common/executor_test.go index 75dca49f..721d487f 100644 --- a/act/common/executor_test.go +++ b/act/common/executor_test.go @@ -3,6 +3,7 @@ package common import ( "context" "fmt" + "sync/atomic" "testing" "time" @@ -79,37 +80,48 @@ func TestNewParallelExecutor(t *testing.T) { ctx := t.Context() - count := 0 - activeCount := 0 - maxCount := 0 - emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + var count atomic.Int32 + var activeCount atomic.Int32 + var maxCount atomic.Int32 - activeCount++ - if activeCount > maxCount { - maxCount = activeCount + emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { + count.Add(1) + + currentActive := activeCount.Add(1) + + // maxCount = max(maxCount, currentActive) -- but concurrent-safe by using CompareAndSwap. + for { + currentMax := maxCount.Load() + if currentActive <= currentMax { + break + } + if maxCount.CompareAndSwap(currentMax, currentActive) { + break + } + // If CompareAndSwap failed, retry due to concurrent update by another goroutine. } + time.Sleep(2 * time.Second) - activeCount-- + activeCount.Add(-1) return nil }) err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(3, count, "should run all 3 executors") - assert.Equal(2, maxCount, "should run at most 2 executors in parallel") + assert.Equal(int32(3), count.Load(), "should run all 3 executors") + assert.Equal(int32(2), maxCount.Load(), "should run at most 2 executors in parallel") assert.Nil(err) // Reset to test running the executor with 0 parallelism - count = 0 - activeCount = 0 - maxCount = 0 + count.Store(0) + activeCount.Store(0) + maxCount.Store(0) errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(3, count, "should run all 3 executors") - assert.Equal(1, maxCount, "should run at most 1 executors in parallel") + assert.Equal(int32(3), count.Load(), "should run all 3 executors") + assert.Equal(int32(1), maxCount.Load(), "should run at most 1 executors in parallel") assert.Nil(errSingle) } @@ -119,13 +131,13 @@ func TestNewParallelExecutorFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - count := 0 + var count atomic.Int32 errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + count.Add(1) return fmt.Errorf("fake error") }) err := NewParallelExecutor(1, errorWorkflow)(ctx) - assert.Equal(1, count) + assert.Equal(int32(1), count.Load()) assert.ErrorIs(context.Canceled, err) } @@ -137,16 +149,16 @@ func TestNewParallelExecutorCanceled(t *testing.T) { errExpected := fmt.Errorf("fake error") - count := 0 + var count atomic.Int32 successWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + count.Add(1) return nil }) errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error { - count++ + count.Add(1) return errExpected }) err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx) - assert.Equal(3, count) + assert.Equal(int32(3), count.Load()) assert.Error(errExpected, err) }