1
0
Fork 0
mirror of https://code.forgejo.org/forgejo/runner.git synced 2025-09-15 18:57:01 +00:00

fix: shutdown the runner when the cache fails with a non recoverable error (#935)

Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/935
Reviewed-by: Mathieu Fenniak <mfenniak@noreply.code.forgejo.org>
Reviewed-by: Michael Kriese <michael.kriese@gmx.de>
This commit is contained in:
earl-warren 2025-09-07 16:03:31 +00:00
commit 09adcc47d2
No known key found for this signature in database
GPG key ID: F128CBE6AB3A7201
10 changed files with 883 additions and 310 deletions

View file

@ -62,7 +62,7 @@ else
endif
endif
GO_PACKAGES_TO_VET ?= $(filter-out code.forgejo.org/forgejo/runner/v11/internal/pkg/client/mocks,$(shell $(GO) list ./...))
GO_PACKAGES_TO_VET ?= $(filter-out code.forgejo.org/forgejo/runner/v11/internal/pkg/client/mocks code.forgejo.org/forgejo/runner/v11/act/artifactcache/mock_caches.go,$(shell $(GO) list ./...))
TAGS ?=
LDFLAGS ?= -X "code.forgejo.org/forgejo/runner/v11/internal/pkg/ver.version=v$(RELEASE_VERSION)"
@ -120,7 +120,7 @@ vet:
.PHONY: generate
generate:
$(GO) generate ./internal/...
$(GO) generate ./...
install: $(GOFILES)
$(GO) install -v -tags '$(TAGS)' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)'

325
act/artifactcache/caches.go Normal file
View file

