RxJS in Angular – Basics

Introduction

RxJS, or Reactive Extensions for JavaScript, is Observables implementation for JavaScript.

RxJS is a library for composing async and event-based progress by observable sequences. Examples Production-lines, airport baggage conveyor belt, a stream etc.

With Reactive programming, we create our entire app by defining the various streams and the operations that get performed on those streams.

In this post, we’ll learn the basics of RxJs and in upcoming posts, will focus more on the real world usages.

Setup

I will be using an existing angular application and if you are following along this is the same code base from my other angular articles.

The source code is available from this git repo.

Diagram

The following diagram shows the basic data-flow from source to observer via observable:

Observable

A collection of events or values emitted over time.

  • User actions such as MouseMove, Selection etc.
  • App events (e.g. routing, form)
  • Response from an HTTP Request.
  • Internal structures.

Think of observable as a conveyor collecting and emitting items to an observer. It connects observer to a source of data or events.

Observer

  • Observe notifications from observable.
  • Provides methods to process notifications
    • next(): to handle next emitted items.
    • error(): to handle an error notification.
    • complete(): to do any final processing or cleanup.
  • So, an observer is a collection of callbacks that knows how to listen to values delivered by the observable.

Subscriber

One class that implements the observer interface is a subscriber.

A subscriber is basically an observer with additional features to unsubscribe from an observable.
Subscriber is rarely use in our code; we mainly work with observer.

Subscribing and Unsubscribing

Subscribing

  • In RxJS, we call the subscribe method on the observable to start receiving and pass in the observer, so observable knows where to send the notifications to.
  • Must subscribe to receive notifications.
  • const sub = apple$.subscribe(observer) //subscribing

Unsubscribing

  • In RxJS, to stop receiving notifications, we call unsubscribe on the subscription returned from the subscribe() method.
  • Every time, we subscribe to start, we should unsubscribe to stop. This avoids potential memory leaks in app.
  • How to unsubscribe (several ways)
    • Call complete() method on subscriber.
    • Use an operator that automatically complete.
    • Throw an error.
    • Call unsubscribe() on subscription (this doesn’t call complete method).
    • Sub.unsubscribe()

Creation Functions (of, from)

  • In angular, we often work with observables returned from angular features e.g., Forms(valueChanges), Routing(param Map), HTTP (get) etc.
  • We often don’t create our own observables. But there are times we want to create one ourselves. We can use of, from functions.
  • of, from, take automatically-completes; so, we don’t need to unsubscribe.

of function

we can pass it any comma separated list of individual values as shown below

from function

It is similar to of function but instead of passing individual values, we can pass a single object, that already encapsulate a group of values:

with JavaScript spread operator

Creating observables from existing data

Lets see another example of creation functions

to start with, we have some existing data as shown below

we can create observables from this data and lets say we want to combine this data into a single observable:

here we are using concat operator to achieve this. Following is the output

Creating an Observable (FromEvent)

Creates an observable from any DOM event. Following example, we create an observable from ClickEvents:

First, lets create DOM elements as shown below

next, we get the reference to these elements using ViewChild. You can check this article for information about ViewChild.

Following is the code for fromEvent function to create observables from DOM event

output

Creating an Observable (interval)

interval is another creation function. It creates an observable that emits a sequential number at defined interval.

RxJS Operators

Similar to LINQ in .NET or JavaScript map function

  • With observables, we pipe emitted items through a sequence of operators to transform, filter or process them.
  • And since observables arrive over time, we can perform a delay, timeout, or other time-based operations.
  • An operator is a function, used to transform or manipulate emitted items.
  • pipe() method is used to apply operators.
  • There are over 100 RxJS operators https://rxjs.dev

Few of the operators are as follows

  • map: transformation function as it transforms each emitted item map(item=>item*2)
  • tap: taps into emissions without affecting the items tap(item=>console.log(item)
  • take: filtering operator, emits a specific number of items take(2)
  • of: of(2,4,6)
  • from: from([20,15,10,5])

map example

merge example

  • Combines multiple observers by merging their emissions.
  • It is a creation function.
  • merge(a$, b$, c$)

take example

As mentioned earlier, there are over 100 RxJS operators. You can check those at https://rxjs.dev fore more information.

Summary

In this post, we covered the basics of RxJS and its building blocks. We’ll also saw few examples of creating observables from existing data and various RxJS operators to transform, filter and process observables.

You can check the source code available form this git repo.

The deployed sample application is available on this URL.

Let me know if you have some comments or questions. Till next time, Happy coding.

2 thoughts on “RxJS in Angular – Basics”

Comments are closed.