reader.md
October 8, 2016 ยท View on GitHub
EZ Streams core reader API
import * as ez from 'ez-streams'
ez.reader.decorate(proto)
Adds the EZ streams reader API to an object. Usually the object is a prototype but it may be any object with aread(_)method.
You do not need to call this function if you create your readers with theez.devicesmodules.
Returnsprotofor convenience.count = reader.forEach(_, fn, thisObj)
Similar toforEachon arrays.
Thefnfunction is called asfn(_, elt, i).
This call is asynchonous. It returns the number of entries processed when the end of stream is reached.reader = reader.map(fn, thisObj)
Similar tomapon arrays.
Thefnfunction is called asfn(_, elt, i).
Returns another reader on which other operations may be chained.result = reader.every(_, fn, thisObj)
Similar toeveryon arrays.
Thefnfunction is called asfn(_, elt).
Returns true at the end of stream iffnreturned true on every entry.
Stops streaming and returns false as soon asfnreturns false on an entry.result = reader.some(_, fn, thisObj)
Similar tosomeon arrays.
Thefnfunction is called asfn(_, elt).
Returns false at the end of stream iffnreturned false on every entry.
Stops streaming and returns true as soon asfnreturns true on an entry.result = reader.reduce(_, fn, initial, thisObj)
Similar toreduceon arrays.
Thefnfunction is called asfn(_, current, elt)wherecurrentisinitialon the first entry and the result of the previousfncall otherwise. Returns the value returned by the lastfncall.writer = reader.pipe(_, writer)
Pipes fromstreamtowriter. Returns the writer for chaining.reader = reader.tee(writer)
Branches another writer on the chain`.
Returns another reader on which other operations may be chained.readers = reader.dup()
Duplicates a reader and returns a pair of readers which can be read from independently.reader = reader.concat(reader1, reader2)
Concatenates reader with one or more readers.
Works like array.concat: you can pass the readers as separate arguments, or pass an array of readers.result = reader.toArray(_)
Reads all entries and returns them to an array. Note that this call is an anti-pattern for streaming but it may be useful when working with small streams.result = reader.readAll(_)
Reads all entries and returns them as a single string or buffer. Returns undefined if nothing has been read. Note that this call is an anti-pattern for streaming but it may be useful when working with small streams.reader = reader.transform(fn)
Inserts an asynchronous transformation into chain.
This API is more powerful thanmapbecause the transformation function can combine results, split them, etc.
The transformation functionfnis called asfn(_, reader, writer)wherereaderis thestreamto whichtransformis applied, and writer is a writer which is piped into the next element of the chain.
Returns another reader on which other operations may be chained.result = reader.filter(fn, thisObj)
Similar tofilteron arrays.
Thefnfunction is called asfn(_, elt, i).
Returns another reader on which other operations may be chained.result = reader.until(fn, testVal, thisObj, stopArg)
Cuts the stream by when thefncondition becomes true.
Thefnfunction is called asfn(_, elt, i).
stopArgis an optional argument which is passed tostopwhenfnbecomes true.
Returns another reader on which other operations may be chained.result = reader.while(fn, testVal, thisObj, stopArg)
Cuts the stream by when thefncondition becomes false.
This is different fromfilterin that the result streams ends when the condition becomes false, instead of just skipping the entries. Thefnfunction is called asfn(_, elt, i).
stopArgis an optional argument which is passed tostopwhenfnbecomes false.
Returns another reader on which other operations may be chained.result = reader.limit(count, stopArg)
Limits the stream to producecountresults.
stopArgis an optional argument which is passed tostopwhen the limit is reached.
Returns another reader on which other operations may be chained.result = reader.skip(count)
Skips the firstcountentries of the reader.
Returns another reader on which other operations may be chained.group = reader.fork(consumers)
Forks the steam and passes the values to a set of consumers, as if each consumer had its own copy of the stream as input.
consumersis an array of functions with the following signature:reader = consumer(source)Returns aStreamGroupon which other operations can be chained.group = reader.parallel(count, consumer)
Parallelizes by distributing the values to a set ofcountidentical consumers.
countis the number of consumers that will be created.
consumeris a function with the following signature:reader = consumer(source)
Returns aStreamGroupon which other operations can be chained.
Note: transformed entries may be delivered out of order.reader = reader.peekable()
Returns a stream which has been extended with two methods to support lookahead.
The lookahead methods are:reader.peek(_): same asread(_)but does not consume the item.reader.unread(val): pushesvalback so that it will be returned by the nextread(_)
reader = reader.buffer(max)
Returns a stream which is identical to the original one but in which up tomaxentries may have been buffered.stream = reader.nodify()
converts the reader into a native node Readable stream.reader = reader.nodeTransform(duplex)
pipes the reader into a node duplex stream. Returns another reader.cmp = reader1.compare(_, reader2)
compares reader1 and reader2 return 0 if equal,reader.stop(_, arg)
Informs the source that the consumer(s) has(ve) stopped reading.
The source should override this method if it needs to free resources when the stream ends.
argis an optional argument.
Ifargis falsy and the reader has been forked (or teed) upstream, only this reader stops (silently).
Ifargis true, readers that have been forked upstream are stopped silently (theirreadreturns undefined).
Otherwiseargshould be an error object which will be thrown when readers that have been forked upstream try to read.
The defaultstopfunction is a no-op.
Note:stopis only called if reading stops before reaching the end of the stream.
Sources should free their resources both onstopand on end-of-stream.
StreamGroup API
reader = group.dequeue()
Dequeues values in the order in which they are delivered by the readers. Returns a stream on which other operations may be chained.reader = group.rr()
Dequeues values in round robin fashion. Returns a stream on which other operations may be chained.reader = group.join(fn, thisObj)
Combines the values read from the readers to produce a single value.fnis called asfn(_, values)wherevaluesis the set of values produced by all the readers that are still active.
fnreturns the value which will be read from the joined stream.fnmust also reset toundefinedthevaluesentries that it has consumed. The nextread(_)on the joined stream will fetch these values. Note that the length of thevaluesarray will decrease every time an input stream is exhausted. Returns a stream on which other operations may be chained.