From 115882c15526e5bf26d702d83d3f170da586a165 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 12 Jun 2024 19:24:27 +0200 Subject: [PATCH] generalize protobuf message and respective refactorings --- share/shwap/p2p/bitswap/block.go | 31 + share/shwap/p2p/bitswap/block_fetch.go | 77 ++- share/shwap/p2p/bitswap/cid.go | 20 - share/shwap/p2p/bitswap/pb/bitswap.pb.go | 600 ++---------------- share/shwap/p2p/bitswap/pb/bitswap.proto | 18 +- share/shwap/p2p/bitswap/row_block.go | 28 +- .../p2p/bitswap/row_namespace_data_block.go | 28 +- share/shwap/p2p/bitswap/sample_block.go | 28 +- 8 files changed, 159 insertions(+), 671 deletions(-) diff --git a/share/shwap/p2p/bitswap/block.go b/share/shwap/p2p/bitswap/block.go index 9bd870c9fa..7afc083451 100644 --- a/share/shwap/p2p/bitswap/block.go +++ b/share/shwap/p2p/bitswap/block.go @@ -1,10 +1,15 @@ package bitswap import ( + "fmt" + + "github.com/gogo/protobuf/proto" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" logger "github.com/ipfs/go-log/v2" + bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" @@ -23,6 +28,7 @@ type Block interface { // CID returns Shwap ID of the Block formatted as CID. CID() cid.Cid // BlockFromEDS extract Bitswap Block out of the EDS. + // TODO: Split into MarshalBinary and Populate BlockFromEDS(*rsmt2d.ExtendedDataSquare) (blocks.Block, error) // IsEmpty reports whether the Block been populated with Shwap container. @@ -32,3 +38,28 @@ type Block interface { // Population involves data validation against the Root. PopulateFn(*share.Root) PopulateFn } + +// toBlock converts given protobuf container into Bitswap Block. +func toBlock(cid cid.Cid, container proto.Marshaler) (blocks.Block, error) { + containerData, err := container.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling container: %w", err) + } + + blkProto := bitswappb.Block{ + Cid: cid.Bytes(), + Container: containerData, + } + + blkData, err := blkProto.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling Block protobuf: %w", err) + } + + blk, err := blocks.NewBlockWithCid(blkData, cid) + if err != nil { + return nil, fmt.Errorf("assembling Bitswap block: %w", err) + } + + return blk, nil +} diff --git a/share/shwap/p2p/bitswap/block_fetch.go b/share/shwap/p2p/bitswap/block_fetch.go index 758e535278..2c18adffd6 100644 --- a/share/shwap/p2p/bitswap/block_fetch.go +++ b/share/shwap/p2p/bitswap/block_fetch.go @@ -9,6 +9,8 @@ import ( "github.com/ipfs/boxo/exchange" "github.com/ipfs/go-cid" + bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" + "github.com/celestiaorg/celestia-node/share" ) @@ -47,7 +49,8 @@ func Fetch(ctx context.Context, fetcher exchange.Fetcher, root *share.Root, blks } // uncommon duplicate case: concurrent fetching of the same block, // so we have to populate it ourselves instead of the hasher, - err := blk.PopulateFn(root)(bitswapBlk.RawData()) + populateFn := blk.PopulateFn(root) + _, err := populate(populateFn, bitswapBlk.RawData()) if err != nil { // this means verification succeeded in the hasher but failed here // this case should never happen in practice @@ -63,6 +66,42 @@ func Fetch(ctx context.Context, fetcher exchange.Fetcher, root *share.Root, blks return ctx.Err() } +// populate populates the data into a Block via PopulateFn +// If populateFn is nil -- gets it from the global populatorFns. +func populate(populate PopulateFn, data []byte) ([]byte, error) { + var blk bitswappb.Block + err := blk.Unmarshal(data) + if err != nil { + return nil, fmt.Errorf("unmarshalling block: %w", err) + } + cid, err := cid.Cast(blk.Cid) + if err != nil { + return nil, fmt.Errorf("casting cid: %w", err) + } + // get ID out of CID validating it + id, err := extractCID(cid) + if err != nil { + return nil, fmt.Errorf("validating cid: %w", err) + } + + if populate == nil { + // get registered PopulateFn and use it to check data validity and + // pass it to Fetch caller + val, ok := populatorFns.LoadAndDelete(cid) + if !ok { + return nil, fmt.Errorf("no populator registered") + } + populate = val.(PopulateFn) + } + + err = populate(blk.Container) + if err != nil { + return nil, fmt.Errorf("verifying data: %w", err) + } + + return id, nil +} + // populatorFns exist to communicate between Fetch and hasher. // // Fetch registers PopulateFNs that hasher then uses to validate and populate Block responses coming @@ -86,41 +125,9 @@ type hasher struct { } func (h *hasher) Write(data []byte) (int, error) { - if len(data) == 0 { - errMsg := "hasher: empty message" - log.Error(errMsg) - return 0, fmt.Errorf("shwap/bitswap: %s", errMsg) - } - - // cut off the first tag type byte out of protobuf data - const pbTypeOffset = 1 - cidData := data[pbTypeOffset:] - - cid, err := readCID(cidData) - if err != nil { - err = fmt.Errorf("hasher: reading cid: %w", err) - log.Error(err) - return 0, fmt.Errorf("shwap/bitswap: %w", err) - } - // get ID out of CID and validate it - id, err := extractCID(cid) - if err != nil { - err = fmt.Errorf("hasher: validating cid: %w", err) - log.Error(err) - return 0, fmt.Errorf("shwap/bitswap: %w", err) - } - // get registered PopulateFn and use it to check data validity and - // pass it to Fetch caller - val, ok := populatorFns.LoadAndDelete(cid) - if !ok { - errMsg := "hasher: no verifier registered" - log.Error(errMsg) - return 0, fmt.Errorf("shwap/bitswap: %s", errMsg) - } - populate := val.(PopulateFn) - err = populate(data) + id, err := populate(nil, data) if err != nil { - err = fmt.Errorf("hasher: verifying data: %w", err) + err = fmt.Errorf("hasher: %w", err) log.Error(err) return 0, fmt.Errorf("shwap/bitswap: %w", err) } diff --git a/share/shwap/p2p/bitswap/cid.go b/share/shwap/p2p/bitswap/cid.go index bc72755854..53129393da 100644 --- a/share/shwap/p2p/bitswap/cid.go +++ b/share/shwap/p2p/bitswap/cid.go @@ -2,7 +2,6 @@ package bitswap import ( "encoding" - "encoding/binary" "fmt" "github.com/ipfs/go-cid" @@ -23,25 +22,6 @@ func (a allowlist) IsAllowed(code uint64) bool { return ok } -// readCID reads out cid out of bytes -func readCID(data []byte) (cid.Cid, error) { - cidLen, ln := binary.Uvarint(data) - if ln <= 0 || len(data) < ln+int(cidLen) { - return cid.Undef, fmt.Errorf("invalid data length") - } - // extract CID out of data - // we do this on the raw data to: - // * Avoid complicating hasher with generalized bytes -> type unmarshalling - // * Avoid type allocations - cidRaw := data[ln : ln+int(cidLen)] - castCid, err := cid.Cast(cidRaw) - if err != nil { - return cid.Undef, fmt.Errorf("casting cid: %w", err) - } - - return castCid, nil -} - // extractCID retrieves Shwap ID out of the CID. func extractCID(cid cid.Cid) ([]byte, error) { if err := validateCID(cid); err != nil { diff --git a/share/shwap/p2p/bitswap/pb/bitswap.pb.go b/share/shwap/p2p/bitswap/pb/bitswap.pb.go index d159661361..a84077e9ef 100644 --- a/share/shwap/p2p/bitswap/pb/bitswap.pb.go +++ b/share/shwap/p2p/bitswap/pb/bitswap.pb.go @@ -5,7 +5,6 @@ package pb import ( fmt "fmt" - pb "github.com/celestiaorg/celestia-node/share/shwap/pb" proto "github.com/gogo/protobuf/proto" io "io" math "math" @@ -23,23 +22,23 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type RowBlock struct { - RowCid []byte `protobuf:"bytes,1,opt,name=row_cid,json=rowCid,proto3" json:"row_cid,omitempty"` - Row *pb.Row `protobuf:"bytes,2,opt,name=row,proto3" json:"row,omitempty"` +type Block struct { + Cid []byte `protobuf:"bytes,1,opt,name=cid,proto3" json:"cid,omitempty"` + Container []byte `protobuf:"bytes,2,opt,name=container,proto3" json:"container,omitempty"` } -func (m *RowBlock) Reset() { *m = RowBlock{} } -func (m *RowBlock) String() string { return proto.CompactTextString(m) } -func (*RowBlock) ProtoMessage() {} -func (*RowBlock) Descriptor() ([]byte, []int) { +func (m *Block) Reset() { *m = Block{} } +func (m *Block) String() string { return proto.CompactTextString(m) } +func (*Block) ProtoMessage() {} +func (*Block) Descriptor() ([]byte, []int) { return fileDescriptor_09fd4e2ff1d5ce94, []int{0} } -func (m *RowBlock) XXX_Unmarshal(b []byte) error { +func (m *Block) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *RowBlock) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *Block) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_RowBlock.Marshal(b, m, deterministic) + return xxx_messageInfo_Block.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -49,140 +48,34 @@ func (m *RowBlock) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (m *RowBlock) XXX_Merge(src proto.Message) { - xxx_messageInfo_RowBlock.Merge(m, src) +func (m *Block) XXX_Merge(src proto.Message) { + xxx_messageInfo_Block.Merge(m, src) } -func (m *RowBlock) XXX_Size() int { +func (m *Block) XXX_Size() int { return m.Size() } -func (m *RowBlock) XXX_DiscardUnknown() { - xxx_messageInfo_RowBlock.DiscardUnknown(m) +func (m *Block) XXX_DiscardUnknown() { + xxx_messageInfo_Block.DiscardUnknown(m) } -var xxx_messageInfo_RowBlock proto.InternalMessageInfo +var xxx_messageInfo_Block proto.InternalMessageInfo -func (m *RowBlock) GetRowCid() []byte { +func (m *Block) GetCid() []byte { if m != nil { - return m.RowCid + return m.Cid } return nil } -func (m *RowBlock) GetRow() *pb.Row { +func (m *Block) GetContainer() []byte { if m != nil { - return m.Row - } - return nil -} - -type SampleBlock struct { - SampleCid []byte `protobuf:"bytes,1,opt,name=sample_cid,json=sampleCid,proto3" json:"sample_cid,omitempty"` - Sample *pb.Sample `protobuf:"bytes,2,opt,name=sample,proto3" json:"sample,omitempty"` -} - -func (m *SampleBlock) Reset() { *m = SampleBlock{} } -func (m *SampleBlock) String() string { return proto.CompactTextString(m) } -func (*SampleBlock) ProtoMessage() {} -func (*SampleBlock) Descriptor() ([]byte, []int) { - return fileDescriptor_09fd4e2ff1d5ce94, []int{1} -} -func (m *SampleBlock) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *SampleBlock) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_SampleBlock.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *SampleBlock) XXX_Merge(src proto.Message) { - xxx_messageInfo_SampleBlock.Merge(m, src) -} -func (m *SampleBlock) XXX_Size() int { - return m.Size() -} -func (m *SampleBlock) XXX_DiscardUnknown() { - xxx_messageInfo_SampleBlock.DiscardUnknown(m) -} - -var xxx_messageInfo_SampleBlock proto.InternalMessageInfo - -func (m *SampleBlock) GetSampleCid() []byte { - if m != nil { - return m.SampleCid - } - return nil -} - -func (m *SampleBlock) GetSample() *pb.Sample { - if m != nil { - return m.Sample - } - return nil -} - -type RowNamespaceDataBlock struct { - RowNamespaceDataCid []byte `protobuf:"bytes,1,opt,name=row_namespace_data_cid,json=rowNamespaceDataCid,proto3" json:"row_namespace_data_cid,omitempty"` - Data *pb.RowNamespaceData `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` -} - -func (m *RowNamespaceDataBlock) Reset() { *m = RowNamespaceDataBlock{} } -func (m *RowNamespaceDataBlock) String() string { return proto.CompactTextString(m) } -func (*RowNamespaceDataBlock) ProtoMessage() {} -func (*RowNamespaceDataBlock) Descriptor() ([]byte, []int) { - return fileDescriptor_09fd4e2ff1d5ce94, []int{2} -} -func (m *RowNamespaceDataBlock) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *RowNamespaceDataBlock) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_RowNamespaceDataBlock.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *RowNamespaceDataBlock) XXX_Merge(src proto.Message) { - xxx_messageInfo_RowNamespaceDataBlock.Merge(m, src) -} -func (m *RowNamespaceDataBlock) XXX_Size() int { - return m.Size() -} -func (m *RowNamespaceDataBlock) XXX_DiscardUnknown() { - xxx_messageInfo_RowNamespaceDataBlock.DiscardUnknown(m) -} - -var xxx_messageInfo_RowNamespaceDataBlock proto.InternalMessageInfo - -func (m *RowNamespaceDataBlock) GetRowNamespaceDataCid() []byte { - if m != nil { - return m.RowNamespaceDataCid - } - return nil -} - -func (m *RowNamespaceDataBlock) GetData() *pb.RowNamespaceData { - if m != nil { - return m.Data + return m.Container } return nil } func init() { - proto.RegisterType((*RowBlock)(nil), "bitswap.RowBlock") - proto.RegisterType((*SampleBlock)(nil), "bitswap.SampleBlock") - proto.RegisterType((*RowNamespaceDataBlock)(nil), "bitswap.RowNamespaceDataBlock") + proto.RegisterType((*Block)(nil), "bitswap.Block") } func init() { @@ -190,29 +83,21 @@ func init() { } var fileDescriptor_09fd4e2ff1d5ce94 = []byte{ - // 296 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0xc1, 0x4a, 0xc3, 0x40, - 0x10, 0x86, 0xbb, 0x2a, 0xad, 0x4e, 0xf5, 0x52, 0xd1, 0x96, 0xa2, 0x4b, 0x29, 0x08, 0x05, 0xb1, - 0x81, 0xf6, 0x01, 0xc4, 0xea, 0xd9, 0xc3, 0xf6, 0xa4, 0x97, 0xb2, 0x49, 0x96, 0x26, 0x98, 0x74, - 0x96, 0xdd, 0x95, 0xc5, 0xb7, 0xf0, 0xb1, 0x3c, 0xf6, 0xe8, 0x51, 0x92, 0x17, 0x91, 0xcd, 0xa6, - 0xda, 0x0a, 0xde, 0xfe, 0x99, 0xf9, 0xe7, 0xfb, 0x93, 0x59, 0x18, 0xe9, 0x84, 0x2b, 0x11, 0xe8, - 0xc4, 0x72, 0x19, 0xc8, 0x89, 0x0c, 0xc2, 0xd4, 0xe8, 0x4a, 0x87, 0x1b, 0x39, 0x96, 0x0a, 0x0d, - 0x76, 0x5a, 0x75, 0xd9, 0xef, 0xef, 0xac, 0x84, 0x5e, 0x78, 0xd3, 0xf0, 0x0e, 0x0e, 0x19, 0xda, - 0x59, 0x86, 0xd1, 0x4b, 0xa7, 0x0b, 0x2d, 0x85, 0x76, 0x11, 0xa5, 0x71, 0x8f, 0x0c, 0xc8, 0xe8, - 0x98, 0x35, 0x15, 0xda, 0xfb, 0x34, 0xee, 0x5c, 0xc0, 0xbe, 0x42, 0xdb, 0xdb, 0x1b, 0x90, 0x51, - 0x7b, 0x02, 0x63, 0xbf, 0xcf, 0xd0, 0x32, 0xd7, 0x1e, 0xce, 0xa1, 0x3d, 0xe7, 0xb9, 0xcc, 0x84, - 0xa7, 0x5c, 0x02, 0xe8, 0xaa, 0xdc, 0x02, 0x1d, 0xf9, 0x8e, 0x63, 0x5d, 0x41, 0xd3, 0x17, 0x35, - 0xee, 0xa4, 0xc6, 0x79, 0x04, 0xab, 0x87, 0xc3, 0x37, 0x38, 0x63, 0x68, 0x1f, 0x79, 0x2e, 0xb4, - 0xe4, 0x91, 0x78, 0xe0, 0x86, 0x7b, 0xfc, 0x14, 0xce, 0xdd, 0x47, 0xae, 0x36, 0x93, 0x45, 0xcc, - 0x0d, 0xdf, 0x8a, 0x3a, 0x55, 0x7f, 0xd6, 0x5c, 0xe8, 0x35, 0x1c, 0x38, 0x5b, 0x1d, 0xd9, 0xfd, - 0xfd, 0x83, 0x1d, 0x27, 0xab, 0x4c, 0xb3, 0xa7, 0x8f, 0x82, 0x92, 0x75, 0x41, 0xc9, 0x57, 0x41, - 0xc9, 0x7b, 0x49, 0x1b, 0xeb, 0x92, 0x36, 0x3e, 0x4b, 0xda, 0x78, 0xbe, 0x5d, 0xa6, 0x26, 0x79, - 0x0d, 0xc7, 0x11, 0xe6, 0x41, 0x24, 0x32, 0xa1, 0x4d, 0xca, 0x51, 0x2d, 0x7f, 0xf4, 0xcd, 0x0a, - 0x63, 0x77, 0xe7, 0xff, 0x1e, 0x28, 0x6c, 0x56, 0x47, 0x9f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, - 0x7d, 0xd6, 0xa6, 0x3b, 0xc5, 0x01, 0x00, 0x00, -} - -func (m *RowBlock) Marshal() (dAtA []byte, err error) { + // 171 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x28, 0xce, 0x48, 0x2c, + 0x4a, 0xd5, 0x2f, 0xce, 0x28, 0x4f, 0x2c, 0xd0, 0x2f, 0x30, 0x2a, 0xd0, 0x4f, 0xca, 0x2c, 0x29, + 0x06, 0xb3, 0x93, 0x60, 0x4c, 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x76, 0x28, 0x57, 0xc9, + 0x9c, 0x8b, 0xd5, 0x29, 0x27, 0x3f, 0x39, 0x5b, 0x48, 0x80, 0x8b, 0x39, 0x39, 0x33, 0x45, 0x82, + 0x51, 0x81, 0x51, 0x83, 0x27, 0x08, 0xc4, 0x14, 0x92, 0xe1, 0xe2, 0x4c, 0xce, 0xcf, 0x2b, 0x49, + 0xcc, 0xcc, 0x4b, 0x2d, 0x92, 0x60, 0x02, 0x8b, 0x23, 0x04, 0x9c, 0x22, 0x4f, 0x3c, 0x92, 0x63, + 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, + 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0xca, 0x3e, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, + 0x3f, 0x57, 0x3f, 0x39, 0x35, 0x27, 0xb5, 0xb8, 0x24, 0x33, 0x31, 0xbf, 0x28, 0x1d, 0xce, 0xd6, + 0xcd, 0xcb, 0x4f, 0x01, 0x39, 0x12, 0x97, 0x53, 0x93, 0xd8, 0xc0, 0x6e, 0x34, 0x06, 0x04, 0x00, + 0x00, 0xff, 0xff, 0xe7, 0x9c, 0x32, 0xc5, 0xcf, 0x00, 0x00, 0x00, +} + +func (m *Block) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -222,116 +107,27 @@ func (m *RowBlock) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *RowBlock) MarshalTo(dAtA []byte) (int, error) { +func (m *Block) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *RowBlock) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *Block) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if m.Row != nil { - { - size, err := m.Row.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintBitswap(dAtA, i, uint64(size)) - } + if len(m.Container) > 0 { + i -= len(m.Container) + copy(dAtA[i:], m.Container) + i = encodeVarintBitswap(dAtA, i, uint64(len(m.Container))) i-- dAtA[i] = 0x12 } - if len(m.RowCid) > 0 { - i -= len(m.RowCid) - copy(dAtA[i:], m.RowCid) - i = encodeVarintBitswap(dAtA, i, uint64(len(m.RowCid))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *SampleBlock) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *SampleBlock) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *SampleBlock) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Sample != nil { - { - size, err := m.Sample.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintBitswap(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - if len(m.SampleCid) > 0 { - i -= len(m.SampleCid) - copy(dAtA[i:], m.SampleCid) - i = encodeVarintBitswap(dAtA, i, uint64(len(m.SampleCid))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *RowNamespaceDataBlock) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *RowNamespaceDataBlock) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *RowNamespaceDataBlock) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Data != nil { - { - size, err := m.Data.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintBitswap(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - if len(m.RowNamespaceDataCid) > 0 { - i -= len(m.RowNamespaceDataCid) - copy(dAtA[i:], m.RowNamespaceDataCid) - i = encodeVarintBitswap(dAtA, i, uint64(len(m.RowNamespaceDataCid))) + if len(m.Cid) > 0 { + i -= len(m.Cid) + copy(dAtA[i:], m.Cid) + i = encodeVarintBitswap(dAtA, i, uint64(len(m.Cid))) i-- dAtA[i] = 0xa } @@ -349,54 +145,20 @@ func encodeVarintBitswap(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *RowBlock) Size() (n int) { +func (m *Block) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.RowCid) + l = len(m.Cid) if l > 0 { n += 1 + l + sovBitswap(uint64(l)) } - if m.Row != nil { - l = m.Row.Size() - n += 1 + l + sovBitswap(uint64(l)) - } - return n -} - -func (m *SampleBlock) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.SampleCid) + l = len(m.Container) if l > 0 { n += 1 + l + sovBitswap(uint64(l)) } - if m.Sample != nil { - l = m.Sample.Size() - n += 1 + l + sovBitswap(uint64(l)) - } - return n -} - -func (m *RowNamespaceDataBlock) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.RowNamespaceDataCid) - if l > 0 { - n += 1 + l + sovBitswap(uint64(l)) - } - if m.Data != nil { - l = m.Data.Size() - n += 1 + l + sovBitswap(uint64(l)) - } return n } @@ -406,127 +168,7 @@ func sovBitswap(x uint64) (n int) { func sozBitswap(x uint64) (n int) { return sovBitswap(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *RowBlock) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBitswap - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: RowBlock: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: RowBlock: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RowCid", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBitswap - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthBitswap - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthBitswap - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RowCid = append(m.RowCid[:0], dAtA[iNdEx:postIndex]...) - if m.RowCid == nil { - m.RowCid = []byte{} - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Row", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBitswap - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthBitswap - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthBitswap - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Row == nil { - m.Row = &pb.Row{} - } - if err := m.Row.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipBitswap(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthBitswap - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *SampleBlock) Unmarshal(dAtA []byte) error { +func (m *Block) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -549,15 +191,15 @@ func (m *SampleBlock) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: SampleBlock: wiretype end group for non-group") + return fmt.Errorf("proto: Block: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: SampleBlock: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: Block: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SampleCid", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Cid", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -584,100 +226,14 @@ func (m *SampleBlock) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.SampleCid = append(m.SampleCid[:0], dAtA[iNdEx:postIndex]...) - if m.SampleCid == nil { - m.SampleCid = []byte{} + m.Cid = append(m.Cid[:0], dAtA[iNdEx:postIndex]...) + if m.Cid == nil { + m.Cid = []byte{} } iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Sample", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBitswap - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthBitswap - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthBitswap - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Sample == nil { - m.Sample = &pb.Sample{} - } - if err := m.Sample.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipBitswap(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthBitswap - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *RowNamespaceDataBlock) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBitswap - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: RowNamespaceDataBlock: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: RowNamespaceDataBlock: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RowNamespaceDataCid", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Container", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -704,45 +260,9 @@ func (m *RowNamespaceDataBlock) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.RowNamespaceDataCid = append(m.RowNamespaceDataCid[:0], dAtA[iNdEx:postIndex]...) - if m.RowNamespaceDataCid == nil { - m.RowNamespaceDataCid = []byte{} - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBitswap - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthBitswap - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthBitswap - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Data == nil { - m.Data = &pb.RowNamespaceData{} - } - if err := m.Data.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err + m.Container = append(m.Container[:0], dAtA[iNdEx:postIndex]...) + if m.Container == nil { + m.Container = []byte{} } iNdEx = postIndex default: diff --git a/share/shwap/p2p/bitswap/pb/bitswap.proto b/share/shwap/p2p/bitswap/pb/bitswap.proto index 8f2fb78748..aa06fbf5b5 100644 --- a/share/shwap/p2p/bitswap/pb/bitswap.proto +++ b/share/shwap/p2p/bitswap/pb/bitswap.proto @@ -2,19 +2,7 @@ syntax = "proto3"; package bitswap; option go_package = "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb"; -import "share/shwap/pb/shwap.proto"; - -message RowBlock { - bytes row_cid = 1; - shwap.Row row = 2; -} - -message SampleBlock { - bytes sample_cid = 1; - shwap.Sample sample = 2; -} - -message RowNamespaceDataBlock { - bytes row_namespace_data_cid = 1; - shwap.RowNamespaceData data = 2; +message Block { + bytes cid = 1; + bytes container = 2; } diff --git a/share/shwap/p2p/bitswap/row_block.go b/share/shwap/p2p/bitswap/row_block.go index b6eb37c8d3..60f03f0eda 100644 --- a/share/shwap/p2p/bitswap/row_block.go +++ b/share/shwap/p2p/bitswap/row_block.go @@ -11,7 +11,7 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap" - bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" + shwappb "github.com/celestiaorg/celestia-node/share/shwap/pb" ) const ( @@ -70,21 +70,9 @@ func (rb *RowBlock) CID() cid.Cid { func (rb *RowBlock) BlockFromEDS(eds *rsmt2d.ExtendedDataSquare) (blocks.Block, error) { row := shwap.RowFromEDS(eds, rb.ID.RowIndex, shwap.Left) - - cid := rb.CID() - rowBlk := bitswappb.RowBlock{ - RowCid: cid.Bytes(), - Row: row.ToProto(), - } - - blkData, err := rowBlk.Marshal() + blk, err := toBlock(rb.CID(), row.ToProto()) if err != nil { - return nil, fmt.Errorf("marshaling RowBlock: %w", err) - } - - blk, err := blocks.NewBlockWithCid(blkData, cid) - if err != nil { - return nil, fmt.Errorf("assembling Bitswap block: %w", err) + return nil, fmt.Errorf("converting Row to Bitswap block: %w", err) } return blk, nil @@ -103,18 +91,16 @@ func (rb *RowBlock) PopulateFn(root *share.Root) PopulateFn { if !rb.IsEmpty() { return nil } - var rowBlk bitswappb.RowBlock - if err := rowBlk.Unmarshal(data); err != nil { - return fmt.Errorf("unmarshaling RowBlock: %w", err) + var row shwappb.Row + if err := row.Unmarshal(data); err != nil { + return fmt.Errorf("unmarshaling Row: %w", err) } - cntr := shwap.RowFromProto(rowBlk.Row) + cntr := shwap.RowFromProto(&row) if err := cntr.Validate(root, rb.ID.RowIndex); err != nil { return fmt.Errorf("validating Row: %w", err) } rb.container.Store(&cntr) - - // NOTE: We don't have to validate the ID here, as it is verified in the hasher. return nil } } diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block.go b/share/shwap/p2p/bitswap/row_namespace_data_block.go index ebd5300c98..79d5be0191 100644 --- a/share/shwap/p2p/bitswap/row_namespace_data_block.go +++ b/share/shwap/p2p/bitswap/row_namespace_data_block.go @@ -7,11 +7,12 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + shwappb "github.com/celestiaorg/celestia-node/share/shwap/pb" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap" - bitswapb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" ) const ( @@ -80,20 +81,9 @@ func (rndb *RowNamespaceDataBlock) BlockFromEDS(eds *rsmt2d.ExtendedDataSquare) return nil, err } - cid := rndb.CID() - rndBlk := bitswapb.RowNamespaceDataBlock{ - RowNamespaceDataCid: cid.Bytes(), - Data: rnd.ToProto(), - } - - blkData, err := rndBlk.Marshal() + blk, err := toBlock(rndb.CID(), rnd.ToProto()) if err != nil { - return nil, fmt.Errorf("marshaling RowNamespaceDataBlock: %w", err) - } - - blk, err := blocks.NewBlockWithCid(blkData, cid) - if err != nil { - return nil, fmt.Errorf("assembling Bitswap block: %w", err) + return nil, fmt.Errorf("converting RowNamespaceData to Bitswap block: %w", err) } return blk, nil @@ -112,18 +102,16 @@ func (rndb *RowNamespaceDataBlock) PopulateFn(root *share.Root) PopulateFn { if !rndb.IsEmpty() { return nil } - var rndBlk bitswapb.RowNamespaceDataBlock - if err := rndBlk.Unmarshal(data); err != nil { - return fmt.Errorf("unmarshaling RowNamespaceDataBlock: %w", err) + var rnd shwappb.RowNamespaceData + if err := rnd.Unmarshal(data); err != nil { + return fmt.Errorf("unmarshaling RowNamespaceData: %w", err) } - cntr := shwap.RowNamespaceDataFromProto(rndBlk.Data) + cntr := shwap.RowNamespaceDataFromProto(&rnd) if err := cntr.Validate(root, rndb.ID.DataNamespace, rndb.ID.RowIndex); err != nil { return fmt.Errorf("validating RowNamespaceData: %w", err) } rndb.container.Store(&cntr) - - // NOTE: We don't have to validate the ID here, as it is verified in the hasher. return nil } } diff --git a/share/shwap/p2p/bitswap/sample_block.go b/share/shwap/p2p/bitswap/sample_block.go index ae98c58bc1..a9c574edba 100644 --- a/share/shwap/p2p/bitswap/sample_block.go +++ b/share/shwap/p2p/bitswap/sample_block.go @@ -7,11 +7,12 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + shwappb "github.com/celestiaorg/celestia-node/share/shwap/pb" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap" - bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" ) const ( @@ -75,20 +76,9 @@ func (sb *SampleBlock) BlockFromEDS(eds *rsmt2d.ExtendedDataSquare) (blocks.Bloc return nil, err } - cid := sb.CID() - smplBlk := bitswappb.SampleBlock{ - SampleCid: cid.Bytes(), - Sample: smpl.ToProto(), - } - - blkData, err := smplBlk.Marshal() + blk, err := toBlock(sb.CID(), smpl.ToProto()) if err != nil { - return nil, fmt.Errorf("marshaling SampleBlock: %w", err) - } - - blk, err := blocks.NewBlockWithCid(blkData, cid) - if err != nil { - return nil, fmt.Errorf("assembling Bitswap block: %w", err) + return nil, fmt.Errorf("converting Sample to Bitswap block: %w", err) } return blk, nil @@ -107,18 +97,16 @@ func (sb *SampleBlock) PopulateFn(root *share.Root) PopulateFn { if !sb.IsEmpty() { return nil } - var sampleBlk bitswappb.SampleBlock - if err := sampleBlk.Unmarshal(data); err != nil { - return fmt.Errorf("unmarshaling SampleBlock: %w", err) + var sample shwappb.Sample + if err := sample.Unmarshal(data); err != nil { + return fmt.Errorf("unmarshaling Sample: %w", err) } - cntr := shwap.SampleFromProto(sampleBlk.Sample) + cntr := shwap.SampleFromProto(&sample) if err := cntr.Validate(root, sb.ID.RowIndex, sb.ID.ShareIndex); err != nil { return fmt.Errorf("validating Sample: %w", err) } sb.container.Store(&cntr) - - // NOTE: We don't have to validate the ID here, as it is verified in the hasher. return nil } }