From 39dd708768f1309971aa4c7c6145a6fc8bee5afc Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Thu, 4 Sep 2025 18:58:14 +0200 Subject: [PATCH 1/9] chore: cosmetic fixes --- act/artifactcache/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/act/artifactcache/handler_test.go b/act/artifactcache/handler_test.go index cc0b443c..8fc317b8 100644 --- a/act/artifactcache/handler_test.go +++ b/act/artifactcache/handler_test.go @@ -63,7 +63,7 @@ func TestHandler(t *testing.T) { base := fmt.Sprintf("%s%s", handler.ExternalURL(), urlBase) defer func() { - t.Run("inpect db", func(t *testing.T) { + t.Run("inspect db", func(t *testing.T) { db, err := handler.openDB() require.NoError(t, err) defer db.Close() From 98552f9b9961d6457b0024d0aa88e4fb60d8b12e Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Thu, 4 Sep 2025 16:58:24 +0200 Subject: [PATCH 2/9] feat: cache-server: shutdown on TERM in the same way the daemon can be shutdown by either INT or TERM the cache server could only be shutdown with INT --- internal/app/cmd/cache-server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/app/cmd/cache-server.go b/internal/app/cmd/cache-server.go index 36346d6e..37bee855 100644 --- a/internal/app/cmd/cache-server.go +++ b/internal/app/cmd/cache-server.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "os/signal" + "syscall" "code.forgejo.org/forgejo/runner/v11/internal/pkg/config" @@ -73,7 +74,7 @@ func runCacheServer(ctx context.Context, configFile *string, cacheArgs *cacheSer log.Infof("cache server is listening on %v", cacheHandler.ExternalURL()) c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) <-c return nil From 36ca627f2e0a415a7d47170830d9d8e14034f1e3 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Thu, 4 Sep 2025 16:55:26 +0200 Subject: [PATCH 3/9] feat: cache: fatal() helper to gracefully terminate the runner in case of an error that is not recoverable (e.g. failing to open the bolthold database), the cache can call fatal() to log the error and send a TERM signal that will gracefully shutdown the daemon. --- act/artifactcache/handler.go | 8 ++++++ act/artifactcache/handler_test.go | 43 +++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/act/artifactcache/handler.go b/act/artifactcache/handler.go index ac7355d0..429b661c 100644 --- a/act/artifactcache/handler.go +++ b/act/artifactcache/handler.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" "sync/atomic" + "syscall" "time" "github.com/julienschmidt/httprouter" @@ -27,6 +28,13 @@ const ( urlBase = "/_apis/artifactcache" ) +var fatal = func(logger logrus.FieldLogger, err error) { + logger.Errorf("unrecoverable error in the cache: %v", err) + if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil { + logger.Errorf("unrecoverable error in the cache: failed to send the TERM signal to shutdown the daemon %v", err) + } +} + type Handler interface { ExternalURL() string Close() error diff --git a/act/artifactcache/handler_test.go b/act/artifactcache/handler_test.go index 8fc317b8..5970ea1a 100644 --- a/act/artifactcache/handler_test.go +++ b/act/artifactcache/handler_test.go @@ -4,11 +4,15 @@ import ( "bytes" "crypto/rand" "encoding/json" + "errors" "fmt" "io" "net/http" + "os" + "os/signal" "path/filepath" "strings" + "syscall" "testing" "time" @@ -1032,3 +1036,42 @@ func TestHandler_ExternalURL(t *testing.T) { assert.True(t, handler.isClosed()) }) } + +var ( + settleTime = 100 * time.Millisecond + fatalWaitingTime = 30 * time.Second +) + +func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) { + t.Helper() + + // Sleep multiple times to give the kernel more tries to + // deliver the signal. + start := time.Now() + timer := time.NewTimer(settleTime / 10) + defer timer.Stop() + for time.Since(start) < fatalWaitingTime { + select { + case s := <-c: + if s == sig { + return + } + t.Fatalf("signal was %v, want %v", s, sig) + case <-timer.C: + timer.Reset(settleTime / 10) + } + } + t.Fatalf("timeout after %v waiting for %v", fatalWaitingTime, sig) +} + +func TestHandler_fatal(t *testing.T) { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM) + defer signal.Stop(c) + + discard := logrus.New() + discard.Out = io.Discard + fatal(discard, errors.New("fatal error")) + + waitSig(t, c, syscall.SIGTERM) +} From 37f634fd31a00534d0ee438bc640f07589afe98c Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Mon, 1 Sep 2025 13:37:57 +0200 Subject: [PATCH 4/9] fix: cache: call fatal() on errors that are not recoverable - responseFatalJSON(w, r, err) replaces responseJSON(w, r, 500, err) and calls fatal() when the following fail because they are not recoverable. There may be other non-recoverable errors but it is difficult to be 100% sure they cannot be engineered by the caller of the API for DoS purposes. - openDB - findCache - cache.Repo != repo - wrap errors in - openDB() - it was missing - readCache() - it was missing - useCache() - it was missing - findCache() - some had identical messages - in gc - replace logger.Warnf with h.fatal - differentiate errors that have identical messages - call fatal if openDB fails instead of returning --- act/artifactcache/handler.go | 66 +++++++++++++++++++------------ act/artifactcache/handler_test.go | 11 +++++- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/act/artifactcache/handler.go b/act/artifactcache/handler.go index 429b661c..a325d68b 100644 --- a/act/artifactcache/handler.go +++ b/act/artifactcache/handler.go @@ -177,7 +177,8 @@ func (h *handler) isClosed() bool { } func (h *handler) openDB() (*bolthold.Store, error) { - return bolthold.Open(filepath.Join(h.dir, "bolt.db"), 0o644, &bolthold.Options{ + file := filepath.Join(h.dir, "bolt.db") + db, err := bolthold.Open(file, 0o644, &bolthold.Options{ Encoder: json.Marshal, Decoder: json.Unmarshal, Options: &bbolt.Options{ @@ -186,6 +187,10 @@ func (h *handler) openDB() (*bolthold.Store, error) { FreelistType: bbolt.DefaultOptions.FreelistType, }, }) + if err != nil { + return nil, fmt.Errorf("Open(%s): %v", file, err) + } + return db, nil } // GET /_apis/artifactcache/cache @@ -206,21 +211,21 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter db, err := h.openDB() if err != nil { - h.responseJSON(w, r, 500, err) + h.responseFatalJSON(w, r, err) return } defer db.Close() cache, err := findCache(db, repo, keys, version, rundata.WriteIsolationKey) if err != nil { - h.responseJSON(w, r, 500, err) + h.responseFatalJSON(w, r, err) return } // If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read if cache == nil && rundata.WriteIsolationKey != "" { cache, err = findCache(db, repo, keys, version, "") if err != nil { - h.responseJSON(w, r, 500, err) + h.responseFatalJSON(w, r, err) return } } @@ -265,7 +270,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou cache := api.ToCache() db, err := h.openDB() if err != nil { - h.responseJSON(w, r, 500, err) + h.responseFatalJSON(w, r, err) return } defer db.Close() @@ -305,13 +310,13 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) return } - h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err)) + h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err)) return } - // Should not happen if cache.Repo != repo { - h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid")) + // can only happen if the cache is corrupted + h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid")) return } if cache.WriteIsolationKey != rundata.WriteIsolationKey { @@ -360,13 +365,13 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) return } - h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err)) + h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err)) return } - // Should not happen if cache.Repo != repo { - h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid")) + // can only happen if the cache is corrupted + h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid")) return } if cache.WriteIsolationKey != rundata.WriteIsolationKey { @@ -381,7 +386,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout size, err := h.storage.Commit(cache.ID, cache.Size) if err != nil { - h.responseJSON(w, r, 500, err) + h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err)) return } // write real size back to cache, it may be different from the current value when the request doesn't specify it. @@ -389,7 +394,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout db, err := h.openDB() if err != nil { - h.responseJSON(w, r, 500, err) + h.responseFatalJSON(w, r, err) return } defer db.Close() @@ -424,13 +429,13 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter. h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) return } - h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err)) + h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err)) return } - // Should not happen if cache.Repo != repo { - h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid")) + // can only happen if the cache is corrupted + h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid")) return } // reads permitted against caches w/ the same isolation key, or no isolation key @@ -481,7 +486,7 @@ func findCache(db *bolthold.Store, repo string, keys []string, version, writeIso And("Complete").Eq(true). SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) { if err != nil { - return nil, fmt.Errorf("find cache: %w", err) + return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err) } return cache, nil } @@ -500,7 +505,7 @@ func findCache(db *bolthold.Store, repo string, keys []string, version, writeIso if errors.Is(err, bolthold.ErrNotFound) { continue } - return nil, fmt.Errorf("find cache: %w", err) + return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err) } return cache, nil } @@ -526,7 +531,7 @@ func (h *handler) readCache(id uint64) (*Cache, error) { defer db.Close() cache := &Cache{} if err := db.Get(id, cache); err != nil { - return nil, err + return nil, fmt.Errorf("readCache: Get(%v): %w", id, err) } return cache, nil } @@ -539,10 +544,13 @@ func (h *handler) useCache(id uint64) error { defer db.Close() cache := &Cache{} if err := db.Get(id, cache); err != nil { - return err + return fmt.Errorf("useCache: Get(%v): %w", id, err) } cache.UsedAt = time.Now().Unix() - return db.Update(cache.ID, cache) + if err := db.Update(cache.ID, cache); err != nil { + return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err) + } + return nil } const ( @@ -574,6 +582,7 @@ func (h *handler) gcCache() { db, err := h.openDB() if err != nil { + fatal(h.logger, err) return } defer db.Close() @@ -584,12 +593,12 @@ func (h *handler) gcCache() { Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()). And("Complete").Eq(false), ); err != nil { - h.logger.Warnf("find caches: %v", err) + fatal(h.logger, fmt.Errorf("gc caches not completed: %v", err)) } else { for _, cache := range caches { h.storage.Remove(cache.ID) if err := db.Delete(cache.ID, cache); err != nil { - h.logger.Warnf("delete cache: %v", err) + h.logger.Errorf("delete cache: %v", err) continue } h.logger.Infof("deleted cache: %+v", cache) @@ -601,7 +610,7 @@ func (h *handler) gcCache() { if err := db.Find(&caches, bolthold. Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()), ); err != nil { - h.logger.Warnf("find caches: %v", err) + fatal(h.logger, fmt.Errorf("gc caches old not used: %v", err)) } else { for _, cache := range caches { h.storage.Remove(cache.ID) @@ -618,7 +627,7 @@ func (h *handler) gcCache() { if err := db.Find(&caches, bolthold. Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()), ); err != nil { - h.logger.Warnf("find caches: %v", err) + fatal(h.logger, fmt.Errorf("gc caches too old: %v", err)) } else { for _, cache := range caches { h.storage.Remove(cache.ID) @@ -637,7 +646,7 @@ func (h *handler) gcCache() { bolthold.Where("Complete").Eq(true), "Key", "Version", ); err != nil { - h.logger.Warnf("find aggregate caches: %v", err) + fatal(h.logger, fmt.Errorf("gc aggregate caches: %v", err)) } else { for _, result := range results { if result.Count() <= 1 { @@ -663,6 +672,11 @@ func (h *handler) gcCache() { } } +func (h *handler) responseFatalJSON(w http.ResponseWriter, r *http.Request, err error) { + h.responseJSON(w, r, 500, err) + fatal(h.logger, err) +} + func (h *handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) { w.Header().Set("Content-Type", "application/json; charset=utf-8") var data []byte diff --git a/act/artifactcache/handler_test.go b/act/artifactcache/handler_test.go index 5970ea1a..0c7160d6 100644 --- a/act/artifactcache/handler_test.go +++ b/act/artifactcache/handler_test.go @@ -16,10 +16,13 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "code.forgejo.org/forgejo/runner/v9/testutils" + "github.com/sirupsen/logrus" "github.com/timshannon/bolthold" "go.etcd.io/bbolt" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -59,6 +62,10 @@ var ( ) func TestHandler(t *testing.T) { + defer testutils.MockVariable(&fatal, func(_ logrus.FieldLogger, err error) { + t.Fatalf("unexpected call to fatal(%v)", err) + })() + dir := filepath.Join(t.TempDir(), "artifactcache") handler, err := StartHandler(dir, "", 0, "secret", nil) require.NoError(t, err) From 6c4e705f97d9bcbd277e0bb3b230f0287b934925 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Fri, 5 Sep 2025 11:17:57 +0200 Subject: [PATCH 5/9] chore: cache: split caches implementation out of handler - create the caches interface and matching cachesImpl - move the cache logic out of handler - openDB - readCache - useCache - gcCache - access to the storage struct - serve - commit - exist - write - add getCaches / setCaches to the handler interface so it can be used by tests. The caches test should be implemented independently in the future but this is a different kind of cleanup. - no functional change, minimal refactor --- act/artifactcache/caches.go | 305 ++++++++++++++++++++++++++++++ act/artifactcache/handler.go | 300 +++-------------------------- act/artifactcache/handler_test.go | 10 +- act/artifactcache/mac.go | 4 +- act/artifactcache/mac_test.go | 12 +- 5 files changed, 349 insertions(+), 282 deletions(-) create mode 100644 act/artifactcache/caches.go diff --git a/act/artifactcache/caches.go b/act/artifactcache/caches.go new file mode 100644 index 00000000..2f5fc0cf --- /dev/null +++ b/act/artifactcache/caches.go @@ -0,0 +1,305 @@ +package artifactcache + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "regexp" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + "github.com/timshannon/bolthold" + "go.etcd.io/bbolt" +) + +type caches interface { + openDB() (*bolthold.Store, error) + validateMac(rundata RunData) (string, error) + readCache(id uint64) (*Cache, error) + useCache(id uint64) error + setgcAt(at time.Time) + gcCache() + + serve(w http.ResponseWriter, r *http.Request, id uint64) + commit(id uint64, size int64) (int64, error) + exist(id uint64) (bool, error) + write(id, offset uint64, reader io.Reader) error +} + +type cachesImpl struct { + dir string + storage *Storage + logger logrus.FieldLogger + secret string + + gcing atomic.Bool + gcAt time.Time +} + +func newCaches(dir, secret string, logger logrus.FieldLogger) (caches, error) { + c := &cachesImpl{ + secret: secret, + } + + c.logger = logger + + if dir == "" { + home, err := os.UserHomeDir() + if err != nil { + return nil, err + } + dir = filepath.Join(home, ".cache", "actcache") + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, err + } + + c.dir = dir + + storage, err := NewStorage(filepath.Join(dir, "cache")) + if err != nil { + return nil, err + } + c.storage = storage + + c.gcCache() + + return c, nil +} + +func (c *cachesImpl) openDB() (*bolthold.Store, error) { + file := filepath.Join(c.dir, "bolt.db") + db, err := bolthold.Open(file, 0o644, &bolthold.Options{ + Encoder: json.Marshal, + Decoder: json.Unmarshal, + Options: &bbolt.Options{ + Timeout: 5 * time.Second, + NoGrowSync: bbolt.DefaultOptions.NoGrowSync, + FreelistType: bbolt.DefaultOptions.FreelistType, + }, + }) + if err != nil { + return nil, fmt.Errorf("Open(%s): %w", file, err) + } + return db, nil +} + +// if not found, return (nil, nil) instead of an error. +func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) { + cache := &Cache{} + for _, prefix := range keys { + // if a key in the list matches exactly, don't return partial matches + if err := db.FindOne(cache, + bolthold.Where("Repo").Eq(repo).Index("Repo"). + And("Key").Eq(prefix). + And("Version").Eq(version). + And("WriteIsolationKey").Eq(writeIsolationKey). + And("Complete").Eq(true). + SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) { + if err != nil { + return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err) + } + return cache, nil + } + prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix)) + re, err := regexp.Compile(prefixPattern) + if err != nil { + continue + } + if err := db.FindOne(cache, + bolthold.Where("Repo").Eq(repo).Index("Repo"). + And("Key").RegExp(re). + And("Version").Eq(version). + And("WriteIsolationKey").Eq(writeIsolationKey). + And("Complete").Eq(true). + SortBy("CreatedAt").Reverse()); err != nil { + if errors.Is(err, bolthold.ErrNotFound) { + continue + } + return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err) + } + return cache, nil + } + return nil, nil +} + +func insertCache(db *bolthold.Store, cache *Cache) error { + if err := db.Insert(bolthold.NextSequence(), cache); err != nil { + return fmt.Errorf("insert cache: %w", err) + } + // write back id to db + if err := db.Update(cache.ID, cache); err != nil { + return fmt.Errorf("write back id to db: %w", err) + } + return nil +} + +func (c *cachesImpl) readCache(id uint64) (*Cache, error) { + db, err := c.openDB() + if err != nil { + return nil, err + } + defer db.Close() + cache := &Cache{} + if err := db.Get(id, cache); err != nil { + return nil, fmt.Errorf("readCache: Get(%v): %w", id, err) + } + return cache, nil +} + +func (c *cachesImpl) useCache(id uint64) error { + db, err := c.openDB() + if err != nil { + return err + } + defer db.Close() + cache := &Cache{} + if err := db.Get(id, cache); err != nil { + return fmt.Errorf("useCache: Get(%v): %w", id, err) + } + cache.UsedAt = time.Now().Unix() + if err := db.Update(cache.ID, cache); err != nil { + return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err) + } + return nil +} + +func (c *cachesImpl) serve(w http.ResponseWriter, r *http.Request, id uint64) { + c.storage.Serve(w, r, id) +} + +func (c *cachesImpl) commit(id uint64, size int64) (int64, error) { + return c.storage.Commit(id, size) +} + +func (c *cachesImpl) exist(id uint64) (bool, error) { + return c.storage.Exist(id) +} + +func (c *cachesImpl) write(id, offset uint64, reader io.Reader) error { + return c.storage.Write(id, offset, reader) +} + +const ( + keepUsed = 30 * 24 * time.Hour + keepUnused = 7 * 24 * time.Hour + keepTemp = 5 * time.Minute + keepOld = 5 * time.Minute +) + +func (c *cachesImpl) setgcAt(at time.Time) { + c.gcAt = at +} + +func (c *cachesImpl) gcCache() { + if c.gcing.Load() { + return + } + if !c.gcing.CompareAndSwap(false, true) { + return + } + defer c.gcing.Store(false) + + if time.Since(c.gcAt) < time.Hour { + c.logger.Debugf("skip gc: %v", c.gcAt.String()) + return + } + c.gcAt = time.Now() + c.logger.Debugf("gc: %v", c.gcAt.String()) + + db, err := c.openDB() + if err != nil { + fatal(c.logger, err) + return + } + defer db.Close() + + // Remove the caches which are not completed for a while, they are most likely to be broken. + var caches []*Cache + if err := db.Find(&caches, bolthold. + Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()). + And("Complete").Eq(false), + ); err != nil { + fatal(c.logger, fmt.Errorf("gc caches not completed: %v", err)) + } else { + for _, cache := range caches { + c.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + c.logger.Errorf("delete cache: %v", err) + continue + } + c.logger.Infof("deleted cache: %+v", cache) + } + } + + // Remove the old caches which have not been used recently. + caches = caches[:0] + if err := db.Find(&caches, bolthold. + Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()), + ); err != nil { + fatal(c.logger, fmt.Errorf("gc caches old not used: %v", err)) + } else { + for _, cache := range caches { + c.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + c.logger.Warnf("delete cache: %v", err) + continue + } + c.logger.Infof("deleted cache: %+v", cache) + } + } + + // Remove the old caches which are too old. + caches = caches[:0] + if err := db.Find(&caches, bolthold. + Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()), + ); err != nil { + fatal(c.logger, fmt.Errorf("gc caches too old: %v", err)) + } else { + for _, cache := range caches { + c.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + c.logger.Warnf("delete cache: %v", err) + continue + } + c.logger.Infof("deleted cache: %+v", cache) + } + } + + // Remove the old caches with the same key and version, keep the latest one. + // Also keep the olds which have been used recently for a while in case of the cache is still in use. + if results, err := db.FindAggregate( + &Cache{}, + bolthold.Where("Complete").Eq(true), + "Key", "Version", + ); err != nil { + fatal(c.logger, fmt.Errorf("gc aggregate caches: %v", err)) + } else { + for _, result := range results { + if result.Count() <= 1 { + continue + } + result.Sort("CreatedAt") + caches = caches[:0] + result.Reduction(&caches) + for _, cache := range caches[:len(caches)-1] { + if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld { + // Keep it since it has been used recently, even if it's old. + // Or it could break downloading in process. + continue + } + c.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + c.logger.Warnf("delete cache: %v", err) + continue + } + c.logger.Infof("deleted cache: %+v", cache) + } + } + } +} diff --git a/act/artifactcache/handler.go b/act/artifactcache/handler.go index a325d68b..a4985ea3 100644 --- a/act/artifactcache/handler.go +++ b/act/artifactcache/handler.go @@ -7,19 +7,14 @@ import ( "io" "net" "net/http" - "os" - "path/filepath" - "regexp" "strconv" "strings" - "sync/atomic" "syscall" "time" "github.com/julienschmidt/httprouter" "github.com/sirupsen/logrus" "github.com/timshannon/bolthold" - "go.etcd.io/bbolt" "code.forgejo.org/forgejo/runner/v11/act/common" ) @@ -39,7 +34,8 @@ type Handler interface { ExternalURL() string Close() error isClosed() bool - openDB() (*bolthold.Store, error) + getCaches() caches + setCaches(caches caches) find(w http.ResponseWriter, r *http.Request, params httprouter.Params) reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) @@ -47,32 +43,21 @@ type Handler interface { get(w http.ResponseWriter, r *http.Request, params httprouter.Params) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) middleware(handler httprouter.Handle) httprouter.Handle - readCache(id uint64) (*Cache, error) - useCache(id uint64) error - setgcAt(at time.Time) - gcCache() responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) } type handler struct { - dir string - storage *Storage + caches caches router *httprouter.Router listener net.Listener server *http.Server logger logrus.FieldLogger - secret string - - gcing atomic.Bool - gcAt time.Time outboundIP string } func StartHandler(dir, outboundIP string, port uint16, secret string, logger logrus.FieldLogger) (Handler, error) { - h := &handler{ - secret: secret, - } + h := &handler{} if logger == nil { discard := logrus.New() @@ -82,24 +67,11 @@ func StartHandler(dir, outboundIP string, port uint16, secret string, logger log logger = logger.WithField("module", "artifactcache") h.logger = logger - if dir == "" { - home, err := os.UserHomeDir() - if err != nil { - return nil, err - } - dir = filepath.Join(home, ".cache", "actcache") - } - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, err - } - - h.dir = dir - - storage, err := NewStorage(filepath.Join(dir, "cache")) + caches, err := newCaches(dir, secret, logger) if err != nil { return nil, err } - h.storage = storage + h.caches = caches if outboundIP != "" { h.outboundIP = outboundIP @@ -119,8 +91,6 @@ func StartHandler(dir, outboundIP string, port uint16, secret string, logger log h.router = router - h.gcCache() - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces if err != nil { return nil, err @@ -176,27 +146,18 @@ func (h *handler) isClosed() bool { return h.listener == nil && h.server == nil } -func (h *handler) openDB() (*bolthold.Store, error) { - file := filepath.Join(h.dir, "bolt.db") - db, err := bolthold.Open(file, 0o644, &bolthold.Options{ - Encoder: json.Marshal, - Decoder: json.Unmarshal, - Options: &bbolt.Options{ - Timeout: 5 * time.Second, - NoGrowSync: bbolt.DefaultOptions.NoGrowSync, - FreelistType: bbolt.DefaultOptions.FreelistType, - }, - }) - if err != nil { - return nil, fmt.Errorf("Open(%s): %v", file, err) - } - return db, nil +func (h *handler) getCaches() caches { + return h.caches +} + +func (h *handler) setCaches(caches caches) { + h.caches = caches } // GET /_apis/artifactcache/cache func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter.Params) { rundata := runDataFromHeaders(r) - repo, err := h.validateMac(rundata) + repo, err := h.caches.validateMac(rundata) if err != nil { h.responseJSON(w, r, 403, err) return @@ -209,7 +170,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter } version := r.URL.Query().Get("version") - db, err := h.openDB() + db, err := h.caches.openDB() if err != nil { h.responseFatalJSON(w, r, err) return @@ -234,7 +195,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter return } - if ok, err := h.storage.Exist(cache.ID); err != nil { + if ok, err := h.caches.exist(cache.ID); err != nil { h.responseJSON(w, r, 500, err) return } else if !ok { @@ -253,7 +214,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter // POST /_apis/artifactcache/caches func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params) { rundata := runDataFromHeaders(r) - repo, err := h.validateMac(rundata) + repo, err := h.caches.validateMac(rundata) if err != nil { h.responseJSON(w, r, 403, err) return @@ -268,7 +229,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou api.Key = strings.ToLower(api.Key) cache := api.ToCache() - db, err := h.openDB() + db, err := h.caches.openDB() if err != nil { h.responseFatalJSON(w, r, err) return @@ -292,7 +253,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou // PATCH /_apis/artifactcache/caches/:id func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) { rundata := runDataFromHeaders(r) - repo, err := h.validateMac(rundata) + repo, err := h.caches.validateMac(rundata) if err != nil { h.responseJSON(w, r, 403, err) return @@ -304,7 +265,7 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout return } - cache, err := h.readCache(id) + cache, err := h.caches.readCache(id) if err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) @@ -333,11 +294,11 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout h.responseJSON(w, r, 400, fmt.Errorf("cache parseContentRange(%s): %w", r.Header.Get("Content-Range"), err)) return } - if err := h.storage.Write(cache.ID, start, r.Body); err != nil { + if err := h.caches.write(cache.ID, start, r.Body); err != nil { h.responseJSON(w, r, 500, fmt.Errorf("cache storage.Write: %w", err)) return } - if err := h.useCache(id); err != nil { + if err := h.caches.useCache(id); err != nil { h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err)) return } @@ -347,7 +308,7 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout // POST /_apis/artifactcache/caches/:id func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) { rundata := runDataFromHeaders(r) - repo, err := h.validateMac(rundata) + repo, err := h.caches.validateMac(rundata) if err != nil { h.responseJSON(w, r, 403, err) return @@ -359,7 +320,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout return } - cache, err := h.readCache(id) + cache, err := h.caches.readCache(id) if err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) @@ -384,7 +345,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout return } - size, err := h.storage.Commit(cache.ID, cache.Size) + size, err := h.caches.commit(cache.ID, cache.Size) if err != nil { h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err)) return @@ -392,7 +353,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout // write real size back to cache, it may be different from the current value when the request doesn't specify it. cache.Size = size - db, err := h.openDB() + db, err := h.caches.openDB() if err != nil { h.responseFatalJSON(w, r, err) return @@ -411,7 +372,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout // GET /_apis/artifactcache/artifacts/:id func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) { rundata := runDataFromHeaders(r) - repo, err := h.validateMac(rundata) + repo, err := h.caches.validateMac(rundata) if err != nil { h.responseJSON(w, r, 403, err) return @@ -423,7 +384,7 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter. return } - cache, err := h.readCache(id) + cache, err := h.caches.readCache(id) if err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) @@ -444,17 +405,17 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter. return } - if err := h.useCache(id); err != nil { + if err := h.caches.useCache(id); err != nil { h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err)) return } - h.storage.Serve(w, r, id) + h.caches.serve(w, r, id) } // POST /_apis/artifactcache/clean func (h *handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { rundata := runDataFromHeaders(r) - _, err := h.validateMac(rundata) + _, err := h.caches.validateMac(rundata) if err != nil { h.responseJSON(w, r, 403, err) return @@ -469,206 +430,7 @@ func (h *handler) middleware(handler httprouter.Handle) httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { h.logger.Debugf("%s %s", r.Method, r.RequestURI) handler(w, r, params) - go h.gcCache() - } -} - -// if not found, return (nil, nil) instead of an error. -func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) { - cache := &Cache{} - for _, prefix := range keys { - // if a key in the list matches exactly, don't return partial matches - if err := db.FindOne(cache, - bolthold.Where("Repo").Eq(repo).Index("Repo"). - And("Key").Eq(prefix). - And("Version").Eq(version). - And("WriteIsolationKey").Eq(writeIsolationKey). - And("Complete").Eq(true). - SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) { - if err != nil { - return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err) - } - return cache, nil - } - prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix)) - re, err := regexp.Compile(prefixPattern) - if err != nil { - continue - } - if err := db.FindOne(cache, - bolthold.Where("Repo").Eq(repo).Index("Repo"). - And("Key").RegExp(re). - And("Version").Eq(version). - And("WriteIsolationKey").Eq(writeIsolationKey). - And("Complete").Eq(true). - SortBy("CreatedAt").Reverse()); err != nil { - if errors.Is(err, bolthold.ErrNotFound) { - continue - } - return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err) - } - return cache, nil - } - return nil, nil -} - -func insertCache(db *bolthold.Store, cache *Cache) error { - if err := db.Insert(bolthold.NextSequence(), cache); err != nil { - return fmt.Errorf("insert cache: %w", err) - } - // write back id to db - if err := db.Update(cache.ID, cache); err != nil { - return fmt.Errorf("write back id to db: %w", err) - } - return nil -} - -func (h *handler) readCache(id uint64) (*Cache, error) { - db, err := h.openDB() - if err != nil { - return nil, err - } - defer db.Close() - cache := &Cache{} - if err := db.Get(id, cache); err != nil { - return nil, fmt.Errorf("readCache: Get(%v): %w", id, err) - } - return cache, nil -} - -func (h *handler) useCache(id uint64) error { - db, err := h.openDB() - if err != nil { - return err - } - defer db.Close() - cache := &Cache{} - if err := db.Get(id, cache); err != nil { - return fmt.Errorf("useCache: Get(%v): %w", id, err) - } - cache.UsedAt = time.Now().Unix() - if err := db.Update(cache.ID, cache); err != nil { - return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err) - } - return nil -} - -const ( - keepUsed = 30 * 24 * time.Hour - keepUnused = 7 * 24 * time.Hour - keepTemp = 5 * time.Minute - keepOld = 5 * time.Minute -) - -func (h *handler) setgcAt(at time.Time) { - h.gcAt = at -} - -func (h *handler) gcCache() { - if h.gcing.Load() { - return - } - if !h.gcing.CompareAndSwap(false, true) { - return - } - defer h.gcing.Store(false) - - if time.Since(h.gcAt) < time.Hour { - h.logger.Debugf("skip gc: %v", h.gcAt.String()) - return - } - h.gcAt = time.Now() - h.logger.Debugf("gc: %v", h.gcAt.String()) - - db, err := h.openDB() - if err != nil { - fatal(h.logger, err) - return - } - defer db.Close() - - // Remove the caches which are not completed for a while, they are most likely to be broken. - var caches []*Cache - if err := db.Find(&caches, bolthold. - Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()). - And("Complete").Eq(false), - ); err != nil { - fatal(h.logger, fmt.Errorf("gc caches not completed: %v", err)) - } else { - for _, cache := range caches { - h.storage.Remove(cache.ID) - if err := db.Delete(cache.ID, cache); err != nil { - h.logger.Errorf("delete cache: %v", err) - continue - } - h.logger.Infof("deleted cache: %+v", cache) - } - } - - // Remove the old caches which have not been used recently. - caches = caches[:0] - if err := db.Find(&caches, bolthold. - Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()), - ); err != nil { - fatal(h.logger, fmt.Errorf("gc caches old not used: %v", err)) - } else { - for _, cache := range caches { - h.storage.Remove(cache.ID) - if err := db.Delete(cache.ID, cache); err != nil { - h.logger.Warnf("delete cache: %v", err) - continue - } - h.logger.Infof("deleted cache: %+v", cache) - } - } - - // Remove the old caches which are too old. - caches = caches[:0] - if err := db.Find(&caches, bolthold. - Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()), - ); err != nil { - fatal(h.logger, fmt.Errorf("gc caches too old: %v", err)) - } else { - for _, cache := range caches { - h.storage.Remove(cache.ID) - if err := db.Delete(cache.ID, cache); err != nil { - h.logger.Warnf("delete cache: %v", err) - continue - } - h.logger.Infof("deleted cache: %+v", cache) - } - } - - // Remove the old caches with the same key and version, keep the latest one. - // Also keep the olds which have been used recently for a while in case of the cache is still in use. - if results, err := db.FindAggregate( - &Cache{}, - bolthold.Where("Complete").Eq(true), - "Key", "Version", - ); err != nil { - fatal(h.logger, fmt.Errorf("gc aggregate caches: %v", err)) - } else { - for _, result := range results { - if result.Count() <= 1 { - continue - } - result.Sort("CreatedAt") - caches = caches[:0] - result.Reduction(&caches) - for _, cache := range caches[:len(caches)-1] { - if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld { - // Keep it since it has been used recently, even if it's old. - // Or it could break downloading in process. - continue - } - h.storage.Remove(cache.ID) - if err := db.Delete(cache.ID, cache); err != nil { - h.logger.Warnf("delete cache: %v", err) - continue - } - h.logger.Infof("deleted cache: %+v", cache) - } - } + go h.caches.gcCache() } } diff --git a/act/artifactcache/handler_test.go b/act/artifactcache/handler_test.go index 0c7160d6..a1495015 100644 --- a/act/artifactcache/handler_test.go +++ b/act/artifactcache/handler_test.go @@ -75,7 +75,7 @@ func TestHandler(t *testing.T) { defer func() { t.Run("inspect db", func(t *testing.T) { - db, err := handler.openDB() + db, err := handler.getCaches().openDB() require.NoError(t, err) defer db.Close() require.NoError(t, db.Bolt().View(func(tx *bbolt.Tx) error { @@ -986,17 +986,17 @@ func TestHandler_gcCache(t *testing.T) { }, } - db, err := handler.openDB() + db, err := handler.getCaches().openDB() require.NoError(t, err) for _, c := range cases { require.NoError(t, insertCache(db, c.Cache)) } require.NoError(t, db.Close()) - handler.setgcAt(time.Time{}) // ensure gcCache will not skip - handler.gcCache() + handler.getCaches().setgcAt(time.Time{}) // ensure gcCache will not skip + handler.getCaches().gcCache() - db, err = handler.openDB() + db, err = handler.getCaches().openDB() require.NoError(t, err) for i, v := range cases { t.Run(fmt.Sprintf("%d_%s", i, v.Cache.Key), func(t *testing.T) { diff --git a/act/artifactcache/mac.go b/act/artifactcache/mac.go index 5b8ca3fb..aa9a7c54 100644 --- a/act/artifactcache/mac.go +++ b/act/artifactcache/mac.go @@ -14,13 +14,13 @@ import ( var ErrValidation = errors.New("validation error") -func (h *handler) validateMac(rundata RunData) (string, error) { +func (c *cachesImpl) validateMac(rundata RunData) (string, error) { // TODO: allow configurable max age if !validateAge(rundata.Timestamp) { return "", ErrValidation } - expectedMAC := ComputeMac(h.secret, rundata.RepositoryFullName, rundata.RunNumber, rundata.Timestamp, rundata.WriteIsolationKey) + expectedMAC := ComputeMac(c.secret, rundata.RepositoryFullName, rundata.RunNumber, rundata.Timestamp, rundata.WriteIsolationKey) if hmac.Equal([]byte(expectedMAC), []byte(rundata.RepositoryMAC)) { return rundata.RepositoryFullName, nil } diff --git a/act/artifactcache/mac_test.go b/act/artifactcache/mac_test.go index f51461e3..91607386 100644 --- a/act/artifactcache/mac_test.go +++ b/act/artifactcache/mac_test.go @@ -9,7 +9,7 @@ import ( ) func TestMac(t *testing.T) { - handler := &handler{ + cache := &cachesImpl{ secret: "secret for testing", } @@ -18,7 +18,7 @@ func TestMac(t *testing.T) { run := "1" ts := strconv.FormatInt(time.Now().Unix(), 10) - mac := ComputeMac(handler.secret, name, run, ts, "") + mac := ComputeMac(cache.secret, name, run, ts, "") rundata := RunData{ RepositoryFullName: name, RunNumber: run, @@ -26,7 +26,7 @@ func TestMac(t *testing.T) { RepositoryMAC: mac, } - repoName, err := handler.validateMac(rundata) + repoName, err := cache.validateMac(rundata) require.NoError(t, err) require.Equal(t, name, repoName) }) @@ -36,7 +36,7 @@ func TestMac(t *testing.T) { run := "1" ts := "9223372036854775807" // This should last us for a while... - mac := ComputeMac(handler.secret, name, run, ts, "") + mac := ComputeMac(cache.secret, name, run, ts, "") rundata := RunData{ RepositoryFullName: name, RunNumber: run, @@ -44,7 +44,7 @@ func TestMac(t *testing.T) { RepositoryMAC: mac, } - _, err := handler.validateMac(rundata) + _, err := cache.validateMac(rundata) require.Error(t, err) }) @@ -60,7 +60,7 @@ func TestMac(t *testing.T) { RepositoryMAC: "this is not the right mac :D", } - repoName, err := handler.validateMac(rundata) + repoName, err := cache.validateMac(rundata) require.Error(t, err) require.Equal(t, "", repoName) }) From c28a98082b6162ce3338696adbcc56680bc18248 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Fri, 5 Sep 2025 15:00:38 +0200 Subject: [PATCH 6/9] chore: cache: move repo != cache.Repo in readCache - it only is used after calling readCache - add unit test it reduces the number of testcase to be considered in handler --- act/artifactcache/caches.go | 8 +++-- act/artifactcache/caches_test.go | 54 ++++++++++++++++++++++++++++++++ act/artifactcache/handler.go | 21 ++----------- 3 files changed, 63 insertions(+), 20 deletions(-) create mode 100644 act/artifactcache/caches_test.go diff --git a/act/artifactcache/caches.go b/act/artifactcache/caches.go index 2f5fc0cf..014a6011 100644 --- a/act/artifactcache/caches.go +++ b/act/artifactcache/caches.go @@ -20,7 +20,7 @@ import ( type caches interface { openDB() (*bolthold.Store, error) validateMac(rundata RunData) (string, error) - readCache(id uint64) (*Cache, error) + readCache(id uint64, repo string) (*Cache, error) useCache(id uint64) error setgcAt(at time.Time) gcCache() @@ -139,7 +139,7 @@ func insertCache(db *bolthold.Store, cache *Cache) error { return nil } -func (c *cachesImpl) readCache(id uint64) (*Cache, error) { +func (c *cachesImpl) readCache(id uint64, repo string) (*Cache, error) { db, err := c.openDB() if err != nil { return nil, err @@ -149,6 +149,10 @@ func (c *cachesImpl) readCache(id uint64) (*Cache, error) { if err := db.Get(id, cache); err != nil { return nil, fmt.Errorf("readCache: Get(%v): %w", id, err) } + if cache.Repo != repo { + return nil, fmt.Errorf("readCache: Get(%v): cache.Repo %s != repo %s", id, cache.Repo, repo) + } + return cache, nil } diff --git a/act/artifactcache/caches_test.go b/act/artifactcache/caches_test.go new file mode 100644 index 00000000..a08a9af7 --- /dev/null +++ b/act/artifactcache/caches_test.go @@ -0,0 +1,54 @@ +package artifactcache + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/timshannon/bolthold" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheReadWrite(t *testing.T) { + caches, err := newCaches(t.TempDir(), "secret", logrus.New()) + require.NoError(t, err) + t.Run("NotFound", func(t *testing.T) { + found, err := caches.readCache(456, "repo") + assert.Nil(t, found) + assert.ErrorIs(t, err, bolthold.ErrNotFound) + }) + + repo := "repository" + cache := &Cache{ + Repo: repo, + Key: "key", + Version: "version", + Size: 444, + } + now := time.Now().Unix() + cache.CreatedAt = now + cache.UsedAt = now + cache.Repo = repo + + t.Run("Insert", func(t *testing.T) { + db, err := caches.openDB() + require.NoError(t, err) + defer db.Close() + assert.NoError(t, insertCache(db, cache)) + }) + + t.Run("Found", func(t *testing.T) { + found, err := caches.readCache(cache.ID, cache.Repo) + require.NoError(t, err) + assert.Equal(t, cache.ID, found.ID) + }) + + t.Run("InvalidRepo", func(t *testing.T) { + invalidRepo := "INVALID REPO" + found, err := caches.readCache(cache.ID, invalidRepo) + assert.Nil(t, found) + assert.ErrorContains(t, err, invalidRepo) + }) +} diff --git a/act/artifactcache/handler.go b/act/artifactcache/handler.go index a4985ea3..d8a56e19 100644 --- a/act/artifactcache/handler.go +++ b/act/artifactcache/handler.go @@ -265,7 +265,7 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout return } - cache, err := h.caches.readCache(id) + cache, err := h.caches.readCache(id, repo) if err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) @@ -275,11 +275,6 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout return } - if cache.Repo != repo { - // can only happen if the cache is corrupted - h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid")) - return - } if cache.WriteIsolationKey != rundata.WriteIsolationKey { h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey)) return @@ -320,7 +315,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout return } - cache, err := h.caches.readCache(id) + cache, err := h.caches.readCache(id, repo) if err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) @@ -330,11 +325,6 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout return } - if cache.Repo != repo { - // can only happen if the cache is corrupted - h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid")) - return - } if cache.WriteIsolationKey != rundata.WriteIsolationKey { h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey)) return @@ -384,7 +374,7 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter. return } - cache, err := h.caches.readCache(id) + cache, err := h.caches.readCache(id, repo) if err != nil { if errors.Is(err, bolthold.ErrNotFound) { h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id)) @@ -394,11 +384,6 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter. return } - if cache.Repo != repo { - // can only happen if the cache is corrupted - h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid")) - return - } // reads permitted against caches w/ the same isolation key, or no isolation key if cache.WriteIsolationKey != rundata.WriteIsolationKey && cache.WriteIsolationKey != "" { h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey)) From c48accfb518dab0305dd935d9663724399b9c1d6 Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Fri, 5 Sep 2025 15:03:52 +0200 Subject: [PATCH 7/9] chore: cache: generate mocks for act/artifactcache/caches.go --- Makefile | 4 +- act/artifactcache/caches.go | 1 + act/artifactcache/mock_caches.go | 230 ++++++++++++++++++++++++++++ internal/pkg/client/mocks/Client.go | 2 +- 4 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 act/artifactcache/mock_caches.go diff --git a/Makefile b/Makefile index 1b4890ec..06029b7f 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ else endif endif -GO_PACKAGES_TO_VET ?= $(filter-out code.forgejo.org/forgejo/runner/v11/internal/pkg/client/mocks,$(shell $(GO) list ./...)) +GO_PACKAGES_TO_VET ?= $(filter-out code.forgejo.org/forgejo/runner/v11/internal/pkg/client/mocks code.forgejo.org/forgejo/runner/v11/act/artifactcache/mock_caches.go,$(shell $(GO) list ./...)) TAGS ?= LDFLAGS ?= -X "code.forgejo.org/forgejo/runner/v11/internal/pkg/ver.version=v$(RELEASE_VERSION)" @@ -120,7 +120,7 @@ vet: .PHONY: generate generate: - $(GO) generate ./internal/... + $(GO) generate ./... install: $(GOFILES) $(GO) install -v -tags '$(TAGS)' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)' diff --git a/act/artifactcache/caches.go b/act/artifactcache/caches.go index 014a6011..16b3204d 100644 --- a/act/artifactcache/caches.go +++ b/act/artifactcache/caches.go @@ -17,6 +17,7 @@ import ( "go.etcd.io/bbolt" ) +//go:generate mockery --inpackage --name caches type caches interface { openDB() (*bolthold.Store, error) validateMac(rundata RunData) (string, error) diff --git a/act/artifactcache/mock_caches.go b/act/artifactcache/mock_caches.go new file mode 100644 index 00000000..9d484f80 --- /dev/null +++ b/act/artifactcache/mock_caches.go @@ -0,0 +1,230 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package artifactcache + +import ( + http "net/http" + + bolthold "github.com/timshannon/bolthold" + + io "io" + + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// mockCaches is an autogenerated mock type for the caches type +type mockCaches struct { + mock.Mock +} + +// commit provides a mock function with given fields: id, size +func (_m *mockCaches) commit(id uint64, size int64) (int64, error) { + ret := _m.Called(id, size) + + if len(ret) == 0 { + panic("no return value specified for commit") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(uint64, int64) (int64, error)); ok { + return rf(id, size) + } + if rf, ok := ret.Get(0).(func(uint64, int64) int64); ok { + r0 = rf(id, size) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(uint64, int64) error); ok { + r1 = rf(id, size) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// exist provides a mock function with given fields: id +func (_m *mockCaches) exist(id uint64) (bool, error) { + ret := _m.Called(id) + + if len(ret) == 0 { + panic("no return value specified for exist") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(uint64) (bool, error)); ok { + return rf(id) + } + if rf, ok := ret.Get(0).(func(uint64) bool); ok { + r0 = rf(id) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// gcCache provides a mock function with no fields +func (_m *mockCaches) gcCache() { + _m.Called() +} + +// openDB provides a mock function with no fields +func (_m *mockCaches) openDB() (*bolthold.Store, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for openDB") + } + + var r0 *bolthold.Store + var r1 error + if rf, ok := ret.Get(0).(func() (*bolthold.Store, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() *bolthold.Store); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*bolthold.Store) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// readCache provides a mock function with given fields: id, repo +func (_m *mockCaches) readCache(id uint64, repo string) (*Cache, error) { + ret := _m.Called(id, repo) + + if len(ret) == 0 { + panic("no return value specified for readCache") + } + + var r0 *Cache + var r1 error + if rf, ok := ret.Get(0).(func(uint64, string) (*Cache, error)); ok { + return rf(id, repo) + } + if rf, ok := ret.Get(0).(func(uint64, string) *Cache); ok { + r0 = rf(id, repo) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*Cache) + } + } + + if rf, ok := ret.Get(1).(func(uint64, string) error); ok { + r1 = rf(id, repo) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// serve provides a mock function with given fields: w, r, id +func (_m *mockCaches) serve(w http.ResponseWriter, r *http.Request, id uint64) { + _m.Called(w, r, id) +} + +// setgcAt provides a mock function with given fields: at +func (_m *mockCaches) setgcAt(at time.Time) { + _m.Called(at) +} + +// useCache provides a mock function with given fields: id +func (_m *mockCaches) useCache(id uint64) error { + ret := _m.Called(id) + + if len(ret) == 0 { + panic("no return value specified for useCache") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// validateMac provides a mock function with given fields: rundata +func (_m *mockCaches) validateMac(rundata RunData) (string, error) { + ret := _m.Called(rundata) + + if len(ret) == 0 { + panic("no return value specified for validateMac") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(RunData) (string, error)); ok { + return rf(rundata) + } + if rf, ok := ret.Get(0).(func(RunData) string); ok { + r0 = rf(rundata) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(RunData) error); ok { + r1 = rf(rundata) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// write provides a mock function with given fields: id, offset, reader +func (_m *mockCaches) write(id uint64, offset uint64, reader io.Reader) error { + ret := _m.Called(id, offset, reader) + + if len(ret) == 0 { + panic("no return value specified for write") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64, uint64, io.Reader) error); ok { + r0 = rf(id, offset, reader) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// newMockCaches creates a new instance of mockCaches. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockCaches(t interface { + mock.TestingT + Cleanup(func()) +}, +) *mockCaches { + mock := &mockCaches{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/pkg/client/mocks/Client.go b/internal/pkg/client/mocks/Client.go index f2ea5fe2..3b3ab6cb 100644 --- a/internal/pkg/client/mocks/Client.go +++ b/internal/pkg/client/mocks/Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package mocks From 5f0b036e34966fc3eafa46e2df94cbf1ef58e2fc Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Fri, 5 Sep 2025 17:21:53 +0200 Subject: [PATCH 8/9] chore: cache: move findCacheWithIsolationKeyFallback out of handler.find --- act/artifactcache/caches.go | 15 +++++++++++++++ act/artifactcache/handler.go | 10 +--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/act/artifactcache/caches.go b/act/artifactcache/caches.go index 16b3204d..6d499c8a 100644 --- a/act/artifactcache/caches.go +++ b/act/artifactcache/caches.go @@ -90,6 +90,21 @@ func (c *cachesImpl) openDB() (*bolthold.Store, error) { return db, nil } +var findCacheWithIsolationKeyFallback = func(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) { + cache, err := findCache(db, repo, keys, version, writeIsolationKey) + if err != nil { + return nil, err + } + // If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read + if cache == nil && writeIsolationKey != "" { + cache, err = findCache(db, repo, keys, version, "") + if err != nil { + return nil, err + } + } + return cache, nil +} + // if not found, return (nil, nil) instead of an error. func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) { cache := &Cache{} diff --git a/act/artifactcache/handler.go b/act/artifactcache/handler.go index d8a56e19..0b574397 100644 --- a/act/artifactcache/handler.go +++ b/act/artifactcache/handler.go @@ -177,19 +177,11 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter } defer db.Close() - cache, err := findCache(db, repo, keys, version, rundata.WriteIsolationKey) + cache, err := findCacheWithIsolationKeyFallback(db, repo, keys, version, rundata.WriteIsolationKey) if err != nil { h.responseFatalJSON(w, r, err) return } - // If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read - if cache == nil && rundata.WriteIsolationKey != "" { - cache, err = findCache(db, repo, keys, version, "") - if err != nil { - h.responseFatalJSON(w, r, err) - return - } - } if cache == nil { h.responseJSON(w, r, 204) return From c7e2db2559271722ea1976ea5a5dea9b4f5c1b4f Mon Sep 17 00:00:00 2001 From: Earl Warren Date: Fri, 5 Sep 2025 17:24:36 +0200 Subject: [PATCH 9/9] chore: cache: handler: test coverage for fatal errors --- act/artifactcache/handler_test.go | 154 +++++++++++++++++++++++++++++- 1 file changed, 153 insertions(+), 1 deletion(-) diff --git a/act/artifactcache/handler_test.go b/act/artifactcache/handler_test.go index a1495015..136f1a87 100644 --- a/act/artifactcache/handler_test.go +++ b/act/artifactcache/handler_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" "os" "os/signal" "path/filepath" @@ -16,8 +17,10 @@ import ( "testing" "time" - "code.forgejo.org/forgejo/runner/v9/testutils" + "code.forgejo.org/forgejo/runner/v11/testutils" + "github.com/julienschmidt/httprouter" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" "github.com/timshannon/bolthold" "go.etcd.io/bbolt" @@ -903,6 +906,155 @@ func uploadCacheNormally(t *testing.T, base, key, version, writeIsolationKey str } } +func TestHandlerAPIFatalErrors(t *testing.T) { + for _, testCase := range []struct { + name string + prepare func(message string) func() + caches func(t *testing.T, message string) caches + call func(t *testing.T, handler Handler, w http.ResponseWriter) + }{ + { + name: "find", + prepare: func(message string) func() { + return testutils.MockVariable(&findCacheWithIsolationKeyFallback, func(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) { + return nil, errors.New(message) + }) + }, + caches: func(t *testing.T, message string) caches { + caches, err := newCaches(t.TempDir(), "secret", logrus.New()) + require.NoError(t, err) + return caches + }, + call: func(t *testing.T, handler Handler, w http.ResponseWriter) { + keyOne := "ONE" + req, err := http.NewRequest("GET", fmt.Sprintf("http://example.com/cache?keys=%s", keyOne), nil) + require.NoError(t, err) + req.Header.Set("Forgejo-Cache-Repo", cacheRepo) + req.Header.Set("Forgejo-Cache-RunNumber", cacheRunnum) + req.Header.Set("Forgejo-Cache-Timestamp", cacheTimestamp) + req.Header.Set("Forgejo-Cache-MAC", cacheMac) + req.Header.Set("Forgejo-Cache-Host", "http://example.com") + handler.find(w, req, nil) + }, + }, + { + name: "find open", + caches: func(t *testing.T, message string) caches { + caches := newMockCaches(t) + caches.On("validateMac", RunData{}).Return(cacheRepo, nil) + caches.On("openDB", mock.Anything, mock.Anything).Return(nil, errors.New(message)) + return caches + }, + call: func(t *testing.T, handler Handler, w http.ResponseWriter) { + req, err := http.NewRequest("GET", "example.com/cache", nil) + require.NoError(t, err) + handler.find(w, req, nil) + }, + }, + { + name: "reserve", + caches: func(t *testing.T, message string) caches { + caches := newMockCaches(t) + caches.On("validateMac", RunData{}).Return(cacheRepo, nil) + caches.On("openDB", mock.Anything, mock.Anything).Return(nil, errors.New(message)) + return caches + }, + call: func(t *testing.T, handler Handler, w http.ResponseWriter) { + body, err := json.Marshal(&Request{}) + require.NoError(t, err) + req, err := http.NewRequest("POST", "example.com/caches", bytes.NewReader(body)) + require.NoError(t, err) + handler.reserve(w, req, nil) + }, + }, + { + name: "upload", + caches: func(t *testing.T, message string) caches { + caches := newMockCaches(t) + caches.On("validateMac", RunData{}).Return(cacheRepo, nil) + caches.On("readCache", mock.Anything, mock.Anything).Return(nil, errors.New(message)) + return caches + }, + call: func(t *testing.T, handler Handler, w http.ResponseWriter) { + id := 1234 + req, err := http.NewRequest("PATCH", fmt.Sprintf("http://example.com/caches/%d", id), nil) + require.NoError(t, err) + handler.upload(w, req, httprouter.Params{ + httprouter.Param{Key: "id", Value: fmt.Sprintf("%d", id)}, + }) + }, + }, + { + name: "commit", + caches: func(t *testing.T, message string) caches { + caches := newMockCaches(t) + caches.On("validateMac", RunData{}).Return(cacheRepo, nil) + caches.On("readCache", mock.Anything, mock.Anything).Return(nil, errors.New(message)) + return caches + }, + call: func(t *testing.T, handler Handler, w http.ResponseWriter) { + id := 1234 + req, err := http.NewRequest("POST", fmt.Sprintf("http://example.com/caches/%d", id), nil) + require.NoError(t, err) + handler.commit(w, req, httprouter.Params{ + httprouter.Param{Key: "id", Value: fmt.Sprintf("%d", id)}, + }) + }, + }, + { + name: "get", + caches: func(t *testing.T, message string) caches { + caches := newMockCaches(t) + caches.On("validateMac", RunData{}).Return(cacheRepo, nil) + caches.On("readCache", mock.Anything, mock.Anything).Return(nil, errors.New(message)) + return caches + }, + call: func(t *testing.T, handler Handler, w http.ResponseWriter) { + id := 1234 + req, err := http.NewRequest("GET", fmt.Sprintf("http://example.com/artifacts/%d", id), nil) + require.NoError(t, err) + handler.get(w, req, httprouter.Params{ + httprouter.Param{Key: "id", Value: fmt.Sprintf("%d", id)}, + }) + }, + }, + // it currently is a noop + //{ + //name: "clean", + //} + } { + t.Run(testCase.name, func(t *testing.T) { + message := "ERROR MESSAGE" + if testCase.prepare != nil { + defer testCase.prepare(message)() + } + + fatalMessage := "" + defer testutils.MockVariable(&fatal, func(_ logrus.FieldLogger, err error) { + fatalMessage = err.Error() + })() + + assertFatalMessage := func(t *testing.T, expected string) { + t.Helper() + assert.Contains(t, fatalMessage, expected) + } + + dir := filepath.Join(t.TempDir(), "artifactcache") + handler, err := StartHandler(dir, "", 0, "secret", nil) + require.NoError(t, err) + + fatalMessage = "" + + handler.setCaches(testCase.caches(t, message)) + + w := httptest.NewRecorder() + testCase.call(t, handler, w) + require.Equal(t, 500, w.Code) + assertFatalMessage(t, message) + }) + } +} + func TestHandler_gcCache(t *testing.T) { dir := filepath.Join(t.TempDir(), "artifactcache") handler, err := StartHandler(dir, "", 0, "", nil)