mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-30 19:22:09 +00:00
Fixes #1039. Rather than opening and closing the Bolt DB instance constantly, the cache now maintains one open `*bolthold.Store` for its lifetime, allowing GC, cache read, and cache write operations to occur concurrently. The major risk is this change is, "is it safe to use one Bolt instance across goroutines concurrently?" [Bolt does document its concurrency requirements](https://github.com/boltdb/bolt?tab=readme-ov-file#transactions), and an analysis of our DB interactions looks to me like it introduces very little risk. Most of the cache operations perform multiple touches to the database; for example `useCache` performs a read to fetch a cache object, and then an update to set its `UsedAt` timestamp. If we wanted to ensure consistency in these operations, they should use a Bolt ReadWrite transaction -- but concurrent access would just be setting the field to the same value anyway. The `gcCache` is the complex operation where a transaction might be warranted -- but doing so would also cause the same bug that #1039 indicates. I believe it is safe to run without a transaction because it is protected by an application-level mutex (to prevent multiple concurrent GCs), it is the only code that performs deletes from the database -- these should guarantee that all its delete attempts are successful. In the event of unexpected failure to do the DB write, `gcCache` deletes from the storage before deleting from the DB, so it should just attempt to cleanup again next run. <!--start release-notes-assistant--> <!--URL:https://code.forgejo.org/forgejo/runner--> - bug fixes - [PR](https://code.forgejo.org/forgejo/runner/pulls/1040): <!--number 1040 --><!--line 0 --><!--description Zml4OiBhbGxvdyBHQyAmIGNhY2hlIG9wZXJhdGlvbnMgdG8gb3BlcmF0ZSBjb25jdXJyZW50bHk=-->fix: allow GC & cache operations to operate concurrently<!--description--> <!--end release-notes-assistant--> Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/1040 Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
324 lines
8.1 KiB
Go
324 lines
8.1 KiB
Go
package artifactcache
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/timshannon/bolthold"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
//go:generate mockery --inpackage --name caches
|
|
type caches interface {
|
|
getDB() *bolthold.Store
|
|
validateMac(rundata RunData) (string, error)
|
|
readCache(id uint64, repo string) (*Cache, error)
|
|
useCache(id uint64) error
|
|
setgcAt(at time.Time)
|
|
gcCache()
|
|
close()
|
|
|
|
serve(w http.ResponseWriter, r *http.Request, id uint64)
|
|
commit(id uint64, size int64) (int64, error)
|
|
exist(id uint64) (bool, error)
|
|
write(id, offset uint64, reader io.Reader) error
|
|
}
|
|
|
|
type cachesImpl struct {
|
|
dir string
|
|
storage *Storage
|
|
logger logrus.FieldLogger
|
|
secret string
|
|
|
|
db *bolthold.Store
|
|
|
|
gcing atomic.Bool
|
|
gcAt time.Time
|
|
}
|
|
|
|
func newCaches(dir, secret string, logger logrus.FieldLogger) (caches, error) {
|
|
c := &cachesImpl{
|
|
secret: secret,
|
|
}
|
|
|
|
c.logger = logger
|
|
|
|
if dir == "" {
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dir = filepath.Join(home, ".cache", "actcache")
|
|
}
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.dir = dir
|
|
|
|
storage, err := NewStorage(filepath.Join(dir, "cache"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.storage = storage
|
|
|
|
file := filepath.Join(c.dir, "bolt.db")
|
|
db, err := bolthold.Open(file, 0o644, &bolthold.Options{
|
|
Encoder: json.Marshal,
|
|
Decoder: json.Unmarshal,
|
|
Options: &bbolt.Options{
|
|
Timeout: 5 * time.Second,
|
|
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
|
|
FreelistType: bbolt.DefaultOptions.FreelistType,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Open(%s): %w", file, err)
|
|
}
|
|
c.db = db
|
|
|
|
c.gcCache()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (c *cachesImpl) close() {
|
|
if c.db != nil {
|
|
c.db.Close()
|
|
c.db = nil
|
|
}
|
|
}
|
|
|
|
func (c *cachesImpl) getDB() *bolthold.Store {
|
|
return c.db
|
|
}
|
|
|
|
var findCacheWithIsolationKeyFallback = func(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
|
|
cache, err := findCache(db, repo, keys, version, writeIsolationKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
|
|
if cache == nil && writeIsolationKey != "" {
|
|
cache, err = findCache(db, repo, keys, version, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return cache, nil
|
|
}
|
|
|
|
// if not found, return (nil, nil) instead of an error.
|
|
func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
|
|
cache := &Cache{}
|
|
for _, prefix := range keys {
|
|
// if a key in the list matches exactly, don't return partial matches
|
|
if err := db.FindOne(cache,
|
|
bolthold.Where("Repo").Eq(repo).Index("Repo").
|
|
And("Key").Eq(prefix).
|
|
And("Version").Eq(version).
|
|
And("WriteIsolationKey").Eq(writeIsolationKey).
|
|
And("Complete").Eq(true).
|
|
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err)
|
|
}
|
|
return cache, nil
|
|
}
|
|
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
|
|
re, err := regexp.Compile(prefixPattern)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if err := db.FindOne(cache,
|
|
bolthold.Where("Repo").Eq(repo).Index("Repo").
|
|
And("Key").RegExp(re).
|
|
And("Version").Eq(version).
|
|
And("WriteIsolationKey").Eq(writeIsolationKey).
|
|
And("Complete").Eq(true).
|
|
SortBy("CreatedAt").Reverse()); err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err)
|
|
}
|
|
return cache, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func insertCache(db *bolthold.Store, cache *Cache) error {
|
|
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
|
return fmt.Errorf("insert cache: %w", err)
|
|
}
|
|
// write back id to db
|
|
if err := db.Update(cache.ID, cache); err != nil {
|
|
return fmt.Errorf("write back id to db: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cachesImpl) readCache(id uint64, repo string) (*Cache, error) {
|
|
db := c.getDB()
|
|
cache := &Cache{}
|
|
if err := db.Get(id, cache); err != nil {
|
|
return nil, fmt.Errorf("readCache: Get(%v): %w", id, err)
|
|
}
|
|
if cache.Repo != repo {
|
|
return nil, fmt.Errorf("readCache: Get(%v): cache.Repo %s != repo %s", id, cache.Repo, repo)
|
|
}
|
|
|
|
return cache, nil
|
|
}
|
|
|
|
func (c *cachesImpl) useCache(id uint64) error {
|
|
db := c.getDB()
|
|
cache := &Cache{}
|
|
if err := db.Get(id, cache); err != nil {
|
|
return fmt.Errorf("useCache: Get(%v): %w", id, err)
|
|
}
|
|
cache.UsedAt = time.Now().Unix()
|
|
if err := db.Update(cache.ID, cache); err != nil {
|
|
return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cachesImpl) serve(w http.ResponseWriter, r *http.Request, id uint64) {
|
|
c.storage.Serve(w, r, id)
|
|
}
|
|
|
|
func (c *cachesImpl) commit(id uint64, size int64) (int64, error) {
|
|
return c.storage.Commit(id, size)
|
|
}
|
|
|
|
func (c *cachesImpl) exist(id uint64) (bool, error) {
|
|
return c.storage.Exist(id)
|
|
}
|
|
|
|
func (c *cachesImpl) write(id, offset uint64, reader io.Reader) error {
|
|
return c.storage.Write(id, offset, reader)
|
|
}
|
|
|
|
const (
|
|
keepUsed = 30 * 24 * time.Hour
|
|
keepUnused = 7 * 24 * time.Hour
|
|
keepTemp = 5 * time.Minute
|
|
keepOld = 5 * time.Minute
|
|
)
|
|
|
|
func (c *cachesImpl) setgcAt(at time.Time) {
|
|
c.gcAt = at
|
|
}
|
|
|
|
func (c *cachesImpl) gcCache() {
|
|
if c.gcing.Load() {
|
|
return
|
|
}
|
|
if !c.gcing.CompareAndSwap(false, true) {
|
|
return
|
|
}
|
|
defer c.gcing.Store(false)
|
|
|
|
if time.Since(c.gcAt) < time.Hour {
|
|
c.logger.Debugf("skip gc: %v", c.gcAt.String())
|
|
return
|
|
}
|
|
c.gcAt = time.Now()
|
|
c.logger.Debugf("gc: %v", c.gcAt.String())
|
|
|
|
db := c.getDB()
|
|
|
|
// Remove the caches which are not completed for a while, they are most likely to be broken.
|
|
var caches []*Cache
|
|
if err := db.Find(&caches, bolthold.
|
|
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
|
|
And("Complete").Eq(false),
|
|
); err != nil {
|
|
fatal(c.logger, fmt.Errorf("gc caches not completed: %v", err))
|
|
} else {
|
|
for _, cache := range caches {
|
|
c.storage.Remove(cache.ID)
|
|
if err := db.Delete(cache.ID, cache); err != nil {
|
|
c.logger.Errorf("delete cache: %v", err)
|
|
continue
|
|
}
|
|
c.logger.Infof("deleted cache: %+v", cache)
|
|
}
|
|
}
|
|
|
|
// Remove the old caches which have not been used recently.
|
|
caches = caches[:0]
|
|
if err := db.Find(&caches, bolthold.
|
|
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
|
|
); err != nil {
|
|
fatal(c.logger, fmt.Errorf("gc caches old not used: %v", err))
|
|
} else {
|
|
for _, cache := range caches {
|
|
c.storage.Remove(cache.ID)
|
|
if err := db.Delete(cache.ID, cache); err != nil {
|
|
c.logger.Warnf("delete cache: %v", err)
|
|
continue
|
|
}
|
|
c.logger.Infof("deleted cache: %+v", cache)
|
|
}
|
|
}
|
|
|
|
// Remove the old caches which are too old.
|
|
caches = caches[:0]
|
|
if err := db.Find(&caches, bolthold.
|
|
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
|
|
); err != nil {
|
|
fatal(c.logger, fmt.Errorf("gc caches too old: %v", err))
|
|
} else {
|
|
for _, cache := range caches {
|
|
c.storage.Remove(cache.ID)
|
|
if err := db.Delete(cache.ID, cache); err != nil {
|
|
c.logger.Warnf("delete cache: %v", err)
|
|
continue
|
|
}
|
|
c.logger.Infof("deleted cache: %+v", cache)
|
|
}
|
|
}
|
|
|
|
// Remove the old caches with the same key and version, keep the latest one.
|
|
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
|
|
if results, err := db.FindAggregate(
|
|
&Cache{},
|
|
bolthold.Where("Complete").Eq(true),
|
|
"Key", "Version",
|
|
); err != nil {
|
|
fatal(c.logger, fmt.Errorf("gc aggregate caches: %v", err))
|
|
} else {
|
|
for _, result := range results {
|
|
if result.Count() <= 1 {
|
|
continue
|
|
}
|
|
result.Sort("CreatedAt")
|
|
caches = caches[:0]
|
|
result.Reduction(&caches)
|
|
for _, cache := range caches[:len(caches)-1] {
|
|
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
|
|
// Keep it since it has been used recently, even if it's old.
|
|
// Or it could break downloading in process.
|
|
continue
|
|
}
|
|
c.storage.Remove(cache.ID)
|
|
if err := db.Delete(cache.ID, cache); err != nil {
|
|
c.logger.Warnf("delete cache: %v", err)
|
|
continue
|
|
}
|
|
c.logger.Infof("deleted cache: %+v", cache)
|
|
}
|
|
}
|
|
}
|
|
}
|