From 3549af881a5878ac07d442d60b7f110a859f3853 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 15 Mar 2024 22:56:02 +0400 Subject: [PATCH] feat(share/availability): persist random samples selection in availability call (#3239) This PR introduces persistence for sample selection in random sampling. It addresses the issue by storing all failed samples into the datastore, allowing them to be reloaded on the next sampling attempt. This ensures that if the availability call fails fully or partially during the last sampling attempt, the sampling retry will use the same preselected random coordinates of shares. Provided solution is backwards compatible with previously stored empty byte slice on sampling success, allowing the change to be non-breaking for existing storage. Additionally, this PR includes basic refactoring to simplify concurrency logic in availability. It also ensures that errors returned by the call are aligned with the interface declaration in [availability.go](https://github.com/celestiaorg/celestia-node/blob/main/share/availability.go) enhancing code consistency and maintainability. Resolves https://github.com/celestiaorg/celestia-node/issues/2780 --- share/availability/light/availability.go | 97 +++++++++++-------- share/availability/light/availability_test.go | 82 +++++++++------- share/availability/light/sample.go | 34 ++++++- share/availability/light/sample_test.go | 4 +- share/availability/light/testing.go | 47 +++++++++ 5 files changed, 182 insertions(+), 82 deletions(-) diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index 1d35542344..97046f4438 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/autobatch" "github.com/ipfs/go-datastore/namespace" - ipldFormat "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/header" @@ -67,26 +66,37 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header return nil } - // do not sample over Root that has already been sampled + // load snapshot of the last sampling errors from disk key := rootKey(dah) - la.dsLk.RLock() - exists, err := la.ds.Has(ctx, key) + last, err := la.ds.Get(ctx, key) la.dsLk.RUnlock() - if err != nil || exists { + + // Check for error cases + var samples []Sample + switch { + case err == nil && len(last) == 0: + // Availability has already been validated + return nil + case err != nil && !errors.Is(err, datastore.ErrNotFound): + // Other error occurred return err + case errors.Is(err, datastore.ErrNotFound): + // No sampling result found, select new samples + samples, err = SampleSquare(len(dah.RowRoots), int(la.params.SampleAmount)) + if err != nil { + return err + } + default: + // Sampling result found, unmarshal it + samples, err = decodeSamples(last) + if err != nil { + return err + } } - log.Debugw("validate availability", "root", dah.String()) - // We assume the caller of this method has already performed basic validation on the - // given dah/root. If for some reason this has not happened, the node should panic. if err := dah.ValidateBasic(); err != nil { - log.Errorw("availability validation cannot be performed on a malformed DataAvailabilityHeader", - "err", err) - panic(err) - } - samples, err := SampleSquare(len(dah.RowRoots), int(la.params.SampleAmount)) - if err != nil { + log.Errorw("DAH validation failed", "error", err) return err } @@ -94,49 +104,50 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header // functionality is optional and must be supported by the used share.Getter. ctx = getters.WithSession(ctx) + var ( + failedSamplesLock sync.Mutex + failedSamples []Sample + ) + log.Debugw("starting sampling session", "root", dah.String()) - errs := make(chan error, len(samples)) + var wg sync.WaitGroup for _, s := range samples { + wg.Add(1) go func(s Sample) { - log.Debugw("fetching share", "root", dah.String(), "row", s.Row, "col", s.Col) - _, err := la.getter.GetShare(ctx, header, s.Row, s.Col) + defer wg.Done() + // check if the sample is available + _, err := la.getter.GetShare(ctx, header, int(s.Row), int(s.Col)) if err != nil { log.Debugw("error fetching share", "root", dah.String(), "row", s.Row, "col", s.Col) - } - // we don't really care about Share bodies at this point - // it also means we now saved the Share in local storage - select { - case errs <- err: - case <-ctx.Done(): + failedSamplesLock.Lock() + failedSamples = append(failedSamples, s) + failedSamplesLock.Unlock() } }(s) } + wg.Wait() - for range samples { - var err error - select { - case err = <-errs: - case <-ctx.Done(): - err = ctx.Err() - } - - if err != nil { - if errors.Is(err, context.Canceled) { - return err - } - log.Errorw("availability validation failed", "root", dah.String(), "err", err.Error()) - if ipldFormat.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) { - return share.ErrNotAvailable - } - return err - } + if errors.Is(ctx.Err(), context.Canceled) { + // Availability did not complete due to context cancellation, return context error instead of share.ErrNotAvailable + return ctx.Err() } + // store the result of the sampling session + bs := encodeSamples(failedSamples) la.dsLk.Lock() - err = la.ds.Put(ctx, key, []byte{}) + err = la.ds.Put(ctx, key, bs) la.dsLk.Unlock() if err != nil { - log.Errorw("storing root of successful SharesAvailable request to disk", "err", err) + log.Errorw("Failed to store sampling result", "error", err) + } + + // if any of the samples failed, return an error + if len(failedSamples) > 0 { + log.Errorw("availability validation failed", + "root", dah.String(), + "failed_samples", failedSamples, + ) + return share.ErrNotAvailable } return nil } diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 2ace654d50..68da3698b5 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -6,7 +6,6 @@ import ( "strconv" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/header/headertest" @@ -26,16 +25,18 @@ func TestSharesAvailableCaches(t *testing.T) { // cache doesn't have dah yet has, err := avail.ds.Has(ctx, rootKey(dah)) - assert.NoError(t, err) - assert.False(t, has) + require.NoError(t, err) + require.False(t, has) err = avail.SharesAvailable(ctx, eh) - assert.NoError(t, err) + require.NoError(t, err) - // is now cached - has, err = avail.ds.Has(ctx, rootKey(dah)) - assert.NoError(t, err) - assert.True(t, has) + // is now stored success result + result, err := avail.ds.Get(ctx, rootKey(dah)) + require.NoError(t, err) + failed, err := decodeSamples(result) + require.NoError(t, err) + require.Empty(t, failed) } func TestSharesAvailableHitsCache(t *testing.T) { @@ -45,19 +46,16 @@ func TestSharesAvailableHitsCache(t *testing.T) { getter, _ := GetterWithRandSquare(t, 16) avail := TestAvailability(getter) + // create new dah, that is not available by getter bServ := ipld.NewMemBlockservice() dah := availability_test.RandFillBS(t, 16, bServ) eh := headertest.RandExtendedHeaderWithRoot(t, dah) // blockstore doesn't actually have the dah err := avail.SharesAvailable(ctx, eh) - require.Error(t, err) - - // cache doesn't have dah yet, since it errored - has, err := avail.ds.Has(ctx, rootKey(dah)) - assert.NoError(t, err) - assert.False(t, has) + require.ErrorIs(t, err, share.ErrNotAvailable) + // put success result in cache err = avail.ds.Put(ctx, rootKey(dah), []byte{}) require.NoError(t, err) @@ -75,31 +73,47 @@ func TestSharesAvailableEmptyRoot(t *testing.T) { eh := headertest.RandExtendedHeaderWithRoot(t, share.EmptyRoot()) err := avail.SharesAvailable(ctx, eh) - assert.NoError(t, err) + require.NoError(t, err) } -func TestSharesAvailable(t *testing.T) { +func TestSharesAvailableFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - getter, dah := GetterWithRandSquare(t, 16) + getter, _ := GetterWithRandSquare(t, 16) avail := TestAvailability(getter) - err := avail.SharesAvailable(ctx, dah) - assert.NoError(t, err) -} - -func TestSharesAvailableFailed(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // create new dah, that is not available by getter bServ := ipld.NewMemBlockservice() dah := availability_test.RandFillBS(t, 16, bServ) eh := headertest.RandExtendedHeaderWithRoot(t, dah) - getter, _ := GetterWithRandSquare(t, 16) - avail := TestAvailability(getter) + // blockstore doesn't actually have the dah, so it should fail err := avail.SharesAvailable(ctx, eh) - assert.Error(t, err) + require.ErrorIs(t, err, share.ErrNotAvailable) + + // cache should have failed results now + result, err := avail.ds.Get(ctx, rootKey(dah)) + require.NoError(t, err) + + failed, err := decodeSamples(result) + require.NoError(t, err) + require.Len(t, failed, int(avail.params.SampleAmount)) + + // ensure that retry persists the failed samples selection + // create new getter with only the failed samples available, and add them to the onceGetter + onceGetter := newOnceGetter() + onceGetter.AddSamples(failed) + + // replace getter with the new one + avail.getter = onceGetter + + // should be able to retrieve all the failed samples now + err = avail.SharesAvailable(ctx, eh) + require.NoError(t, err) + + // onceGetter should have no more samples stored after the call + require.Empty(t, onceGetter.available) } func TestShareAvailableOverMocknet_Light(t *testing.T) { @@ -114,7 +128,7 @@ func TestShareAvailableOverMocknet_Light(t *testing.T) { net.ConnectAll() err := nd.SharesAvailable(ctx, eh) - assert.NoError(t, err) + require.NoError(t, err) } func TestGetShare(t *testing.T) { @@ -127,8 +141,8 @@ func TestGetShare(t *testing.T) { for i := range make([]bool, n) { for j := range make([]bool, n) { sh, err := getter.GetShare(ctx, eh, i, j) - assert.NotNil(t, sh) - assert.NoError(t, err) + require.NotNil(t, sh) + require.NoError(t, err) } } } @@ -163,14 +177,14 @@ func TestService_GetSharesByNamespace(t *testing.T) { require.NoError(t, err) require.NoError(t, shares.Verify(root, randNamespace)) flattened := shares.Flatten() - assert.Len(t, flattened, tt.expectedShareCount) + require.Len(t, flattened, tt.expectedShareCount) for _, value := range flattened { - assert.Equal(t, randNamespace, share.GetNamespace(value)) + require.Equal(t, randNamespace, share.GetNamespace(value)) } if tt.expectedShareCount > 1 { // idx1 is always smaller than idx2 - assert.Equal(t, randShares[idx1], flattened[0]) - assert.Equal(t, randShares[idx2], flattened[1]) + require.Equal(t, randShares[idx1], flattened[0]) + require.Equal(t, randShares[idx2], flattened[1]) } }) t.Run("last two rows of a 4x4 square that have the same namespace have valid NMT proofs", func(t *testing.T) { diff --git a/share/availability/light/sample.go b/share/availability/light/sample.go index e66ff9aafe..e09a46a5fc 100644 --- a/share/availability/light/sample.go +++ b/share/availability/light/sample.go @@ -3,12 +3,14 @@ package light import ( crand "crypto/rand" + "encoding/binary" + "errors" "math/big" ) // Sample is a point in 2D space over square. type Sample struct { - Row, Col int + Row, Col uint16 } // SampleSquare randomly picks *num* unique points from the given *width* square @@ -66,11 +68,37 @@ func (ss *squareSampler) samples() []Sample { return samples } -func randInt(max int) int { +func randInt(max int) uint16 { n, err := crand.Int(crand.Reader, big.NewInt(int64(max))) if err != nil { panic(err) // won't panic as rand.Reader is endless } - return int(n.Int64()) + return uint16(n.Uint64()) +} + +// encodeSamples encodes a slice of samples into a byte slice using little endian encoding. +func encodeSamples(samples []Sample) []byte { + bs := make([]byte, 0, len(samples)*4) + for _, s := range samples { + bs = binary.LittleEndian.AppendUint16(bs, s.Row) + bs = binary.LittleEndian.AppendUint16(bs, s.Col) + } + return bs +} + +// decodeSamples decodes a byte slice into a slice of samples. +func decodeSamples(bs []byte) ([]Sample, error) { + if len(bs)%4 != 0 { + return nil, errors.New("invalid byte slice length") + } + + samples := make([]Sample, 0, len(bs)/4) + for i := 0; i < len(bs); i += 4 { + samples = append(samples, Sample{ + Row: binary.LittleEndian.Uint16(bs[i : i+2]), + Col: binary.LittleEndian.Uint16(bs[i+2 : i+4]), + }) + } + return samples, nil } diff --git a/share/availability/light/sample_test.go b/share/availability/light/sample_test.go index 7092b99e83..8d7656e688 100644 --- a/share/availability/light/sample_test.go +++ b/share/availability/light/sample_test.go @@ -21,8 +21,8 @@ func TestSampleSquare(t *testing.T) { assert.Len(t, ss, tt.samples) // check points are within width for _, s := range ss { - assert.Less(t, s.Row, tt.width) - assert.Less(t, s.Col, tt.width) + assert.Less(t, int(s.Row), tt.width) + assert.Less(t, int(s.Col), tt.width) } // checks samples are not equal for i, s1 := range ss { diff --git a/share/availability/light/testing.go b/share/availability/light/testing.go index 9efc9ff14a..b6251b4fbd 100644 --- a/share/availability/light/testing.go +++ b/share/availability/light/testing.go @@ -1,11 +1,15 @@ package light import ( + "context" + "sync" "testing" "github.com/ipfs/boxo/blockservice" "github.com/ipfs/go-datastore" + "github.com/celestiaorg/rsmt2d" + "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" @@ -58,3 +62,46 @@ func SubNetNode(sn *availability_test.SubNet) *availability_test.TestNode { sn.AddNode(nd) return nd } + +type onceGetter struct { + *sync.Mutex + available map[Sample]struct{} +} + +func newOnceGetter() onceGetter { + return onceGetter{ + Mutex: &sync.Mutex{}, + available: make(map[Sample]struct{}), + } +} + +func (m onceGetter) AddSamples(samples []Sample) { + m.Lock() + defer m.Unlock() + for _, s := range samples { + m.available[s] = struct{}{} + } +} + +func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (share.Share, error) { + m.Lock() + defer m.Unlock() + s := Sample{Row: uint16(row), Col: uint16(col)} + if _, ok := m.available[s]; ok { + delete(m.available, s) + return share.Share{}, nil + } + return share.Share{}, share.ErrNotAvailable +} + +func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { + panic("not implemented") +} + +func (m onceGetter) GetSharesByNamespace( + _ context.Context, + _ *header.ExtendedHeader, + _ share.Namespace, +) (share.NamespacedShares, error) { + panic("not implemented") +}