RxJS – Combining Data Streams

Introduction

In previous few posts, we’ve covered some theory and few demos example of RxJS in Angular.

The previous post was focused about RxJs Subjects and we saw few examples of its usage such as we built an eventBus to broadcast notifications and the likes.

Today, we’ll learn about the various needs and patterns of combining data and action streams to make our angular application more reactive.

Using subject to create ActionStream

Subjects are frequently used to create action-streams in angular applications. Using subjects, we can call next() method to emit items and other code can subscribe() for notifications.

An action is simply about informing when something happened. When we define an observable for an action, the observable emits an item any time the action occurs. e.g., FilterBy dropdown with different selections or user types into a search-box to search for products etc.

Following are basic steps to setup an action-stream.

We can use subject to create action stream :

actionSubject = new Subject<string>();

call next() method to emit items:

this.actionSubject.next('tools');

other code can subscribe to receive notifications:

this.actionSubject.subscribe(item=> console.log('received', item));

Combining Streams

One way we work with multiple datasets reactively is by combining the emitted items for multiple observers into one set of emitted items.

Why Combine Data Streams?

  • To Map id to a string (e.g. CategoryId lookup for user-friendly display)
  • To work with multiple data sources e.g. combining product data with supplier data.
  • To simplify template code
  • To React to actions e.g. re-filter a list of products each time user input change.

Types of Combination Operators/Functions

  • Combine to a single observable result
    • merge, concat
  • Flatten higher-order observables
    • mergeAll
  • Emit a combined value
    • combineLatest
    • forkJoin
    • withLatestFrom

combineLatest

Creates an observable whose values are defined:

using the latest values from each input observable

combineLatest([a$, b$, c$])

Each time, any of the observable emit, the latest value from each observable is combined into an array and emitted to output observable.

Note, that combineLatest doesn’t emit anything until all of the observables, have emitted atleast once. It completes when all input observables complete.

It is a static creation function, not a pipe-able operator.

Usage

  • To work with multiple datasets.
  • To reevaluate state when an action occurs

[combineLatest]([data$, action$])

Combine latest emits after both observable have emitted and then again each time the action streams emits. The emit result is [ [data], action] and it re-fires the downstream pipeline. So, if pipeline filter the data, it will perform the filter again. This is a key to reaction to actions.

see more details on the official website on this link.

By combining a data-stream such as list of users, or list of products, with an action-stream that watches user actions (e.g. filter selection from drop-down or input box entry), we get a stream with both the data and the action. We can then map the result to reevaluate the state each time the action-stream emits and filter the list.

we’ll see an example to put all this together.

Filtering users List

I’ll be using the users component from one of our previous post

Just to remind that following is the component and corresponding UI

Now lets add a search input where user can enter search-criteria as shown below

This code shall look familiar from previous posts).

Combining Data Stream and Action Stream

[combineLatest]([data$, action$])

We already have data-stream i.e. users$ property on the component. Lets create an action-stream as shown below

we can now wire-up the emission (when user types in search input box) as shown below

this will cause the pipe-line to re-execute for combineLatest operator, which will see next.

and next, lets combine data-stream and action-stream using combineLatest

By combining, a data-stream, with an action-stream, our code automatically reacts to user actions and UI updates.

output

on the UI, the page initially looks empty

however, if we enter some text, we will see filtered results as shown below

So, why it is like this that initially there is no data. You may already realized that, its coz initially when page loads, user have not entered any text in filter input, so no action-stream fired and thus the combineLatest’s downstream pipeline didn’t executed; hence the empty page.

So, we need some magic way to fire the action-stream for the first-time, so that the combineLatest downstream pipeline can be executed; meet BehaviorSubject.

BehaviorSubject

A special type of subject that buffers its last emitted value and emits that value to late subscribers ( users-list component as our case). Behavior subject requires an initial value.

Following picture, we change from a Subject to BehaviorSubject:

Now with this change, listFilterSubject is initialized with an empty value, and because Combine latest emits after both observable have emitted and then again each time the action streams emits, the downstream pipeline will be executed and page will show data as shown below

and when user enters search criteria, it next() method will be called on actionStream subject, which will cause the pipeline to re-execute with filter-criteria and UI will be updated automatically:

Following picture shows the typical steps

Summary

In this post, we covered the few basic requirements and needs for combining data-streams and saw an example of combining a data-stream with an action-stream.

We learned about few useful RxJS combination operators and used combineLatest to implement a search/filter box. In upcoming posts, we will cover few more examples and real-world use-cases.

You can check the source code available form this git repo.

The deployed sample application is available on this URL.

Let me know if you have some comments or questions. Till next time, Happy coding.

1 thought on “RxJS – Combining Data Streams”

Comments are closed.