Skip to content

Commit 9f621b6

Browse files
committed
Add QueueInterface
1 parent 3454cd0 commit 9f621b6

2 files changed

Lines changed: 136 additions & 1 deletion

File tree

src/Queue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
1212
* this purpose). Using a random priority achieves random get()
1313
*/
14-
final class Queue
14+
final class Queue implements QueueInterface
1515
{
1616
const MONGO_INT32_MAX = 2147483647;//2147483648 can overflow in php mongo without using the MongoInt64
1717

src/QueueInterface.php

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
<?php
2+
3+
namespace DominionEnterprises\Mongo;
4+
5+
/**
6+
* Abstraction of mongo db collection as priority queue.
7+
*
8+
* Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
9+
* this purpose). Using a random priority achieves random get()
10+
*/
11+
interface QueueInterface
12+
{
13+
/**
14+
* Ensure an index for the get() method.
15+
*
16+
* @param array $beforeSort fields in get() call to index before the sort field.
17+
* @param array $afterSort fields in get() call to index after the sort field.
18+
*
19+
* @return void
20+
*
21+
* @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
22+
* @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
23+
*/
24+
public function ensureGetIndex(array $beforeSort = [], array $afterSort = []);
25+
26+
/**
27+
* Ensure an index for the count() method.
28+
*
29+
* @param array $fields fields in count() call to index in same format as \MongoCollection::ensureIndex()
30+
* @param bool $includeRunning whether to include the running field in the index
31+
*
32+
* @return void
33+
*
34+
* @throws \InvalidArgumentException $includeRunning was not a boolean
35+
* @throws \InvalidArgumentException key in $fields was not a string
36+
* @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
37+
*/
38+
public function ensureCountIndex(array $fields, $includeRunning);
39+
40+
/**
41+
* Get a non running message from the queue.
42+
*
43+
* @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
44+
* Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
45+
* @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
46+
* retreived again.
47+
* @param int $waitDurationInMillis millisecond duration to wait for a message.
48+
* @param int $pollDurationInMillis millisecond duration to wait between polls.
49+
*
50+
* @return array|null the message or null if one is not found
51+
*
52+
* @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
53+
* an int
54+
* @throws \InvalidArgumentException key in $query was not a string
55+
*/
56+
public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200);
57+
58+
/**
59+
* Count queue messages.
60+
*
61+
* @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
62+
* Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
63+
* @param bool|null $running query a running message or not or all
64+
*
65+
* @return int the count
66+
*
67+
* @throws \InvalidArgumentException $running was not null and not a bool
68+
* @throws \InvalidArgumentException key in $query was not a string
69+
*/
70+
public function count(array $query, $running = null);
71+
72+
/**
73+
* Acknowledge a message was processed and remove from queue.
74+
*
75+
* @param array $message message received from get()
76+
*
77+
* @return void
78+
*
79+
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
80+
*/
81+
public function ack(array $message);
82+
83+
/**
84+
* Atomically acknowledge and send a message to the queue.
85+
*
86+
* @param array $message the message to ack received from get()
87+
* @param array $payload the data to store in the message to send. Data is handled same way
88+
* as \MongoCollection::insert()
89+
* @param int $earliestGet earliest unix timestamp the message can be retreived.
90+
* @param float $priority priority for order out of get(). 0 is higher priority than 1
91+
* @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
92+
*
93+
* @return void
94+
*
95+
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
96+
* @throws \InvalidArgumentException $earliestGet was not an int
97+
* @throws \InvalidArgumentException $priority was not a float
98+
* @throws \InvalidArgumentException $priority is NaN
99+
* @throws \InvalidArgumentException $newTimestamp was not a bool
100+
*/
101+
public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true);
102+
103+
/**
104+
* Requeue message to the queue. Same as ackSend() with the same message.
105+
*
106+
* @param array $message message received from get().
107+
* @param int $earliestGet earliest unix timestamp the message can be retreived.
108+
* @param float $priority priority for order out of get(). 0 is higher priority than 1
109+
* @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
110+
*
111+
* @return void
112+
*
113+
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
114+
* @throws \InvalidArgumentException $earliestGet was not an int
115+
* @throws \InvalidArgumentException $priority was not a float
116+
* @throws \InvalidArgumentException priority is NaN
117+
* @throws \InvalidArgumentException $newTimestamp was not a bool
118+
*/
119+
public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true);
120+
121+
/**
122+
* Send a message to the queue.
123+
*
124+
* @param array $payload the data to store in the message. Data is handled same way as \MongoCollection::insert()
125+
* @param int $earliestGet earliest unix timestamp the message can be retreived.
126+
* @param float $priority priority for order out of get(). 0 is higher priority than 1
127+
*
128+
* @return void
129+
*
130+
* @throws \InvalidArgumentException $earliestGet was not an int
131+
* @throws \InvalidArgumentException $priority was not a float
132+
* @throws \InvalidArgumentException $priority is NaN
133+
*/
134+
public function send(array $payload, $earliestGet = 0, $priority = 0.0);
135+
}

0 commit comments

Comments
 (0)