|
2 | 2 |
|
3 | 3 | namespace Clue\React\Redis; |
4 | 4 |
|
5 | | -use Clue\React\Redis\StreamingClient; |
6 | 5 | use Clue\Redis\Protocol\Factory as ProtocolFactory; |
7 | 6 | use React\EventLoop\LoopInterface; |
8 | 7 | use React\Promise; |
| 8 | +use React\Promise\Deferred; |
9 | 9 | use React\Socket\ConnectionInterface; |
10 | 10 | use React\Socket\Connector; |
11 | 11 | use React\Socket\ConnectorInterface; |
@@ -50,9 +50,20 @@ public function createClient($target) |
50 | 50 | return Promise\reject($e); |
51 | 51 | } |
52 | 52 |
|
53 | | - $protocol = $this->protocol; |
| 53 | + $connecting = $this->connector->connect($parts['authority']); |
| 54 | + $deferred = new Deferred(function ($_, $reject) use ($connecting) { |
| 55 | + // connection cancelled, start with rejecting attempt, then clean up |
| 56 | + $reject(new \RuntimeException('Connection to database server cancelled')); |
| 57 | + |
| 58 | + // either close successful connection or cancel pending connection attempt |
| 59 | + $connecting->then(function (ConnectionInterface $connection) { |
| 60 | + $connection->close(); |
| 61 | + }); |
| 62 | + $connecting->cancel(); |
| 63 | + }); |
54 | 64 |
|
55 | | - $promise = $this->connector->connect($parts['authority'])->then(function (ConnectionInterface $stream) use ($protocol) { |
| 65 | + $protocol = $this->protocol; |
| 66 | + $promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) { |
56 | 67 | return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer()); |
57 | 68 | }); |
58 | 69 |
|
@@ -84,7 +95,9 @@ function ($error) use ($client) { |
84 | 95 | }); |
85 | 96 | } |
86 | 97 |
|
87 | | - return $promise; |
| 98 | + $promise->then(array($deferred, 'resolve'), array($deferred, 'reject')); |
| 99 | + |
| 100 | + return $deferred->promise(); |
88 | 101 | } |
89 | 102 |
|
90 | 103 | /** |
|
0 commit comments