Skip to content

Commit

Permalink
improve perf
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Dec 6, 2024
1 parent e2e3e70 commit 887aae7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 37 deletions.
33 changes: 25 additions & 8 deletions gturbine/gtshred/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewProcessor(ctx context.Context, cb ProcessorCallback, cleanupInterval tim
return p
}

// CollectDataShred processes an incoming data shred
// CollectShred processes an incoming data shred
func (p *Processor) CollectShred(shred *gturbine.Shred) error {
if shred == nil {
return fmt.Errorf("nil shred")
Expand All @@ -91,6 +91,7 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
p.groupsMu.RUnlock()

if !ok {
// TODO use existing shredgroups if they have already been allocated to save memory
// If the group doesn't exist, create it and add the shred
group := &ShredGroupWithTimestamp{
ShredGroup: &ShredGroup{
Expand All @@ -116,7 +117,20 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
return nil
}

full, err := group.CollectShred(shred)
group.mu.Lock()
defer group.mu.Unlock()

// After locking the group, check if the block has already been completed
p.completedBlocksMu.RLock()
// Skip shreds from already processed blocks
_, completed = p.completedBlocks[string(group.BlockHash)]
p.completedBlocksMu.RUnlock()

if completed {
return nil
}

full, err := group.collectShred(shred)
if err != nil {
return fmt.Errorf("failed to collect data shred: %w", err)
}
Expand All @@ -125,20 +139,24 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
if err != nil {
return fmt.Errorf("failed to create encoder: %w", err)
}
block, err := group.ReconstructBlock(encoder)

block, err := group.reconstructBlock(encoder)
if err != nil {
return fmt.Errorf("failed to reconstruct block: %w", err)
}

if err := p.cb.ProcessBlock(shred.Height, shred.BlockHash, block); err != nil {
return fmt.Errorf("failed to process block: %w", err)
}

p.groupsMu.Lock()
delete(p.groups, group.GroupID)
p.groupsMu.Unlock()

// then mark the block as completed at time.Now()
p.completedBlocksMu.Lock()
p.completedBlocks[string(shred.BlockHash)] = time.Now()

// Reset the group before removing it (allows GC to collect old shreds)
group.Reset()
delete(p.groups, group.GroupID)
p.completedBlocksMu.Unlock()
}
return nil
}
Expand Down Expand Up @@ -186,7 +204,6 @@ func (p *Processor) cleanupStaleGroups(now time.Time) {
// Take write lock once for all deletions
p.groupsMu.Lock()
for _, id := range deleteGroups {
p.groups[id].Reset() // TODO: is this necessary?
delete(p.groups, id)
}
p.groupsMu.Unlock()
Expand Down
33 changes: 4 additions & 29 deletions gturbine/gtshred/shred_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,8 @@ func (g *ShredGroup) isFull() bool {
return valid >= g.TotalDataShreds
}

// ReconstructBlock attempts to reconstruct the original block from available shreds
func (g *ShredGroup) ReconstructBlock(encoder *gtencoding.Encoder) ([]byte, error) {
g.mu.Lock()
defer g.mu.Unlock()

// reconstructBlock attempts to reconstruct the original block from available shreds
func (g *ShredGroup) reconstructBlock(encoder *gtencoding.Encoder) ([]byte, error) {
// Extract data bytes for erasure coding
allBytes := make([][]byte, len(g.DataShreds)+len(g.RecoveryShreds))

Expand Down Expand Up @@ -192,8 +189,8 @@ func (g *ShredGroup) ReconstructBlock(encoder *gtencoding.Encoder) ([]byte, erro
return reconstructed, nil
}

// CollectDataShred adds a data shred to the group
func (g *ShredGroup) CollectShred(shred *gturbine.Shred) (bool, error) {
// collectShred adds a data shred to the group
func (g *ShredGroup) collectShred(shred *gturbine.Shred) (bool, error) {
if shred == nil {
return false, fmt.Errorf("nil shred")
}
Expand All @@ -209,9 +206,6 @@ func (g *ShredGroup) CollectShred(shred *gturbine.Shred) (bool, error) {
return false, fmt.Errorf("block hash mismatch")
}

g.mu.Lock()
defer g.mu.Unlock()

switch shred.Type {
case gturbine.DataShred:
// Validate shred index
Expand All @@ -233,22 +227,3 @@ func (g *ShredGroup) CollectShred(shred *gturbine.Shred) (bool, error) {

return g.isFull(), nil
}

// Reset clears the ShredGroup data while maintaining allocated memory
func (g *ShredGroup) Reset() {
g.mu.Lock()
defer g.mu.Unlock()

g.GroupID = uuid.New().String()
g.BlockHash = g.BlockHash[:0]
g.Height = 0
g.OriginalSize = 0

// Clear but keep underlying arrays
for i := range g.DataShreds {
g.DataShreds[i] = nil
}
for i := range g.RecoveryShreds {
g.RecoveryShreds[i] = nil
}
}

0 comments on commit 887aae7

Please sign in to comment.