-
Notifications
You must be signed in to change notification settings - Fork 972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Shwap prototype #3184
WIP: Shwap prototype #3184
Changes from 1 commit
d953117
f122a71
7237ad7
ab6bec8
2f08bdd
b33515e
59aa730
1b3d881
b53769b
ce31854
3ab6b37
041ed3d
1601460
600d186
6673564
830860d
9ffb284
52f3ab9
21bd2fc
cdbd694
c3e8450
8a66fd5
bbcd956
583481b
a28cfef
f6db8f9
9a8b5ed
0d4dd27
c96579a
bb034b3
e6d39dc
82150f2
4feea07
e34394b
698ae46
90672ec
0a9daec
fd82475
17e9821
8b29af3
eae5359
daf4bcc
4d71dad
fc082f4
0f61c50
730f05e
6655b1d
c8e1926
ec71e12
74ab0b9
7183820
ed9b593
f5c1183
1d21a22
1272917
4190362
5c6aa8d
e332f0a
4ab0fbf
2d5299a
85ba38d
8f0315b
2f5563e
efd829a
da5533b
c004096
fb93edb
6ad1791
7d85083
8bde751
85e069b
70dddf6
0c8bc88
f3186f1
f1556a4
05e5a5e
ad117d7
4052a00
134c554
9a6d1a0
cf2d29e
d79338e
6b7080e
f8bbaca
eb6e377
02b2e80
dd8a225
2f7cb65
db0bb88
aeafcad
ae6a0af
9a0cc1d
72c4d5e
3113f73
1a2256c
c7dd8b3
d148bad
2886665
035a272
9e29a18
db97ae6
f2d4704
dfe79fd
e6c64df
25250c8
6e654df
3e91d02
777564d
743fed2
1b1c9a1
7021639
3e71d57
74ed5c0
ab92e82
28fe967
3b46993
ba2f512
960ea10
a0c885b
683bb93
83beea0
772ec00
3ccf47b
6f7fdd2
214392f
02030bf
2ba8ec6
04ad041
84241a4
eef869d
6394d37
09e93d7
39c7276
948013c
6f5db44
7ea43f8
b4b5a03
623a3d3
a02f31d
94f1f02
7580cdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,22 @@ | ||
package shwap | ||
package shwap_getter | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/celestiaorg/celestia-app/pkg/wrapper" | ||
"github.com/celestiaorg/celestia-node/share/shwap" | ||
"github.com/celestiaorg/rsmt2d" | ||
"github.com/ipfs/boxo/blockstore" | ||
"github.com/ipfs/boxo/exchange" | ||
block "github.com/ipfs/go-block-format" | ||
"github.com/ipfs/go-cid" | ||
|
||
"github.com/celestiaorg/celestia-app/pkg/wrapper" | ||
"github.com/celestiaorg/rsmt2d" | ||
|
||
"github.com/celestiaorg/celestia-node/header" | ||
"github.com/celestiaorg/celestia-node/share" | ||
) | ||
|
||
// TODO: GetRow method | ||
type Getter struct { | ||
// TODO(@walldiss): why not blockservice? | ||
fetch exchange.SessionExchange | ||
bstore blockstore.Blockstore | ||
} | ||
|
@@ -59,57 +57,28 @@ func (g *Getter) GetShares(ctx context.Context, hdr *header.ExtendedHeader, smpl | |
return shares, nil | ||
} | ||
|
||
sids := make([]SampleID, len(smplIdxs)) | ||
cids := make([]cid.Cid, len(smplIdxs)) | ||
for i, shrIdx := range smplIdxs { | ||
sid, err := NewSampleID(hdr.Height(), shrIdx, hdr.DAH) | ||
sid, err := shwap.NewSampleID(hdr.Height(), shrIdx, hdr.DAH) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
sids[i] = sid | ||
} | ||
|
||
smplsMu := sync.Mutex{} | ||
smpls := make(map[int]Sample, len(smplIdxs)) | ||
verifyFn := func(s Sample) error { | ||
err := s.Verify(hdr.DAH) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
smplIdx := int(s.SampleID.RowIndex)*len(hdr.DAH.RowRoots) + int(s.SampleID.ShareIndex) | ||
smplsMu.Lock() | ||
smpls[smplIdx] = s | ||
smplsMu.Unlock() | ||
return nil | ||
} | ||
|
||
cids := make([]cid.Cid, len(smplIdxs)) | ||
for i, sid := range sids { | ||
sampleVerifiers.Add(sid, verifyFn) | ||
defer sid.Release() | ||
cids[i] = sid.Cid() | ||
} | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
ses := g.fetch.NewSession(ctx) | ||
// must start getting only after verifiers are registered | ||
blkCh, err := ses.GetBlocks(ctx, cids) | ||
blks, err := g.getBlocks(ctx, cids) | ||
if err != nil { | ||
return nil, fmt.Errorf("fetching blocks: %w", err) | ||
} | ||
// GetBlocks handles ctx and closes blkCh, so we don't have to | ||
blks := make([]block.Block, 0, len(smplIdxs)) | ||
for blk := range blkCh { | ||
blks = append(blks, blk) | ||
return nil, fmt.Errorf("getting blocks: %w", err) | ||
} | ||
// only persist when all samples received | ||
|
||
if len(blks) != len(smplIdxs) { | ||
if ctx.Err() != nil { | ||
return nil, ctx.Err() | ||
} | ||
return nil, fmt.Errorf("not all shares were found") | ||
} | ||
|
||
// ensure we persist samples/blks and make them available for Bitswap | ||
err = g.bstore.PutMany(ctx, blks) | ||
if err != nil { | ||
|
@@ -122,12 +91,26 @@ func (g *Getter) GetShares(ctx context.Context, hdr *header.ExtendedHeader, smpl | |
} | ||
|
||
// ensure we return shares in the requested order | ||
shrs := make([]share.Share, len(smplIdxs)) | ||
for i, smplIdx := range smplIdxs { | ||
shrs[i] = smpls[smplIdx].SampleShare | ||
shrs := make(map[int]share.Share, len(blks)) | ||
for _, blk := range blks { | ||
sample, err := shwap.SampleFromBlock(blk) | ||
if err != nil { | ||
return nil, fmt.Errorf("getting sample from block: %w", err) | ||
} | ||
shrIdx := int(sample.SampleID.RowIndex)*len(hdr.DAH.RowRoots) + int(sample.SampleID.ShareIndex) | ||
shrs[shrIdx] = sample.SampleShare | ||
} | ||
|
||
ordered := make([]share.Share, len(shrs)) | ||
for i, shrIdx := range smplIdxs { | ||
sh, ok := shrs[shrIdx] | ||
if !ok { | ||
return nil, fmt.Errorf("missing share for index %d", shrIdx) | ||
} | ||
ordered[i] = sh | ||
} | ||
|
||
return shrs, nil | ||
return ordered, nil | ||
} | ||
|
||
// GetEDS | ||
|
@@ -138,58 +121,54 @@ func (g *Getter) GetEDS(ctx context.Context, hdr *header.ExtendedHeader) (*rsmt2 | |
} | ||
|
||
sqrLn := len(hdr.DAH.RowRoots) | ||
rids := make([]RowID, sqrLn/2) | ||
cids := make([]cid.Cid, sqrLn/2) | ||
for i := 0; i < sqrLn/2; i++ { | ||
rid, err := NewRowID(hdr.Height(), uint16(i), hdr.DAH) | ||
rid, err := shwap.NewRowID(hdr.Height(), uint16(i), hdr.DAH) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
rids[i] = rid | ||
defer rid.Release() | ||
cids[i] = rid.Cid() | ||
} | ||
|
||
square, err := rsmt2d.NewExtendedDataSquare( | ||
share.DefaultRSMT2DCodec(), | ||
wrapper.NewConstructor(uint64(sqrLn/2)), uint(sqrLn), | ||
share.Size, | ||
) | ||
blks, err := g.getBlocks(ctx, cids) | ||
if err != nil { | ||
return nil, err | ||
return nil, fmt.Errorf("getting blocks: %w", err) | ||
|
||
} | ||
|
||
verifyFn := func(row Row) error { | ||
err := row.Verify(hdr.DAH) | ||
if err != nil { | ||
return err | ||
if len(blks) != sqrLn/2 { | ||
if ctx.Err() != nil { | ||
return nil, ctx.Err() | ||
} | ||
return nil, fmt.Errorf("not all rows were found") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. duplicates logic in getBlocks so can be removed |
||
|
||
for shrIdx, shr := range row.RowShares { | ||
err = square.SetCell(uint(row.RowIndex), uint(shrIdx), shr) // no synchronization needed | ||
if err != nil { | ||
panic(err) // this should never happen and if it is... something is really wrong | ||
} | ||
rows := make([]*shwap.Row, len(blks)) | ||
for _, blk := range blks { | ||
row, err := shwap.RowFromBlock(blk) | ||
if err != nil { | ||
return nil, fmt.Errorf("getting row from block: %w", err) | ||
} | ||
|
||
return nil | ||
if row.RowIndex >= uint16(sqrLn/2) { | ||
// should never happen, because rows should be verified against root by the time they are returned | ||
return nil, fmt.Errorf("row index out of bounds: %d", row.RowIndex) | ||
} | ||
rows[row.RowIndex] = row | ||
} | ||
|
||
cids := make([]cid.Cid, sqrLn/2) | ||
for i, rid := range rids { | ||
rowVerifiers.Add(rid, verifyFn) | ||
cids[i] = rid.Cid() | ||
shrs := make([]share.Share, 0, sqrLn*sqrLn) | ||
for _, row := range rows { | ||
shrs = append(shrs, row.RowShares...) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
ses := g.fetch.NewSession(ctx) | ||
// must start getting only after verifiers are registered | ||
blkCh, err := ses.GetBlocks(ctx, cids) | ||
square, err := rsmt2d.ComputeExtendedDataSquare( | ||
shrs, | ||
share.DefaultRSMT2DCodec(), | ||
wrapper.NewConstructor(uint64(sqrLn/2)), | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("fetching blocks: %w", err) | ||
} | ||
// GetBlocks handles ctx by closing blkCh, so we don't have to | ||
for range blkCh { //nolint:revive // it complains on empty block, but the code is functional | ||
// we handle writes in verifyFn so just wait for as many results as possible | ||
return nil, fmt.Errorf("computing EDS: %w", err) | ||
} | ||
|
||
// and try to repair | ||
|
@@ -218,56 +197,62 @@ func (g *Getter) GetSharesByNamespace( | |
return share.NamespacedShares{}, nil | ||
} | ||
|
||
dids := make([]DataID, 0, to-from) | ||
cids := make([]cid.Cid, 0, to-from) | ||
for rowIdx := from; rowIdx < to; rowIdx++ { | ||
did, err := NewDataID(hdr.Height(), uint16(rowIdx), ns, hdr.DAH) | ||
did, err := shwap.NewDataID(hdr.Height(), uint16(rowIdx), ns, hdr.DAH) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer did.Release() | ||
cids = append(cids, did.Cid()) | ||
} | ||
|
||
dids = append(dids, did) | ||
blks, err := g.getBlocks(ctx, cids) | ||
if err != nil { | ||
return nil, fmt.Errorf("getting blocks: %w", err) | ||
} | ||
|
||
datas := make([]Data, len(dids)) | ||
verifyFn := func(d Data) error { | ||
err := d.Verify(hdr.DAH) | ||
nShrs := make([]share.NamespacedRow, len(blks)) | ||
for _, blk := range blks { | ||
data, err := shwap.DataFromBlock(blk) | ||
if err != nil { | ||
return err | ||
return nil, fmt.Errorf("getting row from block: %w", err) | ||
} | ||
|
||
nsStartIdx := dids[0].RowIndex | ||
idx := d.RowIndex - nsStartIdx | ||
datas[idx] = d | ||
return nil | ||
if data.RowIndex < uint16(from) || data.RowIndex >= uint16(to) { | ||
// should never happen, because rows should be verified against root by the time they are returned | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need if then, if its never happens? |
||
return nil, fmt.Errorf("row index out of bounds: %d", data.RowIndex) | ||
} | ||
nShrs[int(data.RowIndex)-from] = share.NamespacedRow{ | ||
Shares: data.DataShares, | ||
Proof: &data.DataProof, | ||
} | ||
} | ||
|
||
cids := make([]cid.Cid, len(dids)) | ||
for i, did := range dids { | ||
dataVerifiers.Add(did, verifyFn) | ||
cids[i] = did.Cid() | ||
} | ||
return nShrs, nil | ||
} | ||
|
||
func (g *Getter) getBlocks(ctx context.Context, cids []cid.Cid) ([]block.Block, error) { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
ses := g.fetch.NewSession(ctx) | ||
// must start getting only after verifiers are registered | ||
blkCh, err := ses.GetBlocks(ctx, cids) | ||
if err != nil { | ||
return nil, fmt.Errorf("fetching blocks:%w", err) | ||
return nil, fmt.Errorf("fetching blocks: %w", err) | ||
} | ||
// GetBlocks handles ctx by closing blkCh, so we don't have to | ||
for range blkCh { //nolint:revive // it complains on empty block, but the code is functional | ||
// we handle writes in verifyFn so just wait for as many results as possible | ||
// GetBlocks handles ctx and closes blkCh, so we don't have to | ||
blks := make([]block.Block, 0, len(cids)) | ||
for blk := range blkCh { | ||
blks = append(blks, blk) | ||
} | ||
|
||
nShrs := make([]share.NamespacedRow, 0, len(datas)) | ||
for _, row := range datas { | ||
proof := row.DataProof | ||
nShrs = append(nShrs, share.NamespacedRow{ | ||
Shares: row.DataShares, | ||
Proof: &proof, | ||
}) | ||
// only persist when all samples received | ||
if len(blks) != len(cids) { | ||
if ctx.Err() != nil { | ||
return nil, ctx.Err() | ||
} | ||
return nil, fmt.Errorf("not all shares were found") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. blocks, because now its generalized |
||
} | ||
|
||
return nShrs, nil | ||
return blks, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we don't need to cache on disk in most cases