88
99class Buffer extends EventEmitter
1010{
11- private $ loop ;
12- private $ socket ;
11+ protected $ loop ;
12+ protected $ socket ;
13+
1314 private $ listening = false ;
1415 private $ outgoing = array ();
1516 private $ writable = true ;
@@ -29,32 +30,27 @@ public function send($data, $remoteAddress = null)
2930 $ this ->outgoing []= array ($ data , $ remoteAddress );
3031
3132 if (!$ this ->listening ) {
32- $ this ->loop -> addWriteStream ( $ this -> socket , array ( $ this , ' handleWrite ' ) );
33+ $ this ->handleResume ( );
3334 $ this ->listening = true ;
3435 }
3536 }
3637
37- public function handleWrite ()
38+ public function onWritable ()
3839 {
3940 list ($ data , $ remoteAddress ) = array_shift ($ this ->outgoing );
4041
41- if ($ remoteAddress === null ) {
42- // do not use fwrite() as it obeys the stream buffer size and
43- // packets are not to be split at 8kb
44- $ ret = @stream_socket_sendto ($ this ->socket , $ data );
45- } else {
46- $ ret = @stream_socket_sendto ($ this ->socket , $ data , 0 , $ remoteAddress );
42+ try {
43+ $ this ->handleWrite ($ data , $ remoteAddress );
4744 }
48-
49- if ($ ret < 0 ) {
50- $ error = error_get_last ();
51- $ message = 'Unable to send packet: ' . trim ($ error ['message ' ]);
52- $ this ->emit ('error ' , array (new Exception ($ message )));
45+ catch (Exception $ e ) {
46+ $ this ->emit ('error ' , array ($ e , $ this ));
5347 }
5448
5549 if (!$ this ->outgoing ) {
56- $ this ->loop ->removeWriteStream ($ this ->socket );
57- $ this ->listening = false ;
50+ if ($ this ->listening ) {
51+ $ this ->handlePause ();
52+ $ this ->listening = false ;
53+ }
5854
5955 if (!$ this ->writable ) {
6056 $ this ->close ();
@@ -71,7 +67,7 @@ public function close()
7167 $ this ->emit ('close ' , array ($ this ));
7268
7369 if ($ this ->listening ) {
74- $ this ->loop -> removeWriteStream ( $ this -> socket );
70+ $ this ->handlePause ( );
7571 $ this ->listening = false ;
7672 }
7773
@@ -89,8 +85,34 @@ public function end()
8985
9086 $ this ->writable = false ;
9187
92- if (!$ this ->listening ) {
88+ if (!$ this ->outgoing ) {
9389 $ this ->close ();
9490 }
9591 }
92+
93+ protected function handlePause ()
94+ {
95+ $ this ->loop ->removeWriteStream ($ this ->socket );
96+ }
97+
98+ protected function handleResume ()
99+ {
100+ $ this ->loop ->addWriteStream ($ this ->socket , array ($ this , 'onWritable ' ));
101+ }
102+
103+ protected function handleWrite ($ data , $ remoteAddress )
104+ {
105+ if ($ remoteAddress === null ) {
106+ // do not use fwrite() as it obeys the stream buffer size and
107+ // packets are not to be split at 8kb
108+ $ ret = @stream_socket_sendto ($ this ->socket , $ data );
109+ } else {
110+ $ ret = @stream_socket_sendto ($ this ->socket , $ data , 0 , $ remoteAddress );
111+ }
112+
113+ if ($ ret < 0 ) {
114+ $ error = error_get_last ();
115+ throw new Exception ('Unable to send packet: ' . trim ($ error ['message ' ]));
116+ }
117+ }
96118}
0 commit comments