icon-search
observables in angular 2

Observables in Angular 2

Piotr Wojnarowski 07.10.2016

In this blog post we’ll try to describe our experience with RxJS Observables in Angular 2 at Sparkbit. It will not be a tutorial, but you’ll find links to some really good tutorial articles on this topic.

What actually is an Observable?

If you are into frontend development, then you have definitely heard about Observables. They are an implementation of Reactive Programming paradigm in which everything is an asynchronous data stream. They allows you to build streams from data and events like clicks, http requests, timeouts and many more.
We at Sparkbit are using Observables as one of the core parts of our applications. More importantly, Angular 2 developers are also using it as a base of their framework. Because of that you have to know at least the basics about Observables to efficiently use Angular.
Overall, RxJS is a very powerful tool that you can use in your apps and it is worth getting experienced with it. We’ll try to introduce you to Reactive Programming and Observables.

Short technical introduction

Let’s get a little more technical. Here is a very short sample code using Observables:

private processing = false;
private endpoint = "http://example.org/setting/"

public fetch(settingId: string) {
    this.processing = true;
    this.http.get(this.endpoint + settingId)
    .subscribe((value: string) => {
        console.log("Current setting value is", value);
    }, err => {
        console.error("Error occurred while saving setting", err);
    }, () => {
        this.processing = false;
    });
}

This method sets a “processing” flag and then sends a GET request to some server, trying to get a value for a certain setting. First http.get() method returns an Observable. Then we subscribe to it. Notice that we pass three callbacks to subscribe():

In the first one we receive the requested value and print it – it will be called if the request ends successfully. It is the only one that is required in subscribe call.

Then there is the second one, error callback – it will be called if the requests fails and will also print it.

Finally, there is the “on complete” callback, called when the Observable is finished and will not send any more data. This will happen independently of observable success or failure. In case of http request it will be called right after the first success or error as it is a one time thing, but we can imagine another scenario, calling websocket. Then every message from a socket would call success callback. In case of an error, second callback would be called and the third callback would be called after closing the socket.

Observables in Angular 2

Angular 2 is using RxJS 5 as an Observable library. It is a part of ReactiveX project, creating Reactive Programming libraries for many programming languages. I recommend reading documentation for RxJS as well as for the whole ReactiveX project – it might not be JavaScript specific but it tends to cover more ground. There is also an interesting, Angular-focused tutorial from an Angular 2 documentation coauthor. As we said before, Observables are used in many basic functions of Angular 2, like the Http service, shown in the example above.

Observables at Sparkbit

When we started using Angular 2 in our apps, we had rather strange feelings about Observables. Beacuse RxJS was then in alpha (currently, as of September 2016, it’s in beta) the documentation was lacking. Most online materials weren’t describing advanced use cases. A lot of the available operators were not documented well. But as time went by the Observables grew on us and now we can show you some interesting examples of things we’ve done.

Example 1: Refresh stream

Below you can see a function that, every time a refresh is requested, will try to GET user roles and parse them. Lets get into the code:

processing: boolean;
userData: UserRoles[];
refreshStream: Observable<boolean>;
http: Http;
jsonParser: JsonParser;
rolesEndpoint: string;

public getUserRoles(queryParams: URLSearchParams) {
    Observable.of(null).concat(this.refreshStream)
    .flatMap(() => {
        this.processing = true;
        return this.http.get(this.rolesEndpoint, {search: queryParams})
             .finally(() => this.processing = false);
    })
    .catch((err: Response) => {
        console.error("Error while requesting data", err);
        return Observable.of(err);
    })
    .map((res: Response) => {
        if (res.ok) {
            return this.jsonParser.parseArray(UserRole, res.json());
        } else {
            return [];
        }
    })
    .catch(err => {
        console.error("Error while parsing the response", err);
        return Observable.of([]);
    })
    .subscribe(data => {
        this.userData = data;
    });
}

Variables:

  • refreshStream is an Observable that will emit a message everytime some event changes user roles
  • processing is just a flag to display some simple message that app is processing a request
  • userData – user roles to be displayed by the app
  • http – default Angular 2 HTTP service
  • jsonParser – Sparkbit’s own JSON parser – it will get a separate blog post some day
  • rolesEndpoint – url to web service
Operating on Observables:

Later, in line 9, with the of() method, we create a simple Observable with one value and concat it with our refreshStream. concat() creates an Observable that will emit a message on every message emitted by one of the concatenated Observables. The body of the message does not change. It creates an Observable that will emit one message on the start and one message on every refresh.

