In this article, you will learn how to implement custom observables via Observable.create method. Furthermore, you will also learn why Observables are lazy. Finally, you will re-create RxJS’s fromEvent and of method to understand the library bit more.
RxJS is the Library for doing Reactive Functional Programming in JavaScript. It combines Observer pattern with Iterator pattern and also adds Array’s extras like map, filter, reduce and other similar methods on top of it. The most important thing is that you can do functional programming with Asynchronous events with RxJS.
Think of RxJS as Lodash for events. — RxJS Docs
Following are some important terms that you should know before you advance to the coding part.
Observable: It’s basically a collection of events.
Observer: Collection of callbacks, which listens to the value emitted by Observable.
Subscription: It basically represents the invocation of Observable. You can unsubscribe to Observable via Subscription.
On API level, Observable is an Object with subscribe method on it. This subscribe method takes the observer as an argument.
{
subscribe(observer);
}
This observer object has three methods: next, error, complete.
To emit a value, you can call the observer.next method with the value that you want to emit.
In case there is an error, you can emit that error via observer.error.
Finally, if everything is finished. You can call the observer.complete method to complete the observable.
Following is an example of an Observer.
{
next: x => console.log(x),
error: e => console.log(e),
complete: () => console.log('complete')
}
You can wrap this pattern around any Push API of the Browser.
Let’s consider the example of setInterval.
Take a look at following snippet.
const timeId = setInterval(() => { console.log('setInterval'); }, 1000); // Stoping the Interval clearInterval(timeId);
You called setInterval with an anonymous function. setInterval will run this function every one second. It also returns a timeId for referencing this interval further. To stop the execution at any point, you have to call clearInterval with the timeId as shown above.
Now, let’s wrap it around Observable as follows:
function setIntervalObservable(time) {
return {
subscribe: (observer) => {
const timeId = setInterval(() => { console.log('setInterval'); }, 1000);
// Teardown logic here
return {
unsubscribe: () => {
clearInterval(timeId);
}
}
}
};
}
Let’s understand this whole bunch of code clearly. The setIntervalObservable function takes time as input and returns an Observable. This Observable has subscribe method which takes observer as input. When you subscribe to that Observable, it fires the setInterval. This subscribe method also returns an object with the unsubscribe method to stop the interval.
You can use this Observable like this:
const interval$ = setIntervalObservable(1000);
const subscription = interval$
.subscribe({ next: () => console.log('interval') });
// Stoping Interval at some point
subscription.unsubscribe();
Take a look at working demo at JSBin.
Take a closer look at the following code:
const interval$ = setIntervalObservable(1000);
Now, you have created an Observable. But it is not going to do anything until you fire subscribe method. Because the actual work is done inside the subscribe method in the Observable’s implementation. That’s why Observables are lazy.
Observable is an Object with subscribe method on it.
The Real implementation in RxJS is a bit more complex. But in order to use your custom Observable with RxJS’s other methods, you need to use RxJS’s Observable.create method for creating your custom Observables.
This Observable.create method takes subscribe function and returns an Observable.
Observable.create(subscribe): Observable<any>;
This subscribe function is like a blueprint of the Observable where you would define the Observable. It takes observer object as an argument. Now, let’s create our setInterval Observable with create method.
const Observable = Rx.Observable;
// or for es6 import
// import { Observable } from 'rxjs';
function setIntervalObservable(time) {
return Observable.create((observer) => {
const timeId = setInterval(() => { console.log('setInterval');}, 1000);
// Teardown logic here
return () => {
clearInterval(timeId);
}
});
}
const interval$ = setIntervalObservable(1000)
.subscribe(() => console.log('interval'));
// To unsubscribe from Observable, uncomment it to unsubscribe
//interval$.unsubscribe();
If you compare this code with one without create method, the major difference is that you have to pass subscribe function to create method. Also, you have to return a function inside the subscribe method instead of an object with an unsubscribe method. This is all handled by RxJS for us.
Take a look at working demo at JSBin.
In this section, you are going to implement a very basic version of RxJS’s fromEvent and of method. The idea is to give more insight about how these methods work and how you can create custom observables according to your own requirements. It’s highly likely that the actual implementation is more complex.
A fromEvent method in RxJS library has the following signature:
fromEvent(target:EventTargetLike, eventName:string)
:Observable<T>;
Creates an Observable that emits events of a specific type coming from the given event target. – RxJS Docs.
So basically, with this method, you can listen to events on a specific DOM Node in Observable way. For example:
import { fromEvent } from 'rxjs';
fromEvent(document.querySelector('#myButton', 'click'));
The above code creates an Observable which listens to click event on HTML element with myButton id.
Let’s implement it ourselves.
import { Observable } from 'rxjs';
function fromEvent(dom, event) {
return Observable.create((observer) => {
const handler = (event) => {
observer.next(event);
};
dom.addEventListener(event, handler);
return () => {
dom.removeEventListener(handler);
}
});
}
It’s fairly simple, in the subscribe function, where you created a handler function which passes the values to the subscriber via object.next. This handler is attached as an event listener for that specific DOM node and event. In the unsubscribe method, you are removing the event listener to clean up.
Take a look at JSBin Demo.
The of method has the following signature:
of(value):Observable<T>;
Creates an Observable that emits some values you specified as arguments, immediately one after the other, and then emits a complete notification - RxJS Docs
Basically, it emits all the values that you have specified in arguments one by one and then completes. For example:
import { of } from 'rxjs';
of(1,2,3);
The above code creates an Observable which will emit 1,2,3 and then completes.
import { Observable } from 'rxjs';
function of(...values) {
return Observable.create((observer) => {
const handler = () => {
values.forEach(val=> {
observer.next(val);
});
observer.complete();
};
handler();
});
}
Inside subscribe method, you created a handler function, which for Each over all the inputs of method and it fires observer.next with each input. Then it fires observer.complete() and finishes. And finally, we are calling handler function.
Take a look at JSBin Demo.
In this post, you have learned how to create Observable via create method. Along with that, you have learned why Observables are lazy. In the end, you also did a case study on RxJS’s fromEvent and of method.
Leave a Reply
Your email address will not be published. Required fields are marked *