@ -0,0 +1,325 @@
package artifactcache
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"github.com/timshannon/bolthold"
"go.etcd.io/bbolt"
)
//go:generate mockery --inpackage --name caches
type caches interface {
openDB() (*bolthold.Store, error)
validateMac(rundata RunData) (string, error)
readCache(id uint64, repo string) (*Cache, error)
useCache(id uint64) error
setgcAt(at time.Time)
gcCache()
serve(w http.ResponseWriter, r *http.Request, id uint64)
commit(id uint64, size int64) (int64, error)
exist(id uint64) (bool, error)
write(id, offset uint64, reader io.Reader) error
}
type cachesImpl struct {
dir string
storage *Storage
logger logrus.FieldLogger
secret string
gcing atomic.Bool
gcAt time.Time
}
func newCaches(dir, secret string, logger logrus.FieldLogger) (caches, error) {
c := &cachesImpl{
secret: secret,
}
c.logger = logger
if dir == "" {
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
dir = filepath.Join(home, ".cache", "actcache")
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
c.dir = dir
storage, err := NewStorage(filepath.Join(dir, "cache"))
if err != nil {
return nil, err
}
c.storage = storage
c.gcCache()
return c, nil
}
func (c *cachesImpl) openDB() (*bolthold.Store, error) {
file := filepath.Join(c.dir, "bolt.db")
db, err := bolthold.Open(file, 0o644, &bolthold.Options{
Encoder: json.Marshal,
Decoder: json.Unmarshal,
Options: &bbolt.Options{
Timeout: 5 * time.Second,
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
FreelistType: bbolt.DefaultOptions.FreelistType,
},
})
if err != nil {
return nil, fmt.Errorf("Open(%s): %w", file, err)
}
return db, nil
}
var findCacheWithIsolationKeyFallback = func(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
cache, err := findCache(db, repo, keys, version, writeIsolationKey)
if err != nil {
return nil, err
}
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
if cache == nil && writeIsolationKey != "" {
cache, err = findCache(db, repo, keys, version, "")
if err != nil {
return nil, err
}
}
return cache, nil
}
// if not found, return (nil, nil) instead of an error.
func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
cache := &Cache{}
for _, prefix := range keys {
// if a key in the list matches exactly, don't return partial matches
if err := db.FindOne(cache,
bolthold.Where("Repo").Eq(repo).Index("Repo").
And("Key").Eq(prefix).
And("Version").Eq(version).
And("WriteIsolationKey").Eq(writeIsolationKey).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
if err != nil {
return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err)
}
return cache, nil
}
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
re, err := regexp.Compile(prefixPattern)
if err != nil {
continue
}
if err := db.FindOne(cache,
bolthold.Where("Repo").Eq(repo).Index("Repo").
And("Key").RegExp(re).
And("Version").Eq(version).
And("WriteIsolationKey").Eq(writeIsolationKey).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
continue
}
return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err)
}
return cache, nil
}
return nil, nil
}
func insertCache(db *bolthold.Store, cache *Cache) error {
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
return fmt.Errorf("insert cache: %w", err)
}
// write back id to db
if err := db.Update(cache.ID, cache); err != nil {
return fmt.Errorf("write back id to db: %w", err)
}
return nil
}
func (c *cachesImpl) readCache(id uint64, repo string) (*Cache, error) {
db, err := c.openDB()
if err != nil {
return nil, err
}
defer db.Close()
cache := &Cache{}
if err := db.Get(id, cache); err != nil {
return nil, fmt.Errorf("readCache: Get(%v): %w", id, err)
}
if cache.Repo != repo {
return nil, fmt.Errorf("readCache: Get(%v): cache.Repo %s != repo %s", id, cache.Repo, repo)
}
return cache, nil
}
func (c *cachesImpl) useCache(id uint64) error {
db, err := c.openDB()
if err != nil {
return err
}
defer db.Close()
cache := &Cache{}
if err := db.Get(id, cache); err != nil {
return fmt.Errorf("useCache: Get(%v): %w", id, err)
}
cache.UsedAt = time.Now().Unix()
if err := db.Update(cache.ID, cache); err != nil {
return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err)
}
return nil
}
func (c *cachesImpl) serve(w http.ResponseWriter, r *http.Request, id uint64) {
c.storage.Serve(w, r, id)
}
func (c *cachesImpl) commit(id uint64, size int64) (int64, error) {
return c.storage.Commit(id, size)
}
func (c *cachesImpl) exist(id uint64) (bool, error) {
return c.storage.Exist(id)
}
func (c *cachesImpl) write(id, offset uint64, reader io.Reader) error {
return c.storage.Write(id, offset, reader)
}
const (
keepUsed = 30 * 24 * time.Hour
keepUnused = 7 * 24 * time.Hour
keepTemp = 5 * time.Minute
keepOld = 5 * time.Minute
)
func (c *cachesImpl) setgcAt(at time.Time) {
c.gcAt = at
}
func (c *cachesImpl) gcCache() {
if c.gcing.Load() {
return
}
if !c.gcing.CompareAndSwap(false, true) {
return
}
defer c.gcing.Store(false)
if time.Since(c.gcAt) < time.Hour {
c.logger.Debugf("skip gc: %v", c.gcAt.String())
return
}
c.gcAt = time.Now()
c.logger.Debugf("gc: %v", c.gcAt.String())
db, err := c.openDB()
if err != nil {
fatal(c.logger, err)
return
}
defer db.Close()
// Remove the caches which are not completed for a while, they are most likely to be broken.
var caches []*Cache
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
And("Complete").Eq(false),
); err != nil {
fatal(c.logger, fmt.Errorf("gc caches not completed: %v", err))
} else {
for _, cache := range caches {
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Errorf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches which have not been used recently.
caches = caches[:0]
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
); err != nil {
fatal(c.logger, fmt.Errorf("gc caches old not used: %v", err))
} else {
for _, cache := range caches {
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Warnf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches which are too old.
caches = caches[:0]
if err := db.Find(&caches, bolthold.
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
); err != nil {
fatal(c.logger, fmt.Errorf("gc caches too old: %v", err))
} else {
for _, cache := range caches {
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Warnf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches with the same key and version, keep the latest one.
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
if results, err := db.FindAggregate(
&Cache{},
bolthold.Where("Complete").Eq(true),
"Key", "Version",
); err != nil {
fatal(c.logger, fmt.Errorf("gc aggregate caches: %v", err))
} else {
for _, result := range results {
if result.Count() <= 1 {
continue
}
result.Sort("CreatedAt")
caches = caches[:0]
result.Reduction(&caches)
for _, cache := range caches[:len(caches)-1] {
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
// Keep it since it has been used recently, even if it's old.
// Or it could break downloading in process.
continue
}
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Warnf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
}
}

View file

@ -0,0 +1,54 @@
package artifactcache
import (
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/timshannon/bolthold"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCacheReadWrite(t *testing.T) {
caches, err := newCaches(t.TempDir(), "secret", logrus.New())
require.NoError(t, err)
t.Run("NotFound", func(t *testing.T) {
found, err := caches.readCache(456, "repo")
assert.Nil(t, found)
assert.ErrorIs(t, err, bolthold.ErrNotFound)
})
repo := "repository"
cache := &Cache{
Repo: repo,
Key: "key",
Version: "version",
Size: 444,
}
now := time.Now().Unix()
cache.CreatedAt = now
cache.UsedAt = now
cache.Repo = repo
t.Run("Insert", func(t *testing.T) {
db, err := caches.openDB()
require.NoError(t, err)
defer db.Close()
assert.NoError(t, insertCache(db, cache))
})
t.Run("Found", func(t *testing.T) {
found, err := caches.readCache(cache.ID, cache.Repo)
require.NoError(t, err)
assert.Equal(t, cache.ID, found.ID)
})
t.Run("InvalidRepo", func(t *testing.T) {
invalidRepo := "INVALID REPO"
found, err := caches.readCache(cache.ID, invalidRepo)
assert.Nil(t, found)
assert.ErrorContains(t, err, invalidRepo)
})
}

View file

@ -7,18 +7,14 @@ import (
"io"
"net"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
"github.com/timshannon/bolthold"
"go.etcd.io/bbolt"
"code.forgejo.org/forgejo/runner/v11/act/common"
)
@ -27,11 +23,19 @@ const (
urlBase = "/_apis/artifactcache"
)
var fatal = func(logger logrus.FieldLogger, err error) {
logger.Errorf("unrecoverable error in the cache: %v", err)
if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil {
logger.Errorf("unrecoverable error in the cache: failed to send the TERM signal to shutdown the daemon %v", err)
}
}
type Handler interface {
ExternalURL() string
Close() error
isClosed() bool
openDB() (*bolthold.Store, error)
getCaches() caches
setCaches(caches caches)
find(w http.ResponseWriter, r *http.Request, params httprouter.Params)
reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params)
upload(w http.ResponseWriter, r *http.Request, params httprouter.Params)
@ -39,32 +43,21 @@ type Handler interface {
get(w http.ResponseWriter, r *http.Request, params httprouter.Params)
clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params)
middleware(handler httprouter.Handle) httprouter.Handle
readCache(id uint64) (*Cache, error)
useCache(id uint64) error
setgcAt(at time.Time)
gcCache()
responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any)
}
type handler struct {
dir string
storage *Storage
caches caches
router *httprouter.Router
listener net.Listener
server *http.Server
logger logrus.FieldLogger
secret string
gcing atomic.Bool
gcAt time.Time
outboundIP string
}
func StartHandler(dir, outboundIP string, port uint16, secret string, logger logrus.FieldLogger) (Handler, error) {
h := &handler{
secret: secret,
}
h := &handler{}
if logger == nil {
discard := logrus.New()
@ -74,24 +67,11 @@ func StartHandler(dir, outboundIP string, port uint16, secret string, logger log
logger = logger.WithField("module", "artifactcache")
h.logger = logger
if dir == "" {
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
dir = filepath.Join(home, ".cache", "actcache")
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
h.dir = dir
storage, err := NewStorage(filepath.Join(dir, "cache"))
caches, err := newCaches(dir, secret, logger)
if err != nil {
return nil, err
}
h.storage = storage
h.caches = caches
if outboundIP != "" {
h.outboundIP = outboundIP
@ -111,8 +91,6 @@ func StartHandler(dir, outboundIP string, port uint16, secret string, logger log
h.router = router
h.gcCache()
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces
if err != nil {
return nil, err
@ -168,22 +146,18 @@ func (h *handler) isClosed() bool {
return h.listener == nil && h.server == nil
}
func (h *handler) openDB() (*bolthold.Store, error) {
return bolthold.Open(filepath.Join(h.dir, "bolt.db"), 0o644, &bolthold.Options{
Encoder: json.Marshal,
Decoder: json.Unmarshal,
Options: &bbolt.Options{
Timeout: 5 * time.Second,
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
FreelistType: bbolt.DefaultOptions.FreelistType,
},
})
func (h *handler) getCaches() caches {
return h.caches
}
func (h *handler) setCaches(caches caches) {
h.caches = caches
}
// GET /_apis/artifactcache/cache
func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
rundata := runDataFromHeaders(r)
repo, err := h.validateMac(rundata)
repo, err := h.caches.validateMac(rundata)
if err != nil {
h.responseJSON(w, r, 403, err)
return
@ -196,32 +170,24 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter
}
version := r.URL.Query().Get("version")
db, err := h.openDB()
db, err := h.caches.openDB()
if err != nil {
h.responseJSON(w, r, 500, err)
h.responseFatalJSON(w, r, err)
return
}
defer db.Close()
cache, err := findCache(db, repo, keys, version, rundata.WriteIsolationKey)
cache, err := findCacheWithIsolationKeyFallback(db, repo, keys, version, rundata.WriteIsolationKey)
if err != nil {
h.responseJSON(w, r, 500, err)
h.responseFatalJSON(w, r, err)
return
}
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
if cache == nil && rundata.WriteIsolationKey != "" {
cache, err = findCache(db, repo, keys, version, "")
if err != nil {
h.responseJSON(w, r, 500, err)
return
}
}
if cache == nil {
h.responseJSON(w, r, 204)
return
}
if ok, err := h.storage.Exist(cache.ID); err != nil {
if ok, err := h.caches.exist(cache.ID); err != nil {
h.responseJSON(w, r, 500, err)
return
} else if !ok {
@ -240,7 +206,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter
// POST /_apis/artifactcache/caches
func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
rundata := runDataFromHeaders(r)
repo, err := h.validateMac(rundata)
repo, err := h.caches.validateMac(rundata)
if err != nil {
h.responseJSON(w, r, 403, err)
return
@ -255,9 +221,9 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou
api.Key = strings.ToLower(api.Key)
cache := api.ToCache()
db, err := h.openDB()
db, err := h.caches.openDB()
if err != nil {
h.responseJSON(w, r, 500, err)
h.responseFatalJSON(w, r, err)
return
}
defer db.Close()
@ -279,7 +245,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou
// PATCH /_apis/artifactcache/caches/:id
func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
rundata := runDataFromHeaders(r)
repo, err := h.validateMac(rundata)
repo, err := h.caches.validateMac(rundata)
if err != nil {
h.responseJSON(w, r, 403, err)
return
@ -291,21 +257,16 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
return
}
cache, err := h.readCache(id)
cache, err := h.caches.readCache(id, repo)
if err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
return
}
h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err))
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
return
}
// Should not happen
if cache.Repo != repo {
h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid"))
return
}
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
return
@ -320,11 +281,11 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
h.responseJSON(w, r, 400, fmt.Errorf("cache parseContentRange(%s): %w", r.Header.Get("Content-Range"), err))
return
}
if err := h.storage.Write(cache.ID, start, r.Body); err != nil {
if err := h.caches.write(cache.ID, start, r.Body); err != nil {
h.responseJSON(w, r, 500, fmt.Errorf("cache storage.Write: %w", err))
return
}
if err := h.useCache(id); err != nil {
if err := h.caches.useCache(id); err != nil {
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
return
}
@ -334,7 +295,7 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
// POST /_apis/artifactcache/caches/:id
func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
rundata := runDataFromHeaders(r)
repo, err := h.validateMac(rundata)
repo, err := h.caches.validateMac(rundata)
if err != nil {
h.responseJSON(w, r, 403, err)
return
@ -346,21 +307,16 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
return
}
cache, err := h.readCache(id)
cache, err := h.caches.readCache(id, repo)
if err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
return
}
h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err))
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
return
}
// Should not happen
if cache.Repo != repo {
h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid"))
return
}
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
return
@ -371,17 +327,17 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
return
}
size, err := h.storage.Commit(cache.ID, cache.Size)
size, err := h.caches.commit(cache.ID, cache.Size)
if err != nil {
h.responseJSON(w, r, 500, err)
h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err))
return
}
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
cache.Size = size
db, err := h.openDB()
db, err := h.caches.openDB()
if err != nil {
h.responseJSON(w, r, 500, err)
h.responseFatalJSON(w, r, err)
return
}
defer db.Close()
@ -398,7 +354,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
// GET /_apis/artifactcache/artifacts/:id
func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
rundata := runDataFromHeaders(r)
repo, err := h.validateMac(rundata)
repo, err := h.caches.validateMac(rundata)
if err != nil {
h.responseJSON(w, r, 403, err)
return
@ -410,38 +366,33 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.
return
}
cache, err := h.readCache(id)
cache, err := h.caches.readCache(id, repo)
if err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
return
}
h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err))
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
return
}
// Should not happen
if cache.Repo != repo {
h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid"))
return
}
// reads permitted against caches w/ the same isolation key, or no isolation key
if cache.WriteIsolationKey != rundata.WriteIsolationKey && cache.WriteIsolationKey != "" {
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
return
}
if err := h.useCache(id); err != nil {
if err := h.caches.useCache(id); err != nil {
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
return
}
h.storage.Serve(w, r, id)
h.caches.serve(w, r, id)
}
// POST /_apis/artifactcache/clean
func (h *handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
rundata := runDataFromHeaders(r)
_, err := h.validateMac(rundata)
_, err := h.caches.validateMac(rundata)
if err != nil {
h.responseJSON(w, r, 403, err)
return
@ -456,203 +407,13 @@ func (h *handler) middleware(handler httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
h.logger.Debugf("%s %s", r.Method, r.RequestURI)
handler(w, r, params)
go h.gcCache()
go h.caches.gcCache()
}
}
// if not found, return (nil, nil) instead of an error.
func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
cache := &Cache{}
for _, prefix := range keys {
// if a key in the list matches exactly, don't return partial matches
if err := db.FindOne(cache,
bolthold.Where("Repo").Eq(repo).Index("Repo").
And("Key").Eq(prefix).
And("Version").Eq(version).
And("WriteIsolationKey").Eq(writeIsolationKey).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
if err != nil {
return nil, fmt.Errorf("find cache: %w", err)
}
return cache, nil
}
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
re, err := regexp.Compile(prefixPattern)
if err != nil {
continue
}
if err := db.FindOne(cache,
bolthold.Where("Repo").Eq(repo).Index("Repo").
And("Key").RegExp(re).
And("Version").Eq(version).
And("WriteIsolationKey").Eq(writeIsolationKey).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
continue
}
return nil, fmt.Errorf("find cache: %w", err)
}
return cache, nil
}
return nil, nil
}
func insertCache(db *bolthold.Store, cache *Cache) error {
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
return fmt.Errorf("insert cache: %w", err)
}
// write back id to db
if err := db.Update(cache.ID, cache); err != nil {
return fmt.Errorf("write back id to db: %w", err)
}
return nil
}
func (h *handler) readCache(id uint64) (*Cache, error) {
db, err := h.openDB()
if err != nil {
return nil, err
}
defer db.Close()
cache := &Cache{}
if err := db.Get(id, cache); err != nil {
return nil, err
}
return cache, nil
}
func (h *handler) useCache(id uint64) error {
db, err := h.openDB()
if err != nil {
return err
}
defer db.Close()
cache := &Cache{}
if err := db.Get(id, cache); err != nil {
return err
}
cache.UsedAt = time.Now().Unix()
return db.Update(cache.ID, cache)
}
const (
keepUsed = 30 * 24 * time.Hour
keepUnused = 7 * 24 * time.Hour
keepTemp = 5 * time.Minute
keepOld = 5 * time.Minute
)
func (h *handler) setgcAt(at time.Time) {
h.gcAt = at
}
func (h *handler) gcCache() {
if h.gcing.Load() {
return
}
if !h.gcing.CompareAndSwap(false, true) {
return
}
defer h.gcing.Store(false)
if time.Since(h.gcAt) < time.Hour {
h.logger.Debugf("skip gc: %v", h.gcAt.String())
return
}
h.gcAt = time.Now()
h.logger.Debugf("gc: %v", h.gcAt.String())
db, err := h.openDB()
if err != nil {
return
}
defer db.Close()
// Remove the caches which are not completed for a while, they are most likely to be broken.
var caches []*Cache
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
And("Complete").Eq(false),
); err != nil {
h.logger.Warnf("find caches: %v", err)
} else {
for _, cache := range caches {
h.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
h.logger.Warnf("delete cache: %v", err)
continue
}
h.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches which have not been used recently.
caches = caches[:0]
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
); err != nil {
h.logger.Warnf("find caches: %v", err)
} else {
for _, cache := range caches {
h.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
h.logger.Warnf("delete cache: %v", err)
continue
}
h.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches which are too old.
caches = caches[:0]
if err := db.Find(&caches, bolthold.
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
); err != nil {
h.logger.Warnf("find caches: %v", err)
} else {
for _, cache := range caches {
h.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
h.logger.Warnf("delete cache: %v", err)
continue
}
h.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches with the same key and version, keep the latest one.
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
if results, err := db.FindAggregate(
&Cache{},
bolthold.Where("Complete").Eq(true),
"Key", "Version",
); err != nil {
h.logger.Warnf("find aggregate caches: %v", err)
} else {
for _, result := range results {
if result.Count() <= 1 {
continue
}
result.Sort("CreatedAt")
caches = caches[:0]
result.Reduction(&caches)
for _, cache := range caches[:len(caches)-1] {
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
// Keep it since it has been used recently, even if it's old.
// Or it could break downloading in process.
continue
}
h.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
h.logger.Warnf("delete cache: %v", err)
continue
}
h.logger.Infof("deleted cache: %+v", cache)
}
}
}
func (h *handler) responseFatalJSON(w http.ResponseWriter, r *http.Request, err error) {
h.responseJSON(w, r, 500, err)
fatal(h.logger, err)
}
func (h *handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {

View file

@ -4,18 +4,28 @@ import (
"bytes"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"code.forgejo.org/forgejo/runner/v11/testutils"
"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/timshannon/bolthold"
"go.etcd.io/bbolt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@ -55,6 +65,10 @@ var (
)
func TestHandler(t *testing.T) {
defer testutils.MockVariable(&fatal, func(_ logrus.FieldLogger, err error) {
t.Fatalf("unexpected call to fatal(%v)", err)
})()
dir := filepath.Join(t.TempDir(), "artifactcache")
handler, err := StartHandler(dir, "", 0, "secret", nil)
require.NoError(t, err)
@ -63,8 +77,8 @@ func TestHandler(t *testing.T) {
base := fmt.Sprintf("%s%s", handler.ExternalURL(), urlBase)
defer func() {
t.Run("inpect db", func(t *testing.T) {
db, err := handler.openDB()
t.Run("inspect db", func(t *testing.T) {
db, err := handler.getCaches().openDB()
require.NoError(t, err)
defer db.Close()
require.NoError(t, db.Bolt().View(func(tx *bbolt.Tx) error {
@ -892,6 +906,155 @@ func uploadCacheNormally(t *testing.T, base, key, version, writeIsolationKey str
}
}
func TestHandlerAPIFatalErrors(t *testing.T) {
for _, testCase := range []struct {
name string
prepare func(message string) func()
caches func(t *testing.T, message string) caches
call func(t *testing.T, handler Handler, w http.ResponseWriter)
}{
{
name: "find",
prepare: func(message string) func() {
return testutils.MockVariable(&findCacheWithIsolationKeyFallback, func(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
return nil, errors.New(message)
})
},
caches: func(t *testing.T, message string) caches {
caches, err := newCaches(t.TempDir(), "secret", logrus.New())
require.NoError(t, err)
return caches
},
call: func(t *testing.T, handler Handler, w http.ResponseWriter) {
keyOne := "ONE"
req, err := http.NewRequest("GET", fmt.Sprintf("http://example.com/cache?keys=%s", keyOne), nil)
require.NoError(t, err)
req.Header.Set("Forgejo-Cache-Repo", cacheRepo)
req.Header.Set("Forgejo-Cache-RunNumber", cacheRunnum)
req.Header.Set("Forgejo-Cache-Timestamp", cacheTimestamp)
req.Header.Set("Forgejo-Cache-MAC", cacheMac)
req.Header.Set("Forgejo-Cache-Host", "http://example.com")
handler.find(w, req, nil)
},
},
{
name: "find open",
caches: func(t *testing.T, message string) caches {
caches := newMockCaches(t)
caches.On("validateMac", RunData{}).Return(cacheRepo, nil)
caches.On("openDB", mock.Anything, mock.Anything).Return(nil, errors.New(message))
return caches
},
call: func(t *testing.T, handler Handler, w http.ResponseWriter) {
req, err := http.NewRequest("GET", "example.com/cache", nil)
require.NoError(t, err)
handler.find(w, req, nil)
},
},
{
name: "reserve",
caches: func(t *testing.T, message string) caches {
caches := newMockCaches(t)
caches.On("validateMac", RunData{}).Return(cacheRepo, nil)
caches.On("openDB", mock.Anything, mock.Anything).Return(nil, errors.New(message))
return caches
},
call: func(t *testing.T, handler Handler, w http.ResponseWriter) {
body, err := json.Marshal(&Request{})
require.NoError(t, err)
req, err := http.NewRequest("POST", "example.com/caches", bytes.NewReader(body))
require.NoError(t, err)
handler.reserve(w, req, nil)
},
},
{
name: "upload",
caches: func(t *testing.T, message string) caches {
caches := newMockCaches(t)
caches.On("validateMac", RunData{}).Return(cacheRepo, nil)
caches.On("readCache", mock.Anything, mock.Anything).Return(nil, errors.New(message))
return caches
},
call: func(t *testing.T, handler Handler, w http.ResponseWriter) {
id := 1234
req, err := http.NewRequest("PATCH", fmt.Sprintf("http://example.com/caches/%d", id), nil)
require.NoError(t, err)
handler.upload(w, req, httprouter.Params{
httprouter.Param{Key: "id", Value: fmt.Sprintf("%d", id)},
})
},
},
{
name: "commit",
caches: func(t *testing.T, message string) caches {
caches := newMockCaches(t)
caches.On("validateMac", RunData{}).Return(cacheRepo, nil)
caches.On("readCache", mock.Anything, mock.Anything).Return(nil, errors.New(message))
return caches
},
call: func(t *testing.T, handler Handler, w http.ResponseWriter) {
id := 1234
req, err := http.NewRequest("POST", fmt.Sprintf("http://example.com/caches/%d", id), nil)
require.NoError(t, err)
handler.commit(w, req, httprouter.Params{
httprouter.Param{Key: "id", Value: fmt.Sprintf("%d", id)},
})
},
},
{
name: "get",
caches: func(t *testing.T, message string) caches {
caches := newMockCaches(t)
caches.On("validateMac", RunData{}).Return(cacheRepo, nil)
caches.On("readCache", mock.Anything, mock.Anything).Return(nil, errors.New(message))
return caches
},
call: func(t *testing.T, handler Handler, w http.ResponseWriter) {
id := 1234
req, err := http.NewRequest("GET", fmt.Sprintf("http://example.com/artifacts/%d", id), nil)
require.NoError(t, err)
handler.get(w, req, httprouter.Params{
httprouter.Param{Key: "id", Value: fmt.Sprintf("%d", id)},
})
},
},
// it currently is a noop
//{
//name: "clean",
//}
} {
t.Run(testCase.name, func(t *testing.T) {
message := "ERROR MESSAGE"
if testCase.prepare != nil {
defer testCase.prepare(message)()
}
fatalMessage := "<unset>"
defer testutils.MockVariable(&fatal, func(_ logrus.FieldLogger, err error) {
fatalMessage = err.Error()
})()
assertFatalMessage := func(t *testing.T, expected string) {
t.Helper()
assert.Contains(t, fatalMessage, expected)
}
dir := filepath.Join(t.TempDir(), "artifactcache")
handler, err := StartHandler(dir, "", 0, "secret", nil)
require.NoError(t, err)
fatalMessage = "<unset>"
handler.setCaches(testCase.caches(t, message))
w := httptest.NewRecorder()
testCase.call(t, handler, w)
require.Equal(t, 500, w.Code)
assertFatalMessage(t, message)
})
}
}
func TestHandler_gcCache(t *testing.T) {
dir := filepath.Join(t.TempDir(), "artifactcache")
handler, err := StartHandler(dir, "", 0, "", nil)
@ -975,17 +1138,17 @@ func TestHandler_gcCache(t *testing.T) {
},
}
db, err := handler.openDB()
db, err := handler.getCaches().openDB()
require.NoError(t, err)
for _, c := range cases {
require.NoError(t, insertCache(db, c.Cache))
}
require.NoError(t, db.Close())
handler.setgcAt(time.Time{}) // ensure gcCache will not skip
handler.gcCache()
handler.getCaches().setgcAt(time.Time{}) // ensure gcCache will not skip
handler.getCaches().gcCache()
db, err = handler.openDB()
db, err = handler.getCaches().openDB()
require.NoError(t, err)
for i, v := range cases {
t.Run(fmt.Sprintf("%d_%s", i, v.Cache.Key), func(t *testing.T) {
@ -1032,3 +1195,42 @@ func TestHandler_ExternalURL(t *testing.T) {
assert.True(t, handler.isClosed())
})
}
var (
settleTime = 100 * time.Millisecond
fatalWaitingTime = 30 * time.Second
)
func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) {
t.Helper()
// Sleep multiple times to give the kernel more tries to
// deliver the signal.
start := time.Now()
timer := time.NewTimer(settleTime / 10)
defer timer.Stop()
for time.Since(start) < fatalWaitingTime {
select {
case s := <-c:
if s == sig {
return
}
t.Fatalf("signal was %v, want %v", s, sig)
case <-timer.C:
timer.Reset(settleTime / 10)
}
}
t.Fatalf("timeout after %v waiting for %v", fatalWaitingTime, sig)
}
func TestHandler_fatal(t *testing.T) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
defer signal.Stop(c)
discard := logrus.New()
discard.Out = io.Discard
fatal(discard, errors.New("fatal error"))
waitSig(t, c, syscall.SIGTERM)
}

View file

@ -14,13 +14,13 @@ import (
var ErrValidation = errors.New("validation error")
func (h *handler) validateMac(rundata RunData) (string, error) {
func (c *cachesImpl) validateMac(rundata RunData) (string, error) {
// TODO: allow configurable max age
if !validateAge(rundata.Timestamp) {
return "", ErrValidation
}
expectedMAC := ComputeMac(h.secret, rundata.RepositoryFullName, rundata.RunNumber, rundata.Timestamp, rundata.WriteIsolationKey)
expectedMAC := ComputeMac(c.secret, rundata.RepositoryFullName, rundata.RunNumber, rundata.Timestamp, rundata.WriteIsolationKey)
if hmac.Equal([]byte(expectedMAC), []byte(rundata.RepositoryMAC)) {
return rundata.RepositoryFullName, nil
}

View file

@ -9,7 +9,7 @@ import (
)
func TestMac(t *testing.T) {
handler := &handler{
cache := &cachesImpl{
secret: "secret for testing",
}
@ -18,7 +18,7 @@ func TestMac(t *testing.T) {
run := "1"
ts := strconv.FormatInt(time.Now().Unix(), 10)
mac := ComputeMac(handler.secret, name, run, ts, "")
mac := ComputeMac(cache.secret, name, run, ts, "")
rundata := RunData{
RepositoryFullName: name,
RunNumber: run,
@ -26,7 +26,7 @@ func TestMac(t *testing.T) {
RepositoryMAC: mac,
}
repoName, err := handler.validateMac(rundata)
repoName, err := cache.validateMac(rundata)
require.NoError(t, err)
require.Equal(t, name, repoName)
})
@ -36,7 +36,7 @@ func TestMac(t *testing.T) {
run := "1"
ts := "9223372036854775807" // This should last us for a while...
mac := ComputeMac(handler.secret, name, run, ts, "")
mac := ComputeMac(cache.secret, name, run, ts, "")
rundata := RunData{
RepositoryFullName: name,
RunNumber: run,
@ -44,7 +44,7 @@ func TestMac(t *testing.T) {
RepositoryMAC: mac,
}
_, err := handler.validateMac(rundata)
_, err := cache.validateMac(rundata)
require.Error(t, err)
})
@ -60,7 +60,7 @@ func TestMac(t *testing.T) {
RepositoryMAC: "this is not the right mac :D",
}
repoName, err := handler.validateMac(rundata)
repoName, err := cache.validateMac(rundata)
require.Error(t, err)
require.Equal(t, "", repoName)
})

View file

@ -0,0 +1,230 @@
// Code generated by mockery v2.53.5. DO NOT EDIT.
package artifactcache
import (
http "net/http"
bolthold "github.com/timshannon/bolthold"
io "io"
mock "github.com/stretchr/testify/mock"
time "time"
)
// mockCaches is an autogenerated mock type for the caches type
type mockCaches struct {
mock.Mock
}
// commit provides a mock function with given fields: id, size
func (_m *mockCaches) commit(id uint64, size int64) (int64, error) {
ret := _m.Called(id, size)
if len(ret) == 0 {
panic("no return value specified for commit")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(uint64, int64) (int64, error)); ok {
return rf(id, size)
}
if rf, ok := ret.Get(0).(func(uint64, int64) int64); ok {
r0 = rf(id, size)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(uint64, int64) error); ok {
r1 = rf(id, size)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// exist provides a mock function with given fields: id
func (_m *mockCaches) exist(id uint64) (bool, error) {
ret := _m.Called(id)
if len(ret) == 0 {
panic("no return value specified for exist")
}
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(uint64) (bool, error)); ok {
return rf(id)
}
if rf, ok := ret.Get(0).(func(uint64) bool); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(uint64) error); ok {
r1 = rf(id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// gcCache provides a mock function with no fields
func (_m *mockCaches) gcCache() {
_m.Called()
}
// openDB provides a mock function with no fields
func (_m *mockCaches) openDB() (*bolthold.Store, error) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for openDB")
}
var r0 *bolthold.Store
var r1 error
if rf, ok := ret.Get(0).(func() (*bolthold.Store, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() *bolthold.Store); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*bolthold.Store)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// readCache provides a mock function with given fields: id, repo
func (_m *mockCaches) readCache(id uint64, repo string) (*Cache, error) {
ret := _m.Called(id, repo)
if len(ret) == 0 {
panic("no return value specified for readCache")
}
var r0 *Cache
var r1 error
if rf, ok := ret.Get(0).(func(uint64, string) (*Cache, error)); ok {
return rf(id, repo)
}
if rf, ok := ret.Get(0).(func(uint64, string) *Cache); ok {
r0 = rf(id, repo)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*Cache)
}
}
if rf, ok := ret.Get(1).(func(uint64, string) error); ok {
r1 = rf(id, repo)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// serve provides a mock function with given fields: w, r, id
func (_m *mockCaches) serve(w http.ResponseWriter, r *http.Request, id uint64) {
_m.Called(w, r, id)
}
// setgcAt provides a mock function with given fields: at
func (_m *mockCaches) setgcAt(at time.Time) {
_m.Called(at)
}
// useCache provides a mock function with given fields: id
func (_m *mockCaches) useCache(id uint64) error {
ret := _m.Called(id)
if len(ret) == 0 {
panic("no return value specified for useCache")
}
var r0 error
if rf, ok := ret.Get(0).(func(uint64) error); ok {
r0 = rf(id)
} else {
r0 = ret.Error(0)
}
return r0
}
// validateMac provides a mock function with given fields: rundata
func (_m *mockCaches) validateMac(rundata RunData) (string, error) {
ret := _m.Called(rundata)
if len(ret) == 0 {
panic("no return value specified for validateMac")
}
var r0 string
var r1 error
if rf, ok := ret.Get(0).(func(RunData) (string, error)); ok {
return rf(rundata)
}
if rf, ok := ret.Get(0).(func(RunData) string); ok {
r0 = rf(rundata)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(RunData) error); ok {
r1 = rf(rundata)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// write provides a mock function with given fields: id, offset, reader
func (_m *mockCaches) write(id uint64, offset uint64, reader io.Reader) error {
ret := _m.Called(id, offset, reader)
if len(ret) == 0 {
panic("no return value specified for write")
}
var r0 error
if rf, ok := ret.Get(0).(func(uint64, uint64, io.Reader) error); ok {
r0 = rf(id, offset, reader)
} else {
r0 = ret.Error(0)
}
return r0
}
// newMockCaches creates a new instance of mockCaches. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func newMockCaches(t interface {
mock.TestingT
Cleanup(func())
},
) *mockCaches {
mock := &mockCaches{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -8,6 +8,7 @@ import (
"fmt"
"os"
"os/signal"
"syscall"
"code.forgejo.org/forgejo/runner/v11/internal/pkg/config"
@ -73,7 +74,7 @@ func runCacheServer(ctx context.Context, configFile *string, cacheArgs *cacheSer
log.Infof("cache server is listening on %v", cacheHandler.ExternalURL())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
return nil

View file

@ -1,4 +1,4 @@
// Code generated by mockery v2.53.4. DO NOT EDIT.
// Code generated by mockery v2.53.5. DO NOT EDIT.
package mocks