mirror of
https://code.forgejo.org/forgejo/runner.git
synced 2025-08-11 17:50:58 +00:00
implement proxy server
This commit is contained in:
parent
11006f4ef3
commit
4b5ffd768f
2 changed files with 49 additions and 371 deletions
|
@ -4,20 +4,17 @@ import (
|
||||||
"crypto/hmac"
|
"crypto/hmac"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"net/http/httputil"
|
||||||
"strconv"
|
"net/url"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/timshannon/bolthold"
|
|
||||||
|
|
||||||
"github.com/nektos/act/pkg/common"
|
"github.com/nektos/act/pkg/common"
|
||||||
)
|
)
|
||||||
|
@ -35,13 +32,12 @@ type Handler struct {
|
||||||
outboundIP string
|
outboundIP string
|
||||||
|
|
||||||
cacheServerHost string
|
cacheServerHost string
|
||||||
cacheServerPort uint16
|
|
||||||
|
|
||||||
repositoryName string
|
repositoryName string
|
||||||
cacheKey string
|
repositorySecret string
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartHandler(repoName string, targetHost string, targetPort uint16, outboundIP string, port uint16, cacheKey string, logger logrus.FieldLogger) (*Handler, error) {
|
func StartHandler(repoName string, targetHost string, outboundIP string, port uint16, cacheSecret string, logger logrus.FieldLogger) (*Handler, error) {
|
||||||
h := &Handler{}
|
h := &Handler{}
|
||||||
|
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
|
@ -53,7 +49,11 @@ func StartHandler(repoName string, targetHost string, targetPort uint16, outboun
|
||||||
h.logger = logger
|
h.logger = logger
|
||||||
|
|
||||||
h.repositoryName = repoName
|
h.repositoryName = repoName
|
||||||
h.cacheKey = h.cacheKey
|
repoSecret, err := calculateMAC(repoName, cacheSecret)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to decode cacheSecret")
|
||||||
|
}
|
||||||
|
h.repositorySecret = repoSecret
|
||||||
|
|
||||||
if outboundIP != "" {
|
if outboundIP != "" {
|
||||||
h.outboundIP = outboundIP
|
h.outboundIP = outboundIP
|
||||||
|
@ -63,13 +63,17 @@ func StartHandler(repoName string, targetHost string, targetPort uint16, outboun
|
||||||
h.outboundIP = ip.String()
|
h.outboundIP = ip.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.cacheServerHost = targetHost
|
||||||
|
|
||||||
|
proxy, err := h.newReverseProxy(targetHost)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to set up proxy to target host")
|
||||||
|
}
|
||||||
|
|
||||||
router := httprouter.New()
|
router := httprouter.New()
|
||||||
router.GET(urlBase+"/cache", h.middleware(h.find))
|
router.HandlerFunc("GET", urlBase, proxyRequestHandler(proxy))
|
||||||
router.POST(urlBase+"/caches", h.middleware(h.reserve))
|
router.HandlerFunc("POST", urlBase, proxyRequestHandler(proxy))
|
||||||
router.PATCH(urlBase+"/caches/:id", h.middleware(h.upload))
|
router.HandlerFunc("PATCH", urlBase, proxyRequestHandler(proxy))
|
||||||
router.POST(urlBase+"/caches/:id", h.middleware(h.commit))
|
|
||||||
router.GET(urlBase+"/artifacts/:id", h.middleware(h.get))
|
|
||||||
router.POST(urlBase+"/clean", h.middleware(h.clean))
|
|
||||||
|
|
||||||
h.router = router
|
h.router = router
|
||||||
|
|
||||||
|
@ -92,6 +96,27 @@ func StartHandler(repoName string, targetHost string, targetPort uint16, outboun
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func proxyRequestHandler(proxy *httputil.ReverseProxy) func(http.ResponseWriter, *http.Request) {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
proxy.ServeHTTP(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) newReverseProxy(targetHost string) (*httputil.ReverseProxy, error) {
|
||||||
|
url, err := url.Parse(targetHost)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
proxy := httputil.NewSingleHostReverseProxy(url)
|
||||||
|
proxy.Rewrite = func(r *httputil.ProxyRequest) { h.injectAuth(r) }
|
||||||
|
return proxy, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) injectAuth(r *httputil.ProxyRequest) {
|
||||||
|
r.Out.SetBasicAuth(h.repositoryName, h.repositorySecret)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handler) ExternalURL() string {
|
func (h *Handler) ExternalURL() string {
|
||||||
// TODO: make the external url configurable if necessary
|
// TODO: make the external url configurable if necessary
|
||||||
return fmt.Sprintf("http://%s:%d",
|
return fmt.Sprintf("http://%s:%d",
|
||||||
|
@ -124,316 +149,13 @@ func (h *Handler) Close() error {
|
||||||
return retErr
|
return retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) calculateMAC() string {
|
func calculateMAC(repoName string, cacheSecret string) (string, error) {
|
||||||
mac := hmac.New(sha256.New, []byte(h.cacheKey))
|
sec, err := hex.DecodeString(cacheSecret)
|
||||||
mac.Write([]byte(h.repositoryName))
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
mac := hmac.New(sha256.New, sec)
|
||||||
|
mac.Write([]byte(repoName))
|
||||||
macBytes := mac.Sum(nil)
|
macBytes := mac.Sum(nil)
|
||||||
return hex.EncodeToString(macBytes)
|
return hex.EncodeToString(macBytes), nil
|
||||||
}
|
|
||||||
|
|
||||||
// GET /_apis/artifactcache/cache
|
|
||||||
func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
||||||
keys := strings.Split(r.URL.Query().Get("keys"), ",")
|
|
||||||
// cache keys are case insensitive
|
|
||||||
for i, key := range keys {
|
|
||||||
keys[i] = strings.ToLower(key)
|
|
||||||
}
|
|
||||||
version := r.URL.Query().Get("version")
|
|
||||||
|
|
||||||
db, err := h.openDB()
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
cache, err := h.findCache(db, keys, version)
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if cache == nil {
|
|
||||||
h.responseJSON(w, r, 204)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ok, err := h.storage.Exist(cache.ID); err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
} else if !ok {
|
|
||||||
_ = db.Delete(cache.ID, cache)
|
|
||||||
h.responseJSON(w, r, 204)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.responseJSON(w, r, 200, map[string]any{
|
|
||||||
"result": "hit",
|
|
||||||
"archiveLocation": fmt.Sprintf("%s%s/artifacts/%d", h.ExternalURL(), urlBase, cache.ID),
|
|
||||||
"cacheKey": cache.Key,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// POST /_apis/artifactcache/caches
|
|
||||||
func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
||||||
api := &Request{}
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(api); err != nil {
|
|
||||||
h.responseJSON(w, r, 400, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// cache keys are case insensitive
|
|
||||||
api.Key = strings.ToLower(api.Key)
|
|
||||||
|
|
||||||
cache := api.ToCache()
|
|
||||||
cache.FillKeyVersionHash()
|
|
||||||
db, err := h.openDB()
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
|
||||||
if !errors.Is(err, bolthold.ErrNotFound) {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("already exist"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now().Unix()
|
|
||||||
cache.CreatedAt = now
|
|
||||||
cache.UsedAt = now
|
|
||||||
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// write back id to db
|
|
||||||
if err := db.Update(cache.ID, cache); err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.responseJSON(w, r, 200, map[string]any{
|
|
||||||
"cacheId": cache.ID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// PATCH /_apis/artifactcache/caches/:id
|
|
||||||
func (h *Handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
||||||
id, err := strconv.ParseInt(params.ByName("id"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 400, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cache := &Cache{}
|
|
||||||
db, err := h.openDB()
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
if err := db.Get(id, cache); err != nil {
|
|
||||||
if errors.Is(err, bolthold.ErrNotFound) {
|
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cache.Complete {
|
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
db.Close()
|
|
||||||
start, _, err := parseContentRange(r.Header.Get("Content-Range"))
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 400, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := h.storage.Write(cache.ID, start, r.Body); err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
}
|
|
||||||
h.useCache(id)
|
|
||||||
h.responseJSON(w, r, 200)
|
|
||||||
}
|
|
||||||
|
|
||||||
// POST /_apis/artifactcache/caches/:id
|
|
||||||
func (h *Handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
||||||
id, err := strconv.ParseInt(params.ByName("id"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 400, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cache := &Cache{}
|
|
||||||
db, err := h.openDB()
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
if err := db.Get(id, cache); err != nil {
|
|
||||||
if errors.Is(err, bolthold.ErrNotFound) {
|
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %d: not reserved", id))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cache.Complete {
|
|
||||||
h.responseJSON(w, r, 400, fmt.Errorf("cache %v %q: already complete", cache.ID, cache.Key))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
db.Close()
|
|
||||||
|
|
||||||
size, err := h.storage.Commit(cache.ID, cache.Size)
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
|
|
||||||
cache.Size = size
|
|
||||||
|
|
||||||
db, err = h.openDB()
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
cache.Complete = true
|
|
||||||
if err := db.Update(cache.ID, cache); err != nil {
|
|
||||||
h.responseJSON(w, r, 500, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
h.responseJSON(w, r, 200)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GET /_apis/artifactcache/artifacts/:id
|
|
||||||
func (h *Handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
||||||
id, err := strconv.ParseInt(params.ByName("id"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
h.responseJSON(w, r, 400, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.useCache(id)
|
|
||||||
h.storage.Serve(w, r, uint64(id))
|
|
||||||
}
|
|
||||||
|
|
||||||
// POST /_apis/artifactcache/clean
|
|
||||||
func (h *Handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
||||||
// TODO: don't support force deleting cache entries
|
|
||||||
// see: https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries
|
|
||||||
|
|
||||||
h.responseJSON(w, r, 200)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
|
||||||
h.logger.Debugf("%s %s", r.Method, r.RequestURI)
|
|
||||||
handler(w, r, params)
|
|
||||||
go h.gcCache()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if not found, return (nil, nil) instead of an error.
|
|
||||||
func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) {
|
|
||||||
if len(keys) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
key := keys[0] // the first key is for exact match.
|
|
||||||
|
|
||||||
cache := &Cache{
|
|
||||||
Key: key,
|
|
||||||
Version: version,
|
|
||||||
}
|
|
||||||
cache.FillKeyVersionHash()
|
|
||||||
|
|
||||||
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
|
||||||
if !errors.Is(err, bolthold.ErrNotFound) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else if cache.Complete {
|
|
||||||
return cache, nil
|
|
||||||
}
|
|
||||||
stop := fmt.Errorf("stop")
|
|
||||||
|
|
||||||
for _, prefix := range keys[1:] {
|
|
||||||
found := false
|
|
||||||
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
|
|
||||||
re, err := regexp.Compile(prefixPattern)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error {
|
|
||||||
if !strings.HasPrefix(v.Key, prefix) {
|
|
||||||
return stop
|
|
||||||
}
|
|
||||||
if v.Complete {
|
|
||||||
cache = v
|
|
||||||
found = true
|
|
||||||
return stop
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
if !errors.Is(err, stop) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if found {
|
|
||||||
return cache, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) useCache(id int64) {
|
|
||||||
db, err := h.openDB()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
cache := &Cache{}
|
|
||||||
if err := db.Get(id, cache); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cache.UsedAt = time.Now().Unix()
|
|
||||||
_ = db.Update(cache.ID, cache)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
|
|
||||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
||||||
var data []byte
|
|
||||||
if len(v) == 0 || v[0] == nil {
|
|
||||||
data, _ = json.Marshal(struct{}{})
|
|
||||||
} else if err, ok := v[0].(error); ok {
|
|
||||||
h.logger.Errorf("%v %v: %v", r.Method, r.RequestURI, err)
|
|
||||||
data, _ = json.Marshal(map[string]any{
|
|
||||||
"error": err.Error(),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
data, _ = json.Marshal(v[0])
|
|
||||||
}
|
|
||||||
w.WriteHeader(code)
|
|
||||||
_, _ = w.Write(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseContentRange(s string) (int64, int64, error) {
|
|
||||||
// support the format like "bytes 11-22/*" only
|
|
||||||
s, _, _ = strings.Cut(strings.TrimPrefix(s, "bytes "), "/")
|
|
||||||
s1, s2, _ := strings.Cut(s, "-")
|
|
||||||
|
|
||||||
start, err := strconv.ParseInt(s1, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, fmt.Errorf("parse %q: %w", s, err)
|
|
||||||
}
|
|
||||||
stop, err := strconv.ParseInt(s2, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, fmt.Errorf("parse %q: %w", s, err)
|
|
||||||
}
|
|
||||||
return start, stop, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
package cacheproxy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/sha256"
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Request struct {
|
|
||||||
Key string `json:"key" `
|
|
||||||
Version string `json:"version"`
|
|
||||||
Size int64 `json:"cacheSize"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Request) ToCache() *Cache {
|
|
||||||
if c == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
ret := &Cache{
|
|
||||||
Key: c.Key,
|
|
||||||
Version: c.Version,
|
|
||||||
Size: c.Size,
|
|
||||||
}
|
|
||||||
if c.Size == 0 {
|
|
||||||
// So the request comes from old versions of actions, like `actions/cache@v2`.
|
|
||||||
// It doesn't send cache size. Set it to -1 to indicate that.
|
|
||||||
ret.Size = -1
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
type Cache struct {
|
|
||||||
ID uint64 `json:"id" boltholdKey:"ID"`
|
|
||||||
Key string `json:"key" boltholdIndex:"Key"`
|
|
||||||
Version string `json:"version" boltholdIndex:"Version"`
|
|
||||||
KeyVersionHash string `json:"keyVersionHash" boltholdUnique:"KeyVersionHash"`
|
|
||||||
Size int64 `json:"cacheSize"`
|
|
||||||
Complete bool `json:"complete"`
|
|
||||||
UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"`
|
|
||||||
CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache) FillKeyVersionHash() {
|
|
||||||
c.KeyVersionHash = fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%s:%s", c.Key, c.Version))))
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue