Skip to content

Commit

Permalink
improve locking in store
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Apr 1, 2024
1 parent 6f5db44 commit 7ea43f8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
33 changes: 24 additions & 9 deletions share/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,25 @@ func NewStore(params *Parameters, basePath string) (*Store, error) {
return nil, err
}

// ensure blocks folder
if err := ensureFolder(basePath + blocksPath); err != nil {
return nil, fmt.Errorf("ensure blocks folder: %w", err)
// Ensure the blocks folder exists or is created.
blocksFolderPath := basePath + blocksPath
if err := ensureFolder(blocksFolderPath); err != nil {
log.Errorf("Failed to ensure the existence of the blocks folder at '%s': %s", blocksFolderPath, err)
return nil, fmt.Errorf("ensure blocks folder '%s': %w", blocksFolderPath, err)
}

// ensure heights folder
if err := ensureFolder(basePath + heightsPath); err != nil {
return nil, fmt.Errorf("ensure blocks folder: %w", err)
// Ensure the heights folder exists or is created.
heightsFolderPath := basePath + heightsPath
if err := ensureFolder(heightsFolderPath); err != nil {
log.Errorf("Failed to ensure the existence of the heights folder at '%s': %s", heightsFolderPath, err)
return nil, fmt.Errorf("ensure heights folder '%s': %w", heightsFolderPath, err)
}

// ensure empty heights file
if err := ensureFile(basePath + emptyHeightsFile); err != nil {
return nil, fmt.Errorf("ensure empty heights file: %w", err)
// Ensure the empty heights file exists or is created.
emptyHeightsFilePath := basePath + emptyHeightsFile
if err := ensureFile(emptyHeightsFilePath); err != nil {
log.Errorf("Failed to ensure the empty heights file at '%s': %s", emptyHeightsFilePath, err)
return nil, fmt.Errorf("ensure empty heights file '%s': %w", emptyHeightsFilePath, err)
}

recentBlocksCache, err := cache.NewFileCache("recent", 1)
Expand Down Expand Up @@ -355,6 +361,12 @@ func (s *Store) remove(height uint64) error {
return fmt.Errorf("removing from cache: %w", err)
}

// additionally lock by datahash to prevent concurrent access to the same underlying file
// using links from different heights
dlock := s.stripLock.byDatahash(f.DataHash())
dlock.Lock()
defer dlock.Unlock()

// remove hard link by height
heightPath := s.basepath + heightsPath + fmt.Sprintf("%d", height)
if err = os.Remove(heightPath); err != nil {
Expand Down Expand Up @@ -452,6 +464,9 @@ func (s *Store) storeEmptyHeights() error {
}
defer utils.CloseAndLog(log, "empty heights file", file)

s.emptyHeightsLock.RLock()
defer s.emptyHeightsLock.RUnlock()

encoder := gob.NewEncoder(file)
if err := encoder.Encode(s.emptyHeights); err != nil {
return fmt.Errorf("encoding empty heights: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions share/store/striplock.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package store

import (
"encoding/binary"
"sync"

"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -33,8 +32,9 @@ func (l *striplock) byHeight(height uint64) *sync.RWMutex {
}

func (l *striplock) byDatahash(datahash share.DataHash) *sync.RWMutex {
key := binary.LittleEndian.Uint16(datahash[len(datahash)-3:])
lkIdx := key % uint16(len(l.datahashes))
// Use the last 2 bytes of the datahash as hash to distribute the locks
last := uint16(datahash[len(datahash)-1]) | uint16(datahash[len(datahash)-2])<<8
lkIdx := last % uint16(len(l.datahashes))
return l.datahashes[lkIdx]
}

Expand Down

0 comments on commit 7ea43f8

Please sign in to comment.