Skip to content

Commit 0be54fe

Browse files
committed
Add ThroughStream and ReadableStream primitives
These are basic building blocks for building custom streams upon. The ThroughStream is essentially a filter which allows you to change the value or perform a passive action (like logging) by overriding the filter() method. Example: class FooStream extends ThroughStream { public function filter($data) { return foo.$data; } } This useful example prepends the string "foo" to any data it receives.
1 parent d091073 commit 0be54fe

2 files changed

Lines changed: 127 additions & 0 deletions

File tree

ReadableStream.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
namespace React\Stream;
4+
5+
use Evenement\EventEmitter;
6+
7+
class ReadableStream extends EventEmitter implements ReadableStreamInterface
8+
{
9+
public $closed = false;
10+
11+
public function isReadable()
12+
{
13+
return !$this->closed;
14+
}
15+
16+
public function pause()
17+
{
18+
}
19+
20+
public function resume()
21+
{
22+
}
23+
24+
public function pipe(WritableStreamInterface $dest, array $options = array())
25+
{
26+
Util::pipe($this, $dest, $options);
27+
28+
return $dest;
29+
}
30+
31+
public function close()
32+
{
33+
if ($this->closed) {
34+
return;
35+
}
36+
37+
$this->closed = true;
38+
$this->emit('end', array($this));
39+
$this->emit('close', array($this));
40+
$this->removeAllListeners();
41+
}
42+
}

ThroughStream.php

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
namespace React\Stream;
4+
5+
use Evenement\EventEmitter;
6+
7+
class ThroughStream extends EventEmitter implements ReadableStreamInterface, WritableStreamInterface
8+
{
9+
private $closed = false;
10+
private $pipeSource;
11+
12+
public function __construct()
13+
{
14+
$this->on('pipe', array($this, 'handlePipeEvent'));
15+
}
16+
17+
public function handlePipeEvent($source)
18+
{
19+
$this->pipeSource = $source;
20+
}
21+
22+
public function filter($data)
23+
{
24+
return $data;
25+
}
26+
27+
public function write($data)
28+
{
29+
$this->emit('data', array($this->filter($data)));
30+
}
31+
32+
public function end($data = null)
33+
{
34+
if (null !== $data) {
35+
$this->write($data);
36+
}
37+
38+
$this->close();
39+
}
40+
41+
public function isReadable()
42+
{
43+
return !$this->closed;
44+
}
45+
46+
public function isWritable()
47+
{
48+
return !$this->closed;
49+
}
50+
51+
public function pause()
52+
{
53+
if ($this->pipeSource) {
54+
$this->pipeSource->pause();
55+
}
56+
}
57+
58+
public function resume()
59+
{
60+
if ($this->pipeSource) {
61+
$this->pipeSource->resume();
62+
}
63+
}
64+
65+
public function close()
66+
{
67+
if ($this->closed) {
68+
return;
69+
}
70+
71+
$this->closed = true;
72+
$this->pipeSource = null;
73+
74+
$this->emit('end');
75+
$this->emit('close');
76+
$this->removeAllListeners();
77+
}
78+
79+
public function pipe(WritableStreamInterface $dest, array $options = array())
80+
{
81+
Util::pipe($this, $dest, $options);
82+
83+
return $dest;
84+
}
85+
}

0 commit comments

Comments
 (0)