[Medium] Implementing read streams in Node (2019)
This post was originally published on Medium in 2019 and has been imported to my personal blog archive.
Implementing read streams in Node
Hello, Node devs. This will be a quick one today. I was recently implementing a Duplex Stream in Node js for a use case at work. I had everything set up just like I wanted. I had my abstractions in place for where the data was coming from, I had the consumer of the data all ready to go, and, yes, I even had tests already written. Here is some very basic boilerplate for a Duplex Stream that just saves the incoming data to a buffer:
const { Duplex } = require('stream');
class MyDuplexStream extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_read() {
// This was my mistake - returning data instead of pushing it
return this.buffer.shift();
}
}
Some of you may have noticed the mistake already, but let’s give it a moment for everyone else.
When it came to writing everything was peachy. I could go into debug mode and see my buffer being filled with glorious data. Obviously the next step was to send the stream over to whatever would be reading it. Check.
const myStream = new MyDuplexStream();
// Write some data
myStream.write('Hello');
myStream.write('World');
// Pipe to another stream
myStream.pipe(destinationStream);
But wait, my stream was only being read from one time. And the write stream I was sending it to wasn’t getting any of the data. Something was very wrong! At this point I had to commit the ultimate sin: I had to open the documentation.
There are two interesting parts of the documentation relevant to this topic. The first describes reading modes, and this is part of what makes read streams in Node tick. You can read more about that in the link above, but I was confident that my stream was in flowing mode because I was piping it to a write stream, an action that automatically sets the internal read mode of the stream (see the documentation for more).
Finally, I found my solution lurking in the depths of the documentation. It turns out that the _read
method does not return a slice of data. It actually calls an internal push method. IMO this is poor API design, but there are probably internal complexities of streams that make this necessary (at least, I hope there are). Here, my good people of the Node community, is what I had to do instead:
class MyDuplexStream extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_read() {
// Correct approach: use push() instead of return
const chunk = this.buffer.shift();
if (chunk) {
this.push(chunk);
} else {
this.push(null); // Signal end of stream when no more data
}
}
}
And there you have it! The key insight is that _read()
should call this.push(data)
to emit data, not return it. This allows the stream to properly flow data to consumers.
Hopefully, this helps someone else who did not appropriately browse the documentation before writing their Read/Duplex/Transform stream. Cheers, and Merry Christmas!