From e0c38333af43ecde2ea6a476e098d213a4103916 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Thu, 5 Dec 2024 15:10:50 -0700 Subject: [PATCH] use map w mu --- gturbine/shredding/processor.go | 41 +++++++++++-------------------- gturbine/shredding/shred_group.go | 2 +- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/gturbine/shredding/processor.go b/gturbine/shredding/processor.go index b46625a..7d5c08c 100644 --- a/gturbine/shredding/processor.go +++ b/gturbine/shredding/processor.go @@ -16,7 +16,8 @@ const ( ) type Processor struct { - groups sync.Map // string -> *ShredGroup + groups map[string]*ShredGroup + mu sync.Mutex cb ProcessorCallback } @@ -36,7 +37,9 @@ func (p *Processor) CollectDataShred(shred *gturbine.Shred) error { return fmt.Errorf("nil shred") } - value, ok := p.groups.Load(shred.GroupID) + p.mu.Lock() + defer p.mu.Unlock() + group, ok := p.groups[shred.GroupID] if !ok { group := &ShredGroup{ DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds), @@ -49,13 +52,11 @@ func (p *Processor) CollectDataShred(shred *gturbine.Shred) error { OriginalSize: shred.FullDataSize, } group.DataShreds[shred.Index] = shred - p.groups.Store(shred.GroupID, group) + + p.groups[shred.GroupID] = group return nil } - // Get or create the shred group - group := value.(*ShredGroup) - full, err := group.CollectDataShred(shred) if err != nil { return fmt.Errorf("failed to collect data shred: %w", err) @@ -72,7 +73,7 @@ func (p *Processor) CollectDataShred(shred *gturbine.Shred) error { if err := p.cb.ProcessBlock(shred.Height, block); err != nil { return fmt.Errorf("failed to process block: %w", err) } - p.DeleteGroup(group.GroupID) + delete(p.groups, group.GroupID) } return nil } @@ -83,7 +84,10 @@ func (p *Processor) CollectRecoveryShred(shred *gturbine.Shred) error { return fmt.Errorf("nil shred") } - value, ok := p.groups.Load(shred.GroupID) + p.mu.Lock() + defer p.mu.Unlock() + + group, ok := p.groups[shred.GroupID] if !ok { group := &ShredGroup{ DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds), @@ -96,13 +100,10 @@ func (p *Processor) CollectRecoveryShred(shred *gturbine.Shred) error { OriginalSize: shred.FullDataSize, } group.RecoveryShreds[shred.Index] = shred - p.groups.Store(shred.GroupID, group) + p.groups[shred.GroupID] = group return nil } - // Get or create the shred group - group := value.(*ShredGroup) - full, err := group.CollectRecoveryShred(shred) if err != nil { return fmt.Errorf("failed to collect recovery shred: %w", err) @@ -119,21 +120,7 @@ func (p *Processor) CollectRecoveryShred(shred *gturbine.Shred) error { if err := p.cb.ProcessBlock(shred.Height, block); err != nil { return fmt.Errorf("failed to process block: %w", err) } - p.DeleteGroup(group.GroupID) + delete(p.groups, group.GroupID) } return err } - -// GetGroup retrieves a shred group by its ID -func (p *Processor) GetGroup(groupID string) (*ShredGroup, bool) { - value, exists := p.groups.Load(groupID) - if !exists { - return nil, false - } - return value.(*ShredGroup), true -} - -// DeleteGroup removes a shred group -func (p *Processor) DeleteGroup(groupID string) { - p.groups.Delete(groupID) -} diff --git a/gturbine/shredding/shred_group.go b/gturbine/shredding/shred_group.go index cdd44ca..94e52da 100644 --- a/gturbine/shredding/shred_group.go +++ b/gturbine/shredding/shred_group.go @@ -20,7 +20,7 @@ type ShredGroup struct { OriginalSize int } -// FromBlock creates a new ShredGroup from a block of data +// NewShredGroup creates a new ShredGroup from a block of data func NewShredGroup(block []byte, height uint64, dataShreds, recoveryShreds int, chunkSize uint32) (*ShredGroup, error) { if len(block) == 0 { return nil, fmt.Errorf("empty block")