In PHP streams represent a special resource type. The description given in php.net documentation:
Streams are the way of generalizing file, network, data compression, and other operations which share a common set of functions and uses. In its simplest definition, a stream is a resource object which exhibits streamable behavior. That is, it can be read from or written to in a linear fashion, and may be able to fseek() to arbitrary locations within the stream.
Every stream at a low level is simply an
EventEmitter, which implements some special methods. Depending on these methods the stream can be Readable, Writable or Duplex (both readable and writable). Readable streams allow to read the data from a source, while writable can be used to write some data to a destination. Duplex streams allow to read and to write data like TCP/IP connection does.
Accordingly, Stream Component defines the following three interfaces:
Every stream implementation implements
EventEmitterInterface which allows to listen to certain events. There are some common events for all types of streams, and some specific events for every certain type.
Read-only streams are implemented by
ReadableStreamInterface, which is also a readable side of duplex streams.
A readable stream is used only to read data from the source in a continuous fashion (for example, you cannot write to a downloading file). The source can be anything: a file on disk, a buffer in memory or even another stream. Use
ReadableResourceStream class to create a readable stream:
To create an instance
ReadableResourceStream you need to pass to the constructor a valid resource opened in a read mode and an object, which implements
Readable streams are a great solution when you have to deal with large files and don’t want to load them into the memory. For example, you have large log files and need programmatically gather some statistics from them. So, instead of this:
You can use something like this:
The code with ReactPHP looks too complex when compared with a one-line snippet with
file_get_contents, but it’s worth it. The problem with
file_get_contents is that we cannot start processing the received data until we read the whole file. With this approach, we can have problems with really large files or high traffic.
Note that this example uses
fopen()for simplicity and demo purposes only! This should not be used in a truly asynchronous application because the filesystem is inherently blocking and each call could potentially take several seconds. Read this tutorial in case you need to work asynchronously with the filesystem in ReactPHP ecosystem.
With streams, there is no need to keep the whole file in memory and we can process the data as soon it’s been read. Another use case can be live data streams, whose volume is not predetermined.
All available stream events have intuitive names. For example, every time a readable stream receives data from its source it fires
data event. If you want to process data from the stream you should listen to this event. When there is no more data available (the source stream has successfully reached the end) the
end event is fired:
Notice that we have used
fopen functon which creates a file handler, but there is no need to manually close the handler with
fclose. Behind the scenes, when the stream will end it will automatically close the handler. Here is the source code of
close event looks very similar to the
end event, it will be emitted once the stream closes. The difference is that the
end event always indicates a successful end, while
close means only a termination of the stream:
We can use
isReadable() method to check if a stream is in a readable state (not closed):
The output will be the following. On the
end event the stream is still readable, but on the
close event it is in a non-readable mode:
Reading from a stream can be paused and later continued with
resume() methods. When a stream is paused it stops emitting
data events. Under the hood
pause() method simply detaches the stream from the event loop. Here is an example of how to read from a file one byte per second:
The third argument of the
ReadableResourceStream constructor is
$readChunkSize. This parameter allows to control the maximum buffer size in bytes to read from the stream at a time.
Writable streams allow only to write data to the destination (for example, you cannot read from
STDOUT), they also represent a writable side of a duplex stream. Writable streams can be useful for logging some events or for taking user input data. These streams ensure that data chunks arrive in the correct order.
Writable streams are represented by
WritableResourceStream class which implements
WritableStreamInterface. To create a writable stream you need a resource opened in a writable mode and an instance of the event loop:
The process of writing data is very simple,
WritableResourceStream class has two methods:
write($data)to write some data into the stream
end($data = null)to successfully end the stream, you can optionally send some final data before ending.
This example outputs the content of the file to the console using a writable stream instead of
Notice that things happen in an asynchronous way. That means that data is not actually written when you call
write($data) method. It is placed in a buffer, and a listener is added to the event loop, so on the next tick, the data will be written. For example, when you don’t run the loop nothing will be written:
A writable stream has its own analog of the
isReadable() method. Until the stream is not ended
The code above outputs the following:
If we don’t
end() the stream it will stay writable. After stream is ended any further
end() calls have no effect:
write('Hello!') call will provide no output to the console since the stream is already ended.
close() can be used to force stream closing. Unlike the
end() method which takes care of the existing buffers
close() discards any buffer contents and closes the stream. Under the hood
end() method calls
Imagine that you are working with two streams with very different bandwidths. For example, you are uploading a local file to a slow server. The fast (local file) stream will emit data faster than the slow stream (socket on a web server) can consume it. In this situation, we have to keep the data in memory until the slow stream is ready to process it. For large files, it can become a problem. To avoid this
write($data) method returns
false when the buffer is full so we can stop writing. Then later the stream will emit
drain event which indicates that the buffer is now ready to accept more data and we can continue writing.
To demonstrate this we can use the third parameter of the
$writeBufferSoftLimit sets the maximum buffer size in bytes:
This code provides the following output:
Although a writable stream has
end() method there is no
end event. You can listen only to
Here is the output of the code above:
We can chain events with the
pipe(WritableStreamInterface $dest, array $options = array()) method of the
ReadableResourceStream. This method connects a readable stream to a writable one by piping all the data from the readable source into the given writable destination. We can rewrite an example with writing the output from a file to the console using
pipe() method like this:
Only one line of code
$readable->pipe($output); instead of listening to different events and manually processing the data flow:
Behind the scenes
pipe() method subscribes all the required listeners and calls the appropriate methods to provide a continuous flow of data between the streams so that the destination stream isn’t overwhelmed by the readable one. This method also returns an instance of the writable stream, so we can build a chain of piped duplex (both readable and writable) streams:
pipe() will call
end() method on the destination stream when the source stream emits
end event. To disable this behavior use the second
$options argument and set
This behavior only applies to the
end event. You should handle
error and manually emitted
close events yourself.
You don’t get what you write. It is sent to another source.
A duplex stream is one which is both readable and writable. It also may be a combination of two independent streams embedded in one. A concrete example of a duplex stream is a network socket or a file opened in a read-and-write mode:
Duplex streams are built on top of both
WritableStreamInterface, so they provide methods and emit events that are available in both interfaces. You can
pause() and emit the
data event and at the same time
write() and emit
You write something, it is transformed, then you read something.
ThroughStream can be used as a transfer stream. It implements
DuplexStreamInterface and simply passes any written data through to its readable end. It can be used to process data through the pipes. For example, we can use
ThroughStream to uppercase data from a file and then output it to the console:
You may consider
ThroughStream as a readable/writable filter that transforms input and produces output.
Combine together readable and writable streams into a duplex one.
CompositeStream implements the
DuplexStreamInterface and can be used to create a single duplex stream from two individual readable and writable streams:
This snippet reads the data from the
STDIN, prepends it with a string
You said: and then outputs it to the console:
When an error occurs while reading or writing the
error event will be emitted:
This event receives an instance of the
Exception for the occured error. For
DuplexStreamInterface you should take care for both sides of the stream because an error may occur while either reading or writing the stream.
ReactPHP Streams are very powerful tools when you need to create a stream instance from a stream resource. At the same time, they are a very low-level abstraction and you have to manage all the events and data flow by yourself. If you are writing low-level components streams may be a good choice for you. If not consider some higher-level components:
- react/socket if you want to accept incoming or establish outgoing plaintext TCP/IP or secure TLS socket connection streams.
- react/http if you want to receive an incoming HTTP request body streams.
- react/child-process if you want to communicate with child processes via process pipes such as STDIN, STDOUT, STDERR etc.
- react/filesystem if you want to read from/write to the filesystem.
You can find examples from this article on GitHub.
This article is a part of the ReactPHP Series.