Skip to content

Commit

Permalink
Add group id to each shred for over the wire
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzampolin committed Dec 5, 2024
1 parent 4a6f34b commit ea179ef
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 46 deletions.
85 changes: 45 additions & 40 deletions gturbine/shredding/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// Constants for error checking
const (
minChunkSize = 1024 // 1KB minimum
maxChunkSize = 1 << 20 // 1MB maximum chunk size
maxChunkSize = 1 << 20 // 1MB maximum chunk size
maxBlockSize = 128 * 1024 * 1024 // 128MB maximum block size (matches Solana)
)

Expand All @@ -24,7 +24,6 @@ type Processor struct {
}

func NewProcessor(chunkSize uint32, dataShreds, recoveryShreds int) (*Processor, error) {

if chunkSize < minChunkSize || chunkSize > maxChunkSize {
return nil, fmt.Errorf("invalid chunk size %d: must be between %d and %d", chunkSize, minChunkSize, maxChunkSize)
}
Expand Down Expand Up @@ -55,7 +54,6 @@ func NewProcessor(chunkSize uint32, dataShreds, recoveryShreds int) (*Processor,
}

func (p *Processor) ProcessBlock(block []byte, height uint64) (*ShredGroup, error) {

if len(block) == 0 {
return nil, fmt.Errorf("empty block")
}
Expand All @@ -66,34 +64,31 @@ func (p *Processor) ProcessBlock(block []byte, height uint64) (*ShredGroup, erro
return nil, fmt.Errorf("block too large for configured shred size: %d bytes exceeds max size %d", len(block), p.chunkSize*uint32(p.dataShreds))
}

// Calculate block hash for verification before any padding
// Calculate block hash for verification
blockHash := sha256.Sum256(block)

// Generate unique group ID
// Generate unique group ID for this block
groupID := make([]byte, 32)
if _, err := rand.Read(groupID); err != nil {
return nil, fmt.Errorf("failed to generate group ID: %w", err)
}

// Calculate how many shreds we actually need for this block
fullShreds := len(block) / int(p.chunkSize)
if len(block)%int(p.chunkSize) != 0 {
fullShreds++
}

// Create shreds of exact chunk size with padding
// Create fixed-size data chunks
dataBytes := make([][]byte, p.dataShreds)
bytesPerShred := int(p.chunkSize)

// Initialize all shreds to full chunk size with zeros
for i := 0; i < p.dataShreds; i++ {
dataBytes[i] = make([]byte, p.chunkSize)
dataBytes[i] = make([]byte, bytesPerShred)
}

// Copy data into shreds
remaining := len(block)
offset := 0
for i := 0; i < fullShreds; i++ {
for i := 0; i < p.dataShreds && remaining > 0; i++ {
toCopy := remaining
if toCopy > int(p.chunkSize) {
toCopy = int(p.chunkSize)
if toCopy > bytesPerShred {
toCopy = bytesPerShred
}
copy(dataBytes[i], block[offset:offset+toCopy])
offset += toCopy
Expand All @@ -114,39 +109,34 @@ func (p *Processor) ProcessBlock(block []byte, height uint64) (*ShredGroup, erro
Total: uint32(p.dataShreds),
Data: dataBytes[i],
BlockHash: blockHash[:],
GroupID: groupID, // Set the group ID for each shred
Height: height,
}
}

// Create recovery shreds
// Create recovery shreds
recoveryShreds := make([]*gturbine.Shred, len(recoveryBytes))
for i := range recoveryBytes {
recoveryShreds[i] = &gturbine.Shred{
Index: uint32(i),
Total: uint32(len(recoveryBytes)),
Data: recoveryBytes[i],
BlockHash: blockHash[:],
GroupID: groupID, // Set the group ID for each recovery shred too
Height: height,
}
}

lastShredSize := len(block) % int(p.chunkSize)
if lastShredSize == 0 && len(block) > 0 {
lastShredSize = int(p.chunkSize)
}

return &ShredGroup{
DataShreds: dataShreds,
RecoveryShreds: recoveryShreds,
GroupID: groupID,
BlockHash: blockHash[:],
OriginalSize: len(block),
LastShredSize: lastShredSize,
}, nil
}

func (p *Processor) ReassembleBlock(group *ShredGroup) ([]byte, error) {

if group == nil {
return nil, fmt.Errorf("nil shred group")
}
Expand All @@ -158,18 +148,33 @@ func (p *Processor) ReassembleBlock(group *ShredGroup) ([]byte, error) {
allBytes := make([][]byte, p.totalShreds)
availableShreds := 0
var refHeight uint64
var refGroupID []byte

// First find a valid reference height
// First find a valid reference height and group ID
for _, shred := range group.DataShreds {
if shred != nil && len(shred.Data) == int(p.chunkSize) {
refHeight = shred.Height
refGroupID = shred.GroupID
break
}
}

if refGroupID == nil {
return nil, fmt.Errorf("no valid shreds found to determine group ID")
}

// Process data shreds
for i, shred := range group.DataShreds {
if shred != nil && len(shred.Data) == int(p.chunkSize) && shred.Height == refHeight {
if shred != nil {
if len(shred.Data) != int(p.chunkSize) {
continue
}
if shred.Height != refHeight {
continue
}
if string(shred.GroupID) != string(refGroupID) {
continue // Skip shreds from different groups
}
allBytes[i] = make([]byte, p.chunkSize)
copy(allBytes[i], shred.Data)
availableShreds++
Expand All @@ -179,7 +184,16 @@ func (p *Processor) ReassembleBlock(group *ShredGroup) ([]byte, error) {
// Process recovery shreds
if group.RecoveryShreds != nil {
for i, shred := range group.RecoveryShreds {
if shred != nil && len(shred.Data) == int(p.chunkSize) && shred.Height == refHeight {
if shred != nil {
if len(shred.Data) != int(p.chunkSize) {
continue
}
if shred.Height != refHeight {
continue
}
if string(shred.GroupID) != string(refGroupID) {
continue // Skip shreds from different groups
}
allBytes[i+p.dataShreds] = make([]byte, p.chunkSize)
copy(allBytes[i+p.dataShreds], shred.Data)
availableShreds++
Expand All @@ -191,22 +205,16 @@ func (p *Processor) ReassembleBlock(group *ShredGroup) ([]byte, error) {
return nil, fmt.Errorf("insufficient shreds for reconstruction: have %d, need %d", availableShreds, p.dataShreds)
}

// Reconstruct missing/corrupted data
// Reconstruct missing data
if err := p.encoder.Reconstruct(allBytes); err != nil {
return nil, fmt.Errorf("failed to reconstruct data: %w", err)
}

// Calculate exact number of shreds needed for original size
fullShreds := group.OriginalSize / int(p.chunkSize)
if group.OriginalSize%int(p.chunkSize) != 0 {
fullShreds++
}

// Combine data shreds to form final block, respecting original size
// Combine data shreds to form final block
reconstructed := make([]byte, 0, group.OriginalSize)
remaining := group.OriginalSize

for i := 0; i < fullShreds && remaining > 0; i++ {
for i := 0; i < p.dataShreds && remaining > 0; i++ {
if allBytes[i] == nil {
return nil, fmt.Errorf("reconstruction failed: missing data for shard %d", i)
}
Expand All @@ -221,10 +229,8 @@ func (p *Processor) ReassembleBlock(group *ShredGroup) ([]byte, error) {

// Verify reconstructed block hash
computedHash := sha256.Sum256(reconstructed)

if string(computedHash[:]) != string(group.BlockHash) {
return nil, fmt.Errorf("block hash mismatch after reconstruction: original %x, got %x",
group.BlockHash, computedHash[:])
return nil, fmt.Errorf("block hash mismatch after reconstruction")
}

return reconstructed, nil
Expand All @@ -236,5 +242,4 @@ type ShredGroup struct {
GroupID []byte
BlockHash []byte
OriginalSize int
LastShredSize int // Size of actual data in last shred (0 means full)
}
13 changes: 7 additions & 6 deletions gturbine/turbine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ type Config struct {
ChunkSize uint32
}

// Shred represents a piece of a block
// Shred represents a piece of a block that can be sent over the network
type Shred struct {
Index uint32
Total uint32
Data []byte
BlockHash []byte
Height uint64
Index uint32 // Index of this shred within the block
Total uint32 // Total number of shreds for this block
Data []byte // The actual shred data
BlockHash []byte // Hash for data verification
GroupID []byte // ID for associating shreds from the same block
Height uint64 // Block height for chain reference
}

// Validator represents a node in the network
Expand Down

0 comments on commit ea179ef

Please sign in to comment.