multi-stream.md

March 29, 2021 · View on GitHub

Scramjet Logo

:MultiStream

An object consisting of multiple streams than can be refined or muxed.

The idea behind a MultiStream is being able to mux and demux streams when needed.

Usage:

new MultiStream([...streams])
 .mux();

new MultiStream(function*(){ yield* streams; })
 .map(stream => stream.filter(myFilter))
 .mux();

Kind: static class
Test: test/methods/multi-stream-constructor.js

new MultiStream(streams, [options])

Crates an instance of MultiStream with the specified stream list

ParamTypeDefaultDescription
streamsArray.<stream.Readable> | AsyncGenerator.<Readable> | Generator.<Readable>the list of readable streams (other objects will be filtered out!)
[options]object{}Optional options for the super object. ;)

multiStream.streams : Array

Array of all streams

Kind: instance property of MultiStream

multiStream.source : DataStream

Source of the MultiStream.

This is nulled when the stream ends and is used to control the

Kind: instance property of MultiStream

multiStream.length : number

Returns the current stream length

Kind: instance property of MultiStream

multiStream.map(aFunc, rFunc) : Promise.

Returns new MultiStream with the streams returned by the transform.

Runs a callback for every stream, returns a new MultiStream of mapped streams and creates a new MultiStream consisting of streams returned by the Function.

Kind: instance method of MultiStream
Chainable
Returns: Promise.<MultiStream> - the mapped instance
Test: test/methods/multi-stream-map.js

ParamTypeDescription
aFuncMultiMapCallbackAdd callback (normally you need only this)
rFuncMultiMapCallbackRemove callback, called when the stream is removed

multiStream.find() : DataStream

Calls Array.prototype.find on the streams

Kind: instance method of MultiStream
Returns: DataStream - found DataStream

ParamTypeDescription
...argsArray.<any>arguments for

multiStream.filter(func) : MultiStream ↺

Filters the stream list and returns a new MultiStream with only the streams for which the Function returned true

Kind: instance method of MultiStream
Chainable
Returns: MultiStream - the filtered instance
Test: test/methods/multi-stream-filter.js

ParamTypeDescription
funcfunctionFilter ran in Promise::then (so you can return a promise or a boolean)

multiStream.mux([comparator], [ClassType]) : DataStream

Muxes the streams into a single one

Kind: instance method of MultiStream
Returns: DataStream - The resulting DataStream
Test: test/methods/multi-stream-mux.js
Todo

  • For now using comparator will not affect the mergesort.
  • Sorting requires all the streams to be constantly flowing, any single one drain results in draining the muxed too even if there were possible data on other streams.
ParamTypeDefaultDescription
[comparator]functionShould return -1 0 or 1 depending on the desired order. If passed the chunks will be added in a sorted order.
[ClassType]functionDataStreamthe class to be outputted

multiStream.add(stream)

Adds a stream to the MultiStream

If the stream was muxed, filtered or mapped, this stream will undergo the same transforms and conditions as if it was added in constructor.

Kind: instance method of MultiStream
Meta.noreadme:
Test: test/methods/multi-stream-add.js

ParamTypeDescription
streamReadable[description]

multiStream.remove(stream)

Removes a stream from the MultiStream

If the stream was muxed, filtered or mapped, it will be removed from same streams.

Kind: instance method of MultiStream
Meta.noreadme:
Test: test/methods/multi-stream-remove.js

ParamTypeDescription
streamReadable[description]

multiStream.route([policy], [count]) : MultiStream

Re-routes streams to a new MultiStream of specified size

Kind: instance method of MultiStream
Returns: MultiStream - [description]
Meta.noreadme:
Todo

  • NYT: not yet tested
  • NYD: not yet documented
ParamTypeDefaultDescription
[policy]functionAffinity.RoundRobin[description]
[count]numberos.cpus().length[description]

multiStream.smap(transform) ↺

Map stream synchronously

Kind: instance method of MultiStream
Chainable

ParamTypeDescription
transformfunctionmapping function ran on every stream (SYNCHRONOUS!)

multiStream.cluster(clusterFunc, [options]) ↺

Distributes processing to multiple forked subprocesses.

Kind: instance method of MultiStream
Chainable

ParamTypeDefaultDescription
clusterFuncfunction | stringa cluster callback with all operations working similarly to DataStream::use
[options]DistributeOptions{}

MultiStream:from(streams, [StreamClass]) : MultiStream

Constructs MultiStream from any number of streams-likes

Kind: static method of MultiStream

ParamTypeDefaultDescription
streamsArray.<(Array|Iterable.<any>|AsyncGeneratorFunction|GeneratorFunction|AsyncFunction|function()|string|Readable)>the array of input streamlike elements
[StreamClass]functionDataStream