Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Turbine #24

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions gerasure/gereedsolomon/compliance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,32 @@ import (
"github.com/gordian-engine/gordian/gerasure"
"github.com/gordian-engine/gordian/gerasure/gerasuretest"
"github.com/gordian-engine/gordian/gerasure/gereedsolomon"
"github.com/klauspost/reedsolomon"
)

func TestReconstructionCompliance(t *testing.T) {
gerasuretest.TestFixedRateErasureReconstructionCompliance(
t,
func(origData []byte, nData, nParity int) (gerasure.Encoder, gerasure.Reconstructor) {
rs, err := reedsolomon.New(nData, nParity)
enc, err := gereedsolomon.NewEncoder(nData, nParity)
if err != nil {
panic(err)
}

// We don't know the shard size until we encode.
// (Or at least I don't see how to get that from the reedsolomon package.)
allShards, err := rs.Split(origData)
allShards, err := enc.Encode(nil, origData)
if err != nil {
panic(err)
}
shardSize := len(allShards[0])

enc := gereedsolomon.NewEncoder(rs)

// Separate reedsolomon encoder for the reconstructor.
rrs, err := reedsolomon.New(nData, nParity)
rcons, err := gereedsolomon.NewReconstructor(nData, nParity, shardSize)
if err != nil {
panic(err)
}

r := gereedsolomon.NewReconstructor(rrs, shardSize)

return enc, r
return enc, rcons
},
)
}
20 changes: 18 additions & 2 deletions gerasure/gereedsolomon/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,24 @@ type Encoder struct {

// NewEncoder returns a new Encoder.
// The options within the given reedsolomon.Encoder determine the number of shards.
func NewEncoder(rs reedsolomon.Encoder) Encoder {
return Encoder{rs: rs}
func NewEncoder(dataShreds, parityShreds int, opts ...reedsolomon.Option) (*Encoder, error) {
if dataShreds <= 0 {
panic(fmt.Errorf(
"BUG: attempted to create reed solomon encoder with dataShreds < 0, got %d",
dataShreds,
))
}
if parityShreds <= 0 {
panic(fmt.Errorf(
"BUG: attempted to create reed solomon encoder with parityShreds < 0, got %d",
parityShreds,
))
}
rs, err := reedsolomon.New(dataShreds, parityShreds, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create reed-solomon encoder: %w", err)
}
return &Encoder{rs: rs}, nil
}

// Encode satisfies [gerasure.Encoder].
Expand Down
21 changes: 19 additions & 2 deletions gerasure/gereedsolomon/reconstructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,24 @@ type Reconstructor struct {
// NewReconstructor returns a new Reconstructor.
// The options within the given reedsolomon.Encoder determine the number of shards.
// The shardSize and totalDataSize must be discovered out of band;
func NewReconstructor(rs reedsolomon.Encoder, shardSize int) *Reconstructor {
func NewReconstructor(dataShards, parityShards, shardSize int, opts ...reedsolomon.Option) (*Reconstructor, error) {
if dataShards <= 0 {
panic(fmt.Errorf(
"BUG: attempted to create reed solomon encoder with dataShreds < 0, got %d",
dataShards,
))
}
if parityShards <= 0 {
panic(fmt.Errorf(
"BUG: attempted to create reed solomon encoder with parityShreds < 0, got %d",
parityShards,
))
}
rs, err := reedsolomon.New(dataShards, parityShards, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create reed-solomon reconstructor: %w", err)
}

// All reedsolomon.Encoder instances are guaranteed to satisfy reedsolomon.Extensions.
// Calling AllocAligned is supposed to result in better throughput
// when actually encoding and decoding.
Expand All @@ -47,7 +64,7 @@ func NewReconstructor(rs reedsolomon.Encoder, shardSize int) *Reconstructor {
allShards: allShards,

shardSize: shardSize,
}
}, nil
}

// ReconstructData satisfies [gerasure.Reconstructor].
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c
golang.org/x/crypto v0.27.0
golang.org/x/tools v0.22.0
google.golang.org/protobuf v1.34.2
)

require (
Expand Down Expand Up @@ -141,7 +142,6 @@ require (
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.6.0 // indirect
gonum.org/v1/gonum v0.15.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.2 // indirect
)
78 changes: 78 additions & 0 deletions gturbine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# GTurbine 🌪️

GTurbine is a high-performance block propagation protocol designed for distributed consensus systems. It uses a structured network topology and Reed-Solomon erasure coding to efficiently propagate blocks across large validator networks while minimizing bandwidth requirements.

*Because flooding blocks to every node is **so** 2019...*

## Overview

GTurbine implements a multi-layer propagation strategy inspired by Solana's Turbine protocol. Rather than having proposers flood blocks to every validator, GTurbine orchestrates a structured propagation flow:

1. **Block Creation** (Proposer):
- Proposer reaps transactions from mempool
- Assembles them into a new block
- Calculates block header and metadata

2. **Block Shredding** (Proposer):
- Splits block into fixed-size data shreds
- Applies Reed-Solomon erasure coding
- Generates recovery shreds for fault tolerance
- Tags all shreds with unique group ID and metadata

3. **Tree Organization** (Network-wide):
- Validators self-organize into propagation layers
- Each validator knows its upstream source and downstream targets
- Layer assignments are deterministic and stake-weighted
- Tree structure changes periodically to prevent targeted attacks

4. **Initial Propagation** (Proposer → Layer 1):
- Proposer distributes different shreds to each Layer 1 validator
- Each Layer 1 validator receives unique subset of block data
- Distribution uses UDP for low latency

5. **Cascade Propagation** (Layer N → Layer N+1):
- Each validator forwards received shreds to assigned downstream nodes
- Propagation continues until leaves of tree are reached
- Different paths carry different shreds

6. **Block Reconstruction** (All Nodes):
- Validators collect shreds from upstream nodes
- Once minimum threshold of shreds received (data + recovery)
- Reed-Solomon decoding recovers any missing pieces
- Original block is reconstructed and verified

This structured approach transforms the bandwidth requirement at each node from O(n) in a flood-based system to O(log n), where n is the number of validators. By leveraging erasure coding and tree-based propagation, GTurbine achieves reliable block distribution while minimizing network congestion and single-node bandwidth requirements.

## Architecture

### Core Components

```
gturbine/
├── gtbuilder/ - Tree construction and management
├── gtencoding/ - Erasure coding and shred serialization
├── gtnetwork/ - Network transport and routing
├── gtshred/ - Block shredding and reconstruction
└── turbine.go - Core interfaces and types
```

### Key Features

- **Efficient Erasure Coding**: Uses the battle-tested [klauspost/reedsolomon](https://github.com/klauspost/reedsolomon) library
- **Flexible Tree Structure**: Configurable fanout and layer organization
- **UDP Transport**: Low-latency propagation with erasure coding for reliability
- **Safe Partial Recovery**: Reconstruct blocks with minimum required shreds
- **Built-in Verification**: Integrity checking at both shred and block levels

## Acknowledgments

GTurbine's was made possible by:
- [Solana's Turbine Protocol](https://docs.solana.com/cluster/turbine-block-propagation)
- Academic work on reliable multicast protocols
- The incredible [klauspost/reedsolomon](https://github.com/klauspost/reedsolomon) library

## Support

Found a bug? Have a feature request? Open an issue!

*Remember: In distributed systems, eventual consistency is better than eventual insanity* 😉
131 changes: 131 additions & 0 deletions gturbine/gtbuilder/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package gtbuilder

import (
"testing"
)

func BenchmarkTreeBuilder(b *testing.B) {
sizes := []struct {
name string
valCount int
fanout uint32
}{
{"100-Validators-200-Fanout", 100, 200},
{"500-Validators-200-Fanout", 500, 200},
{"1000-Validators-200-Fanout", 1000, 200},
{"2000-Validators-200-Fanout", 2000, 200},
{"500-Validators-100-Fanout", 500, 100},
{"500-Validators-400-Fanout", 500, 400},
}

for _, size := range sizes {
// Create indices array
indices := make([]uint64, size.valCount)
for i := range indices {
indices[i] = uint64(i)
}

b.Run(size.name, func(b *testing.B) {
builder := NewTreeBuilder(size.fanout)
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Make copy of indices since they get modified
indicesCopy := make([]uint64, len(indices))
copy(indicesCopy, indices)

tree, err := builder.BuildTree(indicesCopy, uint64(i), 0)
if err != nil {
b.Fatal(err)
}
if tree == nil {
b.Fatal("expected non-nil tree")
}
}
})
}
}

func BenchmarkFindLayerPosition(b *testing.B) {
sizes := []struct {
name string
valCount int
searchPct float64 // percentage through validator set to search
}{
{"500-Val-First-10pct", 500, 0.1},
{"500-Val-Middle", 500, 0.5},
{"500-Val-Last-10pct", 500, 0.9},
{"2000-Val-First-10pct", 2000, 0.1},
{"2000-Val-Middle", 2000, 0.5},
{"2000-Val-Last-10pct", 2000, 0.9},
}

for _, size := range sizes {
indices := make([]uint64, size.valCount)
for i := range indices {
indices[i] = uint64(i)
}

builder := NewTreeBuilder(200)
tree, _ := builder.BuildTree(indices, 1, 0)
searchIdx := uint64(float64(size.valCount) * size.searchPct)

b.Run(size.name, func(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
layer, idx := FindLayerPosition(tree, searchIdx)
if layer == nil || idx == -1 {
b.Fatal("failed to find validator")
}
}
})
}
}

func BenchmarkGetChildren(b *testing.B) {
sizes := []struct {
name string
valCount int
fanout uint32
}{
{"500-Val-200-Fanout-Root", 500, 200},
{"500-Val-200-Fanout-Mid", 500, 200},
{"2000-Val-200-Fanout-Root", 2000, 200},
{"2000-Val-200-Fanout-Mid", 2000, 200},
}

for _, size := range sizes {
indices := make([]uint64, size.valCount)
for i := range indices {
indices[i] = uint64(i)
}

builder := NewTreeBuilder(size.fanout)
tree, _ := builder.BuildTree(indices, 1, 0)

// Test with root validator and middle layer validator
rootIdx := tree.Root.Validators[0]
midIdx := tree.Root.Children[0].Validators[0]

b.Run(size.name+"-Root", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
children := GetChildren(tree, rootIdx)
if len(children) == 0 {
b.Fatal("expected children")
}
}
})

b.Run(size.name+"-Mid", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
children := GetChildren(tree, midIdx)
if children == nil {
b.Fatal("expected valid children slice")
}
}
})
}
}
48 changes: 48 additions & 0 deletions gturbine/gtbuilder/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package gtbuilder

