Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockservice: move session handling as part of the interface #563

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 59 additions & 86 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,23 @@ type BlockService interface {

// DeleteBlock deletes the given block from the blockservice.
DeleteBlock(ctx context.Context, o cid.Cid) error
}

// BoundedBlockService is a Blockservice bounded via strict multihash Allowlist.
type BoundedBlockService interface {
BlockService

Allowlist() verifcid.Allowlist
// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a [fetcher.SessionExchange], a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
NewSession(context.Context) BlockGetter

// ContextWithSession is creates a context with an embded session,
// future calls to [BlockService.GetBlock], [BlockService.GetBlocks] and [BlockService.NewSession]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
ContextWithSession(ctx context.Context) context.Context
}

var _ BoundedBlockService = (*blockService)(nil)

type blockService struct {
allowlist verifcid.Allowlist
blockstore blockstore.Blockstore
Expand Down Expand Up @@ -133,24 +139,25 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
return s.allowlist
}

// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a SessionExchange, a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
func NewSession(ctx context.Context, bs BlockService) *Session {
ses := grabSessionFromContext(ctx, bs)
func (s *blockService) NewSession(ctx context.Context) BlockGetter {
ses := s.grabSessionFromContext(ctx)
if ses != nil {
return ses
}

return newSession(ctx, bs)
return s.newSession(ctx)
}

// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
func newSession(ctx context.Context, bs BlockService) *Session {
return &Session{bs: bs, sesctx: ctx}
func (s *blockService) newSession(ctx context.Context) *session {
return &session{bs: s, sesctx: ctx}
}

func (s *blockService) ContextWithSession(ctx context.Context) context.Context {
if s.grabSessionFromContext(ctx) != nil {
return ctx
}
return context.WithValue(ctx, s, s.newSession(ctx))
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
Expand Down Expand Up @@ -232,30 +239,27 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if ses := grabSessionFromContext(ctx, s); ses != nil {
if ses := s.grabSessionFromContext(ctx); ses != nil {
return ses.GetBlock(ctx, c)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s, s.getExchangeFetcher)
return s.getBlock(ctx, c, s.getExchangeFetcher)
}

// Look at what I have to do, no interface covariance :'(
func (s *blockService) getExchangeFetcher() exchange.Fetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security
func (s *blockService) getBlock(ctx context.Context, c cid.Cid, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(s.allowlist, c) // hash security
if err != nil {
return nil, err
}

blockstore := bs.Blockstore()

block, err := blockstore.Get(ctx, c)
block, err := s.blockstore.Get(ctx, c)
switch {
case err == nil:
return block, nil
Expand All @@ -277,12 +281,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = blockstore.Put(ctx, blk)
err = s.blockstore.Put(ctx, blk)
if err != nil {
return nil, err
}
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
if s.exchange != nil {
err = s.exchange.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
Expand All @@ -295,28 +299,26 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
if ses := grabSessionFromContext(ctx, s); ses != nil {
if ses := s.grabSessionFromContext(ctx); ses != nil {
return ses.GetBlocks(ctx, ks)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s, s.getExchangeFetcher)
return s.getBlocks(ctx, ks, s.getExchangeFetcher)
}

func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
func (s *blockService) getBlocks(ctx context.Context, ks []cid.Cid, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
defer close(out)

allowlist := grabAllowlistFromBlockservice(blockservice)

var lastAllValidIndex int
var c cid.Cid
for lastAllValidIndex, c = range ks {
if err := verifcid.ValidateCid(allowlist, c); err != nil {
if err := verifcid.ValidateCid(s.allowlist, c); err != nil {
break
}
}
Expand All @@ -327,7 +329,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements
for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements
// hash security
if err := verifcid.ValidateCid(allowlist, c); err == nil {
if err := verifcid.ValidateCid(s.allowlist, c); err == nil {
ks2 = append(ks2, c)
} else {
logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err)
Expand All @@ -336,11 +338,9 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
ks = ks2
}

bs := blockservice.Blockstore()

var misses []cid.Cid
for _, c := range ks {
hit, err := bs.Get(ctx, c)
hit, err := s.blockstore.Get(ctx, c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -363,7 +363,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
return
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
Expand All @@ -378,16 +377,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
}

// write in the blockstore for caching
err = bs.Put(ctx, b)
err = s.blockstore.Put(ctx, b)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

if ex != nil {
if s.exchange != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
err = s.exchange.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
Expand Down Expand Up @@ -425,16 +424,16 @@ func (s *blockService) Close() error {
return s.exchange.Close()
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
// session is a helper type to provide higher level access to bitswap sessions
type session struct {
createSession sync.Once
bs BlockService
bs *blockService
ses exchange.Fetcher
sesctx context.Context
}

// grabSession is used to lazily create sessions.
func (s *Session) grabSession() exchange.Fetcher {
func (s *session) grabSession() exchange.Fetcher {
s.createSession.Do(func() {
defer func() {
s.sesctx = nil // early gc
Expand All @@ -457,64 +456,38 @@ func (s *Session) grabSession() exchange.Fetcher {
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
func (s *session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s.bs, s.grabSession)
return s.bs.getBlock(ctx, c, s.grabSession)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
func (s *session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "session.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s.bs, s.grabSession)
}

var _ BlockGetter = (*Session)(nil)

// ContextWithSession is a helper which creates a context with an embded session,
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
if grabSessionFromContext(ctx, bs) != nil {
return ctx
}
return EmbedSessionInContext(ctx, newSession(ctx, bs))
return s.bs.getBlocks(ctx, ks, s.grabSession)
}

// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
return context.WithValue(ctx, ses.bs, ses)
}
var _ BlockGetter = (*session)(nil)

// grabSessionFromContext returns nil if the session was not found
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
// if this API is public it is too easy to forget to pass a [BlockService] or [session] object around in your app.
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
s := ctx.Value(bs)
func (s *blockService) grabSessionFromContext(ctx context.Context) *session {
ss := ctx.Value(s)
if s == nil {
return nil
}

ss, ok := s.(*Session)
sss, ok := ss.(*session)
if !ok {
// idk what to do here, that kinda sucks, giveup
return nil
}

return ss
}

// grabAllowlistFromBlockservice never returns nil
func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
if bbs, ok := bs.(BoundedBlockService); ok {
return bbs.Allowlist()
}
return verifcid.DefaultAllowlist
return sss
}
22 changes: 11 additions & 11 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestExchangeWrite(t *testing.T) {

for name, fetcher := range map[string]BlockGetter{
"blockservice": bserv,
"session": NewSession(context.Background(), bserv),
"session": bserv.NewSession(context.Background()),
} {
t.Run(name, func(t *testing.T) {
// GetBlock
Expand Down Expand Up @@ -133,9 +133,9 @@ func TestLazySessionInitialization(t *testing.T) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
session := offline.Exchange(bstore2)
ses := offline.Exchange(bstore2)
exch := offline.Exchange(bstore3)
sessionExch := &fakeSessionExchange{Interface: exch, session: session}
sessionExch := &fakeSessionExchange{Interface: exch, session: ses}
bservSessEx := New(bstore, sessionExch, WriteThrough())
bgen := butil.NewBlockGenerator()

Expand All @@ -149,12 +149,12 @@ func TestLazySessionInitialization(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = session.NotifyNewBlocks(ctx, block2)
err = ses.NotifyNewBlocks(ctx, block2)
if err != nil {
t.Fatal(err)
}

bsession := NewSession(ctx, bservSessEx)
bsession := bservSessEx.NewSession(ctx).(*session)
if bsession.ses != nil {
t.Fatal("Session exchange should not instantiated session immediately")
}
Expand All @@ -175,7 +175,7 @@ func TestLazySessionInitialization(t *testing.T) {
if returnedBlock.Cid() != block2.Cid() {
t.Fatal("Got incorrect block")
}
if bsession.ses != session {
if bsession.ses != ses {
t.Fatal("Should have initialized session to fetch block")
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestNilExchange(t *testing.T) {

bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bserv := New(bs, nil, WriteThrough())
sess := NewSession(ctx, bserv)
sess := bserv.NewSession(ctx)
_, err := sess.GetBlock(ctx, block.Cid())
if !ipld.IsNotFound(err) {
t.Fatal("expected block to not be found")
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestAllowlist(t *testing.T) {

blockservice := New(bs, nil, WithAllowlist(verifcid.NewAllowlist(map[uint64]bool{multihash.BLAKE3: true})))
check(blockservice.GetBlock)
check(NewSession(ctx, blockservice).GetBlock)
check(blockservice.NewSession(ctx).GetBlock)
}

type fakeIsNewSessionCreateExchange struct {
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestContextSession(t *testing.T) {

service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx)

ctx = ContextWithSession(ctx, service)
ctx = service.ContextWithSession(ctx)

b, err := service.GetBlock(ctx, block1.Cid())
a.NoError(err)
Expand All @@ -348,8 +348,8 @@ func TestContextSession(t *testing.T) {
a.False(sesEx.newSessionWasCalled, "session should be reused in context")

a.Equal(
NewSession(ctx, service),
NewSession(ContextWithSession(ctx, service), service),
service.NewSession(ctx),
service.NewSession(service.ContextWithSession(ctx)),
"session must be deduped in all invocations on the same context",
)
}
Loading
Loading