Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add queue to gateway when first cache
Browse files Browse the repository at this point in the history
wzshiming committed Jan 20, 2025
1 parent a355adf commit 1d33c0e
Showing 1 changed file with 79 additions and 6 deletions.
85 changes: 79 additions & 6 deletions gateway/manifest.go
Original file line number Diff line number Diff line change
@@ -9,21 +9,23 @@ import (
"net/http"
"net/url"
"strconv"
"time"

"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/model"
"github.com/daocloud/crproxy/token"
"github.com/docker/distribution/registry/api/errcode"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
)

func (c *Gateway) worker(ctx context.Context) {
for {
info, _, finish, ok := c.queue.GetOrWaitWithDone(ctx.Done())
info, weight, finish, ok := c.queue.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}

sc, err := c.cacheManifest(&info)
sc, err := c.cacheManifest(&info, weight)
if err != nil {
c.manifestCache.PutError(&info, err, sc)
}
@@ -63,7 +65,53 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request,
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
}

func (c *Gateway) cacheManifest(info *PathInfo) (int, error) {
func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) error {
mr, err := c.queueClient.Create(ctx, msg, weight)
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}

if mr.Status == model.StatusPending {
chMr, err := c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return fmt.Errorf("failed to watch message: %w", err)
}
c.logger.Info("Watching message in queue", "msg", msg)
watiQueue:
for {
select {
case m, ok := <-chMr:
if !ok {
if m.Status != model.StatusPending {
break watiQueue
}

time.Sleep(1 * time.Second)
chMr, err = c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return fmt.Errorf("failed to re-watch message: %w", err)
}
} else {
mr = m
}

case <-ctx.Done():
return ctx.Err()
}
}
}

switch mr.Status {
case model.StatusCompleted:
return nil
case model.StatusFailed:
return fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error)
default:
return fmt.Errorf("unexpected status %q for message %q", mr.Status, msg)
}
}

func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) {
ctx := context.Background()
u := &url.URL{
Scheme: "https",
@@ -127,16 +175,41 @@ func (c *Gateway) cacheManifest(info *PathInfo) (int, error) {
}

err = c.cache.RelinkManifest(ctx, info.Host, info.Image, info.Manifests, digest)
if err != nil {
c.logger.Warn("failed relink manifest", "url", u.String(), "error", err)
} else {
if err == nil {
c.logger.Info("relink manifest", "url", u.String())
c.manifestCache.Put(info, cacheValue{
Digest: digest,
})
return 0, nil
}

if c.queueClient != nil {
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
err = c.waitingQueue(ctx, msg, weight)
if err != nil {
c.logger.Error("waiting queue", "msg", msg, "digest", digest, "weight", weight, "error", err)
} else {
c.manifestCache.Put(info, cacheValue{
Digest: digest,
})
return 0, nil
}
}
u.Path = fmt.Sprintf("/v2/%s/manifests/%s", info.Image, digest)
} else {
if c.queueClient != nil {
msg := fmt.Sprintf("%s/%s@%s", info.Host, info.Image, info.Manifests)
digest := info.Manifests
err := c.waitingQueue(ctx, msg, weight)
if err != nil {
c.logger.Error("waiting queue", "msg", msg, "digest", digest, "weight", weight, "error", err)
} else {
c.manifestCache.Put(info, cacheValue{
Digest: digest,
})
return 0, nil
}
}
}

forwardReq, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)

0 comments on commit 1d33c0e

Please sign in to comment.