mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-15 18:57:01 +00:00
fix: cache: call fatal() on errors that are not recoverable
- responseFatalJSON(w, r, err) replaces responseJSON(w, r, 500, err) and calls fatal() when the following fail because they are not recoverable. There may be other non-recoverable errors but it is difficult to be 100% sure they cannot be engineered by the caller of the API for DoS purposes. - openDB - findCache - cache.Repo != repo - wrap errors in - openDB() - it was missing - readCache() - it was missing - useCache() - it was missing - findCache() - some had identical messages - in gc - replace logger.Warnf with h.fatal - differentiate errors that have identical messages - call fatal if openDB fails instead of returning
This commit is contained in:
parent
36ca627f2e
commit
37f634fd31
2 changed files with 49 additions and 28 deletions
|
@ -177,7 +177,8 @@ func (h *handler) isClosed() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) openDB() (*bolthold.Store, error) {
|
func (h *handler) openDB() (*bolthold.Store, error) {
|
||||||
return bolthold.Open(filepath.Join(h.dir, "bolt.db"), 0o644, &bolthold.Options{
|
file := filepath.Join(h.dir, "bolt.db")
|
||||||
|
db, err := bolthold.Open(file, 0o644, &bolthold.Options{
|
||||||
Encoder: json.Marshal,
|
Encoder: json.Marshal,
|
||||||
Decoder: json.Unmarshal,
|
Decoder: json.Unmarshal,
|
||||||
Options: &bbolt.Options{
|
Options: &bbolt.Options{
|
||||||
|
@ -186,6 +187,10 @@ func (h *handler) openDB() (*bolthold.Store, error) {
|
||||||
FreelistType: bbolt.DefaultOptions.FreelistType,
|
FreelistType: bbolt.DefaultOptions.FreelistType,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Open(%s): %v", file, err)
|
||||||
|
}
|
||||||
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GET /_apis/artifactcache/cache
|
// GET /_apis/artifactcache/cache
|
||||||
|
@ -206,21 +211,21 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter
|
||||||
|
|
||||||
db, err := h.openDB()
|
db, err := h.openDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseFatalJSON(w, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
cache, err := findCache(db, repo, keys, version, rundata.WriteIsolationKey)
|
cache, err := findCache(db, repo, keys, version, rundata.WriteIsolationKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseFatalJSON(w, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
|
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
|
||||||
if cache == nil && rundata.WriteIsolationKey != "" {
|
if cache == nil && rundata.WriteIsolationKey != "" {
|
||||||
cache, err = findCache(db, repo, keys, version, "")
|
cache, err = findCache(db, repo, keys, version, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseFatalJSON(w, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -265,7 +270,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou
|
||||||
cache := api.ToCache()
|
cache := api.ToCache()
|
||||||
db, err := h.openDB()
|
db, err := h.openDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseFatalJSON(w, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
@ -305,13 +310,13 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err))
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should not happen
|
|
||||||
if cache.Repo != repo {
|
if cache.Repo != repo {
|
||||||
h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid"))
|
// can only happen if the cache is corrupted
|
||||||
|
h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
||||||
|
@ -360,13 +365,13 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err))
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should not happen
|
|
||||||
if cache.Repo != repo {
|
if cache.Repo != repo {
|
||||||
h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid"))
|
// can only happen if the cache is corrupted
|
||||||
|
h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
||||||
|
@ -381,7 +386,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
|
|
||||||
size, err := h.storage.Commit(cache.ID, cache.Size)
|
size, err := h.storage.Commit(cache.ID, cache.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
|
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
|
||||||
|
@ -389,7 +394,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
||||||
|
|
||||||
db, err := h.openDB()
|
db, err := h.openDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.responseJSON(w, r, 500, err)
|
h.responseFatalJSON(w, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
@ -424,13 +429,13 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.
|
||||||
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.responseJSON(w, r, 500, fmt.Errorf("cache Get: %w", err))
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should not happen
|
|
||||||
if cache.Repo != repo {
|
if cache.Repo != repo {
|
||||||
h.responseJSON(w, r, 500, fmt.Errorf("cache repo is not valid"))
|
// can only happen if the cache is corrupted
|
||||||
|
h.responseFatalJSON(w, r, fmt.Errorf("cache repo is not valid"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// reads permitted against caches w/ the same isolation key, or no isolation key
|
// reads permitted against caches w/ the same isolation key, or no isolation key
|
||||||
|
@ -481,7 +486,7 @@ func findCache(db *bolthold.Store, repo string, keys []string, version, writeIso
|
||||||
And("Complete").Eq(true).
|
And("Complete").Eq(true).
|
||||||
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
|
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("find cache: %w", err)
|
return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err)
|
||||||
}
|
}
|
||||||
return cache, nil
|
return cache, nil
|
||||||
}
|
}
|
||||||
|
@ -500,7 +505,7 @@ func findCache(db *bolthold.Store, repo string, keys []string, version, writeIso
|
||||||
if errors.Is(err, bolthold.ErrNotFound) {
|
if errors.Is(err, bolthold.ErrNotFound) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("find cache: %w", err)
|
return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err)
|
||||||
}
|
}
|
||||||
return cache, nil
|
return cache, nil
|
||||||
}
|
}
|
||||||
|
@ -526,7 +531,7 @@ func (h *handler) readCache(id uint64) (*Cache, error) {
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
cache := &Cache{}
|
cache := &Cache{}
|
||||||
if err := db.Get(id, cache); err != nil {
|
if err := db.Get(id, cache); err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("readCache: Get(%v): %w", id, err)
|
||||||
}
|
}
|
||||||
return cache, nil
|
return cache, nil
|
||||||
}
|
}
|
||||||
|
@ -539,10 +544,13 @@ func (h *handler) useCache(id uint64) error {
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
cache := &Cache{}
|
cache := &Cache{}
|
||||||
if err := db.Get(id, cache); err != nil {
|
if err := db.Get(id, cache); err != nil {
|
||||||
return err
|
return fmt.Errorf("useCache: Get(%v): %w", id, err)
|
||||||
}
|
}
|
||||||
cache.UsedAt = time.Now().Unix()
|
cache.UsedAt = time.Now().Unix()
|
||||||
return db.Update(cache.ID, cache)
|
if err := db.Update(cache.ID, cache); err != nil {
|
||||||
|
return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -574,6 +582,7 @@ func (h *handler) gcCache() {
|
||||||
|
|
||||||
db, err := h.openDB()
|
db, err := h.openDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
fatal(h.logger, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
@ -584,12 +593,12 @@ func (h *handler) gcCache() {
|
||||||
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
|
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
|
||||||
And("Complete").Eq(false),
|
And("Complete").Eq(false),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
h.logger.Warnf("find caches: %v", err)
|
fatal(h.logger, fmt.Errorf("gc caches not completed: %v", err))
|
||||||
} else {
|
} else {
|
||||||
for _, cache := range caches {
|
for _, cache := range caches {
|
||||||
h.storage.Remove(cache.ID)
|
h.storage.Remove(cache.ID)
|
||||||
if err := db.Delete(cache.ID, cache); err != nil {
|
if err := db.Delete(cache.ID, cache); err != nil {
|
||||||
h.logger.Warnf("delete cache: %v", err)
|
h.logger.Errorf("delete cache: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
h.logger.Infof("deleted cache: %+v", cache)
|
h.logger.Infof("deleted cache: %+v", cache)
|
||||||
|
@ -601,7 +610,7 @@ func (h *handler) gcCache() {
|
||||||
if err := db.Find(&caches, bolthold.
|
if err := db.Find(&caches, bolthold.
|
||||||
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
|
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
h.logger.Warnf("find caches: %v", err)
|
fatal(h.logger, fmt.Errorf("gc caches old not used: %v", err))
|
||||||
} else {
|
} else {
|
||||||
for _, cache := range caches {
|
for _, cache := range caches {
|
||||||
h.storage.Remove(cache.ID)
|
h.storage.Remove(cache.ID)
|
||||||
|
@ -618,7 +627,7 @@ func (h *handler) gcCache() {
|
||||||
if err := db.Find(&caches, bolthold.
|
if err := db.Find(&caches, bolthold.
|
||||||
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
|
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
h.logger.Warnf("find caches: %v", err)
|
fatal(h.logger, fmt.Errorf("gc caches too old: %v", err))
|
||||||
} else {
|
} else {
|
||||||
for _, cache := range caches {
|
for _, cache := range caches {
|
||||||
h.storage.Remove(cache.ID)
|
h.storage.Remove(cache.ID)
|
||||||
|
@ -637,7 +646,7 @@ func (h *handler) gcCache() {
|
||||||
bolthold.Where("Complete").Eq(true),
|
bolthold.Where("Complete").Eq(true),
|
||||||
"Key", "Version",
|
"Key", "Version",
|
||||||
); err != nil {
|
); err != nil {
|
||||||
h.logger.Warnf("find aggregate caches: %v", err)
|
fatal(h.logger, fmt.Errorf("gc aggregate caches: %v", err))
|
||||||
} else {
|
} else {
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
if result.Count() <= 1 {
|
if result.Count() <= 1 {
|
||||||
|
@ -663,6 +672,11 @@ func (h *handler) gcCache() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *handler) responseFatalJSON(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
|
h.responseJSON(w, r, 500, err)
|
||||||
|
fatal(h.logger, err)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
|
func (h *handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
|
||||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
var data []byte
|
var data []byte
|
||||||
|
|
|
@ -16,10 +16,13 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"code.forgejo.org/forgejo/runner/v9/testutils"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/timshannon/bolthold"
|
"github.com/timshannon/bolthold"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -59,6 +62,10 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHandler(t *testing.T) {
|
func TestHandler(t *testing.T) {
|
||||||
|
defer testutils.MockVariable(&fatal, func(_ logrus.FieldLogger, err error) {
|
||||||
|
t.Fatalf("unexpected call to fatal(%v)", err)
|
||||||
|
})()
|
||||||
|
|
||||||
dir := filepath.Join(t.TempDir(), "artifactcache")
|
dir := filepath.Join(t.TempDir(), "artifactcache")
|
||||||
handler, err := StartHandler(dir, "", 0, "secret", nil)
|
handler, err := StartHandler(dir, "", 0, "secret", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue