From 8bc36fb69a857b551369a26445bb615616e70aaf Mon Sep 17 00:00:00 2001 From: Kwonunn Date: Thu, 24 Oct 2024 15:04:04 +0200 Subject: [PATCH] create cache proxy based on cache server --- act/cacheproxy/handler.go | 427 ++++++++++++++++++++++++++++++++++++++ act/cacheproxy/model.go | 44 ++++ 2 files changed, 471 insertions(+) create mode 100644 act/cacheproxy/handler.go create mode 100644 act/cacheproxy/model.go diff --git a/act/cacheproxy/handler.go b/act/cacheproxy/handler.go new file mode 100644 index 00000000..89234c49 --- /dev/null +++ b/act/cacheproxy/handler.go @@ -0,0 +1,427 @@ +package cacheproxy + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "regexp" + "strconv" + "strings" + "time" + + "github.com/julienschmidt/httprouter" + "github.com/sirupsen/logrus" + "github.com/timshannon/bolthold" + + "github.com/nektos/act/pkg/common" +) + +const ( + urlBase = "/_apis/artifactcache" +) + +type Handler struct { + router *httprouter.Router + listener net.Listener + server *http.Server + logger logrus.FieldLogger + + outboundIP string + + cacheServerHost string + cacheServerPort uint16 + + repositoryName string +} + +func StartHandler(repoName string, targetHost string, targetPort uint16, outboundIP string, port uint16, logger logrus.FieldLogger) (*Handler, error) { + h := &Handler{} + + if logger == nil { + discard := logrus.New() + discard.Out = io.Discard + logger = discard + } + logger = logger.WithField("module", "artifactcache") + h.logger = logger + + h.repositoryName = repoName + + if outboundIP != "" { + h.outboundIP = outboundIP + } else if ip := common.GetOutboundIP(); ip == nil { + return nil, fmt.Errorf("unable to determine outbound IP address") + } else { + h.outboundIP = ip.String() + } + + router := httprouter.New() + router.GET(urlBase+"/cache", h.middleware(h.find)) + router.POST(urlBase+"/caches", h.middleware(h.reserve)) + router.PATCH(urlBase+"/caches/:id", h.middleware(h.upload)) + router.POST(urlBase+"/caches/:id", h.middleware(h.commit)) + router.GET(urlBase+"/artifacts/:id", h.middleware(h.get)) + router.POST(urlBase+"/clean", h.middleware(h.clean)) + + h.router = router + + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces + if err != nil { + return nil, err + } + server := &http.Server{ + ReadHeaderTimeout: 2 * time.Second, + Handler: router, + } + go func() { + if err := server.Serve(listener); err != nil && errors.Is(err, net.ErrClosed) { + logger.Errorf("http serve: %v", err) + } + }() + h.listener = listener + h.server = server + + return h, nil +} + +func (h *Handler) ExternalURL() string { + // TODO: make the external url configurable if necessary + return fmt.Sprintf("http://%s:%d", + h.outboundIP, + h.listener.Addr().(*net.TCPAddr).Port) +} + +func (h *Handler) Close() error { + if h == nil { + return nil + } + var retErr error + if h.server != nil { + err := h.server.Close() + if err != nil { + retErr = err + } + h.server = nil + } + if h.listener != nil { + err := h.listener.Close() + if errors.Is(err, net.ErrClosed) { + err = nil + } + if err != nil { + retErr = err + } + h.listener = nil + } + return retErr +} + +// GET /_apis/artifactcache/cache +func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + keys := strings.Split(r.URL.Query().Get("keys"), ",") + // cache keys are case insensitive + for i, key := range keys { + keys[i] = strings.ToLower(key) + } + version := r.URL.Query().Get("version") + + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + + cache, err := h.findCache(db, keys, version) + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + if cache == nil { + h.responseJSON(w, r, 204) + return + } + + if ok, err := h.storage.Exist(cache.ID); err != nil { + h.responseJSON(w, r, 500, err) + return + } else if !ok { + _ = db.Delete(cache.ID, cache) + h.responseJSON(w, r, 204) + return + } + h.responseJSON(w, r, 200, map[string]any{ + "result": "hit", + "archiveLocation": fmt.Sprintf("%s%s/artifacts/%d", h.ExternalURL(), urlBase, cache.ID), + "cacheKey": cache.Key, + }) +} + +// POST /_apis/artifactcache/caches +func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + api := &Request{} + if err := json.NewDecoder(r.Body).Decode(api); err != nil { + h.responseJSON(w, r, 400, err) + return + } + // cache keys are case insensitive + api.Key = strings.ToLower(api.Key) + + cache := api.ToCache() + cache.FillKeyVersionHash() + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { + if !errors.Is(err, bolthold.ErrNotFound) { + h.responseJSON(w, r, 500, err) + return + } + } else { + h.responseJSON(w, r, 400, fmt.Errorf("already exist")) + return + } + + now := time.Now().Unix() + cache.CreatedAt = now + cache.UsedAt = now + if err := db.Insert(bolthold.NextSequence(), cache); err != nil { + h.responseJSON(w, r, 500, err) + return + } + // write back id to db + if err := db.Update(cache.ID, cache); err != nil { + h.responseJSON(w, r, 500, err) + return + } + h.responseJSON(w, r, 200, map[string]any{ + "cacheId": cache.ID, + }) +} + +// PATCH /_apis/artifactcache/caches/:id +func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + id, err := strconv.ParseInt(params.ByName("id"), 10, 64) + if err != nil { + h.responseJSON(w, r, 400, err) + return + } + + cache := &Cache{} + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + if err := db.Get(id, cache); err != nil { + if errors.Is(err, bolthold.ErrNotFound) { + h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id)) + return + } + h.responseJSON(w, r, 500, err) + return + } + + if cache.Complete { + h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key)) + return + } + db.Close() + start, _, err := parseContentRange(r.Header.Get("Content-Range")) + if err != nil { + h.responseJSON(w, r, 400, err) + return + } + if err := h.storage.Write(cache.ID, start, r.Body); err != nil { + h.responseJSON(w, r, 500, err) + } + h.useCache(id) + h.responseJSON(w, r, 200) +} + +// POST /_apis/artifactcache/caches/:id +func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + id, err := strconv.ParseInt(params.ByName("id"), 10, 64) + if err != nil { + h.responseJSON(w, r, 400, err) + return + } + + cache := &Cache{} + db, err := h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + if err := db.Get(id, cache); err != nil { + if errors.Is(err, bolthold.ErrNotFound) { + h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id)) + return + } + h.responseJSON(w, r, 500, err) + return + } + + if cache.Complete { + h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key)) + return + } + + db.Close() + + size, err := h.storage.Commit(cache.ID, cache.Size) + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + // write real size back to cache, it may be different from the current value when the request doesn't specify it. + cache.Size = size + + db, err = h.openDB() + if err != nil { + h.responseJSON(w, r, 500, err) + return + } + defer db.Close() + + cache.Complete = true + if err := db.Update(cache.ID, cache); err != nil { + h.responseJSON(w, r, 500, err) + return + } + + h.responseJSON(w, r, 200) +} + +// GET /_apis/artifactcache/artifacts/:id +func (h *Handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + id, err := strconv.ParseInt(params.ByName("id"), 10, 64) + if err != nil { + h.responseJSON(w, r, 400, err) + return + } + h.useCache(id) + h.storage.Serve(w, r, uint64(id)) +} + +// POST /_apis/artifactcache/clean +func (h *Handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + // TODO: don't support force deleting cache entries + // see: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries + + h.responseJSON(w, r, 200) +} + +func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + h.logger.Debugf("%s %s", r.Method, r.RequestURI) + handler(w, r, params) + go h.gcCache() + } +} + +// if not found, return (nil, nil) instead of an error. +func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { + if len(keys) == 0 { + return nil, nil + } + key := keys[0] // the first key is for exact match. + + cache := &Cache{ + Key: key, + Version: version, + } + cache.FillKeyVersionHash() + + if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { + if !errors.Is(err, bolthold.ErrNotFound) { + return nil, err + } + } else if cache.Complete { + return cache, nil + } + stop := fmt.Errorf("stop") + + for _, prefix := range keys[1:] { + found := false + prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix)) + re, err := regexp.Compile(prefixPattern) + if err != nil { + continue + } + if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error { + if !strings.HasPrefix(v.Key, prefix) { + return stop + } + if v.Complete { + cache = v + found = true + return stop + } + return nil + }); err != nil { + if !errors.Is(err, stop) { + return nil, err + } + } + if found { + return cache, nil + } + } + return nil, nil +} + +func (h *Handler) useCache(id int64) { + db, err := h.openDB() + if err != nil { + return + } + defer db.Close() + cache := &Cache{} + if err := db.Get(id, cache); err != nil { + return + } + cache.UsedAt = time.Now().Unix() + _ = db.Update(cache.ID, cache) +} + +func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + var data []byte + if len(v) == 0 || v[0] == nil { + data, _ = json.Marshal(struct{}{}) + } else if err, ok := v[0].(error); ok { + h.logger.Errorf("%v %v: %v", r.Method, r.RequestURI, err) + data, _ = json.Marshal(map[string]any{ + "error": err.Error(), + }) + } else { + data, _ = json.Marshal(v[0]) + } + w.WriteHeader(code) + _, _ = w.Write(data) +} + +func parseContentRange(s string) (int64, int64, error) { + // support the format like "bytes 11-22/*" only + s, _, _ = strings.Cut(strings.TrimPrefix(s, "bytes "), "/") + s1, s2, _ := strings.Cut(s, "-") + + start, err := strconv.ParseInt(s1, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse %q: %w", s, err) + } + stop, err := strconv.ParseInt(s2, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse %q: %w", s, err) + } + return start, stop, nil +} diff --git a/act/cacheproxy/model.go b/act/cacheproxy/model.go new file mode 100644 index 00000000..de2fd9ea --- /dev/null +++ b/act/cacheproxy/model.go @@ -0,0 +1,44 @@ +package cacheproxy + +import ( + "crypto/sha256" + "fmt" +) + +type Request struct { + Key string `json:"key" ` + Version string `json:"version"` + Size int64 `json:"cacheSize"` +} + +func (c *Request) ToCache() *Cache { + if c == nil { + return nil + } + ret := &Cache{ + Key: c.Key, + Version: c.Version, + Size: c.Size, + } + if c.Size == 0 { + // So the request comes from old versions of actions, like `actions/cache@v2`. + // It doesn't send cache size. Set it to -1 to indicate that. + ret.Size = -1 + } + return ret +} + +type Cache struct { + ID uint64 `json:"id" boltholdKey:"ID"` + Key string `json:"key" boltholdIndex:"Key"` + Version string `json:"version" boltholdIndex:"Version"` + KeyVersionHash string `json:"keyVersionHash" boltholdUnique:"KeyVersionHash"` + Size int64 `json:"cacheSize"` + Complete bool `json:"complete"` + UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"` + CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"` +} + +func (c *Cache) FillKeyVersionHash() { + c.KeyVersionHash = fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%s:%s", c.Key, c.Version)))) +}