import (
"github.com/gordian-engine/gordian/gturbine"
)

// FindLayerPosition finds a validator's layer and index in the tree
func FindLayerPosition(tree *gturbine.Tree, valIndex uint64) (*gturbine.Layer, int) {
if tree == nil {
return nil, -1
}

layer := tree.Root
for layer != nil {
for i, idx := range layer.Validators {
if idx == valIndex {
return layer, i
}
}
if len(layer.Children) > 0 {
layer = layer.Children[0]
} else {
layer = nil
}
}
return nil, -1
}

// GetChildren returns the validator indices that should receive forwarded shreds
func GetChildren(tree *gturbine.Tree, valIndex uint64) []uint64 {
layer, idx := FindLayerPosition(tree, valIndex)
if layer == nil || len(layer.Children) == 0 {
return nil
}

// Same distribution logic, now returning indices
childLayer := layer.Children[0]
startIdx := (idx * len(childLayer.Validators)) / len(layer.Validators)
endIdx := ((idx + 1) * len(childLayer.Validators)) / len(layer.Validators)
if startIdx == endIdx {
endIdx = startIdx + 1
}
if endIdx > len(childLayer.Validators) {
endIdx = len(childLayer.Validators)
}

return childLayer.Validators[startIdx:endIdx]
}
Loading
Loading