From 4b5ffd768f48aec38af5c1da68df73641c2bc9d8 Mon Sep 17 00:00:00 2001 From: Kwonunn Date: Fri, 25 Oct 2024 17:15:43 +0200 Subject: [PATCH] implement proxy server --- act/cacheproxy/handler.go | 376 +++++--------------------------------- act/cacheproxy/model.go | 44 ----- 2 files changed, 49 insertions(+), 371 deletions(-) delete mode 100644 act/cacheproxy/model.go diff --git a/act/cacheproxy/handler.go b/act/cacheproxy/handler.go index 69f36fff..40bab63a 100644 --- a/act/cacheproxy/handler.go +++ b/act/cacheproxy/handler.go @@ -4,20 +4,17 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/hex" - "encoding/json" "errors" "fmt" "io" "net" "net/http" - "regexp" - "strconv" - "strings" + "net/http/httputil" + "net/url" "time" "github.com/julienschmidt/httprouter" "github.com/sirupsen/logrus" - "github.com/timshannon/bolthold" "github.com/nektos/act/pkg/common" ) @@ -35,13 +32,12 @@ type Handler struct { outboundIP string cacheServerHost string - cacheServerPort uint16 - repositoryName string - cacheKey string + repositoryName string + repositorySecret string } -func StartHandler(repoName string, targetHost string, targetPort uint16, outboundIP string, port uint16, cacheKey string, logger logrus.FieldLogger) (*Handler, error) { +func StartHandler(repoName string, targetHost string, outboundIP string, port uint16, cacheSecret string, logger logrus.FieldLogger) (*Handler, error) { h := &Handler{} if logger == nil { @@ -53,7 +49,11 @@ func StartHandler(repoName string, targetHost string, targetPort uint16, outboun h.logger = logger h.repositoryName = repoName - h.cacheKey = h.cacheKey + repoSecret, err := calculateMAC(repoName, cacheSecret) + if err != nil { + return nil, fmt.Errorf("unable to decode cacheSecret") + } + h.repositorySecret = repoSecret if outboundIP != "" { h.outboundIP = outboundIP @@ -63,13 +63,17 @@ func StartHandler(repoName string, targetHost string, targetPort uint16, outboun h.outboundIP = ip.String() } + h.cacheServerHost = targetHost + + proxy, err := h.newReverseProxy(targetHost) + if err != nil { + return nil, fmt.Errorf("unable to set up proxy to target host") + } + router := httprouter.New() - router.GET(urlBase+"/cache", h.middleware(h.find)) - router.POST(urlBase+"/caches", h.middleware(h.reserve)) - router.PATCH(urlBase+"/caches/:id", h.middleware(h.upload)) - router.POST(urlBase+"/caches/:id", h.middleware(h.commit)) - router.GET(urlBase+"/artifacts/:id", h.middleware(h.get)) - router.POST(urlBase+"/clean", h.middleware(h.clean)) + router.HandlerFunc("GET", urlBase, proxyRequestHandler(proxy)) + router.HandlerFunc("POST", urlBase, proxyRequestHandler(proxy)) + router.HandlerFunc("PATCH", urlBase, proxyRequestHandler(proxy)) h.router = router @@ -92,6 +96,27 @@ func StartHandler(repoName string, targetHost string, targetPort uint16, outboun return h, nil } +func proxyRequestHandler(proxy *httputil.ReverseProxy) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + proxy.ServeHTTP(w, r) + } +} + +func (h *Handler) newReverseProxy(targetHost string) (*httputil.ReverseProxy, error) { + url, err := url.Parse(targetHost) + if err != nil { + return nil, err + } + + proxy := httputil.NewSingleHostReverseProxy(url) + proxy.Rewrite = func(r *httputil.ProxyRequest) { h.injectAuth(r) } + return proxy, nil +} + +func (h *Handler) injectAuth(r *httputil.ProxyRequest) { + r.Out.SetBasicAuth(h.repositoryName, h.repositorySecret) +} + func (h *Handler) ExternalURL() string { // TODO: make the external url configurable if necessary return fmt.Sprintf("http://%s:%d", @@ -124,316 +149,13 @@ func (h *Handler) Close() error { return retErr } -func (h *Handler) calculateMAC() string { - mac := hmac.New(sha256.New, []byte(h.cacheKey)) - mac.Write([]byte(h.repositoryName)) +func calculateMAC(repoName string, cacheSecret string) (string, error) { + sec, err := hex.DecodeString(cacheSecret) + if err != nil { + return "", err + } + mac := hmac.New(sha256.New, sec) + mac.Write([]byte(repoName)) macBytes := mac.Sum(nil) - return hex.EncodeToString(macBytes) -} - -// GET /_apis/artifactcache/cache -func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - keys := strings.Split(r.URL.Query().Get("keys"), ",") - // cache keys are case insensitive - for i, key := range keys { - keys[i] = strings.ToLower(key) - } - version := r.URL.Query().Get("version") - - db, err := h.openDB() - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - defer db.Close() - - cache, err := h.findCache(db, keys, version) - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - if cache == nil { - h.responseJSON(w, r, 204) - return - } - - if ok, err := h.storage.Exist(cache.ID); err != nil { - h.responseJSON(w, r, 500, err) - return - } else if !ok { - _ = db.Delete(cache.ID, cache) - h.responseJSON(w, r, 204) - return - } - h.responseJSON(w, r, 200, map[string]any{ - "result": "hit", - "archiveLocation": fmt.Sprintf("%s%s/artifacts/%d", h.ExternalURL(), urlBase, cache.ID), - "cacheKey": cache.Key, - }) -} - -// POST /_apis/artifactcache/caches -func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - api := &Request{} - if err := json.NewDecoder(r.Body).Decode(api); err != nil { - h.responseJSON(w, r, 400, err) - return - } - // cache keys are case insensitive - api.Key = strings.ToLower(api.Key) - - cache := api.ToCache() - cache.FillKeyVersionHash() - db, err := h.openDB() - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - defer db.Close() - if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { - if !errors.Is(err, bolthold.ErrNotFound) { - h.responseJSON(w, r, 500, err) - return - } - } else { - h.responseJSON(w, r, 400, fmt.Errorf("already exist")) - return - } - - now := time.Now().Unix() - cache.CreatedAt = now - cache.UsedAt = now - if err := db.Insert(bolthold.NextSequence(), cache); err != nil { - h.responseJSON(w, r, 500, err) - return - } - // write back id to db - if err := db.Update(cache.ID, cache); err != nil { - h.responseJSON(w, r, 500, err) - return - } - h.responseJSON(w, r, 200, map[string]any{ - "cacheId": cache.ID, - }) -} - -// PATCH /_apis/artifactcache/caches/:id -func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) { - id, err := strconv.ParseInt(params.ByName("id"), 10, 64) - if err != nil { - h.responseJSON(w, r, 400, err) - return - } - - cache := &Cache{} - db, err := h.openDB() - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - defer db.Close() - if err := db.Get(id, cache); err != nil { - if errors.Is(err, bolthold.ErrNotFound) { - h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id)) - return - } - h.responseJSON(w, r, 500, err) - return - } - - if cache.Complete { - h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key)) - return - } - db.Close() - start, _, err := parseContentRange(r.Header.Get("Content-Range")) - if err != nil { - h.responseJSON(w, r, 400, err) - return - } - if err := h.storage.Write(cache.ID, start, r.Body); err != nil { - h.responseJSON(w, r, 500, err) - } - h.useCache(id) - h.responseJSON(w, r, 200) -} - -// POST /_apis/artifactcache/caches/:id -func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) { - id, err := strconv.ParseInt(params.ByName("id"), 10, 64) - if err != nil { - h.responseJSON(w, r, 400, err) - return - } - - cache := &Cache{} - db, err := h.openDB() - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - defer db.Close() - if err := db.Get(id, cache); err != nil { - if errors.Is(err, bolthold.ErrNotFound) { - h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id)) - return - } - h.responseJSON(w, r, 500, err) - return - } - - if cache.Complete { - h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key)) - return - } - - db.Close() - - size, err := h.storage.Commit(cache.ID, cache.Size) - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - // write real size back to cache, it may be different from the current value when the request doesn't specify it. - cache.Size = size - - db, err = h.openDB() - if err != nil { - h.responseJSON(w, r, 500, err) - return - } - defer db.Close() - - cache.Complete = true - if err := db.Update(cache.ID, cache); err != nil { - h.responseJSON(w, r, 500, err) - return - } - - h.responseJSON(w, r, 200) -} - -// GET /_apis/artifactcache/artifacts/:id -func (h *Handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) { - id, err := strconv.ParseInt(params.ByName("id"), 10, 64) - if err != nil { - h.responseJSON(w, r, 400, err) - return - } - h.useCache(id) - h.storage.Serve(w, r, uint64(id)) -} - -// POST /_apis/artifactcache/clean -func (h *Handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - // TODO: don't support force deleting cache entries - // see: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries - - h.responseJSON(w, r, 200) -} - -func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle { - return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { - h.logger.Debugf("%s %s", r.Method, r.RequestURI) - handler(w, r, params) - go h.gcCache() - } -} - -// if not found, return (nil, nil) instead of an error. -func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { - if len(keys) == 0 { - return nil, nil - } - key := keys[0] // the first key is for exact match. - - cache := &Cache{ - Key: key, - Version: version, - } - cache.FillKeyVersionHash() - - if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { - if !errors.Is(err, bolthold.ErrNotFound) { - return nil, err - } - } else if cache.Complete { - return cache, nil - } - stop := fmt.Errorf("stop") - - for _, prefix := range keys[1:] { - found := false - prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix)) - re, err := regexp.Compile(prefixPattern) - if err != nil { - continue - } - if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error { - if !strings.HasPrefix(v.Key, prefix) { - return stop - } - if v.Complete { - cache = v - found = true - return stop - } - return nil - }); err != nil { - if !errors.Is(err, stop) { - return nil, err - } - } - if found { - return cache, nil - } - } - return nil, nil -} - -func (h *Handler) useCache(id int64) { - db, err := h.openDB() - if err != nil { - return - } - defer db.Close() - cache := &Cache{} - if err := db.Get(id, cache); err != nil { - return - } - cache.UsedAt = time.Now().Unix() - _ = db.Update(cache.ID, cache) -} - -func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - var data []byte - if len(v) == 0 || v[0] == nil { - data, _ = json.Marshal(struct{}{}) - } else if err, ok := v[0].(error); ok { - h.logger.Errorf("%v %v: %v", r.Method, r.RequestURI, err) - data, _ = json.Marshal(map[string]any{ - "error": err.Error(), - }) - } else { - data, _ = json.Marshal(v[0]) - } - w.WriteHeader(code) - _, _ = w.Write(data) -} - -func parseContentRange(s string) (int64, int64, error) { - // support the format like "bytes 11-22/*" only - s, _, _ = strings.Cut(strings.TrimPrefix(s, "bytes "), "/") - s1, s2, _ := strings.Cut(s, "-") - - start, err := strconv.ParseInt(s1, 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("parse %q: %w", s, err) - } - stop, err := strconv.ParseInt(s2, 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("parse %q: %w", s, err) - } - return start, stop, nil + return hex.EncodeToString(macBytes), nil } diff --git a/act/cacheproxy/model.go b/act/cacheproxy/model.go deleted file mode 100644 index de2fd9ea..00000000 --- a/act/cacheproxy/model.go +++ /dev/null @@ -1,44 +0,0 @@ -package cacheproxy - -import ( - "crypto/sha256" - "fmt" -) - -type Request struct { - Key string `json:"key" ` - Version string `json:"version"` - Size int64 `json:"cacheSize"` -} - -func (c *Request) ToCache() *Cache { - if c == nil { - return nil - } - ret := &Cache{ - Key: c.Key, - Version: c.Version, - Size: c.Size, - } - if c.Size == 0 { - // So the request comes from old versions of actions, like `actions/cache@v2`. - // It doesn't send cache size. Set it to -1 to indicate that. - ret.Size = -1 - } - return ret -} - -type Cache struct { - ID uint64 `json:"id" boltholdKey:"ID"` - Key string `json:"key" boltholdIndex:"Key"` - Version string `json:"version" boltholdIndex:"Version"` - KeyVersionHash string `json:"keyVersionHash" boltholdUnique:"KeyVersionHash"` - Size int64 `json:"cacheSize"` - Complete bool `json:"complete"` - UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"` - CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"` -} - -func (c *Cache) FillKeyVersionHash() { - c.KeyVersionHash = fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%s:%s", c.Key, c.Version)))) -}