forked from jrallison/go-workers
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathenqueue.go
153 lines (128 loc) · 3.78 KB
/
enqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package workers
import (
"crypto/rand"
"encoding/json"
"fmt"
"io"
"time"
"github.com/gomodule/redigo/redis"
)
const (
NanoSecondPrecision = 1000000000.0
)
type EnqueueData struct {
Queue string `json:"queue,omitempty"`
Class string `json:"class"`
Args interface{} `json:"args"`
Jid string `json:"jid"`
EnqueuedAt float64 `json:"enqueued_at"`
EnqueueOptions
}
type EnqueueDataProxy EnqueueData
func (e EnqueueData) MarshalJSON() ([]byte, error) {
o := e.EnqueueOptions
if e.EnqueueOptions.RetryCount > 0 {
s := struct {
EnqueueDataProxy
Retry int `json:"retry,omitempty"`
RetryCount int `json:"retry_count"`
At float64 `json:"at,omitempty"`
}{EnqueueDataProxy(e), o.RetryCount, 0, o.At}
return json.Marshal(s)
}
return json.Marshal(struct {
EnqueueDataProxy
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
At float64 `json:"at,omitempty"`
}{EnqueueDataProxy(e), o.RetryCount, o.Retry, o.At})
}
type EnqueueOptions struct {
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
RetryMax int `json:"retry_max,omitempty"`
At float64 `json:"at,omitempty"`
RetryOptions RetryOptions `json:"retry_options,omitempty"`
ConnectionOptions map[string]string `json:"connection_options,omitempty"`
}
type RetryOptions struct {
Exp int `json:"exp"`
MinDelay int `json:"min_delay"`
MaxDelay int `json:"max_delay"`
MaxRand int `json:"max_rand"`
}
func generateJid() string {
// Return 12 random bytes as 24 character hex
b := make([]byte, 12)
_, err := io.ReadFull(rand.Reader, b)
if err != nil {
return ""
}
return fmt.Sprintf("%x", b)
}
func Enqueue(queue, class string, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision()})
}
func EnqueueIn(queue, class string, in float64, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision() + in})
}
func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at)})
}
func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) {
now := nowToSecondsWithNanoPrecision()
data := EnqueueData{
Queue: queue,
Class: class,
Args: args,
Jid: generateJid(),
EnqueuedAt: now,
EnqueueOptions: opts,
}
bytes, err := json.Marshal(data)
if err != nil {
return "", err
}
if now < opts.At {
err := enqueueAt(data.At, bytes)
return data.Jid, err
}
var conn redis.Conn
if len(opts.ConnectionOptions) == 0 {
conn = Config.Pool.Get()
} else {
conn = GetConnectionPool(opts.ConnectionOptions).Get()
}
defer conn.Close()
_, err = conn.Do("sadd", Config.Namespace+"queues", queue)
if err != nil {
return "", err
}
queue = Config.Namespace + "queue:" + queue
_, err = conn.Do("lpush", queue, bytes)
if err != nil {
return "", err
}
return data.Jid, nil
}
func enqueueAt(at float64, bytes []byte) error {
conn := Config.Pool.Get()
defer conn.Close()
_, err := conn.Do(
"zadd",
Config.Namespace+Config.ScheduleKey, at, bytes,
)
if err != nil {
return err
}
return nil
}
func timeToSecondsWithNanoPrecision(t time.Time) float64 {
return float64(t.UnixNano()) / NanoSecondPrecision
}
func durationToSecondsWithNanoPrecision(d time.Duration) float64 {
return float64(d.Nanoseconds()) / NanoSecondPrecision
}
func nowToSecondsWithNanoPrecision() float64 {
return timeToSecondsWithNanoPrecision(time.Now())
}