1414use React \Promise \Deferred ;
1515use Clue \Redis \Protocol \Model \ErrorReply ;
1616use Clue \Redis \Protocol \Model \ModelInterface ;
17+ use Clue \Redis \Protocol \Model \MultiBulkReply ;
1718
1819class StreamingClient extends EventEmitter implements Client
1920{
@@ -24,6 +25,9 @@ class StreamingClient extends EventEmitter implements Client
2425 private $ ending = false ;
2526 private $ closed = false ;
2627
28+ private $ subscribed = 0 ;
29+ private $ psubscribed = 0 ;
30+
2731 public function __construct (Stream $ stream , ParserInterface $ parser = null , SerializerInterface $ serializer = null )
2832 {
2933 if ($ parser === null || $ serializer === null ) {
@@ -85,6 +89,21 @@ public function handleMessage(ModelInterface $message)
8589 {
8690 $ this ->emit ('data ' , array ($ message , $ this ));
8791
92+ if (/*($this->subscribed !== 0 || $this->psubscribed !== 0) &&*/ $ message instanceof MultiBulkReply) {
93+ $ array = $ message ->getValueNative ();
94+ $ first = array_shift ($ array );
95+
96+ // pub/sub events are to be forwarded
97+ if (in_array ($ first , array ('message ' , 'subscribe ' , 'unsubscribe ' , 'pmessage ' , 'psubscribe ' , 'punsubscribe ' ))) {
98+ $ this ->emit ($ first , $ array );
99+ }
100+
101+ // pub/sub message events should not be processed as request responses
102+ if (in_array ($ first , array ('message ' , 'pmessage ' ))) {
103+ return ;
104+ }
105+ }
106+
88107 if (!$ this ->requests ) {
89108 throw new UnderflowException ('Unexpected reply received, no matching request found ' );
90109 }
0 commit comments