through2
May 7, 2026 ยท View on GitHub
Tiny utilities for inserting transformation logic into Node.js streams and Web Streams pipelines.
npm install through2
import { transform } from 'through2'
readableStream
.pipe(transform(async (chunk) => chunk.toString().toUpperCase()))
.pipe(writableStream)
Contents
- Contents
- Why
- Migrating from v4
- API
- Recipes
- Node-style streams (
from 'through2') - Web Streams (
from 'through2/web') - License
Why
Writing a Transform stream usually means subclassing, wiring up _transform, choosing objectMode, and minding backpressure. through2 wraps a function instead:
// Without through2
import { Transform } from 'node:stream'
class Upper extends Transform {
_transform (chunk, _enc, cb) {
this.push(chunk.toString().toUpperCase())
cb()
}
}
input.pipe(new Upper()).pipe(output)
// With through2
import { transform } from 'through2'
input.pipe(transform(async (chunk) => chunk.toString().toUpperCase())).pipe(output)
Same idea for the modern, cross-runtime Web Streams API:
import { transform } from 'through2/web'
await response.body
.pipeThrough(transform(async (chunk) => /* ... */))
.pipeTo(destination)
Migrating from v4
v5 is a major version bump. Headline changes:
- ESM only.
require('through2')no longer works. Convert callers toimportor pin tothrough2@4. - Named exports added.
transform,objectTransform,transformerare the preferred surface in new code. - Default export still works.
through2(fn),through2.obj(fn), andthrough2.ctor(fn)produce equivalent stream instances. Once your callers are converted to ESM imports, the call-site syntax and runtime behaviour are unchanged (modulo theinstanceofcaveat below). - Async functions and async generators are now accepted as transform functions, in addition to the classic
(chunk, enc, cb)callback form. See Transform function styles. through2/websubpath added for Web Streams (TransformStream) pipelines.readable-stream@4(was@3) is the underlying dependency.transformer/.ctorreturns a factory function, not a true constructor. The returned instance is still aTransform, butinstanceof YourFactoryno longer holds. Useinstanceof Transforminstead.
Mapping for legacy code:
| v4 / legacy | v5 named export |
|---|---|
through2(fn) | transform(fn) |
through2.obj(fn) | objectTransform(fn) |
through2.ctor(fn) | transformer(fn) |
API
Two import paths, mirroring the two stream worlds:
| Import | Stream API | Returns | Use when |
|---|---|---|---|
from 'through2' | Node-style streams | stream.Transform | You're working with Readable/Writable/Transform-shaped streams (.pipe(...)) |
from 'through2/web' | Web Streams (WHATWG) | TransformStream | You're working with ReadableStream/WritableStream-shaped streams (.pipeThrough(...)) |
The two entries differ in the stream API they target, not the runtime they run on. Either entry can run in Node.js, browsers, Deno, Bun, or Cloudflare Workers. Pick the one that matches the streams you're piping with.
from 'through2'uses the Node-style streams API (Readable/Writable/Transform,.pipe(), callback-driven_transform). It depends onreadable-stream; browser bundlers pick up that package'sbrowserfield automatically and ship its self-contained shim, so no Node-builtin polyfill is needed.from 'through2/web'uses the WHATWG Web Streams API (TransformStream,.pipeThrough()). Zero runtime dependencies; relies only onTransformStreambeing a global (it is in modern browsers, Node.js >= 18, Deno, Bun, Workers).
Transform function styles
Every transform-creating export accepts the same three function styles, auto-dispatched by inspecting the function's kind:
// 1. Classic Node-style callback (use `this.push()` and the callback)
transform(function (chunk, encoding, callback) {
this.push(chunk)
callback()
})
// 2. Async function (resolved value is pushed; `undefined` skips)
transform(async (chunk) => chunk.toString().toUpperCase())
// 3. Async generator (1-to-many; full pipeline coroutine)
transform(async function * (source) {
for await (const chunk of source) {
yield chunk
yield chunk
}
})
A flush function may be passed as the trailing argument; it follows the same dispatch rules.
Note on async: the async function form does not use
this.push. To emit zero or many chunks per input, use the async generator form. To emit one chunk per input (or skip), return the value (orundefined).
Recipes
import { objectTransform } from 'through2'
Map
One in, one out. Async function form; the resolved value is pushed.
objectTransform(async (item) => doSomething(item))
Filter
Return undefined to drop a chunk.
objectTransform(async (item) => predicate(item) ? item : undefined)
FlatMap
One in, many out. Async generator form.
objectTransform(async function * (source) {
for await (const item of source) {
for (const x of expand(item)) yield x
}
})
Batch
Collect a fixed-size batch, emit at size or at flush.
objectTransform(async function * (source) {
let batch = []
for await (const item of source) {
batch.push(item)
if (batch.length >= 100) { yield batch; batch = [] }
}
if (batch.length) yield batch
})
Tap
Side effect, pass through unchanged.
objectTransform(async (item) => { observe(item); return item })
Parse newline-delimited input
Byte chunks in, line strings out. The async generator buffers across chunk boundaries.
objectTransform(async function * (source) {
let buf = ''
for await (const chunk of source) {
buf += chunk.toString()
const lines = buf.split('\n')
buf = lines.pop()
for (const line of lines) yield line
}
if (buf) yield buf
})
For a runnable end-to-end demo combining NDJSON parsing, level filtering, formatting, and a tally, see example-ndjson.js:
cat app.log | node example-ndjson.js warn
Node-style streams (from 'through2')
import { transform, objectTransform, transformer } from 'through2'
transform()
transform([options], transformFn[, flushFn]) -> stream.Transform
Returns a stream.Transform. options is forwarded to the underlying Transform constructor. If transformFn is omitted, a passthrough is returned.
fs.createReadStream('in.txt')
.pipe(transform(function (chunk, _enc, cb) {
for (let i = 0; i < chunk.length; i++) {
if (chunk[i] === 97) chunk[i] = 122 // swap 'a' for 'z'
}
this.push(chunk)
cb()
}))
.pipe(fs.createWriteStream('out.txt'))
objectTransform()
objectTransform([options], transformFn[, flushFn]) -> stream.Transform
Like transform, with objectMode: true enabled by default. Most async/async-generator use cases want this.
transformer()
transformer([options], transformFn[, flushFn]) -> (overrideOptions?) -> stream.Transform
Returns a factory function. Calling it (with or without new) produces a fresh Transform instance with the configured behaviour. Per-call options merge on top of the configured defaults; the merged options are exposed as this.options inside the transform function.
const Counter = transformer({ objectMode: true }, function (chunk, _enc, cb) {
this.count = (this.count || 0) + 1
this.push(chunk)
cb()
})
const a = Counter()
const b = new Counter({ highWaterMark: 32 }) // override per-call
Composing pipelines
For anything beyond a quick demo, prefer node:stream/promises's pipeline() over chained .pipe(). It propagates errors, awaits completion, and destroys all streams on failure (chained .pipe() silently leaves streams hanging on error).
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { objectTransform } from 'through2'
await pipeline(
createReadStream('in.ndjson'),
objectTransform(async function * (source) {
let buf = ''
for await (const chunk of source) {
buf += chunk.toString()
const lines = buf.split('\n')
buf = lines.pop()
for (const line of lines) yield JSON.parse(line)
}
}),
objectTransform(async (record) => record.active ? record : undefined),
objectTransform(async (record) => JSON.stringify(record) + '\n'),
createWriteStream('out.ndjson')
)
pipeline() accepts any mix of through2-built transforms and other Readable/Writable/Transform instances. Use it whenever the pipeline can fail or you need to know when it's done.
A runnable, more elaborate version of this NDJSON pipeline lives in example-ndjson.js (parses arbitrary structured logs from stdin, filters by level, pretty-prints to stdout, summarises on stderr).
Default export (legacy)
The default export is transform with .obj and .ctor attached for back-compatibility:
import through2 from 'through2'
through2(fn) // === transform(fn)
through2.obj(fn) // === objectTransform(fn)
through2.ctor(fn) // === transformer(fn)
Web Streams (from 'through2/web')
import { transform } from 'through2/web'
transform() (web)
transform([transformFn][, flushFn]) -> TransformStream
Returns a TransformStream. The classic-style function takes (chunk, controller) and uses controller.enqueue(). Async and async-generator forms work the same as in the Node-style streams entry.
// 1-to-many fan-out (controller form, stateless)
const tagged = transform((chunk, controller) => {
controller.enqueue({ kind: 'raw', value: chunk })
controller.enqueue({ kind: 'upper', value: chunk.toString().toUpperCase() })
})
// Splitter: cross-chunk state is needed (a line can span chunks). The async
// generator form lets the buffer be a local variable; the classic controller
// form would need closure state plus a flush handler.
const splitter = transform(async function * (source) {
let buf = ''
for await (const chunk of source) {
buf += chunk.toString()
const lines = buf.split('\n')
buf = lines.pop()
for (const line of lines) yield line
}
if (buf) yield buf
})
// Async (1-to-1) with flush
const withTrailer = transform(
async (chunk) => chunk.toString().toUpperCase(),
(controller) => controller.enqueue('END')
)
// Pipe a fetch response through a transform
await response.body
.pipeThrough(transform(async (chunk) => chunk))
.pipeTo(destinationWritableStream)
Implementation notes
At the time of writing, Chromium hasn't shipped the cleanup hook that TransformStream would use to tear down an in-flight async generator on cancel. Without a workaround, calling reader.cancel() on a browser-side pipeline would leave your async function * suspended and its finally block would never run. The web entry handles this for you:
- Backpressure works in both directions through
pipeThrough. - Cancelling the consumer or aborting the producer cleans up your generator (its
finallyruns) and errors the other side. - Same behaviour in browsers, Node.js, Deno, Bun, and Cloudflare Workers.
One thing to note: transform(asyncGenFn) returns a { readable, writable } pair rather than a TransformStream instance. pipeThrough and pipeTo accept it identically.
License
through2 is Copyright (c) Rod Vagg and additional contributors and licensed under the MIT license. See the included LICENSE file for more details.