@@ -86,11 +86,12 @@ public function onConnection(Connection $connection)
8686
8787 $ line ('connect ' );
8888
89+ $ that = $ this ;
8990 $ loop = $ this ->loop ;
9091 $ this ->handleSocks ($ connection )->then (function ($ remote ) use ($ line , $ connection ){
9192 $ line ('tunnel successfully estabslished ' );
9293 $ connection ->emit ('ready ' ,array ($ remote ));
93- }, function ($ error ) use ($ connection , $ line , $ loop ) {
94+ }, function ($ error ) use ($ connection , $ line , $ that ) {
9495 if ($ error instanceof \Exception) {
9596 $ msg = $ error ->getMessage ();
9697 while ($ error ->getPrevious () !== null ) {
@@ -104,13 +105,8 @@ public function onConnection(Connection $connection)
104105 var_dump ($ error );
105106 }
106107
107- // shut down connection by pausing input data, flushing outgoing buffer and then exit
108- $ connection ->pause ();
109- $ connection ->end ();
110- // fall back to forcefully close connection in 3 seconds if buffer can not be flushed
111- $ loop ->addTimer (3.0 , array ($ connection ,'close ' ));
112-
113- throw $ error ;
108+ $ that ->endConnection ($ connection );
109+
114110// }, function ($progress) use ($line) {
115111// //$s = new StreamReader();
116112// $line('progress: './*$s->s*/($progress));
@@ -125,6 +121,38 @@ public function onConnection(Connection $connection)
125121 $ line ('disconnect ' );
126122 });
127123 }
124+
125+ /**
126+ * gracefully shutdown connection by flushing all remaining data and closing stream
127+ *
128+ * @param Stream $stream
129+ */
130+ public function endConnection (Stream $ stream )
131+ {
132+ $ tid = true ;
133+ $ loop = $ this ->loop ;
134+
135+ // cancel below timer in case connection is closed in time
136+ $ stream ->once ('close ' , function () use (&$ tid , $ loop ) {
137+ // close event called before the timer was set up, so everything is okay
138+ if ($ tid === true ) {
139+ // make sure to not start a useless timer
140+ $ tid = false ;
141+ } else {
142+ $ loop ->cancelTimer ($ tid );
143+ }
144+ });
145+
146+ // shut down connection by pausing input data, flushing outgoing buffer and then exit
147+ $ stream ->pause ();
148+ $ stream ->end ();
149+
150+ // check if connection is not already closed
151+ if ($ tid === true ) {
152+ // fall back to forcefully close connection in 3 seconds if buffer can not be flushed
153+ $ tid = $ loop ->addTimer (3.0 , array ($ stream ,'close ' ));
154+ }
155+ }
128156
129157 private function handleSocks (Stream $ stream )
130158 {
@@ -300,7 +328,8 @@ public function handleSocks5(Stream $stream, $auth=null)
300328 public function connectTarget (Stream $ stream , $ target )
301329 {
302330 $ stream ->emit ('target ' ,$ target );
303- return $ this ->connectionManager ->getConnection ($ target [0 ], $ target [1 ])->then (function (Stream $ remote ) use ($ stream ) {
331+ $ that = $ this ;
332+ return $ this ->connectionManager ->getConnection ($ target [0 ], $ target [1 ])->then (function (Stream $ remote ) use ($ stream , $ that ) {
304333 if (!$ stream ->isWritable ()) {
305334 $ remote ->close ();
306335 throw new UnexpectedValueException ('Remote connection successfully established after client connection closed ' );
@@ -310,18 +339,16 @@ public function connectTarget(Stream $stream, $target)
310339 $ remote ->pipe ($ stream , array ('end ' =>false ));
311340
312341 // remote end closes connection => stop reading from local end, try to flush buffer to local and disconnect local
313- $ remote ->on ('end ' , function () use ($ stream ) {
314- $ stream ->pause ( );
315- $ stream -> end ( );
342+ $ remote ->on ('end ' , function () use ($ stream, $ that ) {
343+ $ stream ->emit ( ' shutdown ' , array ( ' remote ' , null ) );
344+ $ that -> endConnection ( $ stream );
316345 });
317346
318347 // local end closes connection => stop reading from remote end, try to flush buffer to remote and disconnect remote
319- $ stream ->on ('end ' , function () use ($ remote ) {
320- $ remote ->pause ();
321- $ remote ->end ();
348+ $ stream ->on ('end ' , function () use ($ remote , $ that ) {
349+ $ that ->endConnection ($ remote );
322350 });
323351
324-
325352 // set bigger buffer size of 100k to improve performance
326353 $ stream ->bufferSize = $ remote ->bufferSize = 100 * 1024 * 1024 ;
327354
0 commit comments