|
1 | 1 | <?php |
2 | 2 |
|
3 | 3 | use React\Stream\Stream; |
4 | | - |
5 | 4 | use React\Stream\ReadableStream; |
6 | | - |
7 | 5 | use Clue\React\Redis\Factory; |
8 | | - |
9 | 6 | use Clue\React\Redis\StreamingClient; |
| 7 | +use React\Promise\Deferred; |
| 8 | +use Clue\React\Block; |
10 | 9 |
|
11 | 10 | class FunctionalTest extends TestCase |
12 | 11 | { |
@@ -135,15 +134,21 @@ public function testPubSub() |
135 | 134 | $consumer = $this->createClient(); |
136 | 135 | $producer = $this->createClient(); |
137 | 136 |
|
138 | | - $that = $this; |
| 137 | + $channel = 'channel:test:' . mt_rand(); |
139 | 138 |
|
140 | | - $producer->publish('channel:test', 'nobody sees this')->then($this->expectCallableOnce(0)); |
| 139 | + // consumer receives a single message |
| 140 | + $deferred = new Deferred(); |
| 141 | + $consumer->on('message', $this->expectCallableOnce()); |
| 142 | + $consumer->on('message', array($deferred, 'resolve')); |
| 143 | + $consumer->subscribe($channel)->then($this->expectCallableOnce()); |
| 144 | + $this->waitFor($consumer); |
141 | 145 |
|
| 146 | + // producer sends a single message |
| 147 | + $producer->publish($channel, 'hello world')->then($this->expectCallableOnce()); |
142 | 148 | $this->waitFor($producer); |
143 | 149 |
|
144 | | - $consumer->subscribe('channel:test')->then(function () { |
145 | | - // ? |
146 | | - }); |
| 150 | + // expect "message" event to take no longer than 0.1s |
| 151 | + Block\await($deferred->promise(), self::$loop, 0.1); |
147 | 152 | } |
148 | 153 |
|
149 | 154 | public function testClose() |
@@ -186,24 +191,7 @@ public function testInvalidServerRepliesWithDuplicateMessages() |
186 | 191 | */ |
187 | 192 | protected function createClient() |
188 | 193 | { |
189 | | - $client = null; |
190 | | - $exception = null; |
191 | | - |
192 | | - self::$factory->createClient()->then(function ($c) use (&$client) { |
193 | | - $client = $c; |
194 | | - }, function($error) use (&$exception) { |
195 | | - $exception = $error; |
196 | | - }); |
197 | | - |
198 | | - while ($client === null && $exception === null) { |
199 | | - self::$loop->tick(); |
200 | | - } |
201 | | - |
202 | | - if ($exception !== null) { |
203 | | - throw $exception; |
204 | | - } |
205 | | - |
206 | | - return $client; |
| 194 | + return Block\await(self::$factory->createClient(), self::$loop); |
207 | 195 | } |
208 | 196 |
|
209 | 197 | protected function createClientResponse($response) |
|
0 commit comments