RxJS in depth
Google released a major overhaul for its Angular framework in late 2016. Among other new features, it shipped with RxJS, a library for asynchronous data streams. RxJS’ observables turned out to be the most significant improvement of Angular 2 over its predecessor AngularJS. Up until then, developers didn’t have a simple means of representing values that changed over time; promises provided limited help for that, as they were able to handle only a single value over time.
The aim of this post is to give you a better understanding of RxJS. To do so, I’ll first give a few examples that explain why FRP is useful. Then, I’ll present some of the library’s most common operators. After that, I’ll delve into more advanced topics, and lastly, I’ll show you how to implement an observable from scratch.
The slides for the series of tech talks I gave at SBB about RxJS can be downloaded here: Part 1 - The basics, Part 2 - Write your own observable, Part 3 - Beyond the basics.
Why obserables matter
Let’s take a look at a simple webapp, to understand why observables can come in handy for asynchronous tasks.
Example task
The goal is to extend the app with an information page for the user that is currently logged in. In other words, we would like to do the following:
- Call a
/checkCredentials
endpoint that verifies whether username and password match - Call a
/getUser
endpoint that returns the user details - Display the retrieved user details
Solution with callbacks
function checkCredentials(
username: string,
password: string,
callback: (credentialsOk: boolean) => void
): void {
// Simulate REST call
setTimeout(() => {
callback(username == "test" && password == "1234");
}, 2000);
}
function getUser(
credentialsOk: boolean,
callback: (user?: string) => void
): void {
if (!credentialsOk) {
callback(undefined);
} else {
// Simulate REST call
setTimeout(() => {
callback("USER DETAILS");
}, 2000);
}
}
function buildPage(): void {
checkCredentials("test", "1234", credentialsOk => {
getUser(credentialsOk, console.log);
});
}
Besides the unsecure authorisation mechanism, we unfortunately have a few additional issues:
checkCredentials
andgetUser
break the single responsibility principle, since they have to invoke the next callback themselves- We would have to modify both
buildPage
andcheckCredentials
in case of a change to the signature ofgetUser
- We can’t easily deal with errors
- We can’t easily write tests
- The code would quickly become unreadable in case additional endpoints have to be called
Solution with promises
In order to address those issues, we could use promises. Promises have been introduced in ES6, and represent an asynchronous computation that results in a (single) value. In other words, a promise will always be in one of those three states:
- Pending: the initial state, before completion or rejection
- Fulfilled: the operation completed successfully, and the value may be retrieved
- Rejected: the operation failed (and there is no value to be retrieved)
As shown on the diagram below, two promises can be chained together with then()
, and potential errors caught with catch()
. The static methods all()
, allSettled()
and race()
can be used for the case where two or more promises have to be combined.
We can therefore rewrite our app as follows:
function checkCredentials(
username: string,
password: string
): Promise<boolean> {
return new Promise(resolve => {
setTimeout(() => {
resolve(username == "test" && password == "1234");
}, 2000);
});
}
function getUser(credentialsOk: boolean): Promise<string> {
return new Promise(resolve => {
if (!credentialsOk) {
resolve(undefined);
} else {
setTimeout(() => {
resolve("USER DETAILS");
}, 2000);
}
});
}
function buildPage(): void {
checkCredentials("test", "1234")
.then(getUser)
.then(console.log)
.catch(console.log);
}
Thanks to promises, the code looks much more readable already:
checkCredentials
andgetUser
do not break the single responsibility principle anymore- A signature change of
getUser
does not affectcheckCredentials
(directly) anymore - We can deal with errors more easily thanks to the
catch
method checkCredentials
andgetUser
return their result in a promise, making tests much easier to write- And most importantly, we a have a flat hierarchy for the calls to
checkCredentials
andgetUser
Additional requirements for the example task
Suppose now that we have two additional requirements:
- The user details should be reloaded on a button click
- And in order to reduce the server load, at most one request per second should be let through
Promises do not seem to be overly useful for this task. The second point in particular seems difficult to achieve with promises.
Solution with observables
In order to meet the additional requirements, we’ll have to resort to a more powerful abstraction: observables. Observables represent an asynchronous computation just like promises, but may emit multiple values over time. Essentially, they can deliver the following three notifications:
- next: contains the value that is emitted. The observable can emit multiple next notifications during its lifetime.
- error: contains an error or exception. No other notification can be emitted after it.
- complete: indicates a successful completion of the observable’s execution. No other notification can be emitted after it.
Using RxJS observables, our example app can easily meet the new requirements:
function checkCredentials(
username: string,
password: string
): Observable<boolean> {
return of(username == "test" && password == "1234").pipe(delay(2000));
}
function getUser(credentialsOk: boolean): Observable<string> {
return of("USER DETAILS").pipe(delay(2000));
}
function buildPage(): void {
fromEvent(document.querySelector('button'), 'click').pipe(
debounceTime(1000),
switchMap(() => checkCredentials("test", "1234")),
switchMap(getUser)
).subscribe(console.log);
}
As we can see, both of the new requirements could be addressed easily: the first one with the fromEvent
factory function that turns button clicks into observables, and the second one with the debounceTime
operator that throttles the observable’s emission rate.
Common RxJS operators
In the previous section, we used three RxJS operators, namely debounceTime
, delay
and switchMap
. RxJS defines many other operators and factory methods, such as the following ones:
- of: creates an observable that emits the values passed as parameter:
of(42).subscribe(console.log);
- map: transforms the emitted values with a mapping function. For example, we can retrieve the click’s coordinates from the event:
fromEvent(document.querySelector('button'), 'click').pipe(
map((event) => [event.pageX, event.pageY]),
).subscribe(console.log);
- zip: merges two observables into a single one, by pairing up the values the observables emit. The operator can e.g. be used to calculate the duration of mouse clicks:
zip(
fromEvent(document, 'mousedown').pipe(map(() => new Date())),
fromEvent(document, 'mouseup').pipe(map(() => new Date()))
).pipe(
map(([start, end]) => end.getTime() - start.getTime())
).subscribe(console.log);
- switchMap: this operator maps each value emitted by the observable to another observable. The resulting observables are then combined into a single observable. This is very similar to flatMap operations on streams (it can be shown that observables are monads).1 Do note that the operator will interrupt (i.e. complete) inner observables as soon as a new value is available in the outer observable. The example below shows how to make a simple timer with a reset button:
fromEvent(
document.getElementById('reset'),
'click'
).pipe(
switchMap(() => interval(1000))
).subscribe(console.log);
- concatMap:
switchMap
completes inner subscriptions when the outer observable emits. This is not always desirable, for example when combining multiple network requests together:
from(postIds).pipe(
concatMap(postId => this.postService.getPost(postId))
).subscribe(console.log);
- mergeMap: if you were to run the last example, you would notice that the requests are executed in serial. That was to be expected:
concatMap
waits for the inner observable to complete, before executing the next one. To have requests executed in parallel, usemergeMap
instead. Beware that the answers might come back out of order:
from(postIds).pipe(
mergeMap(postId => this.postService.getPost(postId))
).subscribe(console.log);
- debounceTime: Operators like
debounceTime
are called lossy backpressure operators. Their purpose is to limit the producer’s emission rate, in case the consumer cannot keep up with the new notifications. For example, this could happen when two infinite observables with different emission rates have to be zipped together. On the other hand, there are also lossless operators likebufferCount
that can be used whenever dropping notifications is not desirable. In the following example, we usedebounceTime
to log at most two events per second to the console:
fromEvent(document.querySelector('button'), 'click').pipe(
debounceTime(500)
).subscribe(console.log);
Error handling strategies
RxJS has a particularly practical error handling mechanism: error operators. Those operators do not differ much from other operators, apart from dealing with errors instead of regular values. Let’s consider the following observable:
const observable$ = interval(1000).pipe(
tap((i) => {
if (i > 3) {
throw "error";
}
})
);
observable$.subscribe(
(value) => { console.log("next: " + value); },
(error) => { console.log("error: " + error); },
() => { console.log("complete"); }
);
This observable emits the values 0
, 1
, 2
and 3
at intervals of 1000ms, and then errors out. Since we haven’t used any error operator, the error will just bubble up to the subscription’s error callback. That’s a quite primitive error handling mechanism, but fortunately, error operators are able to deal with more complex cases.
Catch & replace
Sometimes, an error can be replaced with a sensible default value. The catch and replace strategy will likely prove useful in those cases. In RxJS, this strategy can be implemented with the catchError
operator, switchMap
’s equivalent for errors:
observable$.pipe(
catchError((error) => {
// provide replacement values
return of(-1);
}),
map((value) => {
// replacement values will be processed like regular values here
})
).subscribe(
(value) => { console.log("next: " + value); },
(error) => { console.log("error: " + error); },
() => { console.log("complete"); }
);
Catch & rethrow
Other times, we might want to handle the error locally, before propagating it further. This can be achieved with the catch and rethrow strategy, that uses the catchError
operator and the throwError
factory method:
observable$.pipe(
switchMap((id) => this.postService.getPost(id))
catchError((error) => {
// local error handling
console.log("The post could not be found");
// create an obserable that errors out on subscription
return throwError(error);
})
).subscribe(
(value) => { console.log("next: " + value); },
(error) => { console.log("error: " + error); },
() => { console.log("complete"); }
);
Catch & retry
The two error handling strategies we have seen so far fail to address the case where servers are unreachable. In such a situation, waiting a bit and retrying later might be a better course of action. That’s the gist of the catch and retry strategy:
observable$.pipe(
retryWhen((errors$) => {
return errors$.pipe(
delayWhen((error) => {
return timer(3000);
}),
tap(() => {
console.log("retrying...")
})
);
})
).subscribe(
(value) => { console.log("next: " + value); },
(error) => { console.log("error: " + error); },
() => { console.log("complete"); }
);
// 1000ms - next: 0
// 2000ms - next: 1
// 3000ms - next: 2
// 4000ms - next: 3
// 8000ms - retrying...
// 9000ms - next: 0
// ...
retryWhen
allows to rerun a source observable in case an error occurred. Its errors$
parameter is an observable that contains the errors from all the retries that have been made. retryWhen
expects its callback to return an observable that emits an event every time the source observable is supposed to be rerun. In the example above, we simply use the errors$
observable, with events delayed by 3000ms. That way, there will be an interval of 3 seconds between a failed attempt and a retry.
Hot and cold observables
You might have noticed that some observables behaved differently from others. The reason is that there are actually two types of observables: hot ones and cold ones.2
Cold observables create the producer (an XMLHttpRequest
in the example below) themselves, and start running on subscription. Therefore, every new subscriber will receive the same notification sequence:
const cold$ = Observable.create((observer) => {
let xhr = new XMLHttpRequest();
xhr.addEventListener("load", v => observer.next(v));
xhr.open("GET", "http://www.example.org/example.json");
xhr.send();
});
cold$.subscribe((v) => {
console.log("cold subscription #1: " + v);
});
setTimeout(
() => cold$.subscribe((v) => {
console.log("cold subscription #2: " + v);
}),
1000
);
// 0ms - cold subscription #1: <json>
// 1000ms - cold subscription #2: <json>
Hot observables close over an already existing producer (the button’s event handlers in the example below). Since the producer is already running, a new observer will only get the events that have been emitted after it subscribed to the observable:
const hot$ = Observable.create((observer) => {
document
.getElementById("button")
.addEventListener("click", e => observer.next(e));
});
hot$.subscribe((v) => {
console.log("hot subscription #1: " + v);
});
setTimeout(
() => hot$.subscribe((v) => {
console.log("hot subscription #2: " + v);
}),
5000
);
// 1000ms - <click>
// 1000ms - hot subscription #1: <event>
// 2000ms - <click>
// 2000ms - hot subscription #1: <event>
// 6000ms - <click>
// 6000ms - hot subscription #1: <event>
// 6000ms - hot subscription #2: <event>
RxJS unfortunately doesn’t distinguish between hot and cold observables at the type level. Developers therefore have to refer to the documentation to figure out whether an observable is hot or cold. The most common cases are summarised in the following table:
Cold observables | Hot observables |
---|---|
of() | fromEvent() |
interval() | ActivatedRoute.queryParams() |
HttpClient.get() | |
… | … |
Subjects
You might now wonder whether a cold observable can be turned into hot a one. The answer is yes, and to achieve that, we’ll have to use subjects, another abstraction from RxJS.
Subjects act like a proxy between observables and observers. They do not trigger a new execution of the observable, but allow values to be multicasted to several observers instead. They have a subscribe
method like observables, as well as additional next
, error
and complete
methods. Therefore, they can act as observables, but also be used to emit notifications.
An otherwise cold observable can be turned into a hot one with the multicast
operator:
const hot$ = cold$.pipe(multicast(new Subject()));
hot$.connect();
The multicast
operator returns a so-called ConnectableObservable
. The subject will be linked (i.e. subscribe itself) to the source only once the connect
method is called. From that moment on, notifications will be forwarded to all subscribed observers.
Although this mechanism allows to control the execution start precisely, it is somewhat cumbersome. RxJS provides a higher level share
operator for that reason:
const hot$ = cold$.pipe(share());
A share
is roughly equivalent to a multicast
followed by a refCount
, which automates observable connection / disconnection. The example below shows that it effectively turns a cold observable into a hot one:
const cold$ = interval(1000);
const hot$ = cold$.pipe(share());
hot$.subscribe((v) => console.log("#1: " + v));
setTimeout(
() => hot$.subscribe((v) => console.log("#2: " + v)),
3500
);
// Output
// #1: 0
// #1: 1
// #1: 2
// #1: 3
// #2: 3
// #1: 4
// #2: 4
// ...
RxJS provides four different subject implementations, to accommodate various use cases:
- Subject: vanilla subjects neither have an initial value nor a cache. New observers will therefore only receive notifications that have been emitted after they subscribed:
const subject = new Subject();
subject.subscribe((n) => {
console.log("subject #1: " + n);
});
subject.next(Math.random());
subject.next(Math.random());
subject.next(Math.random());
setTimeout(() => {
subject.subscribe((n) => {
console.log("subject #2: " + n);
});
}, 2000);
// Output
// subject #1: 0.799064507560346
// subject #1: 0.5626115223854045
// subject #1: 0.5820694908227122
// subject #1: 0.5041551012937133
- BehaviorSubject: this subject requires an initial value, and emits its current value to new subscribers. The example below shows what happens when an observer subscribes immediately, and another one 2000ms later:
const behaviorSubject = new BehaviorSubject(42);
behaviorSubject.subscribe((n) => {
console.log("behaviorSubject #1: " + n);
});
behaviorSubject.next(Math.random());
behaviorSubject.next(Math.random());
behaviorSubject.next(Math.random());
setTimeout(() => {
behaviorSubject.subscribe((n) => {
console.log("behaviorSubject #2: " + n);
});
}, 2000);
// Output
// behaviorSubject #1: 42
// behaviorSubject #1: 0.28484506095836926
// behaviorSubject #1: 0.14382412500454356
// behaviorSubject #1: 0.821816890661595
// behaviorSubject #2: 0.6979675610150831
- ReplaySubject: buffers n notifications, and replays them to new subscribers. Taking the same example as above, with a
ReplaySubject
instead of aBehaviorSubject
:
const replaySubject = new ReplaySubject(2);
replaySubject.subscribe((n) => {
console.log("replaySubject #1: " + n);
});
replaySubject.next(Math.random());
replaySubject.next(Math.random());
replaySubject.next(Math.random());
setTimeout(() => {
replaySubject.subscribe((n) => {
console.log("replaySubject #2: " + n);
});
}, 2000);
// Output
// replaySubject #1: 0.5656977118236854
// replaySubject #1: 0.2378906195692394
// replaySubject #2: 0.6571879405725616
// replaySubject #2: 0.2378906195692394
// replaySubject #2: 0.6571879405725616
- AsyncSubject: only emits its last value upon completion. Such subjects can be useful e.g. for caching purposes:
const asyncSubject = new AsyncSubject();
asyncSubject.subscribe((n) => {
console.log("asyncSubject #1: " + n);
});
asyncSubject.next(Math.random());
asyncSubject.next(Math.random());
asyncSubject.next(Math.random());
setTimeout(() => {
asyncSubject.subscribe((n) => {
console.log("asyncSubject #2: " + n);
});
}, 2000);
asyncSubject.complete();
// Output
// asyncSubject #1: 0.8994070709645197
// asyncSubject #2: 0.8994070709645197
Schedulers
We’ll conclude our tour of RxJS features by looking into schedulers, a fairly advanced functionality that is rarely used in conventional webapps. Nevertheless, it can’t hurt to have a basic understanding of how it works. The main purpose of schedulers is to control the execution context of observables, i.e. whether the notifications are delivered synchronously, on the microtask queue or on the macrotask queue. RxJS provides the following schedulers:
- null scheduler: this scheduler emits notifications synchronously with a simple imperative loop. For example,
of([1,2,3])
will immediately emit 1, 2 and 3. - queueScheduler: this scheduler emits its notifications in the same event frame. The example below shows how it differs from the null scheduler:
console.log("start");
const a$ = of(1, 2);
const b$ = of(3, 4);
const c$ = of(5, 6);
combineLatest(a$, b$, c$).subscribe(console.log);
console.log("end");
// Output:
// start
// [2, 4, 5]
// [2, 4, 6]
// end
console.log("start");
const a$ = of(1, 2, queueScheduler);
const b$ = of(3, 4, queueScheduler);
const c$ = of(5, 6, queueScheduler);
combineLatest(a$, b$, c$, queueScheduler).subscribe(console.log);
console.log("end");
// Output:
// start
// [1, 3, 5]
// [2, 3, 5]
// [2, 4, 5]
// [2, 4, 6]
// end
- asapScheduler: this scheduler delivers its notifications in a new micro task event
- asyncScheduler: this scheduler is similar to the asapScheduler, but uses the macro task queue instead. The following example shows the difference between both:
console.log("start");
const null$ = of("null", queueScheduler);
const asap$ = of("asap", asapScheduler);
const async$ = of("async", asyncScheduler);
merge(null$, asap$, async$)
.pipe(filter((value) => { return !!value; }))
.subscribe(console.log);
console.log("end");
// Output:
// start
// null
// end
// asap
// async
For completeness’ sake, the animationFrameScheduler and the virtualTimeScheduler also have to be mentioned. The former is primarily used for browser animations and the latter for marble testing.
Observable anti-patterns
When starting out with RxJS, it usually takes a bit of time to get comfortable with observables. The execution flow is different, and finding out the right operator for the task at hand is not always straightforward. For these reasons, it is all too easy to write code that is hard to test, maintain and understand. To save you from some trouble, I compiled a list with the most common anti-patterns I came across during code reviews:
Nested observables
Let’s assume we want to use a URL parameter to call an external API. In Angular, URL parameters are usually retrieved by subscribing on the activatedRoute.params
observable. Because of that, it is really tempting to implement the API call like this:
this.activatedRoute.params.subscribe(params => {
const id = params['id'];
if (id !== null && id !== undefined) {
this.userService.getUser(id).subscribe(user => this.user = user);
}
});
Nested subscriptions are not much better than nested callbacks: the code is hardly testable, and not really readable either. It would be better to keep a flat call hierarchy and use switchMap
instead:
this
.activatedRoute
.params
.pipe(
map(params => params['id']),
filter(id => id !== null && id !== undefined)
switchMap(id => this.userService.getUser(id))
).subscribe(user => this.user = user);
});
Observables that error out instead of completing
In the previous example, we used the map
, filter
and switchMap
operators. Could we replace filter
by an if
clause?
this
.activatedRoute
.params
.pipe(
map(params => params['id']),
switchMap(id => {
if(id !== null && id !== undefined) {
return this.getUser(id);
}
})
).subscribe(user => this.user = user);
});
Not really: since switchMap
expects an observable, it would throw an error in case id is null or undefined, instead of completing successfully. To avoid this, return the empty observable in an else clause:
this
.activatedRoute
.params
.pipe(
map(params => params['id']),
switchMap(id => {
if(id !== null && id !== undefined) {
return this.getUser(id);
} else {
return EMPTY;
}
})
).subscribe(user => this.user = user);
});
Memory leaks
Subscriptions can introduce memory leaks that are hard to track down. Those leaks come from the callbacks of subscribe
in most cases: the issue is that these callbacks hold their references until unsubscribe()
is called. This can prevent the garbage collection of large objects like UI components, and make the app quite a bit more memory hungry than necessary. The example below shows an Angular component with such a leak:
@Component(...)
class MyComponent implements OnInit {
ngOnInit() {
this
.someService
.someObservable$
.subscribe(this.someMethod);
}
someMethod(value) {...}
}
The ngOnInit
hook creates a subscription to someObservable$
. At the end of the component’s lifecycle, Angular will take care of calling ngOnDestroy
. At this point, we expect that the garbage collector will reclaim MyComponent
from memory. That’s not the case though: the subscription still listens to someObservable$
, and still holds a reference to MyComponent
through this
. In order to fix the memory leak, we would have to unsubscribe from the observable in the ngOnDestroy
hook:
@Component(...)
class MyComponent implements OnDestroy, OnInit {
someSubscription: Subscription;
ngOnInit() {
this.someSubscription = this
.someService
.someObservable$
.subscribe(this.someMethod);
}
ngOnDestroy() {
this.someSubscription.unsubscribe();
}
someMethod(value) {...}
}
That being said, it is not always necessary to unsubscribe from observables in Angular. As a rule of thumb, a call to unsubscribe
is not necessary for:
- ActivatedRoute subscriptions
- HttpClient subscriptions
- Observables used with the async pipe
- Finite observables
However, an explicit call is mandatory in these cases:
- Form subscriptions (valueChanges / statusChanges / …)
- Renderer2 subscriptions
- Observables that never complete (or error out)
Implement an observable from scratch
In this section, we’ll write our own observable implementation. This task is not as daunting as it looks: since JavaScript is based on an event loop, we can skirt around most typical concurrency issues. A reference implementation with the code snippets from this section is available here.
Observer
We’ll start by defining observers. In a nutshell, an observer is an object with the three methods next
, error
and complete
. These methods will be called when the observable that has been subscribed to emits the corresponding event. Besides that, it contains an _isUnsubscribed
flag, that prevents the callbacks from being called once the observer has been unsubscribed.
class Observer<T> {
private _isUnsubscribed: boolean;
public _unsubscribe: () => void;
constructor(
private _next?: (value: T) => void,
private _error?: <E extends Error> (error: E) => void,
private _complete?: () => void
) {
this._isUnsubscribed = false;
this._unsubscribe = () => { };
}
next(value: T): void {…}
error<E extends Error>(error: E): void {…}
complete(): void {…}
unsubscribe(): void {…}
}
next
can now be implemented relatively straightforwardly. As long as the subscription is still active, it simply invokes the _next
callback if defined:
next(value: T): void {
if (!this._isUnsubscribed && this._next) {
this._next(value);
}
}
error
is similar to next
, with the exception that the subscription has to be closed if it is still active:
error<E extends Error>(error: E): void {
if (!this._isUnsubscribed) {
if (this._error) {
this._error(error);
}
this.unsubscribe();
}
}
The complete
notification is handled the same way:
complete(): void {
if (!this._isUnsubscribed) {
if (this._complete) {
this._complete();
}
this.unsubscribe();
}
}
Lastly, unsubscribe
toggles the _isUnsubscribed
flag, and executes the cleanup logic by calling _unsubscribe
:
unsubscribe(): void {
if (!this._isUnsubscribed) {
this._isUnsubscribed = true;
this._unsubscribe();
}
}
Subscription
Subscriptions (returned by Observable
’s subscribe
method) are straightforward to implement as well. They simply wrap an _unsubscribe
function, that disposes resources held by the execution of an observable:
class Subscription {
constructor(private _unsubscribe?: () => void) {}
unsubscribe(): void {
if (this._unsubscribe) {
this._unsubscribe();
}
}
}
Observable
Last but not least, we’ll implement the Observable
class. Its constructor is private; only the factory methods and operators are supposed to invoke it. We’ll define two factory methods (from
and interval
), and one operator (map
). To keep things simple, they’ll be implemented on the Observable
class directly.3
class Observable<T> {
private constructor(private _subscribe: (observer: Observer<T>) => Subscription) {}
static from<T>(...values: T[]): Observable<T> {...}
static interval(ms: number): Observable<number> {...}
map<U>(f: (value: T) => U): Observable<U> {...}
subscribe(
next?: (value: T) => void,
error?: <E extends Error> (error: E) => void,
complete?: () => void
): Subscription {...}
}
Simply put, observables wrap around a function that is invoked every time an observer subscribes to the observable. To understand that better, let’s implement a from
factory. Every time an observer subscribes to the observable, we want to synchronously emit the values from the array, and then complete it. This is an example of a cold observable: each observer will receive the same notifications.
static from<T>(...values: T[]): Observable<T> {
return new Observable((observer: Observer<T>) => {
values.forEach((value) => {observer.next(value)});
observer.complete();
return new Subscription();
});
}
Let’s now take a look at a factory that actually needs unsubscription logic. In the interval
factory, we want to emit an increasing number every n milliseconds. For that, we’ll use the built-in setInterval
method. However, we’ll have to stop the emission of new values upon unsubscription from the observable. To address this requirement, we’ll simply return a subscription that invokes clearInterval
once disposed:
static interval(ms: number): Observable<number> {
return new Observable((observer: Observer<number>) => {
let i = 0;
const handle = setInterval(() => {observer.next(i += 1)}, ms);
return new Subscription(() => {clearInterval(handle)});
});
}
subscribe
stitches everything together. First, it creates a new observer with the provided callbacks. Then, it invokes the internal _subscribe
method to retrieve the subscription. After that, it sets the observer’s unsubscription logic. And finally, it returns a subscription that unsubscribes the observer once disposed:
subscribe(
next?: (value: T) => void,
error?: <E extends Error> (error: E) => void,
complete?: () => void
): Subscription {
const self = this;
const observer = new Observer(next, error, complete);
const subscription = self._subscribe(observer);
observer._unsubscribe = () => {subscription.unsubscribe()};
return new Subscription(() => {observer.unsubscribe()});
}
To round things off, let’s implement a map
operator. The logic is quite simple, as we just have to subscribe to the existing observable and forward any incoming notification:
map<U>(f: (value: T) => U): Observable<U> {
const self = this;
return new Observable((observer: Observer<U>) => {
const subscription = self.subscribe(
(value: T) => {observer.next(f(value))},
<E extends Error>(error: E) => {observer.error(error)},
() => {observer.complete()}
);
return subscription;
});
}
And that’s it! We have been able to implement observers, subscriptions and observables in a mere hundred lines. To test our implementation, we can write a little counter that counts down from -1 to -20, at intervals of 100ms:
const observable$ = Observable
.interval(100)
.map(value => { return -value; });
const subscription = observable$.subscribe(
(value: number) => {console.log("next: " + value)},
(error: Error) => {console.log("error")},
() => {console.log("complete")}
);
setTimeout(
() => { subscription.unsubscribe() },
2000
);
Conclusion
The idea of modelling events that occur at multiple points of time predates RxJS of course. The library is an implementation4 of a more abstract paradigm from the late 1990s called functional reactive programming.5 Although people usually associate reactive programming with frontend development, the trend is picking up in the backend world too. Sharp tongues would argue it became mainstream the moment Spring released its Webflux module. Either way, it is a useful programming paradigm that every developer should be familiar with.
We have covered quite a lot of ground in this blog post: among other things, we examined a few operators, learnt about different strategies to cope with errors, and looked at various kinds of subjects. Lastly, we implemented our own observable; I hope this demystified RxJS a bit. In case anything is unclear or missing, feel free to reach out on twitter.
Happy hacking :)
References
- A simple Observable implementation – Federico Knüssel
- Callbacks vs promises vs rxjs vs async awaits – Maximilian Schwarzmüller
- Exploring Async/Await Functions in JavaScript – alligator.io
- Implementing JavaScript Promise in 70 lines of code! – Keyvan M. Sadeghi
- learnrxjs.io
- reactivex.io
- RxJs Error Handling: Complete Practical Guide - Angular University
- rxmarbles.com
- Should I care about RxJS schedulers? - Wojciech Trawiński
- The Best Way To Unsubscribe RxJS Observables In The Angular Applications! – Tomas Traja
- The Evolution of Asynchronous JavaScript – Gergely Nemeth
- Understanding rxjs BehaviorSubject, ReplaySubject and AsyncSubject – Luuk Gruijs
- When to Unsubscribe in Angular – Netanel Basal
Notes
-
Cf. Is RxJS.Observable a monad? - StackOverflow for a formal discussion ↩
-
Ignoring corner cases such as warm observables Hot vs Cold Observables - Ben Lesh ↩
-
Actually, this was the way it was done prior to RxJS 5.5: Pipeable Operators - ReactiveX/RxJs ↩
-
Strictly speaking, RxJS is not exactly equivalent to FRP, as it operates on discrete instead of continuous values. But in practice, a lot of people call it like that nevertheless. ↩
-
Functional Reactive Animation - Conal Elliott and Paul Hudak ↩