Blends multiple streams together, emitting events from all of them concurrently.
merge takes multiple streams as arguments, and creates a stream that behaves like each of the argument streams, in parallel.
Marble diagram:
--1----2-----3--------4---
----a-----b----c---d------
merge
--1-a--2--b--3-c---d--4---
Adds a Listener to the Stream.
Passes the input stream to a custom operator, to produce an output stream.
compose is a handy way of using an existing function in a chained style.
Instead of writing outStream = f(inStream)
you can write
outStream = inStream.compose(f)
.
A function that takes a stream as input and returns a stream as well.
Ignores the first amount
many events from the input stream, and then
after that starts forwarding events from the input stream to the output
stream.
Marble diagram:
--a---b--c----d---e--
drop(3)
--------------d---e--
How many events to ignore from the input stream before forwarding all events from the input stream to the output stream.
Uses another stream to determine when to complete the current stream.
When the given other
stream emits an event or completes, the output
stream will complete. Before that happens, the output stream will behaves
like the input stream.
Marble diagram:
---1---2-----3--4----5----6---
endWhen( --------a--b--| )
---1---2-----3--4--|
Some other stream that is used to know when should the output stream of this operator complete.
Flattens a "stream of streams", handling only one nested stream at a time (no concurrency).
If the input stream is a stream that emits streams, then this operator will return an output stream which is a flat stream: emits regular events. The flattening happens without concurrency. It works like this: when the input stream emits a nested stream, flatten will start imitating that nested one. However, as soon as the next nested stream is emitted on the input stream, flatten will forget the previous nested one it was imitating, and will start imitating the new nested one.
Marble diagram:
--+--------+---------------
\ \
\ ----1----2---3--
--a--b----c----d--------
flatten
-----a--b------1----2---3--
"Folds" the stream onto itself.
Combines events from the past throughout
the entire execution of the input stream, allowing you to accumulate them
together. It's essentially like Array.prototype.reduce
. The returned
stream is a MemoryStream, which means it is already remember()
'd.
The output stream starts by emitting the seed
which you give as argument.
Then, when an event happens on the input stream, it is combined with that
seed value through the accumulate
function, and the output value is
emitted on the output stream. fold
remembers that output value as acc
("accumulator"), and then when a new input event t
happens, acc
will be
combined with that to produce the new acc
and so forth.
Marble diagram:
------1-----1--2----1----1------
fold((acc, x) => acc + x, 3)
3-----4-----5--7----8----9------
A function of type (acc: R, t: T) => R
that
takes the previous accumulated value acc
and the incoming event from the
input stream and produces the new accumulated value.
The initial accumulated value, of type R
.
imitate changes this current Stream to emit the same events that the
other
given Stream does. This method returns nothing.
This method exists to allow one thing: circular dependency of streams.
For instance, let's imagine that for some reason you need to create a
circular dependency where stream first$
depends on stream second$
which in turn depends on first$
:
import delay from 'xstream/extra/delay'
var first$ = second$.map(x => x * 10).take(3);
var second$ = first$.map(x => x + 1).startWith(1).compose(delay(100));
However, that is invalid JavaScript, because second$
is undefined
on the first line. This is how imitate can help solve it:
import delay from 'xstream/extra/delay'
var secondProxy$ = xs.create();
var first$ = secondProxy$.map(x => x * 10).take(3);
var second$ = first$.map(x => x + 1).startWith(1).compose(delay(100));
secondProxy$.imitate(second$);
We create secondProxy$
before the others, so it can be used in the
declaration of first$
. Then, after both first$
and second$
are
defined, we hook secondProxy$
with second$
with imitate()
to tell
that they are "the same". imitate
will not trigger the start of any
stream, it just binds secondProxy$
and second$
together.
The following is an example where imitate()
is important in Cycle.js
applications. A parent component contains some child components. A child
has an action stream which is given to the parent to define its state:
const childActionProxy$ = xs.create();
const parent = Parent({...sources, childAction$: childActionProxy$});
const childAction$ = parent.state$.map(s => s.child.action$).flatten();
childActionProxy$.imitate(childAction$);
Note, though, that imitate()
does not support MemoryStreams. If we
would attempt to imitate a MemoryStream in a circular dependency, we would
either get a race condition (where the symptom would be "nothing happens")
or an infinite cyclic emission of values. It's useful to think about
MemoryStreams as cells in a spreadsheet. It doesn't make any sense to
define a spreadsheet cell A1
with a formula that depends on B1
and
cell B1
defined with a formula that depends on A1
.
If you find yourself wanting to use imitate()
with a
MemoryStream, you should rework your code around imitate()
to use a
Stream instead. Look for the stream in the circular dependency that
represents an event stream, and that would be a candidate for creating a
proxy Stream which then imitates the target Stream.
The other stream to imitate on the current one. Must not be a MemoryStream.
When the input stream completes, the output stream will emit the last event emitted by the input stream, and then will also complete.
Marble diagram:
--a---b--c--d----|
last()
-----------------d|
Transforms each event from the input Stream through a project
function,
to get a Stream that emits those transformed events.
Marble diagram:
--1---3--5-----7------
map(i => i * 10)
--10--30-50----70-----
A function of type (t: T) => U
that takes event
t
of type T
from the input Stream and produces an event of type U
, to
be emitted on the output Stream.
It's like map
, but transforms each input event to always the same
constant value on the output Stream.
Marble diagram:
--1---3--5-----7-----
mapTo(10)
--10--10-10----10----
A value to emit on the output Stream whenever the input Stream emits any value.
Returns an output stream that behaves like the input stream, but also remembers the most recent event that happens on the input stream, so that a newly added listener will immediately receive that memorised event.
Removes a Listener from the Stream, assuming the Listener was added to it.
Replaces an error with another stream.
When (and if) an error happens on the input stream, instead of forwarding
that error to the output stream, replaceError will call the replace
function which returns the stream that the output stream will replicate.
And, in case that new stream also emits an error, replace
will be called
again to get another stream to start replicating.
Marble diagram:
--1---2-----3--4-----X
replaceError( () => --10--| )
--1---2-----3--4--------10--|
A function of type (err) => Stream
that takes
the error that occurred on the input stream or on the previous replacement
stream and returns a new stream. The output stream will behave like the
stream that this function returns.
Adds a "debug" listener to the stream. There can only be one debug listener, that's why this is 'setDebugListener'. To remove the debug listener, just call setDebugListener(null).
A debug listener is like any other listener. The only difference is that a debug listener is "stealthy": its presence/absence does not trigger the start/stop of the stream (or the producer inside the stream). This is useful so you can inspect what is going on without changing the behavior of the program. If you have an idle stream and you add a normal listener to it, the stream will start executing. But if you set a debug listener on an idle stream, it won't start executing (not until the first normal listener is added).
As the name indicates, we don't recommend using this method to build app logic. In fact, in most cases the debug operator works just fine. Only use this one if you know what you're doing.
Forces the Stream to emit the "completed" event to its listeners.
As the name indicates, if you use this, you are most likely doing something The Wrong Way. Please try to understand the reactive way before using this method. Use it only when you know what you are doing.
Forces the Stream to emit the given error to its listeners.
As the name indicates, if you use this, you are most likely doing something The Wrong Way. Please try to understand the reactive way before using this method. Use it only when you know what you are doing.
The error you want to broadcast to all the listeners of this Stream.
Forces the Stream to emit the given value to its listeners.
As the name indicates, if you use this, you are most likely doing something The Wrong Way. Please try to understand the reactive way before using this method. Use it only when you know what you are doing.
The "next" value you want to broadcast to all listeners of this Stream.
Prepends the given initial
value to the sequence of events emitted by the
input stream. The returned stream is a MemoryStream, which means it is
already remember()
'd.
Marble diagram:
---1---2-----3---
startWith(0)
0--1---2-----3---
The value or event to prepend.
Adds a Listener to the Stream returning a Subscription to remove that listener.
Lets the first amount
many events from the input stream pass to the
output stream, then makes the output stream complete.
Marble diagram:
--a---b--c----d---e--
take(3)
--a---b--c|
How many events to allow from the input stream before completing the output stream.
Creates a new MemoryStream given a Producer.
An optional Producer that dictates how to start, generate events, and stop the Stream.
Creates a Stream that immediately emits the "complete" notification when started, and that's it.
Marble diagram:
empty
-|
Creates a stream from an Array, Promise, or an Observable.
The input to make a stream from.
Converts an array to a stream. The returned stream will emit synchronously all the items in the array, and then complete.
Marble diagram:
fromArray([1,2,3])
123|
The array to be converted as a stream.
Converts an Observable into a Stream.
Converts a promise to a stream. The returned stream will emit the resolved value of the promise, and then complete. However, if the promise is rejected, the stream will emit the corresponding error.
Marble diagram:
fromPromise( ----42 )
-----------------42|
The promise to be converted as a stream.
Creates a Stream that does nothing when started. It never emits any event.
Marble diagram:
never
-----------------------
Creates a Stream that immediately emits the arguments that you give to of, then completes.
Marble diagram:
of(1,2,3)
123|
Creates a stream that periodically emits incremental numbers, every
period
milliseconds.
Marble diagram:
periodic(1000)
---0---1---2---3---4---...
The interval in milliseconds to use as a rate of emission.
Creates a Stream that immediately emits an "error" notification with the
value you passed as the error
argument when the stream starts, and that's
it.
Marble diagram:
throw(X)
-X
The error event to emit on the created stream.
Generated using TypeDoc
Combines multiple input streams together to return a stream whose events are arrays that collect the latest events from each input stream.
combine internally remembers the most recent event from each of the input streams. When any of the input streams emits an event, that event together with all the other saved events are combined into an array. That array will be emitted on the output stream. It's essentially a way of joining together the events from multiple streams.
Marble diagram:
true
A stream to combine together with other streams.
A stream to combine together with other streams. Multiple streams, not just two, may be given as arguments.