1. Introduction
This section is non-normative.
2. Core infrastructure
2.1. The Subscriber
interface
[Exposed=*]interface {
Subscriber undefined next (any );
value undefined error (any );
error undefined complete ();undefined addTeardown (VoidFunction ); // True after the Subscriber is created, up until either // complete()/error() are invoked, or the subscriber unsubscribes. Inside // complete()/error(), this attribute is true.
teardown readonly attribute boolean active ;readonly attribute AbortSignal signal ; };
Each Subscriber
has a next algorithm, which is a next steps-or-null.
Each Subscriber
has a error algorithm, which is an error steps-or-null.
Each Subscriber
has a complete algorithm, which is a complete steps-or-null.
Each Subscriber
has a teardown callbacks, which is a list of VoidFunction
s, initially empty.
Each Subscriber
has a complete or error controller, which is an AbortController
.
Each Subscriber
has a signal, which is an AbortSignal
.
Note: This is a dependent signal, derived from both complete or error controller's signal, and SubscribeOptions
's signal
(if non-null).
Each Subscriber
has a active boolean, initially true.
Note: This is a bookkeeping variable to ensure that a Subscriber
never calls any of the
callbacks it owns after it has been closed.
The active
getter steps are to return this's active boolean.
The signal
getter steps are to return this's signal.
next(value)
method steps are:
-
If this's relevant global object is a
Window
object, and its associated Document is not fully active, then return. -
If this's next algorithm is not null, then run this's next algorithm given value.
Assert: No exception was thrown.
Note: No exception can be thrown here because in the case where next algorithm is just a wrapper around a script-provided callback, the process observer steps take care to wrap these callbacks in logic that, when invoking them, catches any exceptions, and reports them to the global.
When the next algorithm is a spec algorithm, those steps take care to not throw any exceptions outside of itself, to appease this assert.
error(error)
method steps are:
-
If this's relevant global object is a
Window
object, and its associated Document is not fully active, then return. -
Let error algorithm be this's error algorithm.
-
If error algorithm is not null, then run error algorithm given error.
Assert: No exception was thrown.
Note: See the documentation in
next()
for details on why this is true. -
Otherwise (i.e., when error algorithm is null), report the exception error.
complete()
method steps are:
-
If this's relevant global object is a
Window
object, and its associated Document is not fully active, then return. -
Let complete algorithm be this's complete algorithm.
-
If complete algorithm is not null, then run complete algorithm.
Assert: No exception was thrown.
Note: See the documentation in
next()
for details on why this is true.
addTeardown(teardown)
method steps are:
-
If this's relevant global object is a
Window
object, and its associated Document is not fully active, then return. -
If this's active is true, then append teardown to this's teardown callbacks list.
-
Otherwise, invoke teardown.
If an exception E was thrown, then report the exception E.
Subscriber
subscriber, run these steps:
-
Set subscriber’s active boolean to false.
-
Set subscriber’s next algorithm, error algorithm, and complete algorithm all to null.
This algorithm intentionally does not have script-running side-effects; it just updates the
internal state of a Subscriber
. It’s important that this algorithm sets active to false and clears all of the callback algorithms before running any
script, because running script may reentrantly invoke one of the
methods that closed the subscription in the first place. And closing the subscription must ensure that even if a method gets reentrantly invoked, none of the SubscriptionObserver
callbacks are ever invoked again. Consider this example:
let innerSubscriber= null ; const producedValues= []; const controller= new AbortController(); const observable= new Observable( subscriber=> { innerSubscriber= subscriber; subscriber. complete(); }); observable. subscribe({ next: v=> producedValues. push( v), complete: () => innerSubscriber. next( 'from complete' ), }, { signal: controller. signal} ); // This invokes the complete() callback, and even though it invokes next() from // within, the given next() callback will never run, because the subscription // has already been "closed" before the complete() callback actually executes. controller. abort(); console. assert( producedValues. length=== 0 );
2.2. The Observable
interface
// SubscribeCallback is where the Observable "creator’s" code lives. It’s // called when subscribe() is called, to set up a new subscription.callback =
SubscribeCallback undefined (Subscriber );
subscriber callback =
SubscriptionObserverCallback undefined (any );
value dictionary {
SubscriptionObserver SubscriptionObserverCallback ;
next SubscriptionObserverCallback ;
error VoidFunction ; };
complete typedef (SubscriptionObserverCallback or SubscriptionObserver );
ObserverUnion dictionary {
SubscribeOptions AbortSignal ; };
signal callback =
Predicate boolean (any ,
value unsigned long long );
index callback =
Reducer any (any ,
accumulator any );
currentValue callback =
Mapper any (any ,
element unsigned long long ); // Differs from Mapper only in return type, since this callback is exclusively // used to visit each element in a sequence, not transform it.
index callback =
Visitor undefined (any ,
element unsigned long long ); [Exposed=*]
index interface {
Observable constructor (SubscribeCallback );
callback undefined subscribe (optional ObserverUnion = {},
observer optional SubscribeOptions = {}); // Constructs a native Observable from value if it’s any of the following: // - Observable // - AsyncIterable // - Iterable // - Promise
options static Observable (
from any ); // Observable-returning operators. See "Operators" section in the spec. // // takeUntil() can consume promises, iterables, async iterables, and other // observables.
value Observable takeUntil (any );
notifier Observable map (Mapper );
mapper Observable filter (Predicate );
predicate Observable take (unsigned long long );
amount Observable drop (unsigned long long );
amount Observable flatMap (Mapper );
mapper Observable finally (VoidFunction ); // Promise-returning operators.
callback Promise <sequence <any >>toArray (optional SubscribeOptions = {});
options Promise <undefined >forEach (Visitor ,
callback optional SubscribeOptions = {});
options Promise <boolean >every (Predicate ,
predicate optional SubscribeOptions = {}); // Maybe? Promise<any> first(optional SubscribeOptions options = {});
options Promise <any >find (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <boolean >some (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >reduce (Reducer ,
reducer optional any ,
initialValue optional SubscribeOptions = {}); };
options
Each Observable
has a subscribe callback, which is a SubscribeCallback
or a set of steps that take in a Subscriber
.
Note: The "union" of these types is to support both Observable
s created by JavaScript (that are
always constructed with a SubscribeCallback
), and natively-constructed Observable
objects
(whose subscribe callback could be an arbitrary set of native steps, not a JavaScript
callback). The return value of on()
is an example of the latter.
new
Observable(callback)
constructor steps are:
-
Set this's subscribe callback to callback.
Note: This callback will get invoked later when
subscribe()
is called.
2.2.1. Supporting concepts
any
error, and runs
these steps:
-
Report the exception error.
Note: We pull this default out separately so that every place in this specification that natively subscribes to an Observable
(i.e.,
subscribes from spec prose, not going through the subscribe()
method) doesn’t have
to redundantly define these steps.
An internal observer is a struct with the following items:
- next steps
-
An algorithm that takes a single parameter of type
any
. Initially, these steps do nothing. - error steps
-
An algorithm that takes a single parameter of type
any
. Initially, the default error algorithm. - complete steps
-
An algorithm with no parameters. Initially, these steps do nothing.
The internal observer struct is used to mirror the next
, error
, and complete
callback functions. For
any Observable
that is subscribed by JavaScript via the subscribe()
method,
these algorithm "steps" will just be a wrapper around invoking the corresponding next
, error
, and complete
callback functions provided by script.
But when internal spec prose (not user script) subscribes to an Observable
, these "steps" are arbitrary spec algorithms that
are not provided via an ObserverUnion
packed with Web IDL callback functions. See the § 2.3.3 Promise-returning operators that make use of this, for example.
Observable
given an ObserverUnion
-or-internal observer observer, and a SubscribeOptions
options, run
these steps:
Note: We split this algorithm out from the Web IDL subscribe()
method, so that
spec prose can subscribe to an Observable
without going through the Web IDL bindings. See w3c/IntersectionObserver#464 for
similar context, where "internal" prose must not go through Web IDL
bindings on objects whose properties could be mutated by JavaScript. See § 2.3.3 Promise-returning operators for usage of this.
-
If this's relevant global object is a
Window
object, and its associated Document is not fully active, then return. -
Let internal observer be a new internal observer.
-
Process observer as follows:
-
- If observer is a
SubscriptionObserverCallback
-
Set internal observer’s next steps to these steps that take
an
any
value:-
Invoke observer with value.
If an exception E was thrown, then report the exception E.
-
- If observer is a
SubscriptionObserver
-
-
If observer’s
next
is not null, set internal observer’s next steps to these steps that take anany
value:-
Invoke observer’s
next
with value.If an exception E was thrown, then report the exception E.
-
-
If observer’s
error
is not null, set internal observer’s error steps to these steps that take anany
error:-
Invoke observer’s
error
with error.If an exception E was thrown, then report the exception E.
-
-
If observer’s
complete
is not null, set internal observer’s complete steps to these steps:-
If an exception E was thrown, then report the exception E.
-
-
- If observer is an internal observer
- Set internal observer to observer.
- If observer is a
-
-
Assert: internal observer’s error steps is either the default error algorithm, or an algorithm that invokes the provided
error
callback function. -
Let subscriber be a new
Subscriber
, initialized as:- next algorithm
-
internal observer’s next steps
- error algorithm
-
internal observer’s error steps
- complete algorithm
-
internal observer’s complete steps
- signal
-
The result of creating a dependent abort signal from the list «subscriber’s complete or error controller's signal, options’s
signal
if it is non-null», usingAbortSignal
, and the current realm.
-
If subscriber’s signal is aborted, then close subscriber.
Note: This can happen when
SubscribeOptions
'ssignal
is already aborted. -
Otherwise, add the following abort algorithm to subscriber’s signal:
-
Close subscriber.
-
For each teardown of subscriber’s teardown callbacks sorted in reverse insertion order:
-
If subscriber’s relevant global object is a
Window
object, and its associated Document is not fully active, then abort these steps.Note: This step runs repeatedly because each teardown could result in the above
Document
becoming inactive. -
Invoke teardown.
If an exception E was thrown, call subscriber’s
error()
method with E.
-
-
-
If this's subscribe callback is a
SubscribeCallback
, invoke it with subscriber.If an exception E was thrown, call subscriber’s
error()
method with E. -
Otherwise, run the steps given by this's subscribe callback, given subscriber.
Tests
2.3. Operators
For now, see https://github.com/wicg/observable#operators.
2.3.1. from()
Spec the exact semantics of from()
conversion.
2.3.2. Observable
-returning operators
takeUntil(notifier)
method steps are:
-
Let sourceObservable be this.
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:Note that this method involves Subscribing to twoObservable
s: (1) notifier, and (2) sourceObservable. We "unsubscribe" from both of them in the following situations:-
notifier starts emitting values (either "next" or "error"). In this case, we unsubscribe from notifier since we got all we need from it, and no longer need it to keep producing values. We also unsubscribe from sourceObservable, because it no longer needs to produce values that get plumbed through this method’s returned observable, because we’re manually ending the subscription to observable, since notifier finally produced a value.
-
sourceObservable either
error()
s orcomplete()
s itself. In this case, we unsubscribe from notifier since we no longer need to listen for values it emits in order to determine when observable can stop mirroring values from sourceObservable (since sourceObservable ran to completion by itself). Unsubscribing from sourceObservable isn’t necessary, since its subscription has been exhausted by itself.
-
Let notifierObserver be a new internal observer, initialized as follows:
- next steps
-
Run subscriber’s
complete()
method.Note: This will "unsubscribe" from sourceObservable, if it has been subscribed to by this point. This is because sourceObservable is subscribed to with the "outer" subscriber’s signal as an input signal, and that signal will get aborted when the "outer" subscriber’s
complete()
is called above (and below). - error steps
-
Run subscriber’s
complete()
method.
Note: We do not specify complete steps, because if the notifier
Observable
completes itself, we do not need to complete the subscriber associated with the observable returned from this method. Rather, the observable will continue to mirror sourceObservable uninterrupted. -
Let options be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to notifier given notifierObserver and options.
-
If subscriber’s active is false, then return.
Note: This means that sourceObservable’s subscribe callback will not even get invoked once, if notifier synchronously emits a value. If notifier only "completes" synchronously though (without emitting a "next" or "error" value), then subscriber’s active will still be true, and we proceed to subscribe to sourceObservable, which observable will mirror uninterrupted.
-
Let sourceObserver be a new internal observer, initialized as follows:
- next steps
-
Run subscriber’s
next()
method, given the passed in value. - error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
Run subscriber’s
complete()
method.
Note: sourceObserver is mostly a pass-through, mirroring everything that sourceObservable emits, with the exception of having the ability to unsubscribe from the notifier
Observable
in the case where sourceObservable is exhausted before notifier emits anything. -
Subscribe to sourceObservable given sourceObserver and options.
-
-
Return observable.
map(mapper)
method steps are:
-
Let sourceObservable be this.
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let idx be an
unsigned long long
, initially 0. -
Let sourceObserver be a new internal observer, initialized as follows:
- next steps
-
-
Invoke mapper with the passed in value, and idx, and let mappedValue be the returned value.
If an exception E was thrown, then run subscriber’s
error()
method, given E, and abort these steps. -
Increment idx.
-
Run subscriber’s
next()
method, given mappedValue.
-
- error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
Run subscriber’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to sourceObservable given sourceObserver and options.
-
-
Return observable.
filter(predicate)
method steps are:
-
Let sourceObservable be this.
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let idx be an
unsigned long long
, initially 0. -
Let sourceObserver be a new internal observer, initialized as follows:
- next steps
-
-
Invoke predicate with the passed in value and idx, and let matches be the returned value.
If an exception E was thrown, then run subscriber’s
error()
method, given E, and abort these steps. -
Set idx to idx + 1.
-
If matches is true, then run subscriber’s
next()
method, given value.
-
- error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
Run subscriber’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to sourceObservable given sourceObserver and options.
-
-
Return observable.
take(amount)
method steps are:
-
Let sourceObservable be this.
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
If amount is 0, then run subscriber’s
complete()
method and abort these steps. -
Let sourceObserver be a new internal observer, initialized as follows:
- next steps
-
-
Run subscriber’s
next()
method with the passed in value. -
Decrement amount.
-
If amount is 0, then run subscriber’s
complete()
method.
-
- error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
Run subscriber’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to sourceObservable given sourceObserver and options.
-
-
Return observable.
drop(amount)
method steps are:
-
Let sourceObservable be this.
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let sourceObserver be a new internal observer, initialized as follows:
- next steps
- error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
Run subscriber’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to sourceObservable given sourceObserver and options.
-
-
Return observable.
flatMap(mapper)
method steps are:
-
Let sourceObservable be this.
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let outerSubscriptionHasCompleted to a boolean, initially false.
-
Let queue be a new list of
any
values, initially empty.Note: This queue is used to store any
Observable
s emitted by sourceObservable, while observable is currently subscribed to anObservable
emitted earlier by sourceObservable that has not yet been exhausted. -
Let activeInnerSubscription be a boolean, initially false.
-
Let sourceObserver be a new internal observer, initialized as follows:
- next steps
-
-
If activeInnerSubscription is true, then:
-
Append value to queue.
Note: This value will eventually be processed once the
Observable
that is currently subscribed-to (as indicated by activeInnerSubscription) is exhausted.
-
-
Otherwise:
-
Set activeInnerSubscription to true.
-
Run the flatmap process next value steps with value, subscriber, mapper, and references to all of the following: queue, activeInnerSubscription, and outerSubscriptionHasCompleted.
Note: This flatmap process next value steps will subscribe to the
Observable
derived from value (if one such can be derived) and keep processing values from it until its subscription becomes inactive (either by error or completion). If this "inner"Observable
completes, then the processing steps will recursively invoke themselves with the nextany
in queue.If no such value exists, then the processing steps will terminate, unsetting activeInnerSubscription, so that future values emitted from sourceObservable are processed correctly.
-
-
- error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
-
Set outerSubscriptionHasCompleted to true.
Note: If activeInnerSubscription is true, then the below step will not complete subscriber. In that case, the flatmap process next value steps will be responsible for completing subscriber when queue is empty, after the "inner" subscription becomes inactive.
-
If activeInnerSubscription is false and queue is empty, run subscriber’s
complete()
method.
-
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to sourceObservable given sourceObserver and options.
-
-
Return observable.
any
value, a Subscriber
subscriber, a Mapper
mapper, and references to all of the following: a list of any
values queue, a boolean activeInnerSubscription, and a boolean outerSubscriptionHasCompleted:
-
Let mappedResult be the result of invoking mapper with value.
If an exception E was thrown, then run subscriber’s
error()
method, given E, and abort these steps. -
Let innerObservable be the result of calling
from()
with mappedResult.If an exception E was thrown, then run subscriber’s
error()
method, given E, and abort these steps.We shouldn’t invoke
from()
directly. Rather, we should call some internal algorithm that passes-back the exceptions for us to handle properly here, since we want to pipe them to subscriber. -
Let innerObserver be a new internal observer, initialized as follows:
- next steps
-
Run subscriber’s
next()
method, given the passed in value. - error steps
-
Run subscriber’s
error()
method, given the passed in error. - complete steps
-
-
If queue is not empty, then:
-
Let nextValue be the first item in queue; remove remove this item from queue.
-
Run flatmap process next value steps given nextValue, subscriber, mapper, and references to queue and activeInnerSubscription.
-
-
Otherwise:
-
Set activeInnerSubscription to false.
Note: Because activeInnerSubscription is a reference, this has the effect of ensuring that all subsequent values emitted from the "outer"
Observable
(called sourceObservable. -
If outerSubscriptionHasCompleted is true, run subscriber’s
complete()
method.Note: This means the "outer"
Observable
has already completed, but did not proceed to complete subscriber yet because there was at least one more pending "inner"Observable
(i.e., innerObservable) that had already been queued and had not yet completed. Until right now!
-
-
-
Let innerOptions be a new
SubscribeOptions
whosesignal
is subscriber’s signal. -
Subscribe to innerObservable given innerObserver and innerOptions.
finally(callback)
method steps are:
-
TODO: Spec this and use callback.
2.3.3. Promise
-returning operators
toArray(options)
method steps are:
-
Let p a new promise.
-
If options’s
signal
is not null:-
If options’s
signal
is aborted, then:-
Reject p with options’s
signal
's abort reason. -
Return p.
-
-
Add the following abort algorithm to options’s
signal
:-
Reject p with options’s
signal
's abort reason.
Note: All we have to do here is reject p. Note that the subscription to this
Observable
will also be canceled automatically, since the "inner" signal (created during subscription) is a dependent signal of options’ssignal
. -
-
-
Let values be a new list.
-
Let observer be a new internal observer, initialized as follows:
- next steps
-
Append the passed in value to values.
- error steps
-
Reject p with the passed in error.
- complete steps
-
Resolve p with values.
-
Return p.
forEach(callback, options)
method steps are:
-
Let p a new promise.
-
Let visitor callback controller be a new
AbortController
. -
Let internal options be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list «visitor callback controller’s signal, options’ssignal
if non-null», usingAbortSignal
, and the current realm.Many trivial internal observers act as pass-throughs, and do not control the subscription to the
Observable
that they represent; that is, their error steps and complete steps are called when the subscription is terminated, and their next steps simply pass some version of the given value along the chain.For this operator, however, the below observer’s next steps are responsible for actually aborting the underlying subscription to this, in the event that callback throws an exception. In that case, the
SubscribeOptions
'ssignal
we pass through to "Subscribe to anObservable
", needs to be a dependent signal derived from options’ssignal
, and theAbortSignal
of anAbortController
that the next steps below has access to, and can signal abort when needed. -
If internal options’s
signal
is aborted, then:-
Reject p with internal options’s
signal
's abort reason. -
Return p.
-
-
Add the following abort algorithm to internal options’s
signal
:-
Reject p with internal options’s
signal
's abort reason.Note: The fact that rejection of p is tied to internal options’s
signal
, and not options’ssignal
means, that any microtasks queued during the firing of options’ssignal
'sabort
event will run before p’s rejection handler runs.
-
-
Let idx be an
unsigned long long
, initially 0. -
Let observer be a new internal observer, initialized as follows:
- next steps
-
-
Invoke callback with the passed in value, and idx.
If an exception E was thrown, then reject p with E, and signal abort visitor callback controller with E.
-
Increment idx.
-
- error steps
-
Reject p with the passed in error.
- complete steps
-
Return p.
every(predicate, options)
method steps are:
-
TODO: Spec this and use predicate and options.
find(predicate, options)
method steps are:
-
TODO: Spec this and use predicate and options.
some(predicate, options)
method steps are:
-
TODO: Spec this and use predicate and options.
reduce(reducer, initialValue, options)
method steps are:
-
TODO: Spec this and use reducer, initialValue, and options.
3. EventTarget
integration
dictionary {
ObservableEventListenerOptions boolean =
capture false ;boolean ; };
passive partial interface EventTarget {Observable on (DOMString ,
type optional ObservableEventListenerOptions = {}); };
options
on(type, options)
method steps are:
-
If this's relevant global object is a
Window
object, and its associated Document is not fully active, then return. -
Let event target be this.
-
Let observable be a new
Observable
, initialized as follows:- subscribe callback
-
An algorithm that takes a
Subscriber
subscriber and runs these steps:-
If event target is null, abort these steps.
Note: This is meant to capture the fact that event target can be garbage collected by the time this algorithm runs upon subscription.
-
Add an event listener with event target and an event listener defined as follows:
- type
-
type
- callback
-
The result of creating a new Web IDL
EventListener
instance representing a reference to a function of one argument of typeEvent
event. This function executes the observable event listener invoke algorithm given subscriber and event. - capture
-
options’s
capture
- passive
-
options’s
passive
- once
-
false
- signal
-
null
Note: The
AbortSignal
for event listeners added byon()
is managed by theObservable
itself. Seesubscribe()
andSubscribeOptions
.
-
-
Return observable.
Subscriber
subscriber and
an Event
event, and runs these steps:
-
Run subscriber’s
next()
method with event.
Tests
4. Security & Privacy Considerations
This material is being upstreamed from our explainer into this specification, and in the meantime you can consult the following resources:
5. Acknowledgements
A special thanks to Ben Lesh for much of the design
input for the Observable
API, and his many years of work maintaining
userland Observable code that made this contribution to the web platform
possible.