From 9024839d720133d907ebf9b02b462f2de6aacb3e Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 26 Oct 2024 08:32:12 +0200 Subject: [PATCH] refactor(share): GetShare -> GetSamples --- blob/service_test.go | 7 +- nodebuilder/share/mocks/api.go | 30 ++++---- nodebuilder/share/share.go | 14 +++- share/availability/light/availability.go | 70 +++++++------------ share/availability/light/availability_test.go | 66 +++++++++++------ share/eds/accessor.go | 1 + share/eds/validation.go | 7 +- share/shwap/getter.go | 6 +- share/shwap/getters/cascade.go | 21 ++---- share/shwap/getters/cascade_test.go | 2 +- share/shwap/getters/mock/getter.go | 14 ++-- share/shwap/getters/testing.go | 46 +++++++----- share/shwap/p2p/bitswap/getter.go | 65 ++++++++--------- share/shwap/p2p/bitswap/sample_block.go | 4 +- share/shwap/p2p/bitswap/sample_block_test.go | 5 +- share/shwap/p2p/shrex/shrex_getter/shrex.go | 4 +- share/shwap/sample.go | 13 ++-- share/shwap/sample_id.go | 27 ++++++- share/shwap/sample_id_test.go | 16 ++++- store/file/ods.go | 7 +- store/file/ods_q4.go | 7 +- store/getter.go | 34 +++++---- 22 files changed, 269 insertions(+), 197 deletions(-) diff --git a/blob/service_test.go b/blob/service_test.go index 2cbfcf7b03..9b47b74cb8 100644 --- a/blob/service_test.go +++ b/blob/service_test.go @@ -902,10 +902,9 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) * nd, err := eds.NamespaceData(ctx, accessor, ns) return nd, err }) - shareGetter.EXPECT().GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, row, col int) (libshare.Share, error) { - s, err := accessor.Sample(ctx, row, col) - return s.Share, err + shareGetter.EXPECT().GetSamples(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) { + return smpls, nil }) // create header and put it into the store diff --git a/nodebuilder/share/mocks/api.go b/nodebuilder/share/mocks/api.go index cccc81a452..84866bf784 100644 --- a/nodebuilder/share/mocks/api.go +++ b/nodebuilder/share/mocks/api.go @@ -53,6 +53,21 @@ func (mr *MockModuleMockRecorder) GetEDS(arg0, arg1 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEDS", reflect.TypeOf((*MockModule)(nil).GetEDS), arg0, arg1) } +// GetNamespaceData mocks base method. +func (m *MockModule) GetNamespaceData(arg0 context.Context, arg1 uint64, arg2 share0.Namespace) (shwap.NamespaceData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNamespaceData", arg0, arg1, arg2) + ret0, _ := ret[0].(shwap.NamespaceData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNamespaceData indicates an expected call of GetNamespaceData. +func (mr *MockModuleMockRecorder) GetNamespaceData(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaceData", reflect.TypeOf((*MockModule)(nil).GetNamespaceData), arg0, arg1, arg2) +} + // GetRange mocks base method. func (m *MockModule) GetRange(arg0 context.Context, arg1 uint64, arg2, arg3 int) (*share.GetRangeResult, error) { m.ctrl.T.Helper() @@ -83,21 +98,6 @@ func (mr *MockModuleMockRecorder) GetShare(arg0, arg1, arg2, arg3 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShare", reflect.TypeOf((*MockModule)(nil).GetShare), arg0, arg1, arg2, arg3) } -// GetSharesByNamespace mocks base method. -func (m *MockModule) GetSharesByNamespace(arg0 context.Context, arg1 uint64, arg2 share0.Namespace) (shwap.NamespaceData, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetSharesByNamespace", arg0, arg1, arg2) - ret0, _ := ret[0].(shwap.NamespaceData) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetSharesByNamespace indicates an expected call of GetSharesByNamespace. -func (mr *MockModuleMockRecorder) GetSharesByNamespace(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSharesByNamespace", reflect.TypeOf((*MockModule)(nil).GetSharesByNamespace), arg0, arg1, arg2) -} - // SharesAvailable mocks base method. func (m *MockModule) SharesAvailable(arg0 context.Context, arg1 uint64) error { m.ctrl.T.Helper() diff --git a/nodebuilder/share/share.go b/nodebuilder/share/share.go index 8a3efcc757..9a904b48fd 100644 --- a/nodebuilder/share/share.go +++ b/nodebuilder/share/share.go @@ -112,12 +112,24 @@ type module struct { hs headerServ.Module } +// TODO(@Wondertan): break func (m module) GetShare(ctx context.Context, height uint64, row, col int) (libshare.Share, error) { header, err := m.hs.GetByHeight(ctx, height) if err != nil { return libshare.Share{}, err } - return m.getter.GetShare(ctx, header, row, col) + + idx, err := shwap.SampleIndexFromCoordinates(row, col, len(header.DAH.RowRoots)) + if err != nil { + return libshare.Share{}, err + } + + smpls, err := m.getter.GetSamples(ctx, header, []shwap.SampleIndex{idx}) + if err != nil { + return libshare.Share{}, err + } + + return smpls[0].Share, nil } func (m module) GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedDataSquare, error) { diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index 8fd0449065..f48ea51916 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "sync" - "time" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/go-datastore" @@ -114,59 +113,34 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header return nil } - var ( - mutex sync.Mutex - failedSamples []Sample - wg sync.WaitGroup - ) - - log.Debugw("starting sampling session", "height", header.Height()) - - // remove one second from the deadline to ensure we have enough time to process the results - samplingCtx, cancel := context.WithCancel(ctx) - if deadline, ok := ctx.Deadline(); ok { - samplingCtx, cancel = context.WithDeadline(ctx, deadline.Add(-time.Second)) - } - defer cancel() - - // Concurrently sample shares - for _, s := range samples.Remaining { - wg.Add(1) - go func(s Sample) { - defer wg.Done() - _, err := la.getter.GetShare(samplingCtx, header, s.Row, s.Col) - mutex.Lock() - defer mutex.Unlock() - if err != nil { - log.Debugw("error fetching share", "height", header.Height(), "row", s.Row, "col", s.Col) - failedSamples = append(failedSamples, s) - } else { - samples.Available = append(samples.Available, s) - } - }(s) - } - wg.Wait() + log.Debugw("starting sampling session", "root", dah.String()) - // Update remaining samples with failed ones - samples.Remaining = failedSamples + idxs := make([]shwap.SampleIndex, len(samples.Available)) + for i, s := range samples.Available { + idx, err := shwap.SampleIndexFromCoordinates(int(s.Row), int(s.Col), len(dah.RowRoots)) + if err != nil { + return err + } - // Store the updated sampling result - updatedData, err := json.Marshal(samples) - if err != nil { - return err - } - la.dsLk.Lock() - err = la.ds.Put(ctx, key, updatedData) - la.dsLk.Unlock() - if err != nil { - return fmt.Errorf("store sampling result: %w", err) + idxs[i] = idx } + smpls, err := la.getter.GetSamples(ctx, header, idxs) if errors.Is(ctx.Err(), context.Canceled) { // Availability did not complete due to context cancellation, return context error instead of // share.ErrNotAvailable return ctx.Err() } + if len(smpls) == 0 { + return share.ErrNotAvailable + } + + var failedSamples []Sample + for i, smpl := range smpls { + if smpl.IsEmpty() { + failedSamples = append(failedSamples, samples.Available[i]) + } + } // if any of the samples failed, return an error if len(failedSamples) > 0 { @@ -210,7 +184,11 @@ func (la *ShareAvailability) Prune(ctx context.Context, h *header.ExtendedHeader // delete stored samples for _, sample := range result.Available { - blk, err := bitswap.NewEmptySampleBlock(h.Height(), sample.Row, sample.Col, len(h.DAH.RowRoots)) + idx, err := shwap.SampleIndexFromCoordinates(sample.Row, sample.Col, len(h.DAH.RowRoots)) + if err != nil { + return err + } + blk, err := bitswap.NewEmptySampleBlock(h.Height(), idx, len(h.DAH.RowRoots)) if err != nil { return fmt.Errorf("marshal sample ID: %w", err) } diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 5ec6e8e39e..d7e08e0490 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -21,11 +21,13 @@ import ( "github.com/stretchr/testify/require" libshare "github.com/celestiaorg/go-square/v2/share" + "github.com/celestiaorg/nmt" "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/share/shwap/getters/mock" @@ -38,22 +40,32 @@ func TestSharesAvailableSuccess(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eds := edstest.RandEDS(t, 16) - roots, err := share.NewAxisRoots(eds) + square := edstest.RandEDS(t, 16) + roots, err := share.NewAxisRoots(square) require.NoError(t, err) eh := headertest.RandExtendedHeaderWithRoot(t, roots) getter := mock.NewMockGetter(gomock.NewController(t)) getter.EXPECT(). - GetShare(gomock.Any(), eh, gomock.Any(), gomock.Any()). + GetSamples(gomock.Any(), eh, gomock.Any()). DoAndReturn( - func(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) { - rawSh := eds.GetCell(uint(row), uint(col)) - sh, err := libshare.NewShare(rawSh) - if err != nil { - return libshare.Share{}, err + func(_ context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) { + acc := eds.Rsmt2D{ExtendedDataSquare: square} + smpls := make([]shwap.Sample, len(indices)) + for i, idx := range indices { + rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots)) + if err != nil { + return nil, err + } + + smpl, err := acc.Sample(ctx, rowIdx, colIdx) + if err != nil { + return nil, err + } + + smpls[i] = smpl } - return *sh, nil + return smpls, nil }). AnyTimes() @@ -87,8 +99,8 @@ func TestSharesAvailableSkipSampled(t *testing.T) { // Create a getter that always returns ErrNotFound getter := mock.NewMockGetter(gomock.NewController(t)) getter.EXPECT(). - GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(libshare.Share{}, shrex.ErrNotFound). + GetSamples(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, shrex.ErrNotFound). AnyTimes() ds := datastore.NewMapDatastore() @@ -147,8 +159,8 @@ func TestSharesAvailableFailed(t *testing.T) { // Getter doesn't have the eds, so it should fail for all samples getter.EXPECT(). - GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(libshare.Share{}, shrex.ErrNotFound). + GetSamples(gomock.Any(), gomock.Any(), gomock.Any()). + Return(make([]shwap.Sample, avail.params.SampleAmount), shrex.ErrNotFound). AnyTimes() err = avail.SharesAvailable(ctx, eh) require.ErrorIs(t, err, share.ErrNotAvailable) @@ -185,7 +197,7 @@ func TestSharesAvailableFailed(t *testing.T) { // onceGetter should have no more samples stored after the call successfulGetter.checkOnce(t) - require.ElementsMatch(t, failed.Remaining, successfulGetter.sampledList()) + require.ElementsMatch(t, failed.Remaining, len(successfulGetter.sampled)) } func TestParallelAvailability(t *testing.T) { @@ -213,7 +225,7 @@ func TestParallelAvailability(t *testing.T) { }() } wg.Wait() - require.Len(t, successfulGetter.sampledList(), int(avail.params.SampleAmount)) + require.Len(t, len(successfulGetter.sampled), int(avail.params.SampleAmount)) // Verify that the sampling result is stored with all samples marked as available resultData, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots)) @@ -249,14 +261,24 @@ func (g onceGetter) checkOnce(t *testing.T) { } } -func (g onceGetter) sampledList() []Sample { - g.Lock() - defer g.Unlock() - samples := make([]Sample, 0, len(g.sampled)) - for s := range g.sampled { - samples = append(samples, s) +func (m onceGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) { + m.Lock() + defer m.Unlock() + + smpls := make([]shwap.Sample, 0, len(indices)) + for _, idx := range indices { + rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots)) + if err != nil { + return nil, err + } + + s := Sample{Row: rowIdx, Col: colIdx} + if _, ok := m.sampled[s]; ok { + delete(m.sampled, s) + smpls = append(smpls, shwap.Sample{Proof: &nmt.Proof{}}) + } } - return samples + return smpls, nil } func (g onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) { diff --git a/share/eds/accessor.go b/share/eds/accessor.go index 07eb6db542..81f55fcf64 100644 --- a/share/eds/accessor.go +++ b/share/eds/accessor.go @@ -25,6 +25,7 @@ type Accessor interface { // Sample returns share and corresponding proof for row and column indices. Implementation can // choose which axis to use for proof. Chosen axis for proof should be indicated in the returned // Sample. + // TODO(@Wondertan): change to SampleIndex Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) // AxisHalf returns half of shares axis of the given type and index. Side is determined by // implementation. Implementations should indicate the side in the returned AxisHalf. diff --git a/share/eds/validation.go b/share/eds/validation.go index 845a5bac77..0d513911d9 100644 --- a/share/eds/validation.go +++ b/share/eds/validation.go @@ -35,7 +35,12 @@ func (f validation) Size(ctx context.Context) int { } func (f validation) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { - _, err := shwap.NewSampleID(1, rowIdx, colIdx, f.Size(ctx)) + idx, err := shwap.SampleIndexFromCoordinates(rowIdx, colIdx, f.Size(ctx)) + if err != nil { + return shwap.Sample{}, err + } + + _, err = shwap.NewSampleID(1, idx, f.Size(ctx)) if err != nil { return shwap.Sample{}, fmt.Errorf("sample validation: %w", err) } diff --git a/share/shwap/getter.go b/share/shwap/getter.go index 21ac2ec49f..f88a10debe 100644 --- a/share/shwap/getter.go +++ b/share/shwap/getter.go @@ -29,8 +29,10 @@ var ( // //go:generate mockgen -destination=getters/mock/getter.go -package=mock . Getter type Getter interface { - // GetShare gets a Share by coordinates in EDS. - GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (libshare.Share, error) + // GetSamples gets samples by their indices. + // Returns Sample slice with requested number of samples in the requested order. + // May return partial response with some samples being empty if they weren't found. + GetSamples(ctx context.Context, header *header.ExtendedHeader, indices []SampleIndex) ([]Sample, error) // GetEDS gets the full EDS identified by the given extended header. GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) diff --git a/share/shwap/getters/cascade.go b/share/shwap/getters/cascade.go index 962adffd3c..3518ce8eea 100644 --- a/share/shwap/getters/cascade.go +++ b/share/shwap/getters/cascade.go @@ -41,24 +41,15 @@ func NewCascadeGetter(getters []shwap.Getter) *CascadeGetter { } } -// GetShare gets a share from any of registered shwap.Getters in cascading order. -func (cg *CascadeGetter) GetShare( - ctx context.Context, header *header.ExtendedHeader, row, col int, -) (libshare.Share, error) { - ctx, span := tracer.Start(ctx, "cascade/get-share", trace.WithAttributes( - attribute.Int("row", row), - attribute.Int("col", col), +// GetSamples gets samples from any of registered shwap.Getters in cascading order. +func (cg *CascadeGetter) GetSamples(ctx context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) { + ctx, span := tracer.Start(ctx, "cascade/get-samples", trace.WithAttributes( + attribute.Int("amount", len(indices)), )) defer span.End() - upperBound := len(header.DAH.RowRoots) - if row >= upperBound || col >= upperBound { - err := shwap.ErrOutOfBounds - span.RecordError(err) - return libshare.Share{}, err - } - get := func(ctx context.Context, get shwap.Getter) (libshare.Share, error) { - return get.GetShare(ctx, header, row, col) + get := func(ctx context.Context, get shwap.Getter) ([]shwap.Sample, error) { + return get.GetSamples(ctx, hdr, indices) } return cascadeGetters(ctx, cg.getters, get) diff --git a/share/shwap/getters/cascade_test.go b/share/shwap/getters/cascade_test.go index a23568006f..39e9999fa1 100644 --- a/share/shwap/getters/cascade_test.go +++ b/share/shwap/getters/cascade_test.go @@ -30,7 +30,7 @@ func TestCascadeGetter(t *testing.T) { getter := NewCascadeGetter(getters) t.Run("GetShare", func(t *testing.T) { for _, eh := range headers { - sh, err := getter.GetShare(ctx, eh, 0, 0) + sh, err := getter.GetSamples(ctx, eh, []shwap.SampleIndex{0}) assert.NoError(t, err) assert.NotEmpty(t, sh) } diff --git a/share/shwap/getters/mock/getter.go b/share/shwap/getters/mock/getter.go index 856802d75d..235b5a1fc7 100644 --- a/share/shwap/getters/mock/getter.go +++ b/share/shwap/getters/mock/getter.go @@ -68,17 +68,17 @@ func (mr *MockGetterMockRecorder) GetNamespaceData(arg0, arg1, arg2 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaceData", reflect.TypeOf((*MockGetter)(nil).GetNamespaceData), arg0, arg1, arg2) } -// GetShare mocks base method. -func (m *MockGetter) GetShare(arg0 context.Context, arg1 *header.ExtendedHeader, arg2, arg3 int) (share.Share, error) { +// GetSamples mocks base method. +func (m *MockGetter) GetSamples(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 []shwap.SampleIndex) ([]shwap.Sample, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetShare", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(share.Share) + ret := m.ctrl.Call(m, "GetSamples", arg0, arg1, arg2) + ret0, _ := ret[0].([]shwap.Sample) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetShare indicates an expected call of GetShare. -func (mr *MockGetterMockRecorder) GetShare(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +// GetSamples indicates an expected call of GetSamples. +func (mr *MockGetterMockRecorder) GetSamples(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShare", reflect.TypeOf((*MockGetter)(nil).GetShare), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSamples", reflect.TypeOf((*MockGetter)(nil).GetSamples), arg0, arg1, arg2) } diff --git a/share/shwap/getters/testing.go b/share/shwap/getters/testing.go index a8fdd53ee6..3d7338a2fb 100644 --- a/share/shwap/getters/testing.go +++ b/share/shwap/getters/testing.go @@ -14,43 +14,51 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" "github.com/celestiaorg/celestia-node/share/shwap" ) // TestGetter provides a testing SingleEDSGetter and the root of the EDS it holds. func TestGetter(t *testing.T) (shwap.Getter, *header.ExtendedHeader) { - eds := edstest.RandEDS(t, 8) - roots, err := share.NewAxisRoots(eds) + square := edstest.RandEDS(t, 8) + roots, err := share.NewAxisRoots(square) eh := headertest.RandExtendedHeaderWithRoot(t, roots) require.NoError(t, err) return &SingleEDSGetter{ - EDS: eds, + EDS: eds.Rsmt2D{ExtendedDataSquare: square}, }, eh } // SingleEDSGetter contains a single EDS where data is retrieved from. // Its primary use is testing, and GetNamespaceData is not supported. type SingleEDSGetter struct { - EDS *rsmt2d.ExtendedDataSquare + EDS eds.Rsmt2D } -// GetShare gets a share from a kept EDS if exist and if the correct root is given. -func (seg *SingleEDSGetter) GetShare( - _ context.Context, - header *header.ExtendedHeader, - row, col int, -) (libshare.Share, error) { - err := seg.checkRoots(header.DAH) +// GetSamples get samples from a kept EDS if exist and if the correct root is given. +func (seg *SingleEDSGetter) GetSamples(ctx context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) { + err := seg.checkRoots(hdr.DAH) if err != nil { - return libshare.Share{}, err + return nil, err } - rawSh := seg.EDS.GetCell(uint(row), uint(col)) - sh, err := libshare.NewShare(rawSh) - if err != nil { - return libshare.Share{}, err + + smpls := make([]shwap.Sample, len(indices)) + for i, idx := range indices { + rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots)) + if err != nil { + return nil, err + } + + smpl, err := seg.EDS.Sample(ctx, rowIdx, colIdx) + if err != nil { + return nil, err + } + + smpls[i] = smpl } - return *sh, nil + + return smpls, nil } // GetEDS returns a kept EDS if the correct root is given. @@ -62,7 +70,7 @@ func (seg *SingleEDSGetter) GetEDS( if err != nil { return nil, err } - return seg.EDS, nil + return seg.EDS.ExtendedDataSquare, nil } // GetNamespaceData returns NamespacedShares from a kept EDS if the correct root is given. @@ -72,7 +80,7 @@ func (seg *SingleEDSGetter) GetNamespaceData(context.Context, *header.ExtendedHe } func (seg *SingleEDSGetter) checkRoots(roots *share.AxisRoots) error { - dah, err := da.NewDataAvailabilityHeader(seg.EDS) + dah, err := da.NewDataAvailabilityHeader(seg.EDS.ExtendedDataSquare) if err != nil { return err } diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index c3ffde7965..e7374c6a30 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -66,27 +66,24 @@ func (g *Getter) Stop() { g.cancel() } -// GetShares uses [SampleBlock] and [Fetch] to get and verify samples for given coordinates. -// TODO(@Wondertan): Rework API to get coordinates as a single param to make it ergonomic. -func (g *Getter) GetShares( +// GetSamples uses [SampleBlock] and [Fetch] to get and verify samples for given coordinates. +func (g *Getter) GetSamples( ctx context.Context, hdr *header.ExtendedHeader, - rowIdxs, colIdxs []int, -) ([]libshare.Share, error) { - if len(rowIdxs) != len(colIdxs) { - return nil, fmt.Errorf("row indecies and col indices must be same length") + indices []shwap.SampleIndex, +) ([]shwap.Sample, error) { + if len(indices) == 0 { + return nil, fmt.Errorf("no sample indicies to fetch") } - if len(rowIdxs) == 0 { - return nil, fmt.Errorf("empty coordinates") - } - - ctx, span := tracer.Start(ctx, "get-shares") + ctx, span := tracer.Start(ctx, "get-samples", trace.WithAttributes( + attribute.Int("amount", len(indices)), + )) defer span.End() - blks := make([]Block, len(rowIdxs)) - for i, rowIdx := range rowIdxs { - sid, err := NewEmptySampleBlock(hdr.Height(), rowIdx, colIdxs[i], len(hdr.DAH.RowRoots)) + blks := make([]Block, len(indices)) + for i, idx := range indices { + sid, err := NewEmptySampleBlock(hdr.Height(), idx, len(hdr.DAH.RowRoots)) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, "NewEmptySampleBlock") @@ -99,36 +96,32 @@ func (g *Getter) GetShares( ses := g.session(ctx, hdr) err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses)) if err != nil { + // handle partial fetches + var fetched int + smpls := make([]shwap.Sample, len(blks)) + for i, blk := range blks { + if smpl := blk.(*SampleBlock).Container; !smpl.IsEmpty() { + smpls[i] = smpl + fetched++ + } + } + span.RecordError(err) span.SetStatus(codes.Error, "Fetch") + if fetched > 0 { + span.SetAttributes(attribute.Int("fetched", fetched)) + return smpls, err + } return nil, err } - shares := make([]libshare.Share, len(blks)) + smpls := make([]shwap.Sample, len(blks)) for i, blk := range blks { - shares[i] = blk.(*SampleBlock).Container.Share + smpls[i] = blk.(*SampleBlock).Container } span.SetStatus(codes.Ok, "") - return shares, nil -} - -// GetShare uses [GetShare] to fetch and verify single share by the given coordinates. -func (g *Getter) GetShare( - ctx context.Context, - hdr *header.ExtendedHeader, - row, col int, -) (libshare.Share, error) { - shrs, err := g.GetShares(ctx, hdr, []int{row}, []int{col}) - if err != nil { - return libshare.Share{}, err - } - - if len(shrs) != 1 { - return libshare.Share{}, fmt.Errorf("expected 1 share row, got %d", len(shrs)) - } - - return shrs[0], nil + return smpls, nil } // GetEDS uses [RowBlock] and [Fetch] to get half of the first EDS quadrant(ODS) and diff --git a/share/shwap/p2p/bitswap/sample_block.go b/share/shwap/p2p/bitswap/sample_block.go index 062369c0be..e094f58c49 100644 --- a/share/shwap/p2p/bitswap/sample_block.go +++ b/share/shwap/p2p/bitswap/sample_block.go @@ -39,8 +39,8 @@ type SampleBlock struct { } // NewEmptySampleBlock constructs a new empty SampleBlock. -func NewEmptySampleBlock(height uint64, rowIdx, colIdx, edsSize int) (*SampleBlock, error) { - id, err := shwap.NewSampleID(height, rowIdx, colIdx, edsSize) +func NewEmptySampleBlock(height uint64, idx shwap.SampleIndex, edsSize int) (*SampleBlock, error) { + id, err := shwap.NewSampleID(height, idx, edsSize) if err != nil { return nil, err } diff --git a/share/shwap/p2p/bitswap/sample_block_test.go b/share/shwap/p2p/bitswap/sample_block_test.go index 2a28e7e4c9..6c12edcb8d 100644 --- a/share/shwap/p2p/bitswap/sample_block_test.go +++ b/share/shwap/p2p/bitswap/sample_block_test.go @@ -9,6 +9,7 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/edstest" + "github.com/celestiaorg/celestia-node/share/shwap" ) func TestSample_FetchRoundtrip(t *testing.T) { @@ -24,7 +25,9 @@ func TestSample_FetchRoundtrip(t *testing.T) { blks := make([]Block, 0, width*width) for x := 0; x < width; x++ { for y := 0; y < width; y++ { - blk, err := NewEmptySampleBlock(1, x, y, len(root.RowRoots)) + idx, err := shwap.SampleIndexFromCoordinates(x, y, width) + require.NoError(t, err) + blk, err := NewEmptySampleBlock(1, idx, len(root.RowRoots)) require.NoError(t, err) blks = append(blks, blk) } diff --git a/share/shwap/p2p/shrex/shrex_getter/shrex.go b/share/shwap/p2p/shrex/shrex_getter/shrex.go index a6cc1b88fa..8f1fae589c 100644 --- a/share/shwap/p2p/shrex/shrex_getter/shrex.go +++ b/share/shwap/p2p/shrex/shrex_getter/shrex.go @@ -146,8 +146,8 @@ func (sg *Getter) Stop(ctx context.Context) error { return sg.archivalPeerManager.Stop(ctx) } -func (sg *Getter) GetShare(context.Context, *header.ExtendedHeader, int, int) (libshare.Share, error) { - return libshare.Share{}, fmt.Errorf("getter/shrex: GetShare %w", shwap.ErrOperationNotSupported) +func (sg *Getter) GetSamples(context.Context, *header.ExtendedHeader, []shwap.SampleIndex) ([]shwap.Sample, error) { + return nil, fmt.Errorf("getter/shrex: GetShare %w", shwap.ErrOperationNotSupported) } func (sg *Getter) GetEDS(ctx context.Context, header *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { diff --git a/share/shwap/sample.go b/share/shwap/sample.go index ab07d2f5de..06fa84ab04 100644 --- a/share/shwap/sample.go +++ b/share/shwap/sample.go @@ -31,8 +31,13 @@ type Sample struct { // SampleFromShares creates a Sample from a list of shares, using the specified proof type and // the share index to be included in the sample. -func SampleFromShares(shares []libshare.Share, proofType rsmt2d.Axis, axisIdx, shrIdx int) (Sample, error) { - tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(axisIdx)) +func SampleFromShares(shares []libshare.Share, proofType rsmt2d.Axis, idx SampleIndex) (Sample, error) { + rowIdx, colIdx, err := idx.Coordinates(len(shares)) + if err != nil { + return Sample{}, err + } + + tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(rowIdx)) for _, shr := range shares { err := tree.Push(shr.ToBytes()) if err != nil { @@ -40,13 +45,13 @@ func SampleFromShares(shares []libshare.Share, proofType rsmt2d.Axis, axisIdx, s } } - proof, err := tree.ProveRange(shrIdx, shrIdx+1) + proof, err := tree.ProveRange(colIdx, colIdx+1) if err != nil { return Sample{}, err } return Sample{ - Share: shares[shrIdx], + Share: shares[colIdx], Proof: &proof, ProofType: proofType, }, nil diff --git a/share/shwap/sample_id.go b/share/shwap/sample_id.go index b03bbacfda..f2f3c9c2f4 100644 --- a/share/shwap/sample_id.go +++ b/share/shwap/sample_id.go @@ -10,6 +10,26 @@ import ( // bytes for the ShareIndex. const SampleIDSize = RowIDSize + 2 +type SampleIndex int + +func SampleIndexFromCoordinates(rowIdx, colIdx, edsSize int) (SampleIndex, error) { + if rowIdx < 0 || colIdx < 0 { + return 0, fmt.Errorf("negative row or col index: %w", ErrInvalidID) + } + if rowIdx >= edsSize || colIdx >= edsSize { + return 0, fmt.Errorf("SampleIndex %d || %d > %d: %w", rowIdx, colIdx, edsSize, ErrOutOfBounds) + } + return SampleIndex(rowIdx*edsSize + colIdx), nil +} + +func (s SampleIndex) Coordinates(squareSize int) (int, int, error) { + if int(s) > squareSize*squareSize { + return 0, 0, fmt.Errorf("SampleIndex %d > %d: %w", s, squareSize*squareSize, ErrOutOfBounds) + } + + return int(s) / squareSize, int(s) % squareSize, nil +} + // SampleID uniquely identifies a specific sample within a row of an Extended Data Square (EDS). type SampleID struct { RowID // Embeds RowID to incorporate block height and row index. @@ -18,7 +38,12 @@ type SampleID struct { // NewSampleID constructs a new SampleID using the provided block height, sample index, and EDS // size. It calculates the row and share index based on the sample index and EDS size. -func NewSampleID(height uint64, rowIdx, colIdx, edsSize int) (SampleID, error) { +func NewSampleID(height uint64, idx SampleIndex, edsSize int) (SampleID, error) { + rowIdx, colIdx, err := idx.Coordinates(edsSize) + if err != nil { + return SampleID{}, err + } + sid := SampleID{ RowID: RowID{ EdsID: EdsID{ diff --git a/share/shwap/sample_id_test.go b/share/shwap/sample_id_test.go index 125d536854..2344c932ea 100644 --- a/share/shwap/sample_id_test.go +++ b/share/shwap/sample_id_test.go @@ -11,7 +11,7 @@ import ( func TestSampleID(t *testing.T) { edsSize := 4 - id, err := NewSampleID(1, 1, 1, edsSize) + id, err := NewSampleID(1, 1, edsSize) require.NoError(t, err) data, err := id.MarshalBinary() @@ -29,7 +29,7 @@ func TestSampleID(t *testing.T) { func TestSampleIDReaderWriter(t *testing.T) { edsSize := 4 - id, err := NewSampleID(1, 1, 1, edsSize) + id, err := NewSampleID(1, 1, edsSize) require.NoError(t, err) buf := bytes.NewBuffer(nil) @@ -44,3 +44,15 @@ func TestSampleIDReaderWriter(t *testing.T) { require.EqualValues(t, id, sidOut) } + +func TestSampleIndex(t *testing.T) { + edsSize := 16 + + idxIn := SampleIndex(13 * 16) + row, col, err := idxIn.Coordinates(edsSize) + require.NoError(t, err) + + idxOut, err := SampleIndexFromCoordinates(row, col, edsSize) + require.NoError(t, err) + assert.Equal(t, idxIn, idxOut) +} diff --git a/store/file/ods.go b/store/file/ods.go index 3ec5082b83..a4f1313a6e 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -246,7 +246,12 @@ func (o *ODS) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, err return shwap.Sample{}, fmt.Errorf("reading axis: %w", err) } - return shwap.SampleFromShares(axis, axisType, axisIdx, shrIdx) + idx, err := shwap.SampleIndexFromCoordinates(rowIdx, shrIdx, o.Size(ctx)) + if err != nil { + return shwap.Sample{}, err + } + + return shwap.SampleFromShares(axis, axisType, idx) } // AxisHalf returns half of shares axis of the given type and index. Side is determined by diff --git a/store/file/ods_q4.go b/store/file/ods_q4.go index 06b255cae9..cea6492e84 100644 --- a/store/file/ods_q4.go +++ b/store/file/ods_q4.go @@ -132,7 +132,12 @@ func (odsq4 *ODSQ4) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sampl if err != nil { return shwap.Sample{}, fmt.Errorf("extending shares: %w", err) } - return shwap.SampleFromShares(shares, rsmt2d.Row, rowIdx, colIdx) + + idx, err := shwap.SampleIndexFromCoordinates(rowIdx, colIdx, odsq4.Size(ctx)) + if err != nil { + return shwap.Sample{}, err + } + return shwap.SampleFromShares(shares, rsmt2d.Row, idx) } func (odsq4 *ODSQ4) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { diff --git a/store/getter.go b/store/getter.go index 54ce70e4ae..cfacede23d 100644 --- a/store/getter.go +++ b/store/getter.go @@ -24,26 +24,32 @@ func NewGetter(store *Store) *Getter { return &Getter{store: store} } -func (g *Getter) GetShare(ctx context.Context, h *header.ExtendedHeader, row, col int) (libshare.Share, error) { - acc, err := g.store.GetByHeight(ctx, h.Height()) +func (g *Getter) GetSamples(ctx context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) { + acc, err := g.store.GetByHeight(ctx, hdr.Height()) if err != nil { if errors.Is(err, ErrNotFound) { - return libshare.Share{}, shwap.ErrNotFound + return nil, shwap.ErrNotFound } - return libshare.Share{}, fmt.Errorf("get accessor from store:%w", err) + return nil, fmt.Errorf("get accessor from store:%w", err) } - logger := log.With( - "height", h.Height(), - "row", row, - "col", col, - ) - defer utils.CloseAndLog(logger, "getter/sample", acc) + defer utils.CloseAndLog(log.With("height", hdr.Height()), "getter/sample", acc) - sample, err := acc.Sample(ctx, row, col) - if err != nil { - return libshare.Share{}, fmt.Errorf("get sample from accessor:%w", err) + smpls := make([]shwap.Sample, len(indices)) + for i, idx := range indices { + rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots)) + if err != nil { + return nil, err + } + + smpl, err := acc.Sample(ctx, rowIdx, colIdx) + if err != nil { + return nil, fmt.Errorf("get sample from accessor:%w", err) + } + + smpls[i] = smpl } - return sample.Share, nil + + return smpls, nil } func (g *Getter) GetEDS(ctx context.Context, h *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {