ReactPHP PromiseStream Component is a link between promise-land and stream-land
From Stream To Promise
One of the patterns that are used to deal with streams is spooling: we need the entire resource data available before we start processing it. One approach is to collect each chunk of data received from the stream:
But, imagine that we have some client code that wants to process some data from a file. It doesn’t care about the streams, it only needs to receive the entire data from the file. With this approach, this code should be called inside the callback for the
end event of the stream. So, the client code should now about streams, events, and callbacks. But sometimes it’s impossible. Consider this pretty simple example:
We have two separate classes:
Processorfor processing data from the file
Providerfor collecting this data from the file
Once the data is completely collected we need to pass it to the
Processor, but how? How can we encapsulate this stream logic and provide the client code only with the data from the stream? The answer is - promises.
We can hide spooling logic behind the promise. When all data is being read from the file we can resolve the promise with this data.
\React\Promise\Stream\buffer() function creates a promise which resolves with the stream data. All data chunks from the stream will be concatenated and once the stream closes the promise resolves. Now, let’s rewrite the previous example:
Now, we can easily pass the data from the stream via promises. The
Processor knows nothing about the source of the data. Also, the
Provider only cares about collecting the entire resource data. The promise becomes a clue between these two classes. Promises also allow to create chains of callbacks (like we did with
pipe() method for streams), when every next promise receives the data from the previous one:
In the example above, we collect the data from the stream, then return a promise that resolves with this data. Then this data is trimmed, then we replace all spaces with dashes and lowercase all characters. And at last, we output this data.
If you need to deal with chunks of data and not with a concatenated content you can use
\React\Promise\Stream\all(); function. It accepts an instance of the stream (readable or writable) and a name of the event. When a specified event occurs the function collects its data. The promise resolves with an array of whatever all events emitted or
null if the events didn’t pass any data.
By default, it collects data chunks from the
data event, but you can manually specify the name of the event you are interested in as the second argument:
In case of chunks, the previous example looks the following:
Provider returns a stream wrapped in a promise with
all() function. Then in
Processor, this promise resolves with an array of data chunks from the stream.
Resolving and rejection
- The promise will resolve with an array once the stream closes.
- If the stream is already closed the promise resolves with an empty array.
- If the stream emits an error the promise rejects.
Let’s update our
Provider, so it can return both data and error from the stream. To return error we can use
\React\Promise\Stream\first() function. It creates a Promise which resolves once the given event triggers for the first time. Once
error event is emitted the stream closes, so there will be no more
error events. That’s why
first() function is exactly what we need in this way:
Via constructor, it accepts a path to a file and an instance of the event loop. Then it has two simple methods that both return promises.
getData() returns a promise which resolves with contents from the stream.
getError() resolves with an exception when an error occurs. Then we can simply pass the promise from
getError() method to a logger:
first() function returns a promise which rejects if:
- the stream emits an error - unless you’re waiting for the error event, in which case it will resolve
- the stream closes - unless you’re waiting for the close event, in which case it will resolve.
- the stream is already closed.
- it is canceled.
From Promise To Stream
When a promise resolves with a stream we can extract it or unwrap this stream. For readable streams we can use
\React\Promise\Stream\unwrapReadable() function. This function returns an instance of a stream that implements
ReadableStreamInterface. Consider this stream as a proxy for the future promise resolution:
In the example above when a readable stream receives data from the standard input (console), this data is piped to the resulting stream:
If the promise is either rejected or fulfilled with anything but an instance of
ReadableStreamInterface, then the resulting stream will emit an
error event and close:
To receive the
error event the promise should be pending when calling
unwrapReadable() function. If the promise is already settled and does not resolve with an instance of
ReadableStreamInterface, no events will be emitted. For example, this code doesn’t emit
close() the resulting stream at any time. As a result, this will
cancel() the pending promise and also
close() the underlying stream.
This function can be used to unwrap a promise which resolves with a
WritableStreamInterface. It returns an instance of the
WritableStreamInterface which acts as a proxy for the future promise resolution. Any data you wrote to this resulting stream will be piped to an internal writable stream:
In this simple example, the promise resolves with an instance of a writable stream, which writes data to the console. Then we create a proxy for this stream. When we
write data to the proxy it is also written to the unwrapped stream:
unwrapWritable() follows the same rules as
- If the promise is either rejected or fulfilled with anything but an instance of
WritableStreamInterface, then the resulting stream will emit an
errorevent and close.
- To receive the
errorevent the promise should be pending when calling
- You can
close()the resulting stream at any time. And this will
cancel()the pending promise and also
close()the underlying stream.
You can find examples from this article on GitHub.
This article is a part of the ReactPHP Series.