-
Notifications
You must be signed in to change notification settings - Fork 1
/
memproxy.go
186 lines (148 loc) · 4.14 KB
/
memproxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package memproxy
import (
"context"
"time"
"unsafe"
)
// Memcache represents a generic Memcache interface
// implementations of this interface must be thread safe
type Memcache interface {
// Pipeline creates a Pipeline, a NON thread safe object
Pipeline(ctx context.Context, options ...PipelineOption) Pipeline
// Close shutdowns memcache client
Close() error
}
// LeaseGetResult is the response of LeaseGet, method Result MUST only be called Once.
// Calling Result more than once is undefined behaviour
type LeaseGetResult interface {
Result() (LeaseGetResponse, error)
}
// LeaseGetErrorResult for error only result
type LeaseGetErrorResult struct {
Error error
}
// Result ...
func (r LeaseGetErrorResult) Result() (LeaseGetResponse, error) {
return LeaseGetResponse{}, r.Error
}
// LeaseGetResultFunc for function implementation of LeaseGetResult
type LeaseGetResultFunc func() (LeaseGetResponse, error)
// Result ...
func (f LeaseGetResultFunc) Result() (LeaseGetResponse, error) {
return f()
}
// Pipeline represents a generic Pipeline
type Pipeline interface {
// LeaseGet should not be used directly, use the item or mmap package instead
LeaseGet(key string, options LeaseGetOptions) LeaseGetResult
LeaseSet(key string, data []byte, cas uint64, options LeaseSetOptions) func() (LeaseSetResponse, error)
Delete(key string, options DeleteOptions) func() (DeleteResponse, error)
// Execute flush commands to the network
Execute()
// Finish must be called after create a Pipeline, often by defer
Finish()
// LowerSession returns a lower priority session
LowerSession() Session
}
// SessionProvider for controlling delayed tasks, this object is Thread Safe
type SessionProvider interface {
New() Session
}
// CallbackFunc for session
type CallbackFunc struct {
Object unsafe.Pointer
Func func(obj unsafe.Pointer)
}
// Call ...
func (f CallbackFunc) Call() {
f.Func(f.Object)
}
// NewEmptyCallback creates CallbackFunc from empty args function
func NewEmptyCallback(fn func()) CallbackFunc {
return CallbackFunc{
Object: nil,
Func: func(_ unsafe.Pointer) {
fn()
},
}
}
// Session controlling session values & delayed tasks, this object is NOT Thread Safe
type Session interface {
AddNextCall(fn CallbackFunc)
AddDelayedCall(d time.Duration, fn CallbackFunc)
Execute()
GetLower() Session
}
// LeaseGetOptions lease get options
type LeaseGetOptions struct {
}
// LeaseGetStatus status of lease get
type LeaseGetStatus uint32
const (
// LeaseGetStatusFound returns Data
LeaseGetStatusFound LeaseGetStatus = iota + 1
// LeaseGetStatusLeaseGranted lease granted
LeaseGetStatusLeaseGranted
// LeaseGetStatusLeaseRejected lease rejected
LeaseGetStatusLeaseRejected
)
// LeaseGetResponse lease get response
type LeaseGetResponse struct {
Status LeaseGetStatus
CAS uint64
Data []byte
}
// LeaseSetOptions lease set options
type LeaseSetOptions struct {
TTL uint32
}
// LeaseSetStatus ...
type LeaseSetStatus uint32
const (
// LeaseSetStatusStored ...
LeaseSetStatusStored LeaseSetStatus = iota + 1
// LeaseSetStatusNotStored NOT stored because of key already been deleted or CAS has changed
LeaseSetStatusNotStored
)
// LeaseSetResponse lease set response
type LeaseSetResponse struct {
Status LeaseSetStatus
}
// DeleteOptions delete options
type DeleteOptions struct {
}
// DeleteResponse delete response
type DeleteResponse struct {
}
// ==============================================
// Pipeline Options
// ==============================================
// PipelineConfig ...
type PipelineConfig struct {
existingSess Session
}
// GetSession ...
func (c *PipelineConfig) GetSession(provider SessionProvider) Session {
if c.existingSess != nil {
return c.existingSess
}
return provider.New()
}
// ComputePipelineConfig ...
func ComputePipelineConfig(options []PipelineOption) *PipelineConfig {
conf := &PipelineConfig{
existingSess: nil,
}
for _, fn := range options {
fn(conf)
}
return conf
}
// PipelineOption ...
type PipelineOption func(conf *PipelineConfig)
// WithPipelineExistingSession ...
func WithPipelineExistingSession(sess Session) PipelineOption {
return func(conf *PipelineConfig) {
conf.existingSess = sess
}
}