mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-09-30 19:22:09 +00:00
- it only is used after calling readCache - add unit test it reduces the number of testcase to be considered in handler
476 lines
13 KiB
Go
476 lines
13 KiB
Go
package artifactcache
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/julienschmidt/httprouter"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/timshannon/bolthold"
|
|
|
|
"code.forgejo.org/forgejo/runner/v11/act/common"
|
|
)
|
|
|
|
const (
|
|
urlBase = "/_apis/artifactcache"
|
|
)
|
|
|
|
var fatal = func(logger logrus.FieldLogger, err error) {
|
|
logger.Errorf("unrecoverable error in the cache: %v", err)
|
|
if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil {
|
|
logger.Errorf("unrecoverable error in the cache: failed to send the TERM signal to shutdown the daemon %v", err)
|
|
}
|
|
}
|
|
|
|
type Handler interface {
|
|
ExternalURL() string
|
|
Close() error
|
|
isClosed() bool
|
|
getCaches() caches
|
|
setCaches(caches caches)
|
|
find(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
upload(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
commit(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
get(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
|
clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params)
|
|
middleware(handler httprouter.Handle) httprouter.Handle
|
|
responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any)
|
|
}
|
|
|
|
type handler struct {
|
|
caches caches
|
|
router *httprouter.Router
|
|
listener net.Listener
|
|
server *http.Server
|
|
logger logrus.FieldLogger
|
|
|
|
outboundIP string
|
|
}
|
|
|
|
func StartHandler(dir, outboundIP string, port uint16, secret string, logger logrus.FieldLogger) (Handler, error) {
|
|
h := &handler{}
|
|
|
|
if logger == nil {
|
|
discard := logrus.New()
|
|
discard.Out = io.Discard
|
|
logger = discard
|
|
}
|
|
logger = logger.WithField("module", "artifactcache")
|
|
h.logger = logger
|
|
|
|
caches, err := newCaches(dir, secret, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h.caches = caches
|
|
|
|
if outboundIP != "" {
|
|
h.outboundIP = outboundIP
|
|
} else if ip := common.GetOutboundIP(); ip == nil {
|
|
return nil, fmt.Errorf("unable to determine outbound IP address")
|
|
} else {
|
|
h.outboundIP = ip.String()
|
|
}
|
|
|
|
router := httprouter.New()
|
|
router.GET(urlBase+"/cache", h.middleware(h.find))
|
|
router.POST(urlBase+"/caches", h.middleware(h.reserve))
|
|
router.PATCH(urlBase+"/caches/:id", h.middleware(h.upload))
|
|
router.POST(urlBase+"/caches/:id", h.middleware(h.commit))
|
|
router.GET(urlBase+"/artifacts/:id", h.middleware(h.get))
|
|
router.POST(urlBase+"/clean", h.middleware(h.clean))
|
|
|
|
h.router = router
|
|
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
server := &http.Server{
|
|
ReadHeaderTimeout: 2 * time.Second,
|
|
Handler: router,
|
|
}
|
|
go func() {
|
|
if err := server.Serve(listener); err != nil && errors.Is(err, net.ErrClosed) {
|
|
logger.Errorf("http serve: %v", err)
|
|
}
|
|
}()
|
|
h.listener = listener
|
|
h.server = server
|
|
|
|
return h, nil
|
|
}
|
|
|
|
func (h *handler) ExternalURL() string {
|
|
port := strconv.Itoa(h.listener.Addr().(*net.TCPAddr).Port)
|
|
|
|
// TODO: make the external url configurable if necessary
|
|
return fmt.Sprintf("http://%s", net.JoinHostPort(h.outboundIP, port))
|
|
}
|
|
|
|
func (h *handler) Close() error {
|
|
if h == nil {
|
|
return nil
|
|
}
|
|
var retErr error
|
|
if h.server != nil {
|
|
err := h.server.Close()
|
|
if err != nil {
|
|
retErr = err
|
|
}
|
|
h.server = nil
|
|
}
|
|
if h.listener != nil {
|
|
err := h.listener.Close()
|
|
if errors.Is(err, net.ErrClosed) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
retErr = err
|
|
}
|
|
h.listener = nil
|
|
}
|
|
return retErr
|
|
}
|
|
|
|
func (h *handler) isClosed() bool {
|
|
return h.listener == nil && h.server == nil
|
|
}
|
|
|
|
func (h *handler) getCaches() caches {
|
|
return h.caches
|
|
}
|
|
|
|
func (h *handler) setCaches(caches caches) {
|
|
h.caches = caches
|
|
}
|
|
|
|
// GET /_apis/artifactcache/cache
|
|
func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
keys := strings.Split(r.URL.Query().Get("keys"), ",")
|
|
// cache keys are case insensitive
|
|
for i, key := range keys {
|
|
keys[i] = strings.ToLower(key)
|
|
}
|
|
version := r.URL.Query().Get("version")
|
|
|
|
db, err := h.caches.openDB()
|
|
if err != nil {
|
|
h.responseFatalJSON(w, r, err)
|
|
return
|
|
}
|
|
defer db.Close()
|
|
|
|
cache, err := findCache(db, repo, keys, version, rundata.WriteIsolationKey)
|
|
if err != nil {
|
|
h.responseFatalJSON(w, r, err)
|
|
return
|
|
}
|
|
// If read was scoped to WriteIsolationKey and didn't find anything, we can fallback to the non-isolated cache read
|
|
if cache == nil && rundata.WriteIsolationKey != "" {
|
|
cache, err = findCache(db, repo, keys, version, "")
|
|
if err != nil {
|
|
h.responseFatalJSON(w, r, err)
|
|
return
|
|
}
|
|
}
|
|
if cache == nil {
|
|
h.responseJSON(w, r, 204)
|
|
return
|
|
}
|
|
|
|
if ok, err := h.caches.exist(cache.ID); err != nil {
|
|
h.responseJSON(w, r, 500, err)
|
|
return
|
|
} else if !ok {
|
|
_ = db.Delete(cache.ID, cache)
|
|
h.responseJSON(w, r, 204)
|
|
return
|
|
}
|
|
archiveLocation := fmt.Sprintf("%s/%s%s/artifacts/%d", r.Header.Get("Forgejo-Cache-Host"), r.Header.Get("Forgejo-Cache-RunId"), urlBase, cache.ID)
|
|
h.responseJSON(w, r, 200, map[string]any{
|
|
"result": "hit",
|
|
"archiveLocation": archiveLocation,
|
|
"cacheKey": cache.Key,
|
|
})
|
|
}
|
|
|
|
// POST /_apis/artifactcache/caches
|
|
func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
api := &Request{}
|
|
if err := json.NewDecoder(r.Body).Decode(api); err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
// cache keys are case insensitive
|
|
api.Key = strings.ToLower(api.Key)
|
|
|
|
cache := api.ToCache()
|
|
db, err := h.caches.openDB()
|
|
if err != nil {
|
|
h.responseFatalJSON(w, r, err)
|
|
return
|
|
}
|
|
defer db.Close()
|
|
|
|
now := time.Now().Unix()
|
|
cache.CreatedAt = now
|
|
cache.UsedAt = now
|
|
cache.Repo = repo
|
|
cache.WriteIsolationKey = rundata.WriteIsolationKey
|
|
if err := insertCache(db, cache); err != nil {
|
|
h.responseJSON(w, r, 500, err)
|
|
return
|
|
}
|
|
h.responseJSON(w, r, 200, map[string]any{
|
|
"cacheId": cache.ID,
|
|
})
|
|
}
|
|
|
|
// PATCH /_apis/artifactcache/caches/:id
|
|
func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.ParseUint(params.ByName("id"), 10, 64)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
|
|
cache, err := h.caches.readCache(id, repo)
|
|
if err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
|
return
|
|
}
|
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
|
return
|
|
}
|
|
|
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
|
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
|
|
return
|
|
}
|
|
|
|
if cache.Complete {
|
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
|
return
|
|
}
|
|
start, _, err := parseContentRange(r.Header.Get("Content-Range"))
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, fmt.Errorf("cache parseContentRange(%s): %w", r.Header.Get("Content-Range"), err))
|
|
return
|
|
}
|
|
if err := h.caches.write(cache.ID, start, r.Body); err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("cache storage.Write: %w", err))
|
|
return
|
|
}
|
|
if err := h.caches.useCache(id); err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
|
|
return
|
|
}
|
|
h.responseJSON(w, r, 200)
|
|
}
|
|
|
|
// POST /_apis/artifactcache/caches/:id
|
|
func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.ParseUint(params.ByName("id"), 10, 64)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
|
|
cache, err := h.caches.readCache(id, repo)
|
|
if err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
|
return
|
|
}
|
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
|
return
|
|
}
|
|
|
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey {
|
|
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
|
|
return
|
|
}
|
|
|
|
if cache.Complete {
|
|
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
|
return
|
|
}
|
|
|
|
size, err := h.caches.commit(cache.ID, cache.Size)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err))
|
|
return
|
|
}
|
|
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
|
|
cache.Size = size
|
|
|
|
db, err := h.caches.openDB()
|
|
if err != nil {
|
|
h.responseFatalJSON(w, r, err)
|
|
return
|
|
}
|
|
defer db.Close()
|
|
|
|
cache.Complete = true
|
|
if err := db.Update(cache.ID, cache); err != nil {
|
|
h.responseJSON(w, r, 500, err)
|
|
return
|
|
}
|
|
|
|
h.responseJSON(w, r, 200)
|
|
}
|
|
|
|
// GET /_apis/artifactcache/artifacts/:id
|
|
func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
repo, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.ParseUint(params.ByName("id"), 10, 64)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 400, err)
|
|
return
|
|
}
|
|
|
|
cache, err := h.caches.readCache(id, repo)
|
|
if err != nil {
|
|
if errors.Is(err, bolthold.ErrNotFound) {
|
|
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
|
return
|
|
}
|
|
h.responseFatalJSON(w, r, fmt.Errorf("cache Get: %w", err))
|
|
return
|
|
}
|
|
|
|
// reads permitted against caches w/ the same isolation key, or no isolation key
|
|
if cache.WriteIsolationKey != rundata.WriteIsolationKey && cache.WriteIsolationKey != "" {
|
|
h.responseJSON(w, r, 403, fmt.Errorf("cache authorized for write isolation %q, but attempting to operate on %q", rundata.WriteIsolationKey, cache.WriteIsolationKey))
|
|
return
|
|
}
|
|
|
|
if err := h.caches.useCache(id); err != nil {
|
|
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
|
|
return
|
|
}
|
|
h.caches.serve(w, r, id)
|
|
}
|
|
|
|
// POST /_apis/artifactcache/clean
|
|
func (h *handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
rundata := runDataFromHeaders(r)
|
|
_, err := h.caches.validateMac(rundata)
|
|
if err != nil {
|
|
h.responseJSON(w, r, 403, err)
|
|
return
|
|
}
|
|
// TODO: don't support force deleting cache entries
|
|
// see: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries
|
|
|
|
h.responseJSON(w, r, 200)
|
|
}
|
|
|
|
func (h *handler) middleware(handler httprouter.Handle) httprouter.Handle {
|
|
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
h.logger.Debugf("%s %s", r.Method, r.RequestURI)
|
|
handler(w, r, params)
|
|
go h.caches.gcCache()
|
|
}
|
|
}
|
|
|
|
func (h *handler) responseFatalJSON(w http.ResponseWriter, r *http.Request, err error) {
|
|
h.responseJSON(w, r, 500, err)
|
|
fatal(h.logger, err)
|
|
}
|
|
|
|
func (h *handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
var data []byte
|
|
if len(v) == 0 || v[0] == nil {
|
|
data, _ = json.Marshal(struct{}{})
|
|
} else if err, ok := v[0].(error); ok {
|
|
h.logger.Errorf("%v %v: %v", r.Method, r.RequestURI, err)
|
|
data, _ = json.Marshal(map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
} else {
|
|
data, _ = json.Marshal(v[0])
|
|
}
|
|
w.WriteHeader(code)
|
|
_, _ = w.Write(data)
|
|
}
|
|
|
|
func parseContentRange(s string) (uint64, uint64, error) {
|
|
// support the format like "bytes 11-22/*" only
|
|
s, _, _ = strings.Cut(strings.TrimPrefix(s, "bytes "), "/")
|
|
s1, s2, _ := strings.Cut(s, "-")
|
|
|
|
start, err := strconv.ParseUint(s1, 10, 64)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("parse %q: %w", s, err)
|
|
}
|
|
stop, err := strconv.ParseUint(s2, 10, 64)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("parse %q: %w", s, err)
|
|
}
|
|
return start, stop, nil
|
|
}
|
|
|
|
type RunData struct {
|
|
RepositoryFullName string
|
|
RunNumber string
|
|
Timestamp string
|
|
RepositoryMAC string
|
|
WriteIsolationKey string
|
|
}
|
|
|
|
func runDataFromHeaders(r *http.Request) RunData {
|
|
return RunData{
|
|
RepositoryFullName: r.Header.Get("Forgejo-Cache-Repo"),
|
|
RunNumber: r.Header.Get("Forgejo-Cache-RunNumber"),
|
|
Timestamp: r.Header.Get("Forgejo-Cache-Timestamp"),
|
|
RepositoryMAC: r.Header.Get("Forgejo-Cache-MAC"),
|
|
WriteIsolationKey: r.Header.Get("Forgejo-Cache-WriteIsolationKey"),
|
|
}
|
|
}
|