-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsharder.go
90 lines (74 loc) · 1.73 KB
/
sharder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"crypto/sha1"
"fmt"
"path"
)
// Sharder of keys.
type Sharder interface {
NumShards() uint16
LockPath(shard uint16) string
ShardOf(key string) uint16
Acquire(shard uint16) error
Release(shard uint16) error
}
// NewSharder instance.
func NewSharder(numShards uint16, fs FileSystem, basePath string) Sharder {
sharder := &sharder{
fs: fs,
basePath: basePath,
numShards: numShards,
locks: map[uint16]File{},
}
return sharder
}
type sharder struct {
numShards uint16 // readonly
basePath string // readonly
fs FileSystem
locks map[uint16]File
}
func (s *sharder) NumShards() uint16 { return s.numShards }
func (s *sharder) LockPath(shard uint16) string {
return path.Join(s.basePath, fmt.Sprintf(".shards/%d", shard))
}
func (s *sharder) ShardOf(key string) uint16 {
hash := sha1.Sum([]byte(key))
value := uint16(hash[0]) | uint16(hash[1])<<8 // binary.LittleEndian.Uint16(...)
shard := value % s.numShards
return shard
}
func (s *sharder) Acquire(shard uint16) error {
// done if already acquired (cached)
if _, ok := s.locks[shard]; ok {
return nil
}
// try acquire lock on shard
lockPath := s.LockPath(shard)
f, err := s.fs.OpenFile(lockPath, fileFlags, fileMode)
if err != nil {
return err
}
if err = f.Lock(); err != nil {
f.Close()
return err
}
// cache lock
s.locks[shard] = f
return nil
}
func (s *sharder) Release(shard uint16) error {
// done if not acquired (cached)
f, ok := s.locks[shard]
if !ok {
return nil
}
// try unlock shard
if err := f.Unlock(); err != nil {
return err
}
// remove lock from cache
f.Close()
delete(s.locks, shard)
return nil
}