-
Notifications
You must be signed in to change notification settings - Fork 21
/
generator.go
139 lines (112 loc) · 2.64 KB
/
generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package katsubushi
import (
"errors"
"sync"
"time"
)
var nowFunc = time.Now
var nowMutex sync.RWMutex
func setNowFunc(f func() time.Time) {
nowMutex.Lock()
defer nowMutex.Unlock()
nowFunc = f
}
func now() time.Time {
nowMutex.RLock()
defer nowMutex.RUnlock()
return nowFunc()
}
// Epoch is katsubushi epoch time (2015-01-01 00:00:00 UTC)
// Generated ID includes elapsed time from Epoch.
var Epoch = time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)
// for bitshift
const (
WorkerIDBits = 10
SequenceBits = 12
workerIDMask = -1 ^ (-1 << WorkerIDBits)
sequenceMask = -1 ^ (-1 << SequenceBits)
)
var workerIDPool = []uint{}
var newGeneratorLock sync.Mutex
// errors
var (
ErrInvalidWorkerID = errors.New("invalid worker id")
ErrDuplicatedWorkerID = errors.New("duplicated worker")
)
func checkWorkerID(id uint) error {
if workerIDMask < id {
return ErrInvalidWorkerID
}
for _, otherID := range workerIDPool {
if id == otherID {
return ErrDuplicatedWorkerID
}
}
return nil
}
// Generator is an interface to generate unique ID.
type Generator interface {
NextID() (uint64, error)
WorkerID() uint
}
type generator struct {
workerID uint
lastTimestamp uint64
sequence uint
lock sync.Mutex
startedAt time.Time
offset time.Duration
}
// NewGenerator returns new generator.
func NewGenerator(workerID uint) (Generator, error) {
// To keep worker ID be unique.
newGeneratorLock.Lock()
defer newGeneratorLock.Unlock()
if err := checkWorkerID(workerID); err != nil {
return nil, err
}
// save as already used
workerIDPool = append(workerIDPool, workerID)
n := now()
return &generator{
workerID: workerID,
startedAt: n,
offset: n.Sub(Epoch),
}, nil
}
func (g *generator) WorkerID() uint {
return g.workerID
}
// NextID generate new ID.
func (g *generator) NextID() (uint64, error) {
g.lock.Lock()
defer g.lock.Unlock()
ts := g.timestamp()
// for rewind of server clock
if ts < g.lastTimestamp {
return 0, errors.New("system clock was rollbacked")
}
if ts == g.lastTimestamp {
g.sequence = (g.sequence + 1) & sequenceMask
if g.sequence == 0 {
// overflow
ts = g.waitUntilNextTick(ts)
}
} else {
g.sequence = 0
}
g.lastTimestamp = ts
return (g.lastTimestamp << (WorkerIDBits + SequenceBits)) | (uint64(g.workerID) << SequenceBits) | (uint64(g.sequence)), nil
}
func (g *generator) timestamp() uint64 {
d := now().Sub(g.startedAt) + g.offset
return uint64(d.Nanoseconds()) / uint64(time.Millisecond)
}
func (g *generator) waitUntilNextTick(ts uint64) uint64 {
next := g.timestamp()
for next <= ts {
next = g.timestamp()
time.Sleep(50 * time.Nanosecond)
}
return next
}