Skip to content

Commit 7a4fc08

Browse files
committed
Use only earliestGet to avoid collection updates
1 parent b4c31ca commit 7a4fc08

2 files changed

Lines changed: 17 additions & 56 deletions

File tree

src/Queue.php

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public function __construct($collectionOrUrl, $db = null, $collection = null)
7676
public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
7777
{
7878
//using general rule: equality, sort, range or more equality tests in that order for index
79-
$completeFields = ['running' => 1];
79+
$completeFields = ['earliestGet' => 1];
8080

8181
self::verifySort($beforeSort, 'beforeSort', $completeFields);
8282

@@ -85,13 +85,8 @@ public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
8585

8686
self::verifySort($afterSort, 'afterSort', $completeFields);
8787

88-
$completeFields['earliestGet'] = 1;
89-
9088
//for the main query in get()
9189
$this->ensureIndex($completeFields);
92-
93-
//for the stuck messages query in get()
94-
$this->ensureIndex(['running' => 1, 'resetTimestamp' => 1]);
9590
}
9691

9792
/**
@@ -117,7 +112,7 @@ public function ensureCountIndex(array $fields, $includeRunning)
117112
$completeFields = [];
118113

119114
if ($includeRunning) {
120-
$completeFields['running'] = 1;
115+
$completeFields['earliestGet'] = 1;
121116
}
122117

123118
self::verifySort($fields, 'fields', $completeFields);
@@ -160,13 +155,7 @@ public function get(array $query, $runningResetDuration, $waitDurationInMillis =
160155
$pollDurationInMillis = 0;
161156
}
162157

163-
//reset stuck messages
164-
$this->collection->updateMany(
165-
['running' => true, 'resetTimestamp' => ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]],
166-
['$set' => ['running' => false]]
167-
);
168-
169-
$completeQuery = ['running' => false];
158+
$completeQuery = ['earliestGet' => ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]];
170159
foreach ($query as $key => $value) {
171160
if (!is_string($key)) {
172161
throw new \InvalidArgumentException('key in $query was not a string');
@@ -175,8 +164,6 @@ public function get(array $query, $runningResetDuration, $waitDurationInMillis =
175164
$completeQuery["payload.{$key}"] = $value;
176165
}
177166

178-
$completeQuery['earliestGet'] = ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))];
179-
180167
$resetTimestamp = time() + $runningResetDuration;
181168
//ints overflow to floats
182169
if (!is_int($resetTimestamp)) {
@@ -185,7 +172,7 @@ public function get(array $query, $runningResetDuration, $waitDurationInMillis =
185172

186173
$resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
187174

188-
$update = ['$set' => ['resetTimestamp' => new \MongoDB\BSON\UTCDateTime($resetTimestamp), 'running' => true]];
175+
$update = ['$set' => ['earliestGet' => new \MongoDB\BSON\UTCDateTime($resetTimestamp)]];
189176
$options = ['sort' => ['priority' => 1, 'created' => 1]];
190177

191178
//ints overflow to floats, should be fine
@@ -242,8 +229,9 @@ public function count(array $query, $running = null)
242229

243230
$totalQuery = [];
244231

245-
if ($running !== null) {
246-
$totalQuery['running'] = $running;
232+
if ($running === true || $running === false) {
233+
$key = $running ? '$gt' : '$lte';
234+
$totalQuery['earliestGet'] = [$key => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))];
247235
}
248236

249237
foreach ($query as $key => $value) {
@@ -330,8 +318,6 @@ public function ackSend(array $message, array $payload, $earliestGet = 0, $prior
330318

331319
$toSet = [
332320
'payload' => $payload,
333-
'running' => false,
334-
'resetTimestamp' => new \MongoDB\BSON\UTCDateTime(self::MONGO_INT32_MAX),
335321
'earliestGet' => new \MongoDB\BSON\UTCDateTime($earliestGet),
336322
'priority' => $priority,
337323
];
@@ -399,8 +385,6 @@ public function send(array $payload, $earliestGet = 0, $priority = 0.0)
399385

400386
$message = [
401387
'payload' => $payload,
402-
'running' => false,
403-
'resetTimestamp' => new \MongoDB\BSON\UTCDateTime(self::MONGO_INT32_MAX),
404388
'earliestGet' => new \MongoDB\BSON\UTCDateTime($earliestGet),
405389
'priority' => $priority,
406390
'created' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000)),

tests/QueueTest.php

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -64,29 +64,24 @@ public function ensureGetIndex()
6464
$this->queue->ensureGetIndex(['another.sub' => 1]);
6565

6666
$indexes = iterator_to_array($this->collection->listIndexes());
67-
$this->assertSame(4, count($indexes));
67+
$this->assertSame(3, count($indexes));
6868

6969
$expectedOne = [
70-
'running' => 1,
70+
'earliestGet' => 1,
7171
'payload.type' => 1,
7272
'priority' => 1,
7373
'created' => 1,
7474
'payload.boo' => -1,
75-
'earliestGet' => 1
7675
];
7776
$this->assertSame($expectedOne, $indexes[1]['key']);
7877

79-
$expectedTwo = ['running' => 1, 'resetTimestamp' => 1];
80-
$this->assertSame($expectedTwo, $indexes[2]['key']);
81-
82-
$expectedThree = [
83-
'running' => 1,
78+
$expectedTwo = [
79+
'earliestGet' => 1,
8480
'payload.another.sub' => 1,
8581
'priority' => 1,
8682
'created' => 1,
87-
'earliestGet' => 1
8883
];
89-
$this->assertSame($expectedThree, $indexes[3]['key']);
84+
$this->assertSame($expectedTwo, $indexes[2]['key']);
9085
}
9186

9287
/**
@@ -176,7 +171,7 @@ public function ensureCountIndex()
176171
$expectedOne = ['payload.type' => 1, 'payload.boo' => -1];
177172
$this->assertSame($expectedOne, $indexes[1]['key']);
178173

179-
$expectedTwo = ['running' => 1, 'payload.another.sub' => 1];
174+
$expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1];
180175
$this->assertSame($expectedTwo, $indexes[2]['key']);
181176
}
182177

@@ -472,22 +467,19 @@ public function resetStuck()
472467
//sets to running
473468
$this->collection->updateOne(
474469
['payload.key' => 0],
475-
['$set' => ['running' => true, 'resetTimestamp' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]]
470+
['$set' => ['earliestGet' => new \MongoDB\BSON\UTCDateTime(time() * 1000)]]
476471
);
477472
$this->collection->updateOne(
478473
['payload.key' => 1],
479-
['$set' => ['running' => true, 'resetTimestamp' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]]
474+
['$set' => ['earliestGet' => new \MongoDB\BSON\UTCDateTime(time() * 1000)]]
480475
);
481476

482-
$this->assertSame(2, $this->collection->count(['running' => true]));
483-
484-
//sets resetTimestamp on messageOne
485-
$this->queue->get($messageOne, 0, 0);
477+
$this->assertSame(2, $this->collection->count(['earliestGet' => ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]]));
486478

487479
//resets and gets messageOne
488480
$this->assertNotNull($this->queue->get($messageOne, PHP_INT_MAX, 0));
489481

490-
$this->assertSame(1, $this->collection->count(['running' => false]));
482+
$this->assertSame(1, $this->collection->count(['earliestGet' => ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]]));
491483
}
492484

493485
/**
@@ -670,8 +662,6 @@ public function ackSendWithHighEarliestGet()
670662

671663
$expected = [
672664
'payload' => [],
673-
'running' => false,
674-
'resetTimestamp' => Queue::MONGO_INT32_MAX,
675665
'earliestGet' => Queue::MONGO_INT32_MAX,
676666
'priority' => 0.0,
677667
];
@@ -682,7 +672,6 @@ public function ackSendWithHighEarliestGet()
682672
$this->assertGreaterThan(time() - 10, $message['created']->toDateTime()->getTimestamp());
683673

684674
unset($message['_id'], $message['created']);
685-
$message['resetTimestamp'] = (int)$message['resetTimestamp']->__toString();
686675
$message['earliestGet'] = (int)$message['earliestGet']->__toString();
687676

688677
$this->assertSame($expected, $message);
@@ -759,8 +748,6 @@ public function send()
759748

760749
$expected = [
761750
'payload' => $payload,
762-
'running' => false,
763-
'resetTimestamp' => (new UTCDateTime(Queue::MONGO_INT32_MAX))->toDateTime()->getTimestamp(),
764751
'earliestGet' => 34,
765752
'priority' => 0.8,
766753
];
@@ -771,7 +758,6 @@ public function send()
771758
$this->assertGreaterThan(time() - 10, $message['created']->toDateTime()->getTimestamp());
772759

773760
unset($message['_id'], $message['created']);
774-
$message['resetTimestamp'] = $message['resetTimestamp']->toDateTime()->getTimestamp();
775761
$message['earliestGet'] = $message['earliestGet']->toDateTime()->getTimestamp();
776762

777763
$this->assertSame($expected, $message);
@@ -817,8 +803,6 @@ public function sendWithHighEarliestGet()
817803

818804
$expected = [
819805
'payload' => [],
820-
'running' => false,
821-
'resetTimestamp' => (new UTCDateTime(Queue::MONGO_INT32_MAX))->toDateTime()->getTimestamp(),
822806
'earliestGet' => (new UTCDateTime(Queue::MONGO_INT32_MAX))->toDateTime()->getTimestamp(),
823807
'priority' => 0.0,
824808
];
@@ -829,7 +813,6 @@ public function sendWithHighEarliestGet()
829813
$this->assertGreaterThan(time() - 10, $message['created']->toDateTime()->getTimestamp());
830814

831815
unset($message['_id'], $message['created']);
832-
$message['resetTimestamp'] = $message['resetTimestamp']->toDateTime()->getTimestamp();
833816
$message['earliestGet'] = $message['earliestGet']->toDateTime()->getTimestamp();
834817

835818
$this->assertSame($expected, $message);
@@ -845,8 +828,6 @@ public function sendWithLowEarliestGet()
845828

846829
$expected = [
847830
'payload' => [],
848-
'running' => false,
849-
'resetTimestamp' => (new UTCDateTime(Queue::MONGO_INT32_MAX))->toDateTime()->getTimestamp(),
850831
'earliestGet' => 0,
851832
'priority' => 0.0,
852833
];
@@ -857,7 +838,6 @@ public function sendWithLowEarliestGet()
857838
$this->assertGreaterThan(time() - 10, $message['created']->toDateTime()->getTimestamp());
858839

859840
unset($message['_id'], $message['created']);
860-
$message['resetTimestamp'] = $message['resetTimestamp']->toDateTime()->getTimestamp();
861841
$message['earliestGet'] = $message['earliestGet']->toDateTime()->getTimestamp();
862842

863843
$this->assertSame($expected, $message);
@@ -883,8 +863,6 @@ public function constructWithCollection()
883863

884864
$expected = [
885865
'payload' => $payload,
886-
'running' => false,
887-
'resetTimestamp' => (new UTCDateTime(Queue::MONGO_INT32_MAX))->toDateTime()->getTimestamp(),
888866
'earliestGet' => 34,
889867
'priority' => 0.8,
890868
];
@@ -897,7 +875,6 @@ public function constructWithCollection()
897875
$this->assertGreaterThan(time() - 10, $message['created']->toDateTime()->getTimestamp());
898876

899877
unset($message['_id'], $message['created']);
900-
$message['resetTimestamp'] = $message['resetTimestamp']->toDateTime()->getTimestamp();
901878
$message['earliestGet'] = $message['earliestGet']->toDateTime()->getTimestamp();
902879

903880
$this->assertSame($expected, $message);

0 commit comments

Comments
 (0)