-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathAbstractQueue.php
415 lines (360 loc) · 13.9 KB
/
AbstractQueue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
<?php
/**
* Defines the TraderInteractive\Mongo\Queue class.
*/
namespace TraderInteractive\Mongo;
use ArrayObject;
use MongoDB\BSON\ObjectID;
use MongoDB\BSON\UTCDateTime;
use MongoDB\Operation\FindOneAndUpdate;
/**
* Abstraction of mongo db collection as priority queue.
*
* Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
* this purpose). Using a random priority achieves random get()
*/
abstract class AbstractQueue
{
/**
* Maximum millisecond value to use for UTCDateTime creation.
*
* @var integer
*/
const MONGO_INT32_MAX = PHP_INT_MAX;
/**
* mongo collection to use for queue.
*
* @var \MongoDB\Collection
*/
protected $collection;
/**
* @var array
*/
const FIND_ONE_AND_UPDATE_OPTIONS = [
'sort' => ['priority' => 1, 'created' => 1],
'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
];
/**
* @var integer
*/
const DEFAULT_MAX_NUMBER_OF_MESSAGES = 1;
/**
* @var integer
*/
const DEFAULT_RUNNING_RESET_DURATION = 600000;
/**
* @var integer
*/
const DEFAULT_WAIT_DURATION_IN_MILLISECONDS = 3000;
/**
* @var integer
*/
const DEFAULT_POLL_DURATION_IN_MILLISECONDS = 200;
/**
* @var array
*/
const DEFAULT_GET_OPTIONS = [
'maxNumberOfMessages' => self::DEFAULT_MAX_NUMBER_OF_MESSAGES,
'runningResetDuration' => self::DEFAULT_RUNNING_RESET_DURATION,
'waitDurationInMillis' => self::DEFAULT_WAIT_DURATION_IN_MILLISECONDS,
'pollDurationInMillis' => self::DEFAULT_POLL_DURATION_IN_MILLISECONDS,
];
/**
* Ensure an index for the get() method.
*
* @param array $beforeSort Fields in get() call to index before the sort field in same format
* as \MongoDB\Collection::ensureIndex()
* @param array $afterSort Fields in get() call to index after the sort field in same format as
* \MongoDB\Collection::ensureIndex()
*
* @return void
*
* @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
* @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
*/
final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
{
//using general rule: equality, sort, range or more equality tests in that order for index
$completeFields = ['earliestGet' => 1];
$this->verifySort($beforeSort, 'beforeSort', $completeFields);
$completeFields['priority'] = 1;
$completeFields['created'] = 1;
$this->verifySort($afterSort, 'afterSort', $completeFields);
//for the main query in get()
$this->ensureIndex($completeFields);
}
/**
* Ensure an index for the count() method.
* Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
* call it first.
*
* @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
* @param bool $includeRunning whether to include the running field in the index
*
* @return void
*
* @throws \InvalidArgumentException key in $fields was not a string
* @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
*/
final public function ensureCountIndex(array $fields, bool $includeRunning)
{
$completeFields = [];
if ($includeRunning) {
$completeFields['earliestGet'] = 1;
}
$this->verifySort($fields, 'fields', $completeFields);
$this->ensureIndex($completeFields);
}
/**
* Get a non running message from the queue.
*
* @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
* operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
* invalid {$and: [{...}, {...}]}
* @param array $options Associative array of get options.
* runningResetDuration => integer
* The duration (in miiliseconds) that the received messages are hidden from
* subsequent retrieve requests after being retrieved by a get() request.
* waitDurationInMillis => integer
* The duration (in milliseconds) for which the call will wait for a message to
* arrive in the queue before returning. If a message is available, the call will
* return sooner than WaitTimeSeconds.
* pollDurationInMillis => integer
* The millisecond duration to wait between polls.
* maxNumberOfMessages => integer
* The maximum number of messages to return with get(). All of the messages are not
* necessarily returned.
*
* @return array Array of messages.
*
* @throws \InvalidArgumentException key in $query was not a string
*/
final public function get(array $query, array $options = []) : array
{
$completeQuery = $this->buildPayloadQuery(
['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
$query
);
$options += static::DEFAULT_GET_OPTIONS;
$update = ['$set' => ['earliestGet' => $this->calculateEarliestGet($options['runningResetDuration'])]];
$end = $this->calculateEndTime($options['waitDurationInMillis']);
$sleepTime = $this->calculateSleepTime($options['pollDurationInMillis']);
$messages = new ArrayObject();
while (count($messages) < $options['maxNumberOfMessages']) {
if ($this->tryFindOneAndUpdate($completeQuery, $update, $messages)) {
continue;
}
if (microtime(true) < $end) {
usleep($sleepTime);
}
break;
}
return $messages->getArrayCopy();
}
/**
* Count queue messages.
*
* @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
* operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
* invalid {$and: [{...}, {...}]}
* @param bool|null $running query a running message or not or all
*
* @return int the count
*
* @throws \InvalidArgumentException key in $query was not a string
*/
final public function count(array $query, bool $running = null) : int
{
$totalQuery = [];
if ($running === true || $running === false) {
$key = $running ? '$gt' : '$lte';
$totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
}
return $this->collection->countDocuments($this->buildPayloadQuery($totalQuery, $query));
}
/**
* Acknowledge a message was processed and remove from queue.
*
* @param Message $message message received from get()
*
* @return void
*/
final public function ack(Message $message)
{
$this->collection->deleteOne(['_id' => $message->getId()]);
}
/**
* Atomically acknowledge and send a message to the queue.
*
* @param Message $message message received from get().
*
* @return void
*/
final public function requeue(Message $message)
{
$set = [
'payload' => $message->getPayload(),
'earliestGet' => $message->getEarliestGet(),
'priority' => $message->getPriority(),
'machineName' => gethostname(),
'created' => new UTCDateTime(),
];
$this->collection->updateOne(['_id' => $message->getId()], ['$set' => $set], ['upsert' => true]);
}
/**
* Send a message to the queue.
*
* @param Message $message The message to send.
*
* @return void
*/
final public function send(Message $message)
{
$document = [
'_id' => $message->getId(),
'payload' => $message->getPayload(),
'earliestGet' => $message->getEarliestGet(),
'priority' => $message->getPriority(),
'machineName' => gethostname(),
'created' => new UTCDateTime(),
];
$this->collection->insertOne($document);
}
/**
* Ensure index of correct specification and a unique name whether the specification or name already exist or not.
* Will not create index if $index is a prefix of an existing index
*
* @param array $index index to create in same format as \MongoDB\Collection::createIndex()
*
* @return void
*
* @throws \Exception couldnt create index after 5 attempts
*/
private function ensureIndex(array $index)
{
if ($this->isIndexIncludedInExistingIndex($index)) {
return;
}
for ($i = 0; $i < 5; ++$i) {
if ($this->tryCreateIndex($index)) {
return;
}
}
throw new \Exception('couldnt create index after 5 attempts');
//@codeCoverageIgnoreEnd
}
private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
{
foreach ($payloadQuery as $key => $value) {
if (!is_string($key)) {
throw new \InvalidArgumentException('key in $query was not a string');
}
$initialQuery["payload.{$key}"] = $value;
}
return $initialQuery;
}
private function calculateSleepTime(int $pollDurationInMillis) : int
{
$pollDurationInMillis = max($pollDurationInMillis, 0);
$sleepTime = $pollDurationInMillis * 1000;
//ints overflow to floats and already checked $pollDurationInMillis was positive
return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
}
private function calculateEarliestGet(int $runningResetDuration) : UTCDateTime
{
$resetTimestamp = time() + $runningResetDuration;
//ints overflow to floats, max at PHP_INT_MAX
return new UTCDateTime(min(max(0, $resetTimestamp * 1000), static::MONGO_INT32_MAX));
}
private function tryFindOneAndUpdate(array $query, array $update, ArrayObject $messages) : bool
{
$document = $this->collection->findOneAndUpdate($query, $update, static::FIND_ONE_AND_UPDATE_OPTIONS);
if ($document === null) {
return false;
}
$messages[] = new Message(
$document['_id'],
$document['payload'],
$document['earliestGet'],
$document['priority']
);
return true;
}
private function isIndexIncludedInExistingIndex(array $index) : bool
{
//if $index is a prefix of any existing index we are good
foreach ($this->collection->listIndexes() as $existingIndex) {
$slice = array_slice($existingIndex['key'], 0, count($index), true);
if ($slice === $index) {
return true;
}
}
return false;
}
private function tryCreateIndex(array $index) : bool
{
for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
if ($this->tryCreateNamedIndex($index, $name)) {
return true;
}
}
return false;
}
private function tryCreateNamedIndex(array $index, string $name) : bool
{
//creating an index with same name and different spec does nothing.
//creating an index with same spec and different name does nothing.
//so we use any generated name, and then find the right spec after we have called,
//and just go with that name.
try {
$this->collection->createIndex($index, ['name' => $name, 'background' => true]);
} catch (\MongoDB\Exception\Exception $e) {
//this happens when the name was too long, let continue
}
return $this->indexExists($index);
}
private function indexExists(array $index) : bool
{
foreach ($this->collection->listIndexes() as $existingIndex) {
if ($existingIndex['key'] === $index) {
return true;
}
}
return false;
}
/**
* Helper method to validate keys and values for the given sort array
*
* @param array $sort The proposed sort for a mongo index.
* @param string $label The name of the variable given to the public ensureXIndex method.
* @param array &$completedFields The final index array with payload. prefix added to fields.
*
* @return void
*/
private function verifySort(array $sort, string $label, array &$completeFields)
{
foreach ($sort as $key => $value) {
$this->throwIfTrue(!is_string($key), "key in \${$label} was not a string");
$this->throwIfTrue(
$value !== 1 && $value !== -1,
"value of \${$label} is not 1 or -1 for ascending and descending"
);
$completeFields["payload.{$key}"] = $value;
}
}
private function throwIfTrue(
bool $condition,
string $message,
string $exceptionClass = '\\InvalidArgumentException'
) {
if ($condition === true) {
$reflectionClass = new \ReflectionClass($exceptionClass);
throw $reflectionClass->newInstanceArgs([$message]);
}
}
private function calculateEndTime(int $waitDurationInMillis) : float
{
return microtime(true) + ($waitDurationInMillis / 1000.0);
}
}