1
0
Fork 0
mirror of https://code.forgejo.org/forgejo/runner.git synced 2025-09-30 19:22:09 +00:00
forgejo-runner/act/artifactcache/caches.go
Mathieu Fenniak d79d043696
fix: allow GC & cache operations to operate concurrently (#1040)
Fixes #1039.

Rather than opening and closing the Bolt DB instance constantly, the cache now maintains one open `*bolthold.Store` for its lifetime, allowing GC, cache read, and cache write operations to occur concurrently.

The major risk is this change is, "is it safe to use one Bolt instance across goroutines concurrently?"  [Bolt does document its concurrency requirements](https://github.com/boltdb/bolt?tab=readme-ov-file#transactions), and an analysis of our DB interactions looks to me like it introduces very little risk.

Most of the cache operations perform multiple touches to the database; for example `useCache` performs a read to fetch a cache object, and then an update to set its `UsedAt` timestamp.  If we wanted to ensure consistency in these operations, they should use a Bolt ReadWrite transaction -- but concurrent access would just be setting the field to the same value anyway.

The `gcCache` is the complex operation where a transaction might be warranted -- but doing so would also cause the same bug that #1039 indicates.  I believe it is safe to run without a transaction because it is protected by an application-level mutex (to prevent multiple concurrent GCs), it is the only code that performs deletes from the database -- these should guarantee that all its delete attempts are successful.  In the event of unexpected failure to do the DB write, `gcCache` deletes from the storage before deleting from the DB, so it should just attempt to cleanup again next run.

<!--start release-notes-assistant-->
<!--URL:https://code.forgejo.org/forgejo/runner-->
- bug fixes
  - [PR](https://code.forgejo.org/forgejo/runner/pulls/1040): <!--number 1040 --><!--line 0 --><!--description Zml4OiBhbGxvdyBHQyAmIGNhY2hlIG9wZXJhdGlvbnMgdG8gb3BlcmF0ZSBjb25jdXJyZW50bHk=-->fix: allow GC & cache operations to operate concurrently<!--description-->
<!--end release-notes-assistant-->

Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/1040
Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
2025-09-30 19:12:45 +00:00

324 lines
8.1 KiB
Go

package artifactcache
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"github.com/timshannon/bolthold"
"go.etcd.io/bbolt"
)
//go:generate mockery --inpackage --name caches
type caches interface {
getDB() *bolthold.Store
validateMac(rundata RunData) (string, error)
readCache(id uint64, repo string) (*Cache, error)
useCache(id uint64) error
setgcAt(at time.Time)
gcCache()
close()
serve(w http.ResponseWriter, r *http.Request, id uint64)
commit(id uint64, size int64) (int64, error)
exist(id uint64) (bool, error)
write(id, offset uint64, reader io.Reader) error
}
type cachesImpl struct {
dir string
storage *Storage
logger logrus.FieldLogger
secret string
db *bolthold.Store
gcing atomic.Bool
gcAt time.Time
}
func newCaches(dir, secret string, logger logrus.FieldLogger) (caches, error) {
c := &cachesImpl{
secret: secret,
}
c.logger = logger
if dir == "" {
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
dir = filepath.Join(home, ".cache", "actcache")
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
c.dir = dir
storage, err := NewStorage(filepath.Join(dir, "cache"))
if err != nil {
return nil, err
}
c.storage = storage
file := filepath.Join(c.dir, "bolt.db")
db, err := bolthold.Open(file, 0o644, &bolthold.Options{
Encoder: json.Marshal,
Decoder: json.Unmarshal,
Options: &bbolt.Options{
Timeout: 5 * time.Second,
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
FreelistType: bbolt.DefaultOptions.FreelistType,
},
})
if err != nil {
return nil, fmt.Errorf("Open(%s): %w", file, err)
}
c.db = db
c.gcCache()
return c, nil
}
func (c *cachesImpl) close() {
if c.db != nil {
c.db.Close()
c.db = nil
}
}
func (c *cachesImpl) getDB() *bolthold.Store {
return c.db
}
var findCacheWithIsolationKeyFallback = func(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
cache, err := findCache(db, repo, keys, version, writeIsolationKey)
if err != nil {
return nil, err
}
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
if cache == nil && writeIsolationKey != "" {
cache, err = findCache(db, repo, keys, version, "")
if err != nil {
return nil, err
}
}
return cache, nil
}
// if not found, return (nil, nil) instead of an error.
func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
cache := &Cache{}
for _, prefix := range keys {
// if a key in the list matches exactly, don't return partial matches
if err := db.FindOne(cache,
bolthold.Where("Repo").Eq(repo).Index("Repo").
And("Key").Eq(prefix).
And("Version").Eq(version).
And("WriteIsolationKey").Eq(writeIsolationKey).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
if err != nil {
return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err)
}
return cache, nil
}
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
re, err := regexp.Compile(prefixPattern)
if err != nil {
continue
}
if err := db.FindOne(cache,
bolthold.Where("Repo").Eq(repo).Index("Repo").
And("Key").RegExp(re).
And("Version").Eq(version).
And("WriteIsolationKey").Eq(writeIsolationKey).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
continue
}
return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err)
}
return cache, nil
}
return nil, nil
}
func insertCache(db *bolthold.Store, cache *Cache) error {
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
return fmt.Errorf("insert cache: %w", err)
}
// write back id to db
if err := db.Update(cache.ID, cache); err != nil {
return fmt.Errorf("write back id to db: %w", err)
}
return nil
}
func (c *cachesImpl) readCache(id uint64, repo string) (*Cache, error) {
db := c.getDB()
cache := &Cache{}
if err := db.Get(id, cache); err != nil {
return nil, fmt.Errorf("readCache: Get(%v): %w", id, err)
}
if cache.Repo != repo {
return nil, fmt.Errorf("readCache: Get(%v): cache.Repo %s != repo %s", id, cache.Repo, repo)
}
return cache, nil
}
func (c *cachesImpl) useCache(id uint64) error {
db := c.getDB()
cache := &Cache{}
if err := db.Get(id, cache); err != nil {
return fmt.Errorf("useCache: Get(%v): %w", id, err)
}
cache.UsedAt = time.Now().Unix()
if err := db.Update(cache.ID, cache); err != nil {
return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err)
}
return nil
}
func (c *cachesImpl) serve(w http.ResponseWriter, r *http.Request, id uint64) {
c.storage.Serve(w, r, id)
}
func (c *cachesImpl) commit(id uint64, size int64) (int64, error) {
return c.storage.Commit(id, size)
}
func (c *cachesImpl) exist(id uint64) (bool, error) {
return c.storage.Exist(id)
}
func (c *cachesImpl) write(id, offset uint64, reader io.Reader) error {
return c.storage.Write(id, offset, reader)
}
const (
keepUsed = 30 * 24 * time.Hour
keepUnused = 7 * 24 * time.Hour
keepTemp = 5 * time.Minute
keepOld = 5 * time.Minute
)
func (c *cachesImpl) setgcAt(at time.Time) {
c.gcAt = at
}
func (c *cachesImpl) gcCache() {
if c.gcing.Load() {
return
}
if !c.gcing.CompareAndSwap(false, true) {
return
}
defer c.gcing.Store(false)
if time.Since(c.gcAt) < time.Hour {
c.logger.Debugf("skip gc: %v", c.gcAt.String())
return
}
c.gcAt = time.Now()
c.logger.Debugf("gc: %v", c.gcAt.String())
db := c.getDB()
// Remove the caches which are not completed for a while, they are most likely to be broken.
var caches []*Cache
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
And("Complete").Eq(false),
); err != nil {
fatal(c.logger, fmt.Errorf("gc caches not completed: %v", err))
} else {
for _, cache := range caches {
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Errorf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches which have not been used recently.
caches = caches[:0]
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
); err != nil {
fatal(c.logger, fmt.Errorf("gc caches old not used: %v", err))
} else {
for _, cache := range caches {
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Warnf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches which are too old.
caches = caches[:0]
if err := db.Find(&caches, bolthold.
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
); err != nil {
fatal(c.logger, fmt.Errorf("gc caches too old: %v", err))
} else {
for _, cache := range caches {
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Warnf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
// Remove the old caches with the same key and version, keep the latest one.
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
if results, err := db.FindAggregate(
&Cache{},
bolthold.Where("Complete").Eq(true),
"Key", "Version",
); err != nil {
fatal(c.logger, fmt.Errorf("gc aggregate caches: %v", err))
} else {
for _, result := range results {
if result.Count() <= 1 {
continue
}
result.Sort("CreatedAt")
caches = caches[:0]
result.Reduction(&caches)
for _, cache := range caches[:len(caches)-1] {
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
// Keep it since it has been used recently, even if it's old.
// Or it could break downloading in process.
continue
}
c.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
c.logger.Warnf("delete cache: %v", err)
continue
}
c.logger.Infof("deleted cache: %+v", cache)
}
}
}
}