From 1fc39a3c4d8a24abb921a340822e9c51b2ba63af Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 6 Dec 2024 10:54:14 -0700 Subject: [PATCH] clean up processor crud --- gturbine/gtshred/benchmark_test.go | 6 +- gturbine/gtshred/process_shred_test.go | 3 +- gturbine/gtshred/processor.go | 155 +++++++++++++++---------- gturbine/gtshred/processor_test.go | 3 +- gturbine/turbine_test.go | 3 +- 5 files changed, 101 insertions(+), 69 deletions(-) diff --git a/gturbine/gtshred/benchmark_test.go b/gturbine/gtshred/benchmark_test.go index d4b9a6f..4c98e6e 100644 --- a/gturbine/gtshred/benchmark_test.go +++ b/gturbine/gtshred/benchmark_test.go @@ -36,7 +36,8 @@ func BenchmarkShredProcessing(b *testing.B) { } // Create processor with noop callback - p := NewProcessor(context.Background(), &noopCallback{}, time.Minute) + p := NewProcessor(&noopCallback{}, time.Minute) + go p.RunBackgroundCleanup(context.Background()) // Reset timer before main benchmark loop b.ResetTimer() @@ -88,7 +89,8 @@ func BenchmarkShredReconstruction(b *testing.B) { for _, pattern := range patterns { b.Run(pattern.name, func(b *testing.B) { // Create processor - p := NewProcessor(context.Background(), &noopCallback{}, time.Minute) + p := NewProcessor(&noopCallback{}, time.Minute) + go p.RunBackgroundCleanup(context.Background()) b.ResetTimer() diff --git a/gturbine/gtshred/process_shred_test.go b/gturbine/gtshred/process_shred_test.go index ac72ddf..393c6ba 100644 --- a/gturbine/gtshred/process_shred_test.go +++ b/gturbine/gtshred/process_shred_test.go @@ -86,7 +86,8 @@ func TestProcessorShredding(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var cb = new(testProcessorCallback) - p := NewProcessor(context.Background(), cb, time.Minute) + p := NewProcessor(cb, time.Minute) + go p.RunBackgroundCleanup(context.Background()) block := makeRandomBlock(tc.blockSize) group, err := NewShredGroup(block, TestHeight, DefaultDataShreds, DefaultRecoveryShreds, DefaultChunkSize) diff --git a/gturbine/gtshred/processor.go b/gturbine/gtshred/processor.go index 6e28bf2..1c62858 100644 --- a/gturbine/gtshred/processor.go +++ b/gturbine/gtshred/processor.go @@ -45,88 +45,53 @@ type ProcessorCallback interface { } // NewProcessor creates a new Processor with the given callback and cleanup interval. -func NewProcessor(ctx context.Context, cb ProcessorCallback, cleanupInterval time.Duration) *Processor { - p := &Processor{ +func NewProcessor(cb ProcessorCallback, cleanupInterval time.Duration) *Processor { + return &Processor{ cb: cb, groups: make(map[string]*ShredGroupWithTimestamp), completedBlocks: make(map[string]time.Time), cleanupInterval: cleanupInterval, } +} - // Start cleanup goroutine - go func() { - ticker := time.NewTicker(cleanupInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case now := <-ticker.C: - p.cleanupStaleGroups(now) - } +// RunBackgroundCleanup starts a cleanup loop that runs at the cleanup interval. +// This should be run as a goroutine. +func (p *Processor) RunBackgroundCleanup(ctx context.Context) { + ticker := time.NewTicker(p.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + p.cleanupStaleGroups(now) } - }() - - return p + } } -// CollectShred processes an incoming data shred +// CollectShred processes an incoming data shred. func (p *Processor) CollectShred(shred *gturbine.Shred) error { if shred == nil { return fmt.Errorf("nil shred") } - p.completedBlocksMu.RLock() // Skip shreds from already processed blocks - _, completed := p.completedBlocks[string(shred.BlockHash)] - p.completedBlocksMu.RUnlock() - if completed { + if p.isCompleted(shred.BlockHash) { return nil } - // Take read lock on groups to check if group exists, and get it if it does. - p.groupsMu.RLock() - group, ok := p.groups[shred.GroupID] - p.groupsMu.RUnlock() - + group, ok := p.getGroup(shred.GroupID) if !ok { - // TODO use existing shredgroups if they have already been allocated to save memory // If the group doesn't exist, create it and add the shred - group := &ShredGroupWithTimestamp{ - ShredGroup: &ShredGroup{ - DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds), - RecoveryShreds: make([]*gturbine.Shred, shred.TotalRecoveryShreds), - TotalDataShreds: shred.TotalDataShreds, - TotalRecoveryShreds: shred.TotalRecoveryShreds, - GroupID: shred.GroupID, - BlockHash: shred.BlockHash, - Height: shred.Height, - OriginalSize: shred.FullDataSize, - }, - Timestamp: time.Now(), // Record the time the group was created consumer side. - } - - group.DataShreds[shred.Index] = shred - - // Take write lock to add the group - p.groupsMu.Lock() - p.groups[shred.GroupID] = group - p.groupsMu.Unlock() - - return nil + return p.initGroup(shred) } group.mu.Lock() defer group.mu.Unlock() - // After locking the group, check if the block has already been completed - p.completedBlocksMu.RLock() - // Skip shreds from already processed blocks - _, completed = p.completedBlocks[string(group.BlockHash)] - p.completedBlocksMu.RUnlock() - - if completed { + // After locking the group, check if the block has already been completed. + if p.isCompleted(group.BlockHash) { return nil } @@ -149,18 +114,14 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error { return fmt.Errorf("failed to process block: %w", err) } - p.groupsMu.Lock() - delete(p.groups, group.GroupID) - p.groupsMu.Unlock() - + p.deleteGroup(shred.GroupID) // then mark the block as completed at time.Now() - p.completedBlocksMu.Lock() - p.completedBlocks[string(shred.BlockHash)] = time.Now() - p.completedBlocksMu.Unlock() + p.setCompleted(shred.BlockHash) } return nil } +// cleanupStaleGroups removes groups that have been inactive for longer than the cleanup interval. func (p *Processor) cleanupStaleGroups(now time.Time) { var deleteHashes []string @@ -209,3 +170,69 @@ func (p *Processor) cleanupStaleGroups(now time.Time) { p.groupsMu.Unlock() } } + +// initGroup creates a new group and adds the first shred to it. +func (p *Processor) initGroup(shred *gturbine.Shred) error { + now := time.Now() + group := &ShredGroup{ + DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds), + RecoveryShreds: make([]*gturbine.Shred, shred.TotalRecoveryShreds), + TotalDataShreds: shred.TotalDataShreds, + TotalRecoveryShreds: shred.TotalRecoveryShreds, + GroupID: shred.GroupID, + BlockHash: shred.BlockHash, + Height: shred.Height, + OriginalSize: shred.FullDataSize, + } + + group.DataShreds[shred.Index] = shred + + p.groupsMu.Lock() + + if _, ok := p.groups[shred.GroupID]; ok { + // If a group already exists, return early to avoid overwriting + p.groupsMu.Unlock() + + // Collect the shred into the existing group + return p.CollectShred(shred) + } + + defer p.groupsMu.Unlock() + + p.groups[shred.GroupID] = &ShredGroupWithTimestamp{ + ShredGroup: group, + Timestamp: now, + } + + return nil +} + +// getGroup returns the group with the given ID, if it exists. +func (p *Processor) getGroup(groupID string) (*ShredGroupWithTimestamp, bool) { + p.groupsMu.RLock() + defer p.groupsMu.RUnlock() + group, ok := p.groups[groupID] + return group, ok +} + +// deleteGroup removes the group with the given ID from the processor. +func (p *Processor) deleteGroup(groupID string) { + p.groupsMu.Lock() + defer p.groupsMu.Unlock() + delete(p.groups, groupID) +} + +// setCompleted marks a block as completed. +func (p *Processor) setCompleted(blockHash []byte) { + p.completedBlocksMu.Lock() + defer p.completedBlocksMu.Unlock() + p.completedBlocks[string(blockHash)] = time.Now() +} + +// isCompleted checks if a block has been marked as completed. +func (p *Processor) isCompleted(blockHash []byte) bool { + p.completedBlocksMu.RLock() + defer p.completedBlocksMu.RUnlock() + _, ok := p.completedBlocks[string(blockHash)] + return ok +} diff --git a/gturbine/gtshred/processor_test.go b/gturbine/gtshred/processor_test.go index e793d52..15e3293 100644 --- a/gturbine/gtshred/processor_test.go +++ b/gturbine/gtshred/processor_test.go @@ -10,7 +10,8 @@ func TestProcessorMemoryCleanup(t *testing.T) { // Create processor with short cleanup interval for testing var cb = new(testProcessorCallback) cleanupInterval := 100 * time.Millisecond - p := NewProcessor(context.Background(), cb, cleanupInterval) + p := NewProcessor(cb, cleanupInterval) + go p.RunBackgroundCleanup(context.Background()) // Create a test block and shred group block := []byte("test block data") diff --git a/gturbine/turbine_test.go b/gturbine/turbine_test.go index d5cba4d..07c0758 100644 --- a/gturbine/turbine_test.go +++ b/gturbine/turbine_test.go @@ -66,7 +66,8 @@ func newTestNode(t *testing.T, basePort int) *testNode { cb := &testBlockHandler{} - processor := gtshred.NewProcessor(context.Background(), cb, time.Minute) + processor := gtshred.NewProcessor(cb, time.Minute) + go processor.RunBackgroundCleanup(context.Background()) shredHandler := &testShredHandler{} node := &testNode{