Skip to content

Commit 384d30d

Browse files
committed
Decouple Stream from StreamReader
1 parent fb865eb commit 384d30d

4 files changed

Lines changed: 109 additions & 37 deletions

File tree

src/Client.php

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,13 @@ public function handleConnectedSocks(Stream $stream, $host, $port, $timeout, $pr
203203
$resolver->reject(new Exception('Timeout while establishing socks session'));
204204
});
205205

206+
$reader = new StreamReader($stream);
207+
$stream->on('data', array($reader, 'write'));
208+
206209
if ($protocolVersion === '5' || $auth !== null) {
207-
$promise = $this->handleSocks5($stream, $host, $port, $auth);
210+
$promise = $this->handleSocks5($stream, $host, $port, $auth, $reader);
208211
} else {
209-
$promise = $this->handleSocks4($stream, $host, $port);
212+
$promise = $this->handleSocks4($stream, $host, $port, $reader);
210213
}
211214
$promise->then(function () use ($resolver, $stream) {
212215
$resolver->resolve($stream);
@@ -216,14 +219,20 @@ public function handleConnectedSocks(Stream $stream, $host, $port, $timeout, $pr
216219

217220
$loop = $this->loop;
218221
$deferred->then(
219-
function (Stream $stream) use ($timerTimeout, $loop) {
222+
function (Stream $stream) use ($timerTimeout, $loop, $reader) {
220223
$loop->cancelTimer($timerTimeout);
221224
$stream->removeAllListeners('end');
225+
226+
$stream->removeListener('data', array($reader, 'write'));
227+
222228
return $stream;
223229
},
224-
function ($error) use ($stream, $timerTimeout, $loop) {
230+
function ($error) use ($stream, $timerTimeout, $loop, $reader) {
225231
$loop->cancelTimer($timerTimeout);
226232
$stream->close();
233+
234+
$stream->removeListener('data', array($reader, 'write'));
235+
227236
return $error;
228237
}
229238
);
@@ -235,7 +244,7 @@ function ($error) use ($stream, $timerTimeout, $loop) {
235244
return $deferred->promise();
236245
}
237246

238-
protected function handleSocks4(Stream $stream, $host, $port)
247+
protected function handleSocks4(Stream $stream, $host, $port, StreamReader $reader)
239248
{
240249
// do not resolve hostname. only try to convert to IP
241250
$ip = ip2long($host);
@@ -250,7 +259,6 @@ protected function handleSocks4(Stream $stream, $host, $port)
250259

251260
$stream->write($data);
252261

253-
$reader = new StreamReader($stream);
254262
return $reader->readBinary(array(
255263
'null' => 'C',
256264
'status' => 'C',
@@ -263,7 +271,7 @@ protected function handleSocks4(Stream $stream, $host, $port)
263271
});
264272
}
265273

266-
protected function handleSocks5(Stream $stream, $host, $port, $auth=null)
274+
protected function handleSocks5(Stream $stream, $host, $port, $auth=null, StreamReader $reader)
267275
{
268276
// protocol version 5
269277
$data = pack('C', 0x05);
@@ -277,7 +285,7 @@ protected function handleSocks5(Stream $stream, $host, $port, $auth=null)
277285
$stream->write($data);
278286

279287
$that = $this;
280-
$reader = new StreamReader($stream);
288+
281289
return $reader->readBinary(array(
282290
'version' => 'C',
283291
'method' => 'C'

src/Server.php

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ public function endConnection(Stream $stream)
137137

138138
private function handleSocks(Stream $stream)
139139
{
140-
$reader = new StreamReader($stream);
140+
$reader = new StreamReader();
141+
$stream->on('data', array($reader, 'write'));
142+
143+
$that = $this;
141144
$that = $this;
142145

143146
$auth = $this->auth;
@@ -148,28 +151,27 @@ private function handleSocks(Stream $stream)
148151
$protocolVersion = '5';
149152
}
150153

151-
return $reader->readByte()->then(function ($version) use ($stream, $that, $protocolVersion, $auth){
154+
return $reader->readByte()->then(function ($version) use ($stream, $that, $protocolVersion, $auth, $reader){
152155
if ($version === 0x04) {
153156
if ($protocolVersion === '5') {
154157
throw new UnexpectedValueException('SOCKS4 not allowed due to configuration');
155158
}
156-
return $that->handleSocks4($stream, $protocolVersion);
159+
return $that->handleSocks4($stream, $protocolVersion, $reader);
157160
} else if ($version === 0x05) {
158161
if ($protocolVersion !== null && $protocolVersion !== '5') {
159162
throw new UnexpectedValueException('SOCKS5 not allowed due to configuration');
160163
}
161-
return $that->handleSocks5($stream, $auth);
164+
return $that->handleSocks5($stream, $auth, $reader);
162165
}
163166
throw new UnexpectedValueException('Unexpected/unknown version number');
164167
});
165168
}
166169

167-
public function handleSocks4(Stream $stream, $protocolVersion)
170+
public function handleSocks4(Stream $stream, $protocolVersion, StreamReader $reader)
168171
{
169172
// suppliying hostnames is only allowed for SOCKS4a (or automatically detected version)
170173
$supportsHostname = ($protocolVersion === null || $protocolVersion === '4a');
171174

172-
$reader = new StreamReader($stream);
173175
$that = $this;
174176
return $reader->readByteAssert(0x01)->then(function () use ($reader) {
175177
return $reader->readBinary(array(
@@ -211,9 +213,8 @@ public function handleSocks4(Stream $stream, $protocolVersion)
211213
});
212214
}
213215

214-
public function handleSocks5(Stream $stream, $auth=null)
216+
public function handleSocks5(Stream $stream, $auth=null, StreamReader $reader)
215217
{
216-
$reader = new StreamReader($stream);
217218
$that = $this;
218219
return $reader->readByte()->then(function ($num) use ($reader) {
219220
// $num different authentication mechanisms offered

src/StreamReader.php

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,35 @@
99

1010
class StreamReader
1111
{
12-
private $stream;
12+
const RET_DONE = true;
13+
const RET_INCOMPLETE = null;
1314

14-
public function __construct(Stream $stream)
15+
private $buffer = '';
16+
private $queue = array();
17+
18+
public function write($data)
1519
{
16-
$this->stream = $stream;
20+
$this->buffer .= $data;
21+
22+
do {
23+
$current = reset($this->queue);
24+
25+
if ($current === false) {
26+
break;
27+
}
28+
29+
/* @var $current Closure */
30+
31+
$ret = $current($this->buffer);
32+
33+
if ($ret === self::RET_INCOMPLETE) {
34+
// current is incomplete, so wait for further data to arrive
35+
break;
36+
} else {
37+
// current is done, remove from list and continue with next
38+
array_shift($this->queue);
39+
}
40+
} while (true);
1741
}
1842

1943
public function readBinary($structure)
@@ -45,27 +69,16 @@ public function readBinary($structure)
4569
public function readLength($bytes)
4670
{
4771
$deferred = new Deferred();
48-
$oldsize = $this->stream->bufferSize;
49-
$this->stream->bufferSize = $bytes;
50-
51-
$buffer = '';
52-
53-
$fn = function ($data, Stream $stream) use (&$buffer, &$bytes, $deferred, $oldsize, &$fn) {
54-
$bytes -= strlen($data);
55-
$buffer .= $data;
5672

57-
$deferred->progress($data);
73+
$this->readBufferCallback(function (&$buffer) use ($bytes, $deferred) {
74+
if (strlen($buffer) >= $bytes) {
75+
$deferred->resolve(substr($buffer, 0, $bytes));
76+
$buffer = (string)substr($buffer, $bytes);
5877

59-
if ($bytes === 0) {
60-
$stream->bufferSize = $oldsize;
61-
$stream->removeListener('data', $fn);
62-
63-
$deferred->resolve($buffer);
64-
} else {
65-
$stream->bufferSize = $bytes;
78+
return StreamReader::RET_DONE;
6679
}
67-
};
68-
$this->stream->on('data', $fn);
80+
});
81+
6982
return $deferred->promise();
7083
}
7184

@@ -150,4 +163,27 @@ public function escape($bytes)
150163
}
151164
return $ret;
152165
}
166+
167+
public function readBufferCallback(/* callable */ $callable)
168+
{
169+
if (!is_callable($callable)) {
170+
throw new InvalidArgumentException('Given function must be callable');
171+
}
172+
173+
if ($this->queue) {
174+
$this->queue []= $callable;
175+
} else {
176+
$this->queue = array($callable);
177+
178+
if ($this->buffer !== '') {
179+
// this is the first element in the queue and the buffer is filled => trigger write procedure
180+
$this->write('');
181+
}
182+
}
183+
}
184+
185+
public function getBuffer()
186+
{
187+
return $this->buffer;
188+
}
153189
}

tests/StreamReaderTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
use Clue\React\Socks\StreamReader;
4+
5+
class StreamReaderTest extends TestCase
6+
{
7+
private $reader;
8+
9+
public function setUp()
10+
{
11+
$this->reader = new StreamReader();
12+
}
13+
14+
public function testA()
15+
{
16+
$that = $this;
17+
18+
$this->reader->readChar()->then($this->expectCallableOnce('h'));
19+
$this->reader->readChar()->then($this->expectCallableOnce('e'));
20+
$this->reader->readLength(4)->then($this->expectCallableOnce('llo '));
21+
$this->reader->readBinary(array('w'=>'C', 'o' => 'C'))->then($this->expectCallableOnce(array('w' => ord('w'), 'o' => ord('o'))));
22+
23+
$this->reader->write('hello world');
24+
25+
$this->assertEquals('rld', $this->reader->getBuffer());
26+
}
27+
}

0 commit comments

Comments
 (0)