mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-30 19:22:09 +00:00
Fixes #1039. Rather than opening and closing the Bolt DB instance constantly, the cache now maintains one open `*bolthold.Store` for its lifetime, allowing GC, cache read, and cache write operations to occur concurrently. The major risk is this change is, "is it safe to use one Bolt instance across goroutines concurrently?" [Bolt does document its concurrency requirements](https://github.com/boltdb/bolt?tab=readme-ov-file#transactions), and an analysis of our DB interactions looks to me like it introduces very little risk. Most of the cache operations perform multiple touches to the database; for example `useCache` performs a read to fetch a cache object, and then an update to set its `UsedAt` timestamp. If we wanted to ensure consistency in these operations, they should use a Bolt ReadWrite transaction -- but concurrent access would just be setting the field to the same value anyway. The `gcCache` is the complex operation where a transaction might be warranted -- but doing so would also cause the same bug that #1039 indicates. I believe it is safe to run without a transaction because it is protected by an application-level mutex (to prevent multiple concurrent GCs), it is the only code that performs deletes from the database -- these should guarantee that all its delete attempts are successful. In the event of unexpected failure to do the DB write, `gcCache` deletes from the storage before deleting from the DB, so it should just attempt to cleanup again next run. <!--start release-notes-assistant--> <!--URL:https://code.forgejo.org/forgejo/runner--> - bug fixes - [PR](https://code.forgejo.org/forgejo/runner/pulls/1040): <!--number 1040 --><!--line 0 --><!--description Zml4OiBhbGxvdyBHQyAmIGNhY2hlIG9wZXJhdGlvbnMgdG8gb3BlcmF0ZSBjb25jdXJyZW50bHk=-->fix: allow GC & cache operations to operate concurrently<!--description--> <!--end release-notes-assistant--> Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/1040 Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
460 lines
12 KiB
Go
460 lines
12 KiB
Go
package artifactcache
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/julienschmidt/httprouter"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/timshannon/bolthold"
|
|
|
|
"code.forgejo.org/forgejo/runner/v11/act/common"
|
|
)
|
|
|
|
const (
|
|
urlBase = "/_apis/artifactcache"
|
|
)
|
|
|
|
var fatal = func(logger logrus.FieldLogger, err error) {
|
|
logger.Errorf("unrecoverable error in the cache: %v", err)
|
|
if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil {
|
|
logger.Errorf("unrecoverable error in the cache: failed to send the TERM signal to shutdown the daemon %v", err)
|
|
}
|
|
}
|
|
|
|
type Handler interface {
|
|
ExternalURL() string
|
|
Close() error
|
|
isClosed() bool
|
|
getCaches() caches
|
|
setCaches(caches caches)
|
|
find(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
upload(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
commit(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
get(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params)
|
|
middleware(handler httprouter.Handle) httprouter.Handle
|
|
responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any)
|
|
}
|
|
|
|
type handler struct {
|
|
caches caches
|
|
router *httprouter.Router
|
|
listener net.Listener
|
|
server *http.Server
|
|
logger logrus.FieldLogger
|
|
|
|
outboundIP string
|
|
}
|
|
|
|
func StartHandler(dir, outboundIP string, port uint16, secret string, logger logrus.FieldLogger) (Handler, error) {
|
|
h := &handler{}
|
|
|
|
if logger == nil {
|
|
discard := logrus.New()
|
|
discard.Out = io.Discard
|
|
logger = discard
|
|
}
|
|
logger = logger.WithField("module", "artifactcache")
|
|
h.logger = logger
|
|
|
|
caches, err := newCaches(dir, secret, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h.caches = caches
|
|
|
|
if outboundIP != "" {
|
|
h.outboundIP = outboundIP
|
|
} else if ip := common.GetOutboundIP(); ip == nil {
|
|
return nil, fmt.Errorf("unable to determine outbound IP address")
|
|
} else {
|
|
h.outboundIP = ip.String()
|
|
}
|
|
|
|
router := httprouter.New()
|
|
router.GET(urlBase+"/cache", h.middleware(h.find))
|
|
router.POST(urlBase+"/caches", h.middleware(h.reserve))
|
|
router.PATCH(urlBase+"/caches/:id", h.middleware(h.upload))
|
|
router.POST(urlBase+"/caches/:id", h.middleware(h.commit))
|
|
router.GET(urlBase+"/artifacts/:id", h.middleware(h.get))
|
|
router.POST(urlBase+"/clean", h.middleware(h.clean))
|
|
|
|
h.router = router
|
|
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
server := &http.Server{
|
|
ReadHeaderTimeout: 2 * time.Second,
|
|
Handler: router,
|
|
}
|
|
go func() {
|
|
if err := server.Serve(listener); err != nil && errors.Is(err, net.ErrClosed) {
|
|
logger.Errorf("http serve: %v", err)
|
|
}
|
|
}()
|
|
h.listener = listener
|
|
h.server = server
|
|
|
|
return h, nil
|
|
}
|
|
|
|
func (h *handler) ExternalURL() string {
|
|
port := strconv.Itoa(h.listener.Addr().(*net.TCPAddr).Port)
|
|
|
|
// TODO: make the external url configurable if necessary
|
|
return fmt.Sprintf("http://%s", net.JoinHostPort(h.outboundIP, port))
|
|
}
|
|
|
|
func (h *handler) Close() error {
|
|
if h == nil {
|
|
return nil
|
|
}
|
|
var retErr error
|
|
if h.caches != nil {
|
|
h.caches.close()
|
|
h.caches = nil
|
|
}
|
|
if h.server != nil {
|
|
err := h.server.Close()
|
|
if err != nil {
|
|
retErr = err
|
|
}
|
|
h.server = nil
|
|
}
|
|
if h.listener != nil {
|
|
err := h.listener.Close()
|
|
if errors.Is(err, net.ErrClosed) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
retErr = err
|
|
}
|
|
h.listener = nil
|
|
}
|
|
return retErr
|
|
}
|
|
|
|
func (h *handler) isClosed() bool {
|
|
return h.listener == nil && h.server == nil
|
|
}
|
|
|
|
func (h *handler) getCaches() caches {
|
|
return h.caches
|
|
}
|
|
|
|
func (h *handler) setCaches(caches caches) {
|
|
if h.caches != nil {
|
|
h.caches.close()
|
|
}
|
|
h.caches = caches
|
|
}
|
|
|
|
// GET /_apis/artifactcache/cache
|
|
func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
keys := strings.Split(r.URL.Query().Get("keys"), ",")
|
|
// cache keys are case insensitive
|
|
for i, key := range keys {
|
|
keys[i] = strings.ToLower(key)
|
|
}
|
|
version := r.URL.Query().Get("version")
|
|
|
|
db := h.caches.getDB()
|
|
|
|
cache, err := findCacheWithIsolationKeyFallback(db, repo, keys, version, rundata.WriteIsolationKey)
|
|
if err != nil {
|
|
h.responseFatalJSON(w, r, err)
|
|
return
|
|
}
|
|
if cache == nil {
|
|
h.responseJSON(w, r, 204)
|
|
return
|
|
}
|
|
|
|
if ok, err := h.caches.exist(cache.ID); err != nil {
|
|
h.responseJSON(w, r, 500, err)
|
|
return
|
|
} else if !ok {
|
|
_ = db.Delete(cache.ID, cache)
|
|
h.responseJSON(w, r, 204)
|
|
return
|
|
}
|
|
archiveLocation := fmt.Sprintf("%s/%s%s/artifacts/%d", r.Header.Get("Forgejo-Cache-Host"), r.Header.Get("Forgejo-Cache-RunId"), urlBase, cache.ID)
|
|
h.responseJSON(w, r, 200, map[string]any{
|
|
"result": "hit",
|
|
"archiveLocation": archiveLocation,
|
|
"cacheKey": cache.Key,
|
|
})
|
|
}
|
|
|
|
// POST /_apis/artifactcache/caches
|
|
func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
api := &Request{}
|
|
if err := json.NewDecoder(r.Body).Decode(api); err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
// cache keys are case insensitive
|
|
api.Key = strings.ToLower(api.Key)
|
|
|
|
cache := api.ToCache()
|
|
db := h.caches.getDB()
|
|
|
|
now := time.Now().Unix()
|
|
cache.CreatedAt = now
|
|
cache.UsedAt = now
|
|
cache.Repo = repo
|
|
cache.WriteIsolationKey = rundata.WriteIsolationKey
|
|
if err := insertCache(db, cache); err != nil {
|
|
h.responseJSON(w, r, 500, err)
|
|
return
|
|
}
|
|
h.responseJSON(w, r, 200, map[string]any{
|
|
"cacheId": cache.ID,
|
|
})
|
|
}
|
|
|
|
// PATCH /_apis/artifactcache/caches/:id
|
|
func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.ParseUint(params.ByName("id"), 10, 64)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
|
|
cache, err := h.caches.readCache(id, repo)
|
|
if err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
|
return
|
|
}
|
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
|
return
|
|
}
|
|
|
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
|
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
|
|
return
|
|
}
|
|
|
|
if cache.Complete {
|
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
|
return
|
|
}
|
|
start, _, err := parseContentRange(r.Header.Get("Content-Range"))
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, fmt.Errorf("cache parseContentRange(%s): %w", r.Header.Get("Content-Range"), err))
|
|
return
|
|
}
|
|
if err := h.caches.write(cache.ID, start, r.Body); err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("cache storage.Write: %w", err))
|
|
return
|
|
}
|
|
if err := h.caches.useCache(id); err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
|
|
return
|
|
}
|
|
h.responseJSON(w, r, 200)
|
|
}
|
|
|
|
// POST /_apis/artifactcache/caches/:id
|
|
func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.ParseUint(params.ByName("id"), 10, 64)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
|
|
cache, err := h.caches.readCache(id, repo)
|
|
if err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
|
return
|
|
}
|
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
|
return
|
|
}
|
|
|
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
|
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
|
|
return
|
|
}
|
|
|
|
if cache.Complete {
|
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
|
return
|
|
}
|
|
|
|
size, err := h.caches.commit(cache.ID, cache.Size)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err))
|
|
return
|
|
}
|
|
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
|
|
cache.Size = size
|
|
|
|
db := h.caches.getDB()
|
|
|
|
cache.Complete = true
|
|
if err := db.Update(cache.ID, cache); err != nil {
|
|
h.responseJSON(w, r, 500, err)
|
|
return
|
|
}
|
|
|
|
h.responseJSON(w, r, 200)
|
|
}
|
|
|
|
// GET /_apis/artifactcache/artifacts/:id
|
|
func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.ParseUint(params.ByName("id"), 10, 64)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
|
|
cache, err := h.caches.readCache(id, repo)
|
|
if err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
|
return
|
|
}
|
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
|
return
|
|
}
|
|
|
|
// reads permitted against caches w/ the same isolation key, or no isolation key
|
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey && cache.WriteIsolationKey != "" {
|
|
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
|
|
return
|
|
}
|
|
|
|
if err := h.caches.useCache(id); err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
|
|
return
|
|
}
|
|
h.caches.serve(w, r, id)
|
|
}
|
|
|
|
// POST /_apis/artifactcache/clean
|
|
func (h *handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
_, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
// TODO: don't support force deleting cache entries
|
|
// see: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries
|
|
|
|
h.responseJSON(w, r, 200)
|
|
}
|
|
|
|
func (h *handler) middleware(handler httprouter.Handle) httprouter.Handle {
|
|
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
h.logger.Debugf("%s %s", r.Method, r.RequestURI)
|
|
handler(w, r, params)
|
|
go h.caches.gcCache()
|
|
}
|
|
}
|
|
|
|
func (h *handler) responseFatalJSON(w http.ResponseWriter, r *http.Request, err error) {
|
|
h.responseJSON(w, r, 500, err)
|
|
fatal(h.logger, err)
|
|
}
|
|
|
|
func (h *handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
var data []byte
|
|
if len(v) == 0 || v[0] == nil {
|
|
data, _ = json.Marshal(struct{}{})
|
|
} else if err, ok := v[0].(error); ok {
|
|
h.logger.Errorf("%v %v: %v", r.Method, r.RequestURI, err)
|
|
data, _ = json.Marshal(map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
} else {
|
|
data, _ = json.Marshal(v[0])
|
|
}
|
|
w.WriteHeader(code)
|
|
_, _ = w.Write(data)
|
|
}
|
|
|
|
func parseContentRange(s string) (uint64, uint64, error) {
|
|
// support the format like "bytes 11-22/*" only
|
|
s, _, _ = strings.Cut(strings.TrimPrefix(s, "bytes "), "/")
|
|
s1, s2, _ := strings.Cut(s, "-")
|
|
|
|
start, err := strconv.ParseUint(s1, 10, 64)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("parse %q: %w", s, err)
|
|
}
|
|
stop, err := strconv.ParseUint(s2, 10, 64)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("parse %q: %w", s, err)
|
|
}
|
|
return start, stop, nil
|
|
}
|
|
|
|
type RunData struct {
|
|
RepositoryFullName string
|
|
RunNumber string
|
|
Timestamp string
|
|
RepositoryMAC string
|
|
WriteIsolationKey string
|
|
}
|
|
|
|
func runDataFromHeaders(r *http.Request) RunData {
|
|
return RunData{
|
|
RepositoryFullName: r.Header.Get("Forgejo-Cache-Repo"),
|
|
RunNumber: r.Header.Get("Forgejo-Cache-RunNumber"),
|
|
Timestamp: r.Header.Get("Forgejo-Cache-Timestamp"),
|
|
RepositoryMAC: r.Header.Get("Forgejo-Cache-MAC"),
|
|
WriteIsolationKey: r.Header.Get("Forgejo-Cache-WriteIsolationKey"),
|
|
}
|
|
}
|