Skip to content

Commit 7856445

Browse files
committed
Merge pull request #12 from clue/reader
Refactor StreamReader
2 parents fb865eb + 54559a6 commit 7856445

4 files changed

Lines changed: 156 additions & 71 deletions

File tree

src/Client.php

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,13 @@ public function handleConnectedSocks(Stream $stream, $host, $port, $timeout, $pr
203203
$resolver->reject(new Exception('Timeout while establishing socks session'));
204204
});
205205

206+
$reader = new StreamReader($stream);
207+
$stream->on('data', array($reader, 'write'));
208+
206209
if ($protocolVersion === '5' || $auth !== null) {
207-
$promise = $this->handleSocks5($stream, $host, $port, $auth);
210+
$promise = $this->handleSocks5($stream, $host, $port, $auth, $reader);
208211
} else {
209-
$promise = $this->handleSocks4($stream, $host, $port);
212+
$promise = $this->handleSocks4($stream, $host, $port, $reader);
210213
}
211214
$promise->then(function () use ($resolver, $stream) {
212215
$resolver->resolve($stream);
@@ -216,14 +219,20 @@ public function handleConnectedSocks(Stream $stream, $host, $port, $timeout, $pr
216219

217220
$loop = $this->loop;
218221
$deferred->then(
219-
function (Stream $stream) use ($timerTimeout, $loop) {
222+
function (Stream $stream) use ($timerTimeout, $loop, $reader) {
220223
$loop->cancelTimer($timerTimeout);
221224
$stream->removeAllListeners('end');
225+
226+
$stream->removeListener('data', array($reader, 'write'));
227+
222228
return $stream;
223229
},
224-
function ($error) use ($stream, $timerTimeout, $loop) {
230+
function ($error) use ($stream, $timerTimeout, $loop, $reader) {
225231
$loop->cancelTimer($timerTimeout);
226232
$stream->close();
233+
234+
$stream->removeListener('data', array($reader, 'write'));
235+
227236
return $error;
228237
}
229238
);
@@ -235,7 +244,7 @@ function ($error) use ($stream, $timerTimeout, $loop) {
235244
return $deferred->promise();
236245
}
237246

238-
protected function handleSocks4(Stream $stream, $host, $port)
247+
protected function handleSocks4(Stream $stream, $host, $port, StreamReader $reader)
239248
{
240249
// do not resolve hostname. only try to convert to IP
241250
$ip = ip2long($host);
@@ -250,7 +259,6 @@ protected function handleSocks4(Stream $stream, $host, $port)
250259

251260
$stream->write($data);
252261

253-
$reader = new StreamReader($stream);
254262
return $reader->readBinary(array(
255263
'null' => 'C',
256264
'status' => 'C',
@@ -263,7 +271,7 @@ protected function handleSocks4(Stream $stream, $host, $port)
263271
});
264272
}
265273

266-
protected function handleSocks5(Stream $stream, $host, $port, $auth=null)
274+
protected function handleSocks5(Stream $stream, $host, $port, $auth=null, StreamReader $reader)
267275
{
268276
// protocol version 5
269277
$data = pack('C', 0x05);
@@ -277,7 +285,7 @@ protected function handleSocks5(Stream $stream, $host, $port, $auth=null)
277285
$stream->write($data);
278286

279287
$that = $this;
280-
$reader = new StreamReader($stream);
288+
281289
return $reader->readBinary(array(
282290
'version' => 'C',
283291
'method' => 'C'

src/Server.php

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ public function endConnection(Stream $stream)
137137

138138
private function handleSocks(Stream $stream)
139139
{
140-
$reader = new StreamReader($stream);
140+
$reader = new StreamReader();
141+
$stream->on('data', array($reader, 'write'));
142+
143+
$that = $this;
141144
$that = $this;
142145

143146
$auth = $this->auth;
@@ -148,28 +151,27 @@ private function handleSocks(Stream $stream)
148151
$protocolVersion = '5';
149152
}
150153

151-
return $reader->readByte()->then(function ($version) use ($stream, $that, $protocolVersion, $auth){
154+
return $reader->readByte()->then(function ($version) use ($stream, $that, $protocolVersion, $auth, $reader){
152155
if ($version === 0x04) {
153156
if ($protocolVersion === '5') {
154157
throw new UnexpectedValueException('SOCKS4 not allowed due to configuration');
155158
}
156-
return $that->handleSocks4($stream, $protocolVersion);
159+
return $that->handleSocks4($stream, $protocolVersion, $reader);
157160
} else if ($version === 0x05) {
158161
if ($protocolVersion !== null && $protocolVersion !== '5') {
159162
throw new UnexpectedValueException('SOCKS5 not allowed due to configuration');
160163
}
161-
return $that->handleSocks5($stream, $auth);
164+
return $that->handleSocks5($stream, $auth, $reader);
162165
}
163166
throw new UnexpectedValueException('Unexpected/unknown version number');
164167
});
165168
}
166169

167-
public function handleSocks4(Stream $stream, $protocolVersion)
170+
public function handleSocks4(Stream $stream, $protocolVersion, StreamReader $reader)
168171
{
169172
// suppliying hostnames is only allowed for SOCKS4a (or automatically detected version)
170173
$supportsHostname = ($protocolVersion === null || $protocolVersion === '4a');
171174

172-
$reader = new StreamReader($stream);
173175
$that = $this;
174176
return $reader->readByteAssert(0x01)->then(function () use ($reader) {
175177
return $reader->readBinary(array(
@@ -211,9 +213,8 @@ public function handleSocks4(Stream $stream, $protocolVersion)
211213
});
212214
}
213215

214-
public function handleSocks5(Stream $stream, $auth=null)
216+
public function handleSocks5(Stream $stream, $auth=null, StreamReader $reader)
215217
{
216-
$reader = new StreamReader($stream);
217218
$that = $this;
218219
return $reader->readByte()->then(function ($num) use ($reader) {
219220
// $num different authentication mechanisms offered

src/StreamReader.php

Lines changed: 49 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,35 @@
99

1010
class StreamReader
1111
{
12-
private $stream;
12+
const RET_DONE = true;
13+
const RET_INCOMPLETE = null;
1314

14-
public function __construct(Stream $stream)
15+
private $buffer = '';
16+
private $queue = array();
17+
18+
public function write($data)
1519
{
16-
$this->stream = $stream;
20+
$this->buffer .= $data;
21+
22+
do {
23+
$current = reset($this->queue);
24+
25+
if ($current === false) {
26+
break;
27+
}
28+
29+
/* @var $current Closure */
30+
31+
$ret = $current($this->buffer);
32+
33+
if ($ret === self::RET_INCOMPLETE) {
34+
// current is incomplete, so wait for further data to arrive
35+
break;
36+
} else {
37+
// current is done, remove from list and continue with next
38+
array_shift($this->queue);
39+
}
40+
} while (true);
1741
}
1842

1943
public function readBinary($structure)
@@ -45,27 +69,16 @@ public function readBinary($structure)
4569
public function readLength($bytes)
4670
{
4771
$deferred = new Deferred();
48-
$oldsize = $this->stream->bufferSize;
49-
$this->stream->bufferSize = $bytes;
50-
51-
$buffer = '';
52-
53-
$fn = function ($data, Stream $stream) use (&$buffer, &$bytes, $deferred, $oldsize, &$fn) {
54-
$bytes -= strlen($data);
55-
$buffer .= $data;
56-
57-
$deferred->progress($data);
5872

59-
if ($bytes === 0) {
60-
$stream->bufferSize = $oldsize;
61-
$stream->removeListener('data', $fn);
73+
$this->readBufferCallback(function (&$buffer) use ($bytes, $deferred) {
74+
if (strlen($buffer) >= $bytes) {
75+
$deferred->resolve(substr($buffer, 0, $bytes));
76+
$buffer = (string)substr($buffer, $bytes);
6277

63-
$deferred->resolve($buffer);
64-
} else {
65-
$stream->bufferSize = $bytes;
78+
return StreamReader::RET_DONE;
6679
}
67-
};
68-
$this->stream->on('data', $fn);
80+
});
81+
6982
return $deferred->promise();
7083
}
7184

@@ -78,10 +91,6 @@ public function readByte()
7891
});
7992
}
8093

81-
public function readNull(){
82-
return $this->readByteAssert(0x00);
83-
}
84-
8594
public function readByteAssert($expect)
8695
{
8796
return $this->readByte()->then(function ($byte) use ($expect) {
@@ -92,11 +101,6 @@ public function readByteAssert($expect)
92101
});
93102
}
94103

95-
public function readChar()
96-
{
97-
return $this->readLength(1);
98-
}
99-
100104
public function readStringNull()
101105
{
102106
$deferred = new Deferred();
@@ -118,36 +122,26 @@ public function readStringNull()
118122
return $deferred->promise();
119123
}
120124

121-
public function readAssert($byteSequence)
125+
public function readBufferCallback(/* callable */ $callable)
122126
{
123-
$deferred = new Deferred();
124-
$pos = 0;
127+
if (!is_callable($callable)) {
128+
throw new InvalidArgumentException('Given function must be callable');
129+
}
125130

126-
$that = $this;
127-
$this->readLength(strlen($byteSequence))->then(function ($data) use ($deferred) {
128-
$deferred->resolve($data);
129-
}, null, function ($part) use ($byteSequence, &$pos, $deferred, $that) {
130-
$len = strlen($part);
131-
$expect = substr($byteSequence, $pos, $len);
132-
133-
if ($part === $expect) {
134-
$pos += $len;
135-
} else {
136-
$deferred->reject(new UnexpectedValueException('Expected "'.$that->escape($expect).'", but got "'.$that->escape($part).'"'));
131+
if ($this->queue) {
132+
$this->queue []= $callable;
133+
} else {
134+
$this->queue = array($callable);
135+
136+
if ($this->buffer !== '') {
137+
// this is the first element in the queue and the buffer is filled => trigger write procedure
138+
$this->write('');
137139
}
138-
});
139-
return $deferred->promise();
140+
}
140141
}
141142

142-
public function escape($bytes)
143+
public function getBuffer()
143144
{
144-
$ret = '';
145-
for ($i = 0, $l = strlen($bytes); $i < $l; ++$i) {
146-
if ($i !== 0) {
147-
$ret .= ' ';
148-
}
149-
$ret .= sprintf('0x%02X', ord($bytes[$i]));
150-
}
151-
return $ret;
145+
return $this->buffer;
152146
}
153147
}

tests/StreamReaderTest.php

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
use Clue\React\Socks\StreamReader;
4+
5+
class StreamReaderTest extends TestCase
6+
{
7+
private $reader;
8+
9+
public function setUp()
10+
{
11+
$this->reader = new StreamReader();
12+
}
13+
14+
public function testReadByteAssertCorrect()
15+
{
16+
$this->reader->readByteAssert(0x01)->then($this->expectCallableOnce(0x01));
17+
18+
$this->reader->write("\x01");
19+
}
20+
21+
public function testReadByteAssertInvalid()
22+
{
23+
$this->reader->readByteAssert(0x02)->then(null, $this->expectCallableOnce());
24+
25+
$this->reader->write("\x03");
26+
}
27+
28+
public function testReadStringNull()
29+
{
30+
$this->reader->readStringNull()->then($this->expectCallableOnce('hello'));
31+
32+
$this->reader->write("hello\x00");
33+
}
34+
35+
public function testReadStringLength()
36+
{
37+
$this->reader->readLength(5)->then($this->expectCallableOnce('hello'));
38+
39+
$this->reader->write('he');
40+
$this->reader->write('ll');
41+
$this->reader->write('o ');
42+
43+
$this->assertEquals(' ', $this->reader->getBuffer());
44+
}
45+
46+
public function testReadBuffered()
47+
{
48+
$this->reader->write('hello');
49+
50+
$this->reader->readLength(5)->then($this->expectCallableOnce('hello'));
51+
52+
$this->assertEquals('', $this->reader->getBuffer());
53+
}
54+
55+
public function testSequence()
56+
{
57+
$this->reader->readByte()->then($this->expectCallableOnce(ord('h')));
58+
$this->reader->readByteAssert(ord('e'))->then($this->expectCallableOnce(ord('e')));
59+
$this->reader->readLength(4)->then($this->expectCallableOnce('llo '));
60+
$this->reader->readBinary(array('w'=>'C', 'o' => 'C'))->then($this->expectCallableOnce(array('w' => ord('w'), 'o' => ord('o'))));
61+
62+
$this->reader->write('hello world');
63+
64+
$this->assertEquals('rld', $this->reader->getBuffer());
65+
}
66+
67+
/**
68+
* @expectedException InvalidArgumentException
69+
*/
70+
public function testInvalidStructure()
71+
{
72+
$this->reader->readBinary(array('invalid' => 'y'));
73+
}
74+
75+
/**
76+
* @expectedException InvalidArgumentException
77+
*/
78+
public function testInvalidCallback()
79+
{
80+
$this->reader->readBufferCallback(array());
81+
}
82+
}

0 commit comments

Comments
 (0)