@@ -20,6 +20,8 @@ class Client extends EventEmitter
2020 private $ stream ;
2121 private $ parser ;
2222 private $ serializer ;
23+ private $ requests = array ();
24+ private $ ending = false ;
2325
2426 public function __construct (Stream $ stream , ParserInterface $ parser = null , SerializerInterface $ serializer = null )
2527 {
@@ -46,7 +48,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
4648
4749 foreach ($ models as $ data ) {
4850 try {
49- $ that ->emit ( ' message ' , array ( $ data, $ that ) );
51+ $ that ->handleMessage ( $ data );
5052 }
5153 catch (UnderflowException $ error ) {
5254 $ that ->emit ('error ' , array ($ error ));
@@ -65,32 +67,73 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
6567 $ this ->serializer = $ serializer ;
6668 }
6769
68- /**
69- * Sends command with given $name and additial $args
70- *
71- * @param name $name
72- * @param array $args
73- */
74- public function sendRequest ($ name , array $ args = array ())
70+ public function __call ($ name , $ args )
71+ {
72+ $ request = new Deferred ();
73+
74+ if ($ this ->ending ) {
75+ $ request ->reject (new RuntimeException ('Connection closed ' ));
76+ } else {
77+ $ this ->stream ->write ($ this ->serializer ->getRequestMessage ($ name , $ args ));
78+ $ this ->requests []= $ request ;
79+ }
80+
81+ return $ request ->promise ();
82+ }
83+
84+ public function handleMessage (ModelInterface $ message )
85+ {
86+ $ this ->emit ('message ' , array ($ message , $ this ));
87+
88+ if (!$ this ->requests ) {
89+ throw new UnderflowException ('Unexpected reply received, no matching request found ' );
90+ }
91+
92+ $ request = array_shift ($ this ->requests );
93+ /* @var $request Deferred */
94+
95+ if ($ message instanceof ErrorReply) {
96+ $ request ->reject ($ message );
97+ } else {
98+ $ request ->resolve ($ message ->getValueNative ());
99+ }
100+
101+ if ($ this ->ending && !$ this ->isBusy ()) {
102+ $ this ->close ();
103+ }
104+ }
105+
106+ public function isBusy ()
75107 {
76- $ this ->stream -> write ( $ this -> serializer -> getRequestMessage ( $ name , $ args )) ;
108+ return !! $ this ->requests ;
77109 }
78110
79111 /**
80- * Sends given message model (request message)
112+ * end connection once all pending requests have been replied to
81113 *
82- * @param ModelInterface $message
114+ * @uses self::close() once all replies have been received
115+ * @see self::close() for closing the connection immediately
83116 */
84- public function sendMessage ( ModelInterface $ message )
117+ public function end ( )
85118 {
86- $ this ->stream ->write ($ message ->getMessageSerialized ($ this ->serializer ));
119+ $ this ->ending = true ;
120+
121+ if (!$ this ->isBusy ()) {
122+ $ this ->close ();
123+ }
87124 }
88125
89- /**
90- * Immediately terminate the connection and discard incoming and outgoing buffers
91- */
92126 public function close ()
93127 {
128+ $ this ->ending = true ;
129+
94130 $ this ->stream ->close ();
131+
132+ // reject all remaining requests in the queue
133+ while ($ this ->requests ) {
134+ $ request = array_shift ($ this ->requests );
135+ /* @var $request Request */
136+ $ request ->reject (new RuntimeException ('Connection closing ' ));
137+ }
95138 }
96139}
0 commit comments