Then, in lines 10 to 14, we use my favorite operator – flatMap(). Is gets messages from the previous Observable and maps them to another Observable. Note that it does not map it to another value – that would be a simple map(). FlatMap creates another Observable. So, in this case, every time we need to refresh, we call HTTP GET. Before the call, we check the processing flag, and after it we uncheck the flag. finally() will be called on completion of Observable either successfull or not.

Then, in lines 15 to 18 we catch an error. If we get an error here, we know it was HTTP error. We log it and pass it on.

In lines 19 to 21 we try to use map(), jest like on arrays. It the response was successful, we parse it’s JSON content. If not, we map it to an empty array.

In lines 22 to 25 we again try to catch an error. This time it could be a parsing error. We log it and pass and empty array.

Finally, in lines 26 to 28, we save received data. Instead of two catch() operators we could add an onError callback here, that would check what kind of error happened and act accordingly.

To sum up, we have a clean, functional piece of code with separated concerns doing a rather robust thing.

Example 2: Document Viewer

Now we’ll take a look at our document viewer displaying XML documents and allowing user to annotate them. Viewer needs to be repainted every time the document changes, the annotations change or when the window is resized

annotationsStream: Observable<Annotation[]>;
documentStream: Observable<string>;
sectionId: string;

public ngOnInit() {
    Observable.combineLatest(
        this.documentStream,
        this.annotationsStream.map(annotations =>
            annotations.filter((annotation: Annotation) =>
            annotation.sectionId === this.section)),
        Observable.fromEvent(window, "resize").startWith(null).debounceTime(200))
    .subscribe(data => {
        let htmlDoc = <string>(data[0]);
        const annotations = <Annotation[]>(data[1]);
        this.repaintViewer(htmlDoc, annotations);
    });
}
Variables:
  • annotationsStream – Observable of list of annotations, downloaded from server.
  • documentStream – Observable emitting content of the XML document to be displayed.
  • sectionId – our documents can have multiple sections – viewer is displaying one at a time.
Code itself:

We start with combineLatest in line 6 – it allows to merge the results of multiple Observables. Unlike concat(), it emits message only after all combined Observables emitted something and every message is build out of last message emitted for every observable. So, every time user resizes window, the subscribe callback will receive the last state of document and the last state of annotations, without calling any backend services. By default combineLatest() emits heterogeneous list of every value.

In lines 8 to 10 we transform Observable of all annotations to the Observable of only the annotations from current section – we use map(), which gets the complete list and filters it.

There seems to be a lot going on in line 11. First, we use fromEvent() to create an Observable that will react on window resize events. As combineLatest() waits for every Observable to emit a message, we wouldn’t repaint the viewer before first resizing of the window, event if the annotations list changed. To avoid that we use startWith() to set the initial value. Additionally we have to remember that for every time user grabs a window and resizes it, browser emits a lot of “resize” events. We don’t want to repaint on every one of them, so we use debounce() – after every message it waits for a given time and emits a message only if nothing happens. In this case it should only emit a message when the user finished resizing or paused resizing for a moment.

In lines 13 and 14 you can observe how to get data from combineLatest. There is also a third element of the data array, a DOM Event from window resizing, but we are only interested in its existence, not its value.

Unsubscribing

One great feature of the Observables is that you can unsubscribe from them, basically cancelling it instead of waiting for it’s completion. The subscribe() method returns a Subscription object, which has unsubscribe() method doing just that. If there is more then one subscriber to the Observable, it would only unsubscribe itself. Observable will not call it’s onSuccess or onComplete, but other subscribers will receive further messages.

The most obvious case for cancelling Observables in our apps is on the destruction of the Angular Component that subscribed to it. Unsubscribing from all Observables in Component’s ngOnDestroy() function is highly recommended. Otherwise it may result in some unexpected behavior, like calling an HTTP request multiple times or a memory leak.

Summary

Basically everything can be treated as a data stream – user actions, DOM events, HTTP requests, timeouts and more. RxJS is a great tool to work with them, allowing you to create clean code, taking a lot of imperative paradigm pains away from the developer. It gives you an additional level of abstraction where you can focus on business events in the application.

Reactive Programming requires a very specific mindset, so it may take time to fully grasp it. There are a lot more operators that we didn’t mention in this article. They are the most useful tool in the RxJS toolbox. This documentation is a good place to continue reading about them.

comments: 0


Notice: Theme without comments.php is deprecated since version 3.0.0 with no alternative available. Please include a comments.php template in your theme. in /var/www/html/www_en/wp-includes/functions.php on line 4597

Leave a Reply

Your email address will not be published. Required fields are marked *