From ba2f5445d26cf28a9e8b8d0fabe585d54e835d1d Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 07:04:12 +0100 Subject: [PATCH 01/22] gateway: remove workaround for go bugs on ranges on empty files https://github.com/golang/go/commit/edfe07834905809d687b30632ccb849b84ebd4f2 was released in go1.20 and our `go.mod` file indicates: ``` go 1.20 ``` So this is not needed anymore. However none of this matter since c28c847582f0512d7f4a0e25b45aebae2ca7ca04 embeded and modified the ServeContent function in our codebase. So I copied my upstream fix there. --- gateway/handler_unixfs_file.go | 9 ---- gateway/serve_http_content.go | 91 +++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/gateway/handler_unixfs_file.go b/gateway/handler_unixfs_file.go index 32f8332fd..1dbeef99b 100644 --- a/gateway/handler_unixfs_file.go +++ b/gateway/handler_unixfs_file.go @@ -28,15 +28,6 @@ func (i *handler) serveFile(ctx context.Context, w http.ResponseWriter, r *http. // Set Content-Disposition name := addContentDispositionHeader(w, r, rq.contentPath) - if fileSize == 0 { - // We override null files to 200 to avoid issues with fragment caching reverse proxies. - // Also whatever you are asking for, it's cheaper to just give you the complete file (nothing). - // TODO: remove this if clause once https://github.com/golang/go/issues/54794 is fixed in two latest releases of go - w.Header().Set("Content-Type", "text/plain") - w.WriteHeader(http.StatusOK) - return true - } - var content io.Reader = fileBytes // Calculate deterministic value for Content-Type HTTP header // (we prefer to do it here, rather than using implicit sniffing in http.ServeContent) diff --git a/gateway/serve_http_content.go b/gateway/serve_http_content.go index 2bb27ae04..9a8ee435f 100644 --- a/gateway/serve_http_content.go +++ b/gateway/serve_http_content.go @@ -45,6 +45,12 @@ func headerGetExact(h http.Header, key string) string { // 5. Does not require the name to be passed in for content sniffing // 6. content may be nil for HEAD requests func httpServeContent(w http.ResponseWriter, r *http.Request, modtime time.Time, size int64, content io.Reader) { + if size < 0 { + // Should never happen but just to be sure + http.Error(w, "negative content size computed", http.StatusInternalServerError) + return + } + setLastModified(w, modtime) done, rangeReq := checkPreconditions(w, r, modtime) if done { @@ -55,54 +61,61 @@ func httpServeContent(w http.ResponseWriter, r *http.Request, modtime time.Time, // handle Content-Range header. sendSize := size - if size >= 0 { - ranges, err := parseRange(rangeReq, size) - if err != nil { - if err == errNoOverlap { - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", size)) - } - http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) - return - } - if sumRangesSize(ranges) > size { - // The total number of bytes in all the ranges - // is larger than the size of the file by - // itself, so this is probably an attack, or a - // dumb client. Ignore the range request. + ranges, err := parseRange(rangeReq, size) + switch err { + case nil: + case errNoOverlap: + if size == 0 { + // Some clients add a Range header to all requests to + // limit the size of the response. If the file is empty, + // ignore the range header and respond with a 200 rather + // than a 416. ranges = nil + break } + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", size)) + fallthrough + default: + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if sumRangesSize(ranges) > size { + // The total number of bytes in all the ranges + // is larger than the size of the file by + // itself, so this is probably an attack, or a + // dumb client. Ignore the range request. + ranges = nil + } - // We only support a single range request, if more than one is submitted we just send back the first - if len(ranges) > 0 { - ra := ranges[0] - // RFC 7233, Section 4.1: - // "If a single part is being transferred, the server - // generating the 206 response MUST generate a - // Content-Range header field, describing what range - // of the selected representation is enclosed, and a - // payload consisting of the range. - // ... - // A server MUST NOT generate a multipart response to - // a request for a single range, since a client that - // does not request multiple parts might not support - // multipart responses." - - sendSize = ra.length - code = http.StatusPartialContent - w.Header().Set("Content-Range", ra.contentRange(size)) - } + // We only support a single range request, if more than one is submitted we just send back the first + if len(ranges) > 0 { + ra := ranges[0] + // RFC 7233, Section 4.1: + // "If a single part is being transferred, the server + // generating the 206 response MUST generate a + // Content-Range header field, describing what range + // of the selected representation is enclosed, and a + // payload consisting of the range. + // ... + // A server MUST NOT generate a multipart response to + // a request for a single range, since a client that + // does not request multiple parts might not support + // multipart responses." - w.Header().Set("Accept-Ranges", "bytes") - if w.Header().Get("Content-Encoding") == "" { - w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) - } + sendSize = ra.length + code = http.StatusPartialContent + w.Header().Set("Content-Range", ra.contentRange(size)) + } + + w.Header().Set("Accept-Ranges", "bytes") + if w.Header().Get("Content-Encoding") == "" { + w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) } w.WriteHeader(code) if r.Method != "HEAD" { - var sendContent io.Reader = content - io.CopyN(w, sendContent, sendSize) + io.CopyN(w, content, sendSize) } } From 651402f0c570c50036dc2824ef987b28e50dd59e Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 10:33:57 +0100 Subject: [PATCH 02/22] ipld/unixfs/trickle: add Parallel to tests They don't share state, on my computer time to test went down from 20s to 10s with ~200% CPU usage. --- ipld/unixfs/importer/trickle/trickle_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ipld/unixfs/importer/trickle/trickle_test.go b/ipld/unixfs/importer/trickle/trickle_test.go index 70bc2dd13..e525cd9e8 100644 --- a/ipld/unixfs/importer/trickle/trickle_test.go +++ b/ipld/unixfs/importer/trickle/trickle_test.go @@ -27,8 +27,10 @@ const ( ) func runBothSubtests(t *testing.T, tfunc func(*testing.T, UseRawLeaves)) { - t.Run("leaves=ProtoBuf", func(t *testing.T) { tfunc(t, ProtoBufLeaves) }) - t.Run("leaves=Raw", func(t *testing.T) { tfunc(t, RawLeaves) }) + t.Parallel() + + t.Run("leaves=ProtoBuf", func(t *testing.T) { t.Parallel(); tfunc(t, ProtoBufLeaves) }) + t.Run("leaves=Raw", func(t *testing.T) { t.Parallel(); tfunc(t, RawLeaves) }) } func buildTestDag(ds ipld.DAGService, spl chunker.Splitter, rawLeaves UseRawLeaves) (*merkledag.ProtoNode, error) { @@ -611,6 +613,8 @@ func testMultipleAppends(t *testing.T, rawLeaves UseRawLeaves) { } func TestAppendSingleBytesToEmpty(t *testing.T) { + t.Parallel() + ds := mdtest.Mock() data := []byte("AB") From 922c66c0e09239bab659fc74f83ae070c7999b82 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 10:57:54 +0100 Subject: [PATCH 03/22] chunking: a t.Parallel on tests Time went down from 25s to 0.5s I also removed `TestFuzzBuzhashChunking` since I rewrote it a `testing.F` fuzz test and yet it wasn't able to find any new interesting input. --- chunker/buzhash_norace_test.go | 14 -------------- chunker/buzhash_test.go | 12 ++++++++++++ chunker/parse_test.go | 4 ++++ chunker/rabin_test.go | 4 ++++ chunker/splitting_test.go | 4 ++++ 5 files changed, 24 insertions(+), 14 deletions(-) delete mode 100644 chunker/buzhash_norace_test.go diff --git a/chunker/buzhash_norace_test.go b/chunker/buzhash_norace_test.go deleted file mode 100644 index 50dc0e5ce..000000000 --- a/chunker/buzhash_norace_test.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build !race - -package chunk - -import ( - "testing" -) - -func TestFuzzBuzhashChunking(t *testing.T) { - buf := make([]byte, 1024*1024*16) - for i := 0; i < 100; i++ { - testBuzhashChunking(t, buf) - } -} diff --git a/chunker/buzhash_test.go b/chunker/buzhash_test.go index fe6de4434..2eaf5ae32 100644 --- a/chunker/buzhash_test.go +++ b/chunker/buzhash_test.go @@ -9,6 +9,8 @@ import ( ) func testBuzhashChunking(t *testing.T, buf []byte) (chunkCount int) { + t.Parallel() + n, err := util.NewTimeSeededRand().Read(buf) if n < len(buf) { t.Fatalf("expected %d bytes, got %d", len(buf), n) @@ -89,3 +91,13 @@ func TestBuzhashBitsHashBias(t *testing.T) { } } } + +func FuzzBuzhashChunking(f *testing.F) { + f.Add(make([]byte, 1024*1024*16)) + f.Fuzz(func(t *testing.T, b []byte) { + if len(b) < buzMin { + return + } + testBuzhashChunking(t, b) + }) +} diff --git a/chunker/parse_test.go b/chunker/parse_test.go index 2a33d64de..6809476e9 100644 --- a/chunker/parse_test.go +++ b/chunker/parse_test.go @@ -11,6 +11,8 @@ const ( ) func TestParseRabin(t *testing.T) { + t.Parallel() + r := bytes.NewReader(randBuf(t, 1000)) _, err := FromString(r, "rabin-18-25-32") @@ -55,6 +57,8 @@ func TestParseRabin(t *testing.T) { } func TestParseSize(t *testing.T) { + t.Parallel() + r := bytes.NewReader(randBuf(t, 1000)) _, err := FromString(r, "size-0") diff --git a/chunker/rabin_test.go b/chunker/rabin_test.go index 79699e324..31f3464ee 100644 --- a/chunker/rabin_test.go +++ b/chunker/rabin_test.go @@ -11,6 +11,8 @@ import ( ) func TestRabinChunking(t *testing.T) { + t.Parallel() + data := make([]byte, 1024*1024*16) n, err := util.NewTimeSeededRand().Read(data) if n < len(data) { @@ -67,6 +69,8 @@ func chunkData(t *testing.T, newC newSplitter, data []byte) map[string]blocks.Bl } func testReuse(t *testing.T, cr newSplitter) { + t.Parallel() + data := make([]byte, 1024*1024*16) n, err := util.NewTimeSeededRand().Read(data) if n < len(data) { diff --git a/chunker/splitting_test.go b/chunker/splitting_test.go index 4a9f7f332..c6712446a 100644 --- a/chunker/splitting_test.go +++ b/chunker/splitting_test.go @@ -23,6 +23,8 @@ func copyBuf(buf []byte) []byte { } func TestSizeSplitterOverAllocate(t *testing.T) { + t.Parallel() + max := 1000 r := bytes.NewReader(randBuf(t, max)) chunksize := int64(1024 * 256) @@ -40,6 +42,7 @@ func TestSizeSplitterIsDeterministic(t *testing.T) { if testing.Short() { t.SkipNow() } + t.Parallel() test := func() { bufR := randBuf(t, 10000000) // crank this up to satisfy yourself. @@ -75,6 +78,7 @@ func TestSizeSplitterFillsChunks(t *testing.T) { if testing.Short() { t.SkipNow() } + t.Parallel() max := 10000000 b := randBuf(t, max) From 8109917301319b54da79ce750a2f55b020b20793 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 10:07:46 +0100 Subject: [PATCH 04/22] blockservice: add `NewSessionContext` and `EmbedSessionInContext` This also include cleanup for session code. --- CHANGELOG.md | 2 + blockservice/blockservice.go | 237 ++++++++++++++++-------------- blockservice/blockservice_test.go | 65 ++++++++ 3 files changed, 195 insertions(+), 109 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb3d34c3f..734c89ebf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +- `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. + ### Changed ### Removed diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 423697d87..aa68b8c0f 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -144,29 +144,19 @@ func (s *blockService) Allowlist() verifcid.Allowlist { // If the current exchange is a SessionExchange, a new exchange // session will be created. Otherwise, the current exchange will be used // directly. +// Sessions are lazily setup, this is cheap. func NewSession(ctx context.Context, bs BlockService) *Session { - allowlist := verifcid.Allowlist(verifcid.DefaultAllowlist) + ses := grabSessionFromContext(ctx, bs) + if ses != nil { + return ses + } + + var allowlist verifcid.Allowlist = verifcid.DefaultAllowlist if bbs, ok := bs.(BoundedBlockService); ok { allowlist = bbs.Allowlist() } - exch := bs.Exchange() - if sessEx, ok := exch.(exchange.SessionExchange); ok { - return &Session{ - allowlist: allowlist, - sessCtx: ctx, - ses: nil, - sessEx: sessEx, - bs: bs.Blockstore(), - notifier: exch, - } - } - return &Session{ - allowlist: allowlist, - ses: exch, - sessCtx: ctx, - bs: bs.Blockstore(), - notifier: exch, - } + + return &Session{bs: bs, allowlist: allowlist, sesctx: ctx} } // AddBlock adds a particular block to the service, Putting it into the datastore. @@ -248,75 +238,80 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + if ses := grabSessionFromContext(ctx, s); ses != nil { + return ses.GetBlock(ctx, c) + } + ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() notifiableFetcher - if s.exchange != nil { - f = s.getExchange - } - - return getBlock(ctx, c, s.blockstore, s.allowlist, f) + return getBlock(ctx, c, s, s.allowlist, s.getExchangeFetcher) } -func (s *blockService) getExchange() notifiableFetcher { +// Look at what I have to do, no interface covariance :'( +func (s *blockService) getExchangeFetcher() exchange.Fetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) (blocks.Block, error) { +func getBlock(ctx context.Context, c cid.Cid, bs BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { err := verifcid.ValidateCid(allowlist, c) // hash security if err != nil { return nil, err } - block, err := bs.Get(ctx, c) - if err == nil { + blockstore := bs.Blockstore() + + block, err := blockstore.Get(ctx, c) + switch { + case err == nil: return block, nil + case ipld.IsNotFound(err): + break + default: + return nil, err } - if ipld.IsNotFound(err) && fget != nil { - f := fget() // Don't load the exchange until we have to + fetch := fetchFactory() // lazily create session if needed + if fetch == nil { + logger.Debug("BlockService GetBlock: Not found") + return nil, err + } - // TODO be careful checking ErrNotFound. If the underlying - // implementation changes, this will break. - logger.Debug("BlockService: Searching") - blk, err := f.GetBlock(ctx, c) - if err != nil { - return nil, err - } - // also write in the blockstore for caching, inform the exchange that the block is available - err = bs.Put(ctx, blk) - if err != nil { - return nil, err - } - err = f.NotifyNewBlocks(ctx, blk) + logger.Debug("BlockService: Searching") + blk, err := fetch.GetBlock(ctx, c) + if err != nil { + return nil, err + } + // also write in the blockstore for caching, inform the exchange that the block is available + err = blockstore.Put(ctx, blk) + if err != nil { + return nil, err + } + if ex := bs.Exchange(); ex != nil { + err = ex.NotifyNewBlocks(ctx, blk) if err != nil { return nil, err } - logger.Debugf("BlockService.BlockFetched %s", c) - return blk, nil } - - logger.Debug("BlockService GetBlock: Not found") - return nil, err + logger.Debugf("BlockService.BlockFetched %s", c) + return blk, nil } // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + if ses := grabSessionFromContext(ctx, s); ses != nil { + return ses.GetBlocks(ctx, ks) + } + ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - var f func() notifiableFetcher - if s.exchange != nil { - f = s.getExchange - } - - return getBlocks(ctx, ks, s.blockstore, s.allowlist, f) + return getBlocks(ctx, ks, s, s.allowlist, s.getExchangeFetcher) } -func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) <-chan blocks.Block { +func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { @@ -344,6 +339,8 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo ks = ks2 } + bs := blockservice.Blockstore() + var misses []cid.Cid for _, c := range ks { hit, err := bs.Get(ctx, c) @@ -358,17 +355,18 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo } } - if len(misses) == 0 || fget == nil { + fetch := fetchFactory() // don't load exchange unless we have to + if len(misses) == 0 || fetch == nil { return } - f := fget() // don't load exchange unless we have to - rblocks, err := f.GetBlocks(ctx, misses) + rblocks, err := fetch.GetBlocks(ctx, misses) if err != nil { logger.Debugf("Error with GetBlocks: %s", err) return } + ex := blockservice.Exchange() var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -389,14 +387,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo return } - // inform the exchange that the blocks are available - cache[0] = b - err = f.NotifyNewBlocks(ctx, cache[:]...) - if err != nil { - logger.Errorf("could not tell the exchange about new blocks: %s", err) - return + if ex != nil { + // inform the exchange that the blocks are available + cache[0] = b + err = ex.NotifyNewBlocks(ctx, cache[:]...) + if err != nil { + logger.Errorf("could not tell the exchange about new blocks: %s", err) + return + } + cache[0] = nil // early gc } - cache[0] = nil // early gc select { case out <- b: @@ -428,54 +428,36 @@ func (s *blockService) Close() error { return s.exchange.Close() } -type notifier interface { - NotifyNewBlocks(context.Context, ...blocks.Block) error -} - // Session is a helper type to provide higher level access to bitswap sessions type Session struct { - allowlist verifcid.Allowlist - bs blockstore.Blockstore - ses exchange.Fetcher - sessEx exchange.SessionExchange - sessCtx context.Context - notifier notifier - lk sync.Mutex + createSession sync.Once + bs BlockService + ses exchange.Fetcher + sesctx context.Context + allowlist verifcid.Allowlist } -type notifiableFetcher interface { - exchange.Fetcher - notifier -} +// grabSession is used to lazily create sessions. +func (s *Session) grabSession() exchange.Fetcher { + s.createSession.Do(func() { + defer func() { + s.sesctx = nil // early gc + }() -type notifiableFetcherWrapper struct { - exchange.Fetcher - notifier -} - -func (s *Session) getSession() notifiableFetcher { - s.lk.Lock() - defer s.lk.Unlock() - if s.ses == nil { - s.ses = s.sessEx.NewSession(s.sessCtx) - } - - return notifiableFetcherWrapper{s.ses, s.notifier} -} + ex := s.bs.Exchange() + if ex == nil { + return + } + s.ses = ex // always fallback to non session fetches -func (s *Session) getExchange() notifiableFetcher { - return notifiableFetcherWrapper{s.ses, s.notifier} -} + sesEx, ok := ex.(exchange.SessionExchange) + if !ok { + return + } + s.ses = sesEx.NewSession(s.sesctx) + }) -func (s *Session) getFetcherFactory() func() notifiableFetcher { - if s.sessEx != nil { - return s.getSession - } - if s.ses != nil { - // Our exchange isn't session compatible, let's fallback to non sessions fetches - return s.getExchange - } - return nil + return s.ses } // GetBlock gets a block in the context of a request session @@ -483,7 +465,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory()) + return getBlock(ctx, c, s.bs, s.allowlist, s.grabSession) } // GetBlocks gets blocks in the context of a request session @@ -491,7 +473,44 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory()) + return getBlocks(ctx, ks, s.bs, s.allowlist, s.grabSession) } var _ BlockGetter = (*Session)(nil) + +// ContextWithSession is a helper which creates a context with an embded session, +// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService] +// will be redirected to this same session instead. +// Sessions are lazily setup, this is cheap. +// It wont make a new session if one exists already in the context. +func ContextWithSession(ctx context.Context, bs BlockService) context.Context { + if grabSessionFromContext(ctx, bs) != nil { + return ctx + } + return EmbedSessionInContext(ctx, NewSession(ctx, bs)) +} + +// EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session. +func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { + // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. + return context.WithValue(ctx, ses.bs, ses) +} + +// grabSessionFromContext returns nil if the session was not found +// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, +// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. +// By having this private we allow consumers to follow the trace of where the blockservice is passed and used. +func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { + s := ctx.Value(bs) + if s == nil { + return nil + } + + ss, ok := s.(*Session) + if !ok { + // idk what to do here, that kinda sucks, giveup + return nil + } + + return ss +} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index e36058040..6591529d2 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -288,3 +288,68 @@ func TestAllowlist(t *testing.T) { check(blockservice.GetBlock) check(NewSession(ctx, blockservice).GetBlock) } + +type fakeIsNewSessionCreateExchange struct { + ses exchange.Fetcher + newSessionWasCalled bool +} + +var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil) + +func (*fakeIsNewSessionCreateExchange) Close() error { + return nil +} + +func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { + panic("should call on the session") +} + +func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) { + panic("should call on the session") +} + +func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher { + f.newSessionWasCalled = true + return f.ses +} + +func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error { + return nil +} + +func TestContextSession(t *testing.T) { + t.Parallel() + a := assert.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bgen := butil.NewBlockGenerator() + block1 := bgen.Next() + block2 := bgen.Next() + + bs := blockstore.NewBlockstore(ds.NewMapDatastore()) + a.NoError(bs.Put(ctx, block1)) + a.NoError(bs.Put(ctx, block2)) + sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)} + + service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) + + ctx = ContextWithSession(ctx, service) + + b, err := service.GetBlock(ctx, block1.Cid()) + a.NoError(err) + a.Equal(b.RawData(), block1.RawData()) + a.True(sesEx.newSessionWasCalled, "new session from context should be created") + sesEx.newSessionWasCalled = false + + bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()}) + a.Equal((<-bchan).RawData(), block2.RawData()) + a.False(sesEx.newSessionWasCalled, "session should be reused in context") + + a.Equal( + NewSession(ctx, service), + NewSession(ContextWithSession(ctx, service), service), + "session must be deduped in all invocations on the same context", + ) +} From 992c55c4706d59d391cb19343338184c7271af6e Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 12:10:56 +0100 Subject: [PATCH 05/22] gateway: add `WithContextHint` to `handler` and implement it on `BlocksBackend` This allows the `BlocksBackend` to inject a session in the context, it remove the problems where we keep reseting the bitswap session each time we call the `fetcher`. --- gateway/blocks_backend.go | 6 ++++++ gateway/gateway.go | 8 ++++++++ gateway/handler.go | 5 +++++ 3 files changed, 19 insertions(+) diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index 4faadb206..fe188ae71 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -684,6 +684,12 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { return has } +var _ WithContextHint = (*BlocksBackend)(nil) + +func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { + return blockservice.ContextWithSession(ctx, bb.blockService) +} + func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { roots, lastSeg, remainder, err := bb.getPathRoots(ctx, path) if err != nil { diff --git a/gateway/gateway.go b/gateway/gateway.go index b2edbf20c..aa0d59f43 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -386,6 +386,14 @@ type IPFSBackend interface { GetDNSLinkRecord(context.Context, string) (path.Path, error) } +// WithContextHint allows an [IPFSBackend] to inject custom [context.Context] configurations. +// This should be considered optional, consumers might only make a best effort attempt at calling WrapContextForRequest on requests. +type WithContextHint interface { + // WrapContextForRequest allows the backend to add request scopped modifications to the context, like debug values or value caches. + // There are no promises on actual usage in consumers. + WrapContextForRequest(context.Context) context.Context +} + // cleanHeaderSet is an helper function that cleans a set of headers by // (1) canonicalizing, (2) de-duplicating and (3) sorting. func cleanHeaderSet(headers []string) []string { diff --git a/gateway/handler.go b/gateway/handler.go index 1299c7e59..38a242d00 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -152,6 +152,11 @@ func (i *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // the hour is a hard fallback, we don't expect it to happen, but just in case ctx, cancel := context.WithTimeout(r.Context(), time.Hour) defer cancel() + + if withCtxWrap, ok := i.backend.(WithContextHint); ok { + ctx = withCtxWrap.WrapContextForRequest(ctx) + } + r = r.WithContext(ctx) switch r.Method { From 3154a4642910df70a0e44ab8d557180b9c53ab52 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 14:23:20 +0100 Subject: [PATCH 06/22] gateway: implement passthrough `WithContextHint` on `ipfsBackendWithMetrics` --- gateway/metrics.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/gateway/metrics.go b/gateway/metrics.go index 58ec88e71..32bb66568 100644 --- a/gateway/metrics.go +++ b/gateway/metrics.go @@ -179,6 +179,14 @@ func (b *ipfsBackendWithMetrics) GetDNSLinkRecord(ctx context.Context, fqdn stri } var _ IPFSBackend = (*ipfsBackendWithMetrics)(nil) +var _ WithContextHint = (*ipfsBackendWithMetrics)(nil) + +func (b *ipfsBackendWithMetrics) WrapContextForRequest(ctx context.Context) context.Context { + if withCtxWrap, ok := b.backend.(WithContextHint); ok { + return withCtxWrap.WrapContextForRequest(ctx) + } + return ctx +} func newHandlerWithMetrics(c *Config, backend IPFSBackend) *handler { i := &handler{ From ea04c77d44386b10e1d0aa8b691021f84f04c0c8 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 12 Jan 2024 15:04:17 +0100 Subject: [PATCH 07/22] blockservice: make ContextWithSession shortcut grabSessionFromContext inside newSession This was not wrong however it was confusing, grabSessionFromContext would show up twice in logs. --- blockservice/blockservice.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index aa68b8c0f..6bbe2dab0 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -151,6 +151,11 @@ func NewSession(ctx context.Context, bs BlockService) *Session { return ses } + return newSession(ctx, bs) +} + +// newSession is like [NewSession] but it does not attempt to reuse session from the existing context. +func newSession(ctx context.Context, bs BlockService) *Session { var allowlist verifcid.Allowlist = verifcid.DefaultAllowlist if bbs, ok := bs.(BoundedBlockService); ok { allowlist = bbs.Allowlist() @@ -487,7 +492,7 @@ func ContextWithSession(ctx context.Context, bs BlockService) context.Context { if grabSessionFromContext(ctx, bs) != nil { return ctx } - return EmbedSessionInContext(ctx, NewSession(ctx, bs)) + return EmbedSessionInContext(ctx, newSession(ctx, bs)) } // EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session. From 2f67c04af2fc05a92b316e578c8c3c687a3c8ea4 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 06:54:04 +0100 Subject: [PATCH 08/22] blockservice: don't store allowlist in Session This is accessible through the blockservice object. --- blockservice/blockservice.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 6bbe2dab0..6f0f78f8b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -156,12 +156,7 @@ func NewSession(ctx context.Context, bs BlockService) *Session { // newSession is like [NewSession] but it does not attempt to reuse session from the existing context. func newSession(ctx context.Context, bs BlockService) *Session { - var allowlist verifcid.Allowlist = verifcid.DefaultAllowlist - if bbs, ok := bs.(BoundedBlockService); ok { - allowlist = bbs.Allowlist() - } - - return &Session{bs: bs, allowlist: allowlist, sesctx: ctx} + return &Session{bs: bs, sesctx: ctx} } // AddBlock adds a particular block to the service, Putting it into the datastore. @@ -250,7 +245,7 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s, s.allowlist, s.getExchangeFetcher) + return getBlock(ctx, c, s, s.getExchangeFetcher) } // Look at what I have to do, no interface covariance :'( @@ -258,8 +253,8 @@ func (s *blockService) getExchangeFetcher() exchange.Fetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { - err := verifcid.ValidateCid(allowlist, c) // hash security +func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { + err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security if err != nil { return nil, err } @@ -313,15 +308,17 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s, s.allowlist, s.getExchangeFetcher) + return getBlocks(ctx, ks, s, s.getExchangeFetcher) } -func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { +func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { defer close(out) + allowlist := grabAllowlistFromBlockservice(blockservice) + allValid := true for _, c := range ks { if err := verifcid.ValidateCid(allowlist, c); err != nil { @@ -439,7 +436,6 @@ type Session struct { bs BlockService ses exchange.Fetcher sesctx context.Context - allowlist verifcid.Allowlist } // grabSession is used to lazily create sessions. @@ -470,7 +466,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s.bs, s.allowlist, s.grabSession) + return getBlock(ctx, c, s.bs, s.grabSession) } // GetBlocks gets blocks in the context of a request session @@ -478,7 +474,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s.bs, s.allowlist, s.grabSession) + return getBlocks(ctx, ks, s.bs, s.grabSession) } var _ BlockGetter = (*Session)(nil) @@ -519,3 +515,11 @@ func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { return ss } + +// grabAllowlistFromBlockservice never returns nil +func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { + if bbs, ok := bs.(BoundedBlockService); ok { + return bbs.Allowlist() + } + return verifcid.DefaultAllowlist +} From c7f33e6dbbde36f2b064ecc4340dee240725a2c8 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 06:58:24 +0100 Subject: [PATCH 09/22] blockservice: add compiletime guard for BoundedBlockService on the blockservice struct --- blockservice/blockservice.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 6f0f78f8b..cf730a563 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -71,6 +71,8 @@ type BoundedBlockService interface { Allowlist() verifcid.Allowlist } +var _ BoundedBlockService = (*blockService)(nil) + type blockService struct { allowlist verifcid.Allowlist blockstore blockstore.Blockstore From d5be896cc15ff63437669779454e8c7f64a2311b Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 07:42:36 +0100 Subject: [PATCH 10/22] blockservice: optimize getBlocks filter by not rescanning the already valid leading elements We would scan the valid leading elements twice. --- blockservice/blockservice.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index cf730a563..72ca44b84 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -321,18 +321,19 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet allowlist := grabAllowlistFromBlockservice(blockservice) - allValid := true - for _, c := range ks { + var lastAllValidIndex int + var c cid.Cid + for lastAllValidIndex, c = range ks { if err := verifcid.ValidateCid(allowlist, c); err != nil { - allValid = false break } } - if !allValid { + if lastAllValidIndex != len(ks) { // can't shift in place because we don't want to clobber callers. - ks2 := make([]cid.Cid, 0, len(ks)) - for _, c := range ks { + ks2 := make([]cid.Cid, lastAllValidIndex, len(ks)) + copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements + for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements // hash security if err := verifcid.ValidateCid(allowlist, c); err == nil { ks2 = append(ks2, c) From 02c77b433ae4a1c4dc88362835e3c6d0e3e0c73e Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 12:42:32 +0100 Subject: [PATCH 11/22] blockservice: fix symbol name in docs --- blockservice/blockservice.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 72ca44b84..7733788ec 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -494,7 +494,7 @@ func ContextWithSession(ctx context.Context, bs BlockService) context.Context { return EmbedSessionInContext(ctx, newSession(ctx, bs)) } -// EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session. +// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. return context.WithValue(ctx, ses.bs, ses) From b8ac21b0a97fc02b583a43b819a41ffdc51b2d72 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 12:49:56 +0100 Subject: [PATCH 12/22] blockservice: stop using deprecated NewWriteThrough in tests --- blockservice/blockservice_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 6591529d2..53fd725f3 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -27,7 +27,7 @@ func TestWriteThroughWorks(t *testing.T) { } exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) exch := offline.Exchange(exchbstore) - bserv := NewWriteThrough(bstore, exch) + bserv := New(bstore, exch, WriteThrough()) bgen := butil.NewBlockGenerator() block := bgen.Next() @@ -62,7 +62,7 @@ func TestExchangeWrite(t *testing.T) { offline.Exchange(exchbstore), 0, } - bserv := NewWriteThrough(bstore, exch) + bserv := New(bstore, exch, WriteThrough()) bgen := butil.NewBlockGenerator() for name, fetcher := range map[string]BlockGetter{ @@ -136,7 +136,7 @@ func TestLazySessionInitialization(t *testing.T) { session := offline.Exchange(bstore2) exch := offline.Exchange(bstore3) sessionExch := &fakeSessionExchange{Interface: exch, session: session} - bservSessEx := NewWriteThrough(bstore, sessionExch) + bservSessEx := New(bstore, sessionExch, WriteThrough()) bgen := butil.NewBlockGenerator() block := bgen.Next() @@ -234,7 +234,7 @@ func TestNilExchange(t *testing.T) { block := bgen.Next() bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - bserv := NewWriteThrough(bs, nil) + bserv := New(bs, nil, WriteThrough()) sess := NewSession(ctx, bserv) _, err := sess.GetBlock(ctx, block.Cid()) if !ipld.IsNotFound(err) { From 0536783b09d58f016aefe238091981f397c31278 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 15 Jan 2024 14:16:36 +0100 Subject: [PATCH 13/22] blockservice: remove deprecated NewWriteThrough function --- CHANGELOG.md | 1 + blockservice/blockservice.go | 8 -------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 734c89ebf..12ad5df7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes: ### Added - `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. +- `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. ### Changed diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 7733788ec..353be00f8 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -119,14 +119,6 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) return service } -// NewWriteThrough creates a BlockService that guarantees writes will go -// through to the blockstore and are not skipped by cache checks. -// -// Deprecated: Use [New] with the [WriteThrough] option. -func NewWriteThrough(bs blockstore.Blockstore, exchange exchange.Interface) BlockService { - return New(bs, exchange, WriteThrough()) -} - // Blockstore returns the blockstore behind this blockservice. func (s *blockService) Blockstore() blockstore.Blockstore { return s.blockstore From 4c3a1f2f343637b615c26a27a55afa12dd5aaed5 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Thu, 18 Jan 2024 17:46:10 +0100 Subject: [PATCH 14/22] gateway: remove dead, unexported lazySeek --- gateway/lazyseek.go | 61 ------------------------- gateway/lazyseek_test.go | 98 ---------------------------------------- 2 files changed, 159 deletions(-) delete mode 100644 gateway/lazyseek.go delete mode 100644 gateway/lazyseek_test.go diff --git a/gateway/lazyseek.go b/gateway/lazyseek.go deleted file mode 100644 index e0ec44a82..000000000 --- a/gateway/lazyseek.go +++ /dev/null @@ -1,61 +0,0 @@ -package gateway - -import ( - "errors" - "fmt" - "io" -) - -// The HTTP server uses seek to determine the file size. Actually _seeking_ can -// be slow so we wrap the seeker in a _lazy_ seeker. -type lazySeeker struct { - reader io.ReadSeeker - - size int64 - offset int64 - realOffset int64 -} - -func (s *lazySeeker) Seek(offset int64, whence int) (int64, error) { - switch whence { - case io.SeekEnd: - return s.Seek(s.size+offset, io.SeekStart) - case io.SeekCurrent: - return s.Seek(s.offset+offset, io.SeekStart) - case io.SeekStart: - if offset < 0 { - return s.offset, errors.New("invalid seek offset") - } - s.offset = offset - return s.offset, nil - default: - return s.offset, fmt.Errorf("invalid whence: %d", whence) - } -} - -func (s *lazySeeker) Read(b []byte) (int, error) { - // If we're past the end, EOF. - if s.offset >= s.size { - return 0, io.EOF - } - - // actually seek - for s.offset != s.realOffset { - off, err := s.reader.Seek(s.offset, io.SeekStart) - if err != nil { - return 0, err - } - s.realOffset = off - } - off, err := s.reader.Read(b) - s.realOffset += int64(off) - s.offset += int64(off) - return off, err -} - -func (s *lazySeeker) Close() error { - if closer, ok := s.reader.(io.Closer); ok { - return closer.Close() - } - return nil -} diff --git a/gateway/lazyseek_test.go b/gateway/lazyseek_test.go deleted file mode 100644 index fdc6d2b34..000000000 --- a/gateway/lazyseek_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package gateway - -import ( - "errors" - "io" - "strings" - "testing" - - "github.com/stretchr/testify/require" -) - -type badSeeker struct { - io.ReadSeeker -} - -var errBadSeek = errors.New("bad seeker") - -func (bs badSeeker) Seek(offset int64, whence int) (int64, error) { - off, err := bs.ReadSeeker.Seek(0, io.SeekCurrent) - if err != nil { - panic(err) - } - return off, errBadSeek -} - -func TestLazySeekerError(t *testing.T) { - underlyingBuffer := strings.NewReader("fubar") - s := &lazySeeker{ - reader: badSeeker{underlyingBuffer}, - size: underlyingBuffer.Size(), - } - off, err := s.Seek(0, io.SeekEnd) - require.NoError(t, err) - require.Equal(t, s.size, off, "expected to seek to the end") - - // shouldn't have actually seeked. - b, err := io.ReadAll(s) - require.NoError(t, err) - require.Equal(t, 0, len(b), "expected to read nothing") - - // shouldn't need to actually seek. - off, err = s.Seek(0, io.SeekStart) - require.NoError(t, err) - require.Equal(t, int64(0), off, "expected to seek to the start") - - b, err = io.ReadAll(s) - require.NoError(t, err) - require.Equal(t, "fubar", string(b), "expected to read string") - - // should fail the second time. - off, err = s.Seek(0, io.SeekStart) - require.NoError(t, err) - require.Equal(t, int64(0), off, "expected to seek to the start") - - // right here... - b, err = io.ReadAll(s) - require.NotNil(t, err) - require.Equal(t, errBadSeek, err) - require.Equal(t, 0, len(b), "expected to read nothing") -} - -func TestLazySeeker(t *testing.T) { - underlyingBuffer := strings.NewReader("fubar") - s := &lazySeeker{ - reader: underlyingBuffer, - size: underlyingBuffer.Size(), - } - expectByte := func(b byte) { - t.Helper() - var buf [1]byte - n, err := io.ReadFull(s, buf[:]) - require.NoError(t, err) - require.Equal(t, 1, n, "expected to read one byte, read %d", n) - require.Equal(t, b, buf[0]) - } - expectSeek := func(whence int, off, expOff int64, expErr string) { - t.Helper() - n, err := s.Seek(off, whence) - if expErr == "" { - require.NoError(t, err) - } else { - require.EqualError(t, err, expErr) - } - require.Equal(t, expOff, n) - } - - expectSeek(io.SeekEnd, 0, s.size, "") - b, err := io.ReadAll(s) - require.NoError(t, err) - require.Equal(t, 0, len(b), "expected to read nothing") - expectSeek(io.SeekEnd, -1, s.size-1, "") - expectByte('r') - expectSeek(io.SeekStart, 0, 0, "") - expectByte('f') - expectSeek(io.SeekCurrent, 1, 2, "") - expectByte('b') - expectSeek(io.SeekCurrent, -100, 3, "invalid seek offset") -} From 3d57bce7998cad85517d1e1dc4602a0f55e59dee Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 24 Jan 2024 10:25:21 +0100 Subject: [PATCH 15/22] gateway: extract CORS to headers middleware --- CHANGELOG.md | 3 + examples/gateway/common/handler.go | 11 ++- gateway/README.md | 10 +-- gateway/errors_test.go | 4 +- gateway/gateway.go | 80 --------------------- gateway/gateway_test.go | 8 +-- gateway/handler.go | 8 --- gateway/handler_codec_test.go | 1 - gateway/headers.go | 112 +++++++++++++++++++++++++++++ gateway/hostname.go | 17 +---- gateway/utilities_test.go | 6 +- 11 files changed, 133 insertions(+), 127 deletions(-) create mode 100644 gateway/headers.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 12ad5df7e..4b16d4865 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,14 @@ The following emojis are used to highlight certain changes: - `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. +- `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. ### Changed ### Removed +- ๐Ÿ›  `gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. + ### Security ## [v0.17.0] diff --git a/examples/gateway/common/handler.go b/examples/gateway/common/handler.go index d21f38b64..5c4469aba 100644 --- a/examples/gateway/common/handler.go +++ b/examples/gateway/common/handler.go @@ -12,10 +12,6 @@ import ( func NewHandler(gwAPI gateway.IPFSBackend) http.Handler { conf := gateway.Config{ - // Initialize the headers. For this example, we do not add any special headers, - // only the required ones via gateway.AddAccessControlHeaders. - Headers: map[string][]string{}, - // If you set DNSLink to point at the CID from CAR, you can load it! NoDNSLink: false, @@ -58,9 +54,6 @@ func NewHandler(gwAPI gateway.IPFSBackend) http.Handler { }, } - // Add required access control headers to the configuration. - gateway.AddAccessControlHeaders(conf.Headers) - // Creates a mux to serve the gateway paths. This is not strictly necessary // and gwHandler could be used directly. However, on the next step we also want // to add prometheus metrics, hence needing the mux. @@ -86,6 +79,10 @@ func NewHandler(gwAPI gateway.IPFSBackend) http.Handler { // http.ServeMux which does not support CONNECT by default. handler = withConnect(handler) + // Add headers middleware that applies any headers we define to all requests + // as well as a default CORS configuration. + handler = gateway.NewHeaders(nil).ApplyCors().Wrap(handler) + // Finally, wrap with the otelhttp handler. This will allow the tracing system // to work and for correct propagation of tracing headers. This step is optional // and only required if you want to use tracing. Note that OTel must be correctly diff --git a/gateway/README.md b/gateway/README.md index a434f9b36..60e2dbcfb 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -14,13 +14,7 @@ This example shows how you can start your own gateway, assuming you have an `IPF implementation. ```go -// Initialize your headers and apply the default headers. -headers := map[string][]string{} -gateway.AddAccessControlHeaders(headers) - -conf := gateway.Config{ - Headers: headers, -} +conf := gateway.Config{} // Initialize an IPFSBackend interface for both an online and offline versions. // The offline version should not make any network request for missing content. @@ -29,9 +23,11 @@ ipfsBackend := ... // Create http mux and setup path gateway handler. mux := http.NewServeMux() handler := gateway.NewHandler(conf, ipfsBackend) +handler = gateway.NewHeaders(nil).ApplyCors().Wrap(handler) mux.Handle("/ipfs/", handler) mux.Handle("/ipns/", handler) + // Start the server on :8080 and voilรก! You have a basic IPFS gateway running // in http://localhost:8080. _ = http.ListenAndServe(":8080", mux) diff --git a/gateway/errors_test.go b/gateway/errors_test.go index cad7ae061..ca41b759b 100644 --- a/gateway/errors_test.go +++ b/gateway/errors_test.go @@ -43,7 +43,7 @@ func TestWebError(t *testing.T) { t.Parallel() // Create a handler to be able to test `webError`. - config := &Config{Headers: map[string][]string{}} + config := &Config{} t.Run("429 Too Many Requests", func(t *testing.T) { t.Parallel() @@ -113,7 +113,7 @@ func TestWebError(t *testing.T) { t.Run("Error is sent as plain text when 'Accept' header contains 'text/html' and config.DisableHTMLErrors is true", func(t *testing.T) { t.Parallel() - config := &Config{Headers: map[string][]string{}, DisableHTMLErrors: true} + config := &Config{DisableHTMLErrors: true} w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodGet, "/blah", nil) r.Header.Set("Accept", "something/else, text/html") diff --git a/gateway/gateway.go b/gateway/gateway.go index aa0d59f43..be9501281 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "io" - "net/http" - "sort" "strconv" "strings" "time" @@ -20,11 +18,6 @@ import ( // Config is the configuration used when creating a new gateway handler. type Config struct { - // Headers is a map containing all the headers that should be sent by default - // in all requests. You can define custom headers, as well as add the recommended - // headers via AddAccessControlHeaders. - Headers map[string][]string - // DeserializedResponses configures this gateway to support returning data // in deserialized format. By default, the gateway will only support // trustless, verifiable [application/vnd.ipld.raw] and @@ -394,79 +387,6 @@ type WithContextHint interface { WrapContextForRequest(context.Context) context.Context } -// cleanHeaderSet is an helper function that cleans a set of headers by -// (1) canonicalizing, (2) de-duplicating and (3) sorting. -func cleanHeaderSet(headers []string) []string { - // Deduplicate and canonicalize. - m := make(map[string]struct{}, len(headers)) - for _, h := range headers { - m[http.CanonicalHeaderKey(h)] = struct{}{} - } - result := make([]string, 0, len(m)) - for k := range m { - result = append(result, k) - } - - // Sort - sort.Strings(result) - return result -} - -// AddAccessControlHeaders ensures safe default HTTP headers are used for -// controlling cross-origin requests. This function adds several values to the -// [Access-Control-Allow-Headers] and [Access-Control-Expose-Headers] entries -// to be exposed on GET and OPTIONS responses, including [CORS Preflight]. -// -// If the Access-Control-Allow-Origin entry is missing, a default value of '*' is -// added, indicating that browsers should allow requesting code from any -// origin to access the resource. -// -// If the Access-Control-Allow-Methods entry is missing a value, 'GET, HEAD, -// OPTIONS' is added, indicating that browsers may use them when issuing cross -// origin requests. -// -// [Access-Control-Allow-Headers]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Headers -// [Access-Control-Expose-Headers]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Expose-Headers -// [CORS Preflight]: https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request -func AddAccessControlHeaders(headers map[string][]string) { - // Hard-coded headers. - const ACAHeadersName = "Access-Control-Allow-Headers" - const ACEHeadersName = "Access-Control-Expose-Headers" - const ACAOriginName = "Access-Control-Allow-Origin" - const ACAMethodsName = "Access-Control-Allow-Methods" - - if _, ok := headers[ACAOriginName]; !ok { - // Default to *all* - headers[ACAOriginName] = []string{"*"} - } - if _, ok := headers[ACAMethodsName]; !ok { - // Default to GET, HEAD, OPTIONS - headers[ACAMethodsName] = []string{ - http.MethodGet, - http.MethodHead, - http.MethodOptions, - } - } - - headers[ACAHeadersName] = cleanHeaderSet( - append([]string{ - "Content-Type", - "User-Agent", - "Range", - "X-Requested-With", - }, headers[ACAHeadersName]...)) - - headers[ACEHeadersName] = cleanHeaderSet( - append([]string{ - "Content-Length", - "Content-Range", - "X-Chunked-Output", - "X-Stream-Output", - "X-Ipfs-Path", - "X-Ipfs-Roots", - }, headers[ACEHeadersName]...)) -} - // RequestContextKey is a type representing a [context.Context] value key. type RequestContextKey string diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 53f19ca08..4785ec759 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -352,8 +352,7 @@ func TestHeaders(t *testing.T) { headers := map[string][]string{} headers[headerACAO] = []string{expectedACAO} - ts := newTestServerWithConfig(t, backend, Config{ - Headers: headers, + ts := newTestServerWithConfigAndHeaders(t, backend, Config{ PublicGateways: map[string]*PublicGateway{ "subgw.example.com": { Paths: []string{"/ipfs", "/ipns"}, @@ -362,7 +361,7 @@ func TestHeaders(t *testing.T) { }, }, DeserializedResponses: true, - }) + }, headers) t.Logf("test server url: %s", ts.URL) testCORSPreflightRequest := func(t *testing.T, path, hostHeader string, requestOriginHeader string, code int) { @@ -532,7 +531,6 @@ func TestRedirects(t *testing.T) { backend.namesys["/ipns/example.com"] = newMockNamesysItem(path.FromCid(root), 0) ts := newTestServerWithConfig(t, backend, Config{ - Headers: map[string][]string{}, NoDNSLink: false, PublicGateways: map[string]*PublicGateway{ "example.com": { @@ -590,7 +588,6 @@ func TestDeserializedResponses(t *testing.T) { backend, root := newMockBackend(t, "fixtures.car") ts := newTestServerWithConfig(t, backend, Config{ - Headers: map[string][]string{}, NoDNSLink: false, PublicGateways: map[string]*PublicGateway{ "trustless.com": { @@ -670,7 +667,6 @@ func TestDeserializedResponses(t *testing.T) { backend.namesys["/ipns/trusted.com"] = newMockNamesysItem(path.FromCid(root), 0) ts := newTestServerWithConfig(t, backend, Config{ - Headers: map[string][]string{}, NoDNSLink: false, PublicGateways: map[string]*PublicGateway{ "trustless.com": { diff --git a/gateway/handler.go b/gateway/handler.go index 38a242d00..a64d22971 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -179,7 +179,6 @@ func (i *handler) optionsHandler(w http.ResponseWriter, r *http.Request) { // OPTIONS is a noop request that is used by the browsers to check if server accepts // cross-site XMLHttpRequest, which is indicated by the presence of CORS headers: // https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS#Preflighted_requests - addCustomHeaders(w, i.config.Headers) // return all custom headers (including CORS ones, if set) } // addAllowHeader sets Allow header with supported HTTP methods @@ -264,7 +263,6 @@ func (i *handler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) { trace.SpanFromContext(r.Context()).SetAttributes(attribute.String("ResponseFormat", responseFormat)) i.requestTypeMetric.WithLabelValues(contentPath.Namespace(), responseFormat).Inc() - addCustomHeaders(w, i.config.Headers) // ok, _now_ write user's headers. w.Header().Set("X-Ipfs-Path", contentPath.String()) // Fail fast if unsupported request type was sent to a Trustless Gateway. @@ -340,12 +338,6 @@ func (i *handler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) { } } -func addCustomHeaders(w http.ResponseWriter, headers map[string][]string) { - for k, v := range headers { - w.Header()[http.CanonicalHeaderKey(k)] = v - } -} - // isDeserializedResponsePossible returns true if deserialized responses // are allowed on the specified hostname, or globally. Host-specific rules // override global config. diff --git a/gateway/handler_codec_test.go b/gateway/handler_codec_test.go index d22579027..127e0bc8c 100644 --- a/gateway/handler_codec_test.go +++ b/gateway/handler_codec_test.go @@ -16,7 +16,6 @@ func TestDagJsonCborPreview(t *testing.T) { backend, root := newMockBackend(t, "fixtures.car") ts := newTestServerWithConfig(t, backend, Config{ - Headers: map[string][]string{}, NoDNSLink: false, PublicGateways: map[string]*PublicGateway{ "example.com": { diff --git a/gateway/headers.go b/gateway/headers.go new file mode 100644 index 000000000..66ad5d43a --- /dev/null +++ b/gateway/headers.go @@ -0,0 +1,112 @@ +package gateway + +import ( + "net/http" + "sort" +) + +// Headers is an HTTP middleware that sets the configured headers in all requests. +type Headers struct { + headers map[string][]string +} + +// NewHeaders creates a new [Headers] middleware that applies the given headers +// to all requests. If you call [Headers.ApplyCors], the default CORS configuration +// will also be applied, if any of the CORS headers is missing. +func NewHeaders(headers map[string][]string) *Headers { + h := &Headers{ + headers: map[string][]string{}, + } + + for k, v := range headers { + h.headers[http.CanonicalHeaderKey(k)] = v + } + + return h +} + +// ApplyCors applies safe default HTTP headers for controlling cross-origin +// requests. This function adds several values to the [Access-Control-Allow-Headers] +// and [Access-Control-Expose-Headers] entries to be exposed on GET and OPTIONS +// responses, including [CORS Preflight]. +// +// If the Access-Control-Allow-Origin entry is missing, a default value of '*' is +// added, indicating that browsers should allow requesting code from any +// origin to access the resource. +// +// If the Access-Control-Allow-Methods entry is missing a value, 'GET, HEAD, +// OPTIONS' is added, indicating that browsers may use them when issuing cross +// origin requests. +// +// [Access-Control-Allow-Headers]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Headers +// [Access-Control-Expose-Headers]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Expose-Headers +// [CORS Preflight]: https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request +func (h *Headers) ApplyCors() *Headers { + // Hard-coded headers. + const ACAHeadersName = "Access-Control-Allow-Headers" + const ACEHeadersName = "Access-Control-Expose-Headers" + const ACAOriginName = "Access-Control-Allow-Origin" + const ACAMethodsName = "Access-Control-Allow-Methods" + + if _, ok := h.headers[ACAOriginName]; !ok { + // Default to *all* + h.headers[ACAOriginName] = []string{"*"} + } + if _, ok := h.headers[ACAMethodsName]; !ok { + // Default to GET, HEAD, OPTIONS + h.headers[ACAMethodsName] = []string{ + http.MethodGet, + http.MethodHead, + http.MethodOptions, + } + } + + h.headers[ACAHeadersName] = cleanHeaderSet( + append([]string{ + "Content-Type", + "User-Agent", + "Range", + "X-Requested-With", + }, h.headers[ACAHeadersName]...)) + + h.headers[ACEHeadersName] = cleanHeaderSet( + append([]string{ + "Content-Length", + "Content-Range", + "X-Chunked-Output", + "X-Stream-Output", + "X-Ipfs-Path", + "X-Ipfs-Roots", + }, h.headers[ACEHeadersName]...)) + + return h +} + +// Wrap wraps the given [http.Handler] with the headers middleware. +func (h *Headers) Wrap(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for k, v := range h.headers { + w.Header()[k] = v + } + + next.ServeHTTP(w, r) + }) +} + +// cleanHeaderSet is an helper function that cleans a set of headers by +// (1) canonicalizing, (2) de-duplicating and (3) sorting. +func cleanHeaderSet(headers []string) []string { + // Deduplicate and canonicalize. + m := make(map[string]struct{}, len(headers)) + for _, h := range headers { + m[http.CanonicalHeaderKey(h)] = struct{}{} + } + result := make([]string, 0, len(m)) + for k := range m { + result = append(result, k) + } + + // Sort + sort.Strings(result) + return result +} diff --git a/gateway/hostname.go b/gateway/hostname.go index 6b485f0b4..665cf1663 100644 --- a/gateway/hostname.go +++ b/gateway/hostname.go @@ -68,7 +68,7 @@ func NewHostnameHandler(c Config, backend IPFSBackend, next http.Handler) http.H return } if newURL != "" { - httpRedirectWithHeaders(w, r, newURL, http.StatusMovedPermanently, c.Headers) + http.Redirect(w, r, newURL, http.StatusMovedPermanently) return } } @@ -131,7 +131,7 @@ func NewHostnameHandler(c Config, backend IPFSBackend, next http.Handler) http.H if newURL != "" { // Redirect to deterministic CID to ensure CID // always gets the same Origin on the web - httpRedirectWithHeaders(w, r, newURL, http.StatusMovedPermanently, c.Headers) + http.Redirect(w, r, newURL, http.StatusMovedPermanently) return } } @@ -146,7 +146,7 @@ func NewHostnameHandler(c Config, backend IPFSBackend, next http.Handler) http.H } if newURL != "" { // Redirect to CID fixed inside of toSubdomainURL() - httpRedirectWithHeaders(w, r, newURL, http.StatusMovedPermanently, c.Headers) + http.Redirect(w, r, newURL, http.StatusMovedPermanently) return } } @@ -625,14 +625,3 @@ func (gws *hostnameGateways) knownSubdomainDetails(hostname string) (gw *PublicG // no match return nil, "", "", "", false } - -// httpRedirectWithHeaders applies custom headers before returning a redirect -// response to ensure consistency during transition from path to subdomain -// contexts. -func httpRedirectWithHeaders(w http.ResponseWriter, r *http.Request, url string, code int, headers map[string][]string) { - // ensure things like CORS are applied to redirect responses - // (https://github.com/ipfs/kubo/issues/9983#issuecomment-1599673976) - addCustomHeaders(w, headers) - - http.Redirect(w, r, url, code) -} diff --git a/gateway/utilities_test.go b/gateway/utilities_test.go index 85153f808..68db84041 100644 --- a/gateway/utilities_test.go +++ b/gateway/utilities_test.go @@ -232,19 +232,21 @@ func newTestServerAndNode(t *testing.T, ns mockNamesys, fixturesFile string) (*h func newTestServer(t *testing.T, backend IPFSBackend) *httptest.Server { return newTestServerWithConfig(t, backend, Config{ - Headers: map[string][]string{}, DeserializedResponses: true, }) } func newTestServerWithConfig(t *testing.T, backend IPFSBackend, config Config) *httptest.Server { - AddAccessControlHeaders(config.Headers) + return newTestServerWithConfigAndHeaders(t, backend, config, map[string][]string{}) +} +func newTestServerWithConfigAndHeaders(t *testing.T, backend IPFSBackend, config Config, headers map[string][]string) *httptest.Server { handler := NewHandler(config, backend) mux := http.NewServeMux() mux.Handle("/ipfs/", handler) mux.Handle("/ipns/", handler) handler = NewHostnameHandler(config, backend, mux) + handler = NewHeaders(headers).ApplyCors().Wrap(handler) ts := httptest.NewServer(handler) t.Cleanup(func() { ts.Close() }) From bf34cd0777d8813def8a5fc3312b7287501df9a7 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 Jan 2024 09:34:42 -0800 Subject: [PATCH 16/22] fix(gw): entity-bytes with negative indexes beyond file size (#523) * fix(gateway): bound negative indexes to size of file * fix: adjust negative to when from is negative too * chore: gateway-conformance@v0.5 https://github.com/ipfs/gateway-conformance/releases/tag/v0.5.0 --------- Co-authored-by: Marcin Rataj --- .github/workflows/gateway-conformance.yml | 8 ++++---- CHANGELOG.md | 5 ++++- gateway/blocks_backend.go | 15 ++++++++++----- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/.github/workflows/gateway-conformance.yml b/.github/workflows/gateway-conformance.yml index f45e9aedf..c9c3eb072 100644 --- a/.github/workflows/gateway-conformance.yml +++ b/.github/workflows/gateway-conformance.yml @@ -16,18 +16,18 @@ jobs: steps: # 1. Download the gateway-conformance fixtures - name: Download gateway-conformance fixtures - uses: ipfs/gateway-conformance/.github/actions/extract-fixtures@v0.4 + uses: ipfs/gateway-conformance/.github/actions/extract-fixtures@v0.5 with: output: fixtures merged: true # 2. Build the car-gateway - name: Setup Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: go-version: 1.21.x - name: Checkout boxo - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: path: boxo - name: Build car-gateway @@ -40,7 +40,7 @@ jobs: # 4. Run the gateway-conformance tests - name: Run gateway-conformance tests - uses: ipfs/gateway-conformance/.github/actions/test@v0.4 + uses: ipfs/gateway-conformance/.github/actions/test@v0.5 with: gateway-url: http://127.0.0.1:8040 json: output.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b16d4865..773cd1807 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,10 @@ The following emojis are used to highlight certain changes: ### Removed -- ๐Ÿ›  `gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. +### Fixed + +- ๐Ÿ›  `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended +- ๐Ÿ›  `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. ### Security diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index fe188ae71..d85c2846b 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -508,6 +508,9 @@ func walkGatewaySimpleSelector(ctx context.Context, p path.ImmutablePath, params return err } from = fileLength + entityRange.From + if from < 0 { + from = 0 + } foundFileLength = true } @@ -521,13 +524,15 @@ func walkGatewaySimpleSelector(ctx context.Context, p path.ImmutablePath, params } to := *entityRange.To - if (*entityRange.To) < 0 && !foundFileLength { - fileLength, err = f.Seek(0, io.SeekEnd) - if err != nil { - return err + if (*entityRange.To) < 0 { + if !foundFileLength { + fileLength, err = f.Seek(0, io.SeekEnd) + if err != nil { + return err + } + foundFileLength = true } to = fileLength + *entityRange.To - foundFileLength = true } numToRead := 1 + to - from From fdfcfcc0708a39b06825e158e9bfa96b30cf42aa Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Thu, 25 Jan 2024 11:37:58 +0100 Subject: [PATCH 17/22] namesys: add WithMaxCacheTTL --- CHANGELOG.md | 1 + namesys/namesys.go | 19 +++++++++++++++++-- namesys/namesys_cache.go | 10 +++++++++- namesys/namesys_test.go | 38 ++++++++++++++++++++++++++++---------- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 773cd1807..d0a64b4e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes: - `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. - `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. +- `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries. ### Changed diff --git a/namesys/namesys.go b/namesys/namesys.go index 00b1f4d2d..7928dde40 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -48,8 +48,9 @@ type namesys struct { dnsResolver, ipnsResolver resolver ipnsPublisher Publisher - staticMap map[string]*cacheEntry - cache *lru.Cache[string, cacheEntry] + staticMap map[string]*cacheEntry + cache *lru.Cache[string, cacheEntry] + maxCacheTTL *time.Duration } var _ NameSystem = &namesys{} @@ -73,6 +74,20 @@ func WithCache(size int) Option { } } +// WithMaxCacheTTL configures the maximum cache TTL. By default, if the cache is +// enabled, the entry TTL will be used for caching. By setting this option, you +// can limit how long that TTL is. +// +// For example, if you configure a maximum cache TTL of 1 minute: +// - Entry TTL is 5 minutes -> Cache TTL is 1 minute +// - Entry TTL is 30 seconds -> Cache TTL is 30 seconds +func WithMaxCacheTTL(dur time.Duration) Option { + return func(n *namesys) error { + n.maxCacheTTL = &dur + return nil + } +} + // WithDNSResolver is an option that supplies a custom DNS resolver to use instead // of the system default. func WithDNSResolver(rslv madns.BasicResolver) Option { diff --git a/namesys/namesys_cache.go b/namesys/namesys_cache.go index fc8842e3b..531220b23 100644 --- a/namesys/namesys_cache.go +++ b/namesys/namesys_cache.go @@ -60,12 +60,20 @@ func (ns *namesys) cacheSet(name string, val path.Path, ttl time.Duration, lastM } } + // The cache TTL is capped at the configured maxCacheTTL. If not + // configured, the entry TTL will always be used. + cacheTTL := ttl + if ns.maxCacheTTL != nil && cacheTTL > *ns.maxCacheTTL { + cacheTTL = *ns.maxCacheTTL + } + cacheEOL := time.Now().Add(cacheTTL) + // Add automatically evicts previous entry, so it works for updating. ns.cache.Add(name, cacheEntry{ val: val, ttl: ttl, lastMod: lastMod, - cacheEOL: time.Now().Add(ttl), + cacheEOL: cacheEOL, }) } diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 41fa0ce88..48310f025 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -146,20 +146,38 @@ func TestPublishWithTTL(t *testing.T) { "pk": record.PublicKeyValidator{}, }) - ns, err := NewNameSystem(routing, WithDatastore(dst), WithCache(128)) - require.NoError(t, err) - // CID is arbitrary. p, err := path.NewPath("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") require.NoError(t, err) - ttl := 1 * time.Second - eol := time.Now().Add(2 * time.Second) + ttl := 5 * time.Minute + eol := time.Now().Add(time.Hour) - err = ns.Publish(context.Background(), priv, p, PublishWithEOL(eol), PublishWithTTL(ttl)) - require.NoError(t, err) + t.Run("Without MaxCacheTTL", func(t *testing.T) { + ns, err := NewNameSystem(routing, WithDatastore(dst), WithCache(128)) + require.NoError(t, err) + + err = ns.Publish(context.Background(), priv, p, PublishWithEOL(eol), PublishWithTTL(ttl)) + require.NoError(t, err) + + entry, ok := ns.(*namesys).cache.Get(ipns.NameFromPeer(pid).String()) + require.True(t, ok) + require.Equal(t, ttl, entry.ttl) + require.LessOrEqual(t, time.Until(entry.cacheEOL), ttl) + }) + + t.Run("With MaxCacheTTL", func(t *testing.T) { + cacheTTL := 30 * time.Second + + ns, err := NewNameSystem(routing, WithDatastore(dst), WithCache(128), WithMaxCacheTTL(cacheTTL)) + require.NoError(t, err) - entry, ok := ns.(*namesys).cache.Get(ipns.NameFromPeer(pid).String()) - require.True(t, ok) - require.LessOrEqual(t, entry.cacheEOL.Sub(eol), 10*time.Millisecond) + err = ns.Publish(context.Background(), priv, p, PublishWithEOL(eol), PublishWithTTL(ttl)) + require.NoError(t, err) + + entry, ok := ns.(*namesys).cache.Get(ipns.NameFromPeer(pid).String()) + require.True(t, ok) + require.Equal(t, ttl, entry.ttl) + require.LessOrEqual(t, time.Until(entry.cacheEOL), cacheTTL) + }) } From 89bceff34bf108a46795d26938e6aa8c2f876fc4 Mon Sep 17 00:00:00 2001 From: GitHub Date: Wed, 31 Jan 2024 17:35:18 +0000 Subject: [PATCH 18/22] chore: Update .github/workflows/stale.yml [skip ci] From 79cb4e2886d72af06f3b102982520a743835c02f Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 6 Feb 2024 09:46:52 +0100 Subject: [PATCH 19/22] gateway: tests for simpler superfluous namespace handling (#572) --- gateway/gateway_test.go | 86 +++++++++++++++++++++++++++++++++++++++++ gateway/handler.go | 36 +---------------- 2 files changed, 87 insertions(+), 35 deletions(-) diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 4785ec759..031a184a5 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -577,6 +577,92 @@ func TestRedirects(t *testing.T) { do(http.MethodGet) do(http.MethodHead) }) + + t.Run("Superfluous namespace", func(t *testing.T) { + t.Parallel() + + backend, root := newMockBackend(t, "fixtures.car") + backend.namesys["/ipns/dnslink-gateway.com"] = newMockNamesysItem(path.FromCid(root), 0) + backend.namesys["/ipns/dnslink-website.com"] = newMockNamesysItem(path.FromCid(root), 0) + + ts := newTestServerWithConfig(t, backend, Config{ + NoDNSLink: false, + PublicGateways: map[string]*PublicGateway{ + "dnslink-gateway.com": { + Paths: []string{"/ipfs", "/ipns"}, + NoDNSLink: false, + DeserializedResponses: true, + }, + "dnslink-website.com": { + Paths: []string{}, + NoDNSLink: false, + DeserializedResponses: true, + }, + "gateway.com": { + Paths: []string{"/ipfs"}, + UseSubdomains: false, + NoDNSLink: true, + DeserializedResponses: true, + }, + "subdomain-gateway.com": { + Paths: []string{"/ipfs", "/ipns"}, + UseSubdomains: true, + NoDNSLink: true, + DeserializedResponses: true, + }, + }, + DeserializedResponses: true, + }) + + for _, test := range []struct { + host string + path string + status int + location string + }{ + // Barebones gateway + {"", "/ipfs/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"", "/ipfs/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"", "/ipfs/ipns/dnslink.com", http.StatusMovedPermanently, "/ipns/dnslink.com"}, + + // DNSLink Gateway with /ipfs and /ipns enabled + {"dnslink-gateway.com", "/ipfs/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"dnslink-gateway.com", "/ipfs/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"dnslink-gateway.com", "/ipfs/ipns/dnslink.com", http.StatusMovedPermanently, "/ipns/dnslink.com"}, + + // DNSLink Gateway without /ipfs and /ipns + {"dnslink-website.com", "/ipfs/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusNotFound, ""}, + {"dnslink-website.com", "/ipfs/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusNotFound, ""}, + {"dnslink-website.com", "/ipfs/ipns/dnslink.com", http.StatusNotFound, ""}, + + // Public gateway + {"gateway.com", "/ipfs/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"gateway.com", "/ipfs/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"gateway.com", "/ipfs/ipns/dnslink.com", http.StatusMovedPermanently, "/ipns/dnslink.com"}, + + // Subdomain gateway + {"subdomain-gateway.com", "/ipfs/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipfs/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"subdomain-gateway.com", "/ipfs/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR", http.StatusMovedPermanently, "/ipns/QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"}, + {"subdomain-gateway.com", "/ipfs/ipns/dnslink.com", http.StatusMovedPermanently, "/ipns/dnslink.com"}, + } { + testName := ts.URL + test.path + if test.host != "" { + testName += " " + test.host + } + + t.Run(testName, func(t *testing.T) { + req := mustNewRequest(t, http.MethodGet, ts.URL+test.path, nil) + req.Header.Set("Accept", "text/html") + if test.host != "" { + req.Host = test.host + } + resp := mustDoWithoutRedirect(t, req) + defer resp.Body.Close() + require.Equal(t, test.status, resp.StatusCode) + require.Equal(t, test.location, resp.Header.Get("Location")) + }) + } + }) } func TestDeserializedResponses(t *testing.T) { diff --git a/gateway/handler.go b/gateway/handler.go index a64d22971..6963bdebf 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "html/template" "io" "mime" "net/http" @@ -43,26 +42,6 @@ var ( noModtime = time.Unix(0, 0) // disables Last-Modified header if passed as modtime ) -// HTML-based redirect for errors which can be recovered from, but we want -// to provide hint to people that they should fix things on their end. -var redirectTemplate = template.Must(template.New("redirect").Parse(` - - - - - - - -
{{.ErrorMsg}}
(if a redirect does not happen in 10 seconds, use "{{.SuggestedPath}}" instead)
- -`)) - -type redirectTemplateData struct { - RedirectURL string - SuggestedPath string - ErrorMsg string -} - // handler is a HTTP handler that serves IPFS objects (accessible by default at /ipfs/) // (it serves requests like GET /ipfs/QmVRzPKPzNtSrEzBFm2UZfxmPAgnaLke4DMcerbsGGSaFe/link) type handler struct { @@ -903,21 +882,8 @@ func (i *handler) handleSuperfluousNamespace(w http.ResponseWriter, r *http.Requ q, _ := url.ParseQuery(r.URL.RawQuery) intendedURL = intendedURL + "?" + q.Encode() } - // return HTTP 400 (Bad Request) with HTML error page that: - // - points at correct canonical path via header - // - displays human-readable error - // - redirects to intendedURL after a short delay - - w.WriteHeader(http.StatusBadRequest) - err = redirectTemplate.Execute(w, redirectTemplateData{ - RedirectURL: intendedURL, - SuggestedPath: intendedPath.String(), - ErrorMsg: fmt.Sprintf("invalid path: %q should be %q", r.URL.Path, intendedPath.String()), - }) - if err != nil { - _, _ = w.Write([]byte(fmt.Sprintf("error during body generation: %v", err))) - } + http.Redirect(w, r, intendedURL, http.StatusMovedPermanently) return true } From 39f4588f154831de8c551d0cc71270e7c8095aad Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 19 Feb 2024 11:16:29 +0100 Subject: [PATCH 20/22] routing/http/client: avoid race by not using global http.Client --- CHANGELOG.md | 1 + routing/http/client/client.go | 58 +++++++++++++++++++----------- routing/http/client/client_test.go | 3 +- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a64b4e7..f48638807 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes: - ๐Ÿ›  `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended - ๐Ÿ›  `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. +- ๐Ÿ›  `routing/http/client`: the default HTTP client is no longer a global singleton. Therefore, using `WithUserAgent` won't modify the user agent of existing routing clients. This will also prevent potential race conditions. In addition, incompatible options will now return errors instead of silently failing. ### Security diff --git a/routing/http/client/client.go b/routing/http/client/client.go index efadc2732..16840cab5 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -28,15 +28,8 @@ import ( ) var ( - _ contentrouter.Client = &Client{} - logger = logging.Logger("routing/http/client") - defaultHTTPClient = &http.Client{ - Transport: &ResponseBodyLimitedTransport{ - RoundTripper: http.DefaultTransport, - LimitBytes: 1 << 20, - UserAgent: defaultUserAgent, - }, - } + _ contentrouter.Client = &Client{} + logger = logging.Logger("routing/http/client") ) const ( @@ -67,53 +60,75 @@ var defaultUserAgent = moduleVersion() var _ contentrouter.Client = &Client{} +func newDefaultHTTPClient(userAgent string) *http.Client { + return &http.Client{ + Transport: &ResponseBodyLimitedTransport{ + RoundTripper: http.DefaultTransport, + LimitBytes: 1 << 20, + UserAgent: userAgent, + }, + } +} + type httpClient interface { Do(req *http.Request) (*http.Response, error) } -type Option func(*Client) +type Option func(*Client) error func WithIdentity(identity crypto.PrivKey) Option { - return func(c *Client) { + return func(c *Client) error { c.identity = identity + return nil } } +// WithHTTPClient sets a custom HTTP Client to be used with [Client]. func WithHTTPClient(h httpClient) Option { - return func(c *Client) { + return func(c *Client) error { c.httpClient = h + return nil } } +// WithUserAgent sets a custom user agent to use with the HTTP Client. This modifies +// the underlying [http.Client]. Therefore, you should not use the same HTTP Client +// with multiple routing clients. +// +// This only works if using a [http.Client] with a [ResponseBodyLimitedTransport] +// set as its transport. Otherwise, an error will be returned. func WithUserAgent(ua string) Option { - return func(c *Client) { + return func(c *Client) error { if ua == "" { - return + return errors.New("empty user agent") } httpClient, ok := c.httpClient.(*http.Client) if !ok { - return + return errors.New("the http client of the Client must be a *http.Client") } transport, ok := httpClient.Transport.(*ResponseBodyLimitedTransport) if !ok { - return + return errors.New("the transport of the http client of the Client must be a *ResponseBodyLimitedTransport") } transport.UserAgent = ua + return nil } } func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) Option { - return func(c *Client) { + return func(c *Client) error { c.peerID = peerID for _, a := range addrs { c.addrs = append(c.addrs, types.Multiaddr{Multiaddr: a}) } + return nil } } func WithStreamResultsRequired() Option { - return func(c *Client) { + return func(c *Client) error { c.accepts = mediaTypeNDJSON + return nil } } @@ -122,13 +137,16 @@ func WithStreamResultsRequired() Option { func New(baseURL string, opts ...Option) (*Client, error) { client := &Client{ baseURL: baseURL, - httpClient: defaultHTTPClient, + httpClient: newDefaultHTTPClient(defaultUserAgent), clock: clock.New(), accepts: strings.Join([]string{mediaTypeNDJSON, mediaTypeJSON}, ","), } for _, opt := range opts { - opt(client) + err := opt(client) + if err != nil { + return nil, err + } } if client.identity != nil && client.peerID.Size() != 0 && !client.peerID.MatchesPublicKey(client.identity.GetPublic()) { diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index 7edd77c10..590deed11 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -109,11 +109,10 @@ func makeTestDeps(t *testing.T, clientsOpts []Option, serverOpts []server.Option server := httptest.NewServer(recordingHandler) t.Cleanup(server.Close) serverAddr := "http://" + server.Listener.Addr().String() - recordingHTTPClient := &recordingHTTPClient{httpClient: defaultHTTPClient} + recordingHTTPClient := &recordingHTTPClient{httpClient: newDefaultHTTPClient(testUserAgent)} defaultClientOpts := []Option{ WithProviderInfo(peerID, addrs), WithIdentity(identity), - WithUserAgent(testUserAgent), WithHTTPClient(recordingHTTPClient), } c, err := New(serverAddr, append(defaultClientOpts, clientsOpts...)...) From 28f1c4037ef91bb0d00f6666a44ecdfd2b6a81cd Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 20 Feb 2024 09:35:20 +0100 Subject: [PATCH 21/22] docs: prepare changelog for v0.18.0 --- CHANGELOG.md | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f48638807..785b27904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,23 +16,29 @@ The following emojis are used to highlight certain changes: ### Added +### Changed + +### Removed + +### Fixed + +### Security + +## [v0.18.0] + +### Added + - `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. - `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. - `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries. -### Changed - -### Removed - ### Fixed - ๐Ÿ›  `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended - ๐Ÿ›  `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. - ๐Ÿ›  `routing/http/client`: the default HTTP client is no longer a global singleton. Therefore, using `WithUserAgent` won't modify the user agent of existing routing clients. This will also prevent potential race conditions. In addition, incompatible options will now return errors instead of silently failing. -### Security - ## [v0.17.0] ### Added @@ -58,7 +64,7 @@ The following emojis are used to highlight certain changes: ### Fixed * `boxo/gateway` - * a panic (which is recovered) could sporadically be triggered inside a CAR request, if the right [conditions were met](https://github.com/ipfs/boxo/pull/511). + * a panic (which is recovered) could sporadically be triggered inside a CAR request, if the right [conditions were met](https://github.com/ipfs/boxo/pull/511). * no longer emits `http: superfluous response.WriteHeader` warnings when an error happens. ## [v0.15.0] @@ -162,7 +168,7 @@ The following emojis are used to highlight certain changes: * ๐Ÿ›  The `routing/http` package experienced following removals: * Server and client no longer support the experimental `Provide` method. - `ProvideBitswap` is still usable, but marked as deprecated. A protocol-agnostic + `ProvideBitswap` is still usable, but marked as deprecated. A protocol-agnostic provide mechanism is being worked on in [IPIP-378](https://github.com/ipfs/specs/pull/378). * Server no longer exports `FindProvidersPath` and `ProvidePath`. From 4f5b2c08df2f61b0ae1da53f7560c61dc6965be4 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 20 Feb 2024 09:36:09 +0100 Subject: [PATCH 22/22] chore: bump version to v0.18.0 --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index 24e6330eb..5775de3b2 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.17.0" + "version": "v0.18.0" }