Skip to content

Commit

Permalink
use map w mu
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Dec 5, 2024
1 parent 4a6d3c3 commit e0c3833
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 28 deletions.
41 changes: 14 additions & 27 deletions gturbine/shredding/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const (
)

type Processor struct {
groups sync.Map // string -> *ShredGroup
groups map[string]*ShredGroup
mu sync.Mutex
cb ProcessorCallback
}

Expand All @@ -36,7 +37,9 @@ func (p *Processor) CollectDataShred(shred *gturbine.Shred) error {
return fmt.Errorf("nil shred")
}

value, ok := p.groups.Load(shred.GroupID)
p.mu.Lock()
defer p.mu.Unlock()
group, ok := p.groups[shred.GroupID]
if !ok {
group := &ShredGroup{
DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds),
Expand All @@ -49,13 +52,11 @@ func (p *Processor) CollectDataShred(shred *gturbine.Shred) error {
OriginalSize: shred.FullDataSize,
}
group.DataShreds[shred.Index] = shred
p.groups.Store(shred.GroupID, group)

p.groups[shred.GroupID] = group
return nil
}

// Get or create the shred group
group := value.(*ShredGroup)

full, err := group.CollectDataShred(shred)
if err != nil {
return fmt.Errorf("failed to collect data shred: %w", err)
Expand All @@ -72,7 +73,7 @@ func (p *Processor) CollectDataShred(shred *gturbine.Shred) error {
if err := p.cb.ProcessBlock(shred.Height, block); err != nil {
return fmt.Errorf("failed to process block: %w", err)
}
p.DeleteGroup(group.GroupID)
delete(p.groups, group.GroupID)
}
return nil
}
Expand All @@ -83,7 +84,10 @@ func (p *Processor) CollectRecoveryShred(shred *gturbine.Shred) error {
return fmt.Errorf("nil shred")
}

value, ok := p.groups.Load(shred.GroupID)
p.mu.Lock()
defer p.mu.Unlock()

group, ok := p.groups[shred.GroupID]
if !ok {
group := &ShredGroup{
DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds),
Expand All @@ -96,13 +100,10 @@ func (p *Processor) CollectRecoveryShred(shred *gturbine.Shred) error {
OriginalSize: shred.FullDataSize,
}
group.RecoveryShreds[shred.Index] = shred
p.groups.Store(shred.GroupID, group)
p.groups[shred.GroupID] = group
return nil
}

// Get or create the shred group
group := value.(*ShredGroup)

full, err := group.CollectRecoveryShred(shred)
if err != nil {
return fmt.Errorf("failed to collect recovery shred: %w", err)
Expand All @@ -119,21 +120,7 @@ func (p *Processor) CollectRecoveryShred(shred *gturbine.Shred) error {
if err := p.cb.ProcessBlock(shred.Height, block); err != nil {
return fmt.Errorf("failed to process block: %w", err)
}
p.DeleteGroup(group.GroupID)
delete(p.groups, group.GroupID)
}
return err
}

// GetGroup retrieves a shred group by its ID
func (p *Processor) GetGroup(groupID string) (*ShredGroup, bool) {
value, exists := p.groups.Load(groupID)
if !exists {
return nil, false
}
return value.(*ShredGroup), true
}

// DeleteGroup removes a shred group
func (p *Processor) DeleteGroup(groupID string) {
p.groups.Delete(groupID)
}
2 changes: 1 addition & 1 deletion gturbine/shredding/shred_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ShredGroup struct {
OriginalSize int
}

// FromBlock creates a new ShredGroup from a block of data
// NewShredGroup creates a new ShredGroup from a block of data
func NewShredGroup(block []byte, height uint64, dataShreds, recoveryShreds int, chunkSize uint32) (*ShredGroup, error) {
if len(block) == 0 {
return nil, fmt.Errorf("empty block")
Expand Down

0 comments on commit e0c3833

Please sign in to comment.