Skip to content

Commit

Permalink
clean up processor crud
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Dec 6, 2024
1 parent e0a6883 commit 1fc39a3
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 69 deletions.
6 changes: 4 additions & 2 deletions gturbine/gtshred/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func BenchmarkShredProcessing(b *testing.B) {
}

// Create processor with noop callback
p := NewProcessor(context.Background(), &noopCallback{}, time.Minute)
p := NewProcessor(&noopCallback{}, time.Minute)
go p.RunBackgroundCleanup(context.Background())

// Reset timer before main benchmark loop
b.ResetTimer()
Expand Down Expand Up @@ -88,7 +89,8 @@ func BenchmarkShredReconstruction(b *testing.B) {
for _, pattern := range patterns {
b.Run(pattern.name, func(b *testing.B) {
// Create processor
p := NewProcessor(context.Background(), &noopCallback{}, time.Minute)
p := NewProcessor(&noopCallback{}, time.Minute)
go p.RunBackgroundCleanup(context.Background())

b.ResetTimer()

Expand Down
3 changes: 2 additions & 1 deletion gturbine/gtshred/process_shred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func TestProcessorShredding(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var cb = new(testProcessorCallback)

p := NewProcessor(context.Background(), cb, time.Minute)
p := NewProcessor(cb, time.Minute)
go p.RunBackgroundCleanup(context.Background())

block := makeRandomBlock(tc.blockSize)
group, err := NewShredGroup(block, TestHeight, DefaultDataShreds, DefaultRecoveryShreds, DefaultChunkSize)
Expand Down
155 changes: 91 additions & 64 deletions gturbine/gtshred/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,88 +45,53 @@ type ProcessorCallback interface {
}

// NewProcessor creates a new Processor with the given callback and cleanup interval.
func NewProcessor(ctx context.Context, cb ProcessorCallback, cleanupInterval time.Duration) *Processor {
p := &Processor{
func NewProcessor(cb ProcessorCallback, cleanupInterval time.Duration) *Processor {
return &Processor{
cb: cb,
groups: make(map[string]*ShredGroupWithTimestamp),
completedBlocks: make(map[string]time.Time),
cleanupInterval: cleanupInterval,
}
}

// Start cleanup goroutine
go func() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
p.cleanupStaleGroups(now)
}
// RunBackgroundCleanup starts a cleanup loop that runs at the cleanup interval.
// This should be run as a goroutine.
func (p *Processor) RunBackgroundCleanup(ctx context.Context) {
ticker := time.NewTicker(p.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
p.cleanupStaleGroups(now)
}
}()

return p
}
}

// CollectShred processes an incoming data shred
// CollectShred processes an incoming data shred.
func (p *Processor) CollectShred(shred *gturbine.Shred) error {
if shred == nil {
return fmt.Errorf("nil shred")
}

p.completedBlocksMu.RLock()
// Skip shreds from already processed blocks
_, completed := p.completedBlocks[string(shred.BlockHash)]
p.completedBlocksMu.RUnlock()
if completed {
if p.isCompleted(shred.BlockHash) {
return nil
}

// Take read lock on groups to check if group exists, and get it if it does.
p.groupsMu.RLock()
group, ok := p.groups[shred.GroupID]
p.groupsMu.RUnlock()

group, ok := p.getGroup(shred.GroupID)
if !ok {
// TODO use existing shredgroups if they have already been allocated to save memory
// If the group doesn't exist, create it and add the shred
group := &ShredGroupWithTimestamp{
ShredGroup: &ShredGroup{
DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds),
RecoveryShreds: make([]*gturbine.Shred, shred.TotalRecoveryShreds),
TotalDataShreds: shred.TotalDataShreds,
TotalRecoveryShreds: shred.TotalRecoveryShreds,
GroupID: shred.GroupID,
BlockHash: shred.BlockHash,
Height: shred.Height,
OriginalSize: shred.FullDataSize,
},
Timestamp: time.Now(), // Record the time the group was created consumer side.
}

group.DataShreds[shred.Index] = shred

// Take write lock to add the group
p.groupsMu.Lock()
p.groups[shred.GroupID] = group
p.groupsMu.Unlock()

return nil
return p.initGroup(shred)
}

group.mu.Lock()
defer group.mu.Unlock()

// After locking the group, check if the block has already been completed
p.completedBlocksMu.RLock()
// Skip shreds from already processed blocks
_, completed = p.completedBlocks[string(group.BlockHash)]
p.completedBlocksMu.RUnlock()

if completed {
// After locking the group, check if the block has already been completed.
if p.isCompleted(group.BlockHash) {
return nil
}

Expand All @@ -149,18 +114,14 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
return fmt.Errorf("failed to process block: %w", err)
}

p.groupsMu.Lock()
delete(p.groups, group.GroupID)
p.groupsMu.Unlock()

p.deleteGroup(shred.GroupID)
// then mark the block as completed at time.Now()
p.completedBlocksMu.Lock()
p.completedBlocks[string(shred.BlockHash)] = time.Now()
p.completedBlocksMu.Unlock()
p.setCompleted(shred.BlockHash)
}
return nil
}

// cleanupStaleGroups removes groups that have been inactive for longer than the cleanup interval.
func (p *Processor) cleanupStaleGroups(now time.Time) {
var deleteHashes []string

Expand Down Expand Up @@ -209,3 +170,69 @@ func (p *Processor) cleanupStaleGroups(now time.Time) {
p.groupsMu.Unlock()
}
}

// initGroup creates a new group and adds the first shred to it.
func (p *Processor) initGroup(shred *gturbine.Shred) error {
now := time.Now()
group := &ShredGroup{
DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds),
RecoveryShreds: make([]*gturbine.Shred, shred.TotalRecoveryShreds),
TotalDataShreds: shred.TotalDataShreds,
TotalRecoveryShreds: shred.TotalRecoveryShreds,
GroupID: shred.GroupID,
BlockHash: shred.BlockHash,
Height: shred.Height,
OriginalSize: shred.FullDataSize,
}

group.DataShreds[shred.Index] = shred

p.groupsMu.Lock()

if _, ok := p.groups[shred.GroupID]; ok {
// If a group already exists, return early to avoid overwriting
p.groupsMu.Unlock()

// Collect the shred into the existing group
return p.CollectShred(shred)
}

defer p.groupsMu.Unlock()

p.groups[shred.GroupID] = &ShredGroupWithTimestamp{
ShredGroup: group,
Timestamp: now,
}

return nil
}

// getGroup returns the group with the given ID, if it exists.
func (p *Processor) getGroup(groupID string) (*ShredGroupWithTimestamp, bool) {
p.groupsMu.RLock()
defer p.groupsMu.RUnlock()
group, ok := p.groups[groupID]
return group, ok
}

// deleteGroup removes the group with the given ID from the processor.
func (p *Processor) deleteGroup(groupID string) {
p.groupsMu.Lock()
defer p.groupsMu.Unlock()
delete(p.groups, groupID)
}

// setCompleted marks a block as completed.
func (p *Processor) setCompleted(blockHash []byte) {
p.completedBlocksMu.Lock()
defer p.completedBlocksMu.Unlock()
p.completedBlocks[string(blockHash)] = time.Now()
}

// isCompleted checks if a block has been marked as completed.
func (p *Processor) isCompleted(blockHash []byte) bool {
p.completedBlocksMu.RLock()
defer p.completedBlocksMu.RUnlock()
_, ok := p.completedBlocks[string(blockHash)]
return ok
}
3 changes: 2 additions & 1 deletion gturbine/gtshred/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ func TestProcessorMemoryCleanup(t *testing.T) {
// Create processor with short cleanup interval for testing
var cb = new(testProcessorCallback)
cleanupInterval := 100 * time.Millisecond
p := NewProcessor(context.Background(), cb, cleanupInterval)
p := NewProcessor(cb, cleanupInterval)
go p.RunBackgroundCleanup(context.Background())

// Create a test block and shred group
block := []byte("test block data")
Expand Down
3 changes: 2 additions & 1 deletion gturbine/turbine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func newTestNode(t *testing.T, basePort int) *testNode {

cb := &testBlockHandler{}

processor := gtshred.NewProcessor(context.Background(), cb, time.Minute)
processor := gtshred.NewProcessor(cb, time.Minute)
go processor.RunBackgroundCleanup(context.Background())

shredHandler := &testShredHandler{}
node := &testNode{
Expand Down

0 comments on commit 1fc39a3

Please sign in to comment.