Reactive Programming

Reactive Programming

Reactive Programming

First and foremost, implementing reactive programming means working mainly with events streams, but in order to understand what an event stream is, we should start with a simple diagram for better understanding how a stream works.

 

It’s very important to understand how it works because often in the documentation you can understand how the APIs work by just looking at the diagram without the need to read the full explanation. Also, marble diagrams are useful during the testing phase because when we need to simulate an event stream, we will use them against our event stream implementation. But let’s see what they are so we can understand better the concept of event stream.

 

In a marble diagram, we can spot a horizontal line representing the time (our stream), and the value inside the colorful circles are events that are happening at a certain point in time in our application (events). The vertical line, instead, represents the end of our stream, after that point the stream is completed.

Reactive Programming

So if we want to describe this diagram we can easily do it in this way:

  • After the application started, a stream emits the value 4.
  • After a certain amount of time, the stream emits the value 1 and right after 3.
  • After all these events, the stream was completed.

 

The stream could also end after an error, and in that case instead of seeing a vertical line, the marble diagram will show an X symbol notifying when the error happened inside the stream. The last bit to mention is when we apply transformation to the values inside a stream via operators. An operator is a method that allows us to manipulate the data in a stream or the stream itself.

 

In this case the marble diagrams can show how the values are transformed like in this example with the map operator where we are adding 1 to any number emitted by the stream on the top and creating a second stream with the values manipulated. Until now, we understood that an event stream is a sequence of values (events) passed in a certain period (stream).

 

We can then assume that everything inside an application could be a stream, and in a certain way it’s exactly what the reactive paradigm is leveraging: we can transform listeners with callbacks in event streams, we can wrap an http request in a stream and handle the response by manipulating the data before updating a view, and we can use streams as communication channels for passing data between two or more objects, and many other use cases. In the reactive panorama we need to introduce a particular type of stream: the observable.

 

A great definition of observable is available on the Rx.JS 5 documentation:

 

Observer Pattern

In order to consume data emitted by an observable, we need to create an observer (consumer); this subscribes to the observable and reacts every time a value is emitted by the observable (producer).

 

If we want to summarize in technical words what an observable is, we could say that an observable is an object that wrap some data consumed by an observer (an object with a specific contract) and once instantiated provides a cancellation function.

 

The observer is an object that allows us to retrieve the value emitted by an observable and has a specific contract exposing three methods: next, error, and complete functions. When you deal with observables and observers bear in mind two well-known design patterns for fully understand their mechanisms: the observer and the iterator patterns.

 

Observer Pattern

The Observer Pattern is a behavioral pattern where an object called Subject maintains a list of other objects (observers) that want to be notified when a change happens inside the program. The observers are usually subscribing to a change and then every time they receive a notification, they verify internally if they need to handle the notification or not. Usually, in typed languages, the Observer Pattern is composed by a subject and one or multiple observers.

 

The subject is handling the subscription, unsubscription, and the notification to an observer; then each observer implements a specific interface that contains a publish method (update) for reacting to a notification originated by another object or a user interaction and shared through the Subject.

 

Iterator Pattern

Iterator Pattern

The Iterator Pattern is a behavioral pattern used for traversing a data container like an array or an object, for instance. As JavaScript developers, we should be familiar with this pattern, considering it was added in ECMAScript 2015.

 

This pattern allows an object to be traversed calling a method (next) for retrieving a subsequent value from the current one if it exists. The iterator pattern usually exposes the next method and the hasNext method that returns a Boolean used for checking if the object traversed contains more values or not.

 

Putting the Code into Practice

Let’s create an example by walking through the code for a better understanding of how an observable/observer relation works in practice:

const observable = Observable.range(1, 10);
const observer = observable.subscribe(onData, onError, onComplete); function onData(value){
console.log(value);
}
function onError(err){
console.error(err);
}
function onComplete(){
console.log("stream complete!");
}

As you can see from this first example in the first line we have created an observable that contains a range of values from 1 to 10; that means we have these values ready to be emitted to an observer when the observer will subscribe to it.

 

On the second line we subscribe to the observable; the subscribe method in many libraries has three parameters that correspond to callbacks called once the observable is emitting a value, an error, or a complete signal.

 

In the rest of the code we are just reacting to the information emitted by the observable printing to the console an output that could be either an event from the stream or an error or the complete signal. When an observable receives events can also allow us to manipulate the data for providing a different output from them like we have seen previously in the marble diagram where we were mapping each single value emitted, increasing by 1 each of them.

 

Different Reactive libraries are providing different sets of utilities, but the most complete one when the blog was written is without any doubt Rx.js. This library provides a set of operators for manipulating not only the events emitted by the observable but also with other observables; it is not unusual to transform observable of observables in flat observables: think about them as an array of arrays. These operators will allow us to flat the object nesting accessing directly to the values in the observables without the iteration complexity.

 

We are going to see the possibilities offered by operators in the next section when we explore the different Reactive libraries so we can understand in practice how to use it and what we can do with them. Obviously, we will review only the most used ones because there are literally hundreds and our focus is on understanding the mechanism behind them more than exploring the entire library.

 

Stream Implementations

Stream Implementations

Now that we understood what streams are, it’s time to get a better understanding of what is available in the front-end reactive ecosystem. The first library we are going to take into consideration is Rx.JS 5.

 

Rx.JS

Rx.JS is the most famous and used reactive library at the moment; it’s used almost everywhere, from Angular 2 where the library is integrated inside the framework to many other smaller or larger frameworks that are embracing this library and leveraging its power.

 

It’s one of the most complete reactive libraries with many operators and a great documentation, Rx.JS is part of Reactive Extensions (ReactiveX). Learning it will mean being able to switch from one language to another using the same paradigm and often the same APIs.

 

Rx.JS can be used on front-end applications as well on back-end ones with Node.js; obviously its asynchronous nature will help out on both sides of an application’s development. There are two main versions available. Version 4 is the first JavaScript porting of the Reactive Extension, and then recently another implementation started to rise up that is version 5. The two libraries have several differences in our examples and so we will use version 5.

 

In this section, we won’t be able to play with all the operators available on Rx.JS because this library is broad enough for having its own blog (in fact, there are many blogs available that I strongly suggest you give a go). Our main aim is to grasp a few key concepts, considering we are going to use it in many other examples in this and the next blogs.

Therefore we are going to see Rx.JS in action in three different scenarios:

  • When we need to reduce our data for retrieving a final value.
  • When we retrieve data from an HTTP endpoint.
  • When we need to communicate with multiple objects via observables.

Considering how large Rx.JS is, we introduce another important concept such as the difference between hot and cold observables, to help to understand better how the other libraries work.

 

Let’s start with a simple example on reducing an array of data and retrieve the sum of values filtering the duplicates present in the initial array.

import Rx from "rxjs"
const data = [1,2,10,1,3,9,6,13,11,10,10,3,19,18,17,15,4,8,4];

const onData = (value) => console.log(`current sum is ${value}`); const onError = _ => console.log("stream error");
const onComplete = _ => console.log("stream completed");
const obs = Rx.Observable.from(data)
.filter(value => value % 2 === 0)
.distinct()
.reduce((acc, value) => acc + value);
obs.subscribe(onData, onError, onComplete);

As you can see in the example above, we are converting an array of numbers to an observable (Rx.Observable.from(data)), then we start to transform the values inside the array step by step, applying multiple transformations. In fact, first we are filtering the values creating a new array containing only the even numbers, then we remove all the duplicates inside the array with the distinct operator provided by Rx.JS; and finally we sum the values inside the array with the reduce operator.

 

Every time we are applying a transformation via an operator, we are creating a new observable that is returned at the end of the operation. This means we can concatenate multiple operators by applying several transformations to the same data source. In order to output the final result of our observable, we are subscribing to the observable with the subscribe method, and this method accepts three functions.

 

The first one is triggered every time the observable emits data, the second one if the observable emits an error, and the last one is triggered once the observable receives a complete signal from the producer (in our case, the end of the array).

 

Remember that these callbacks are not all mandatory; we can potentially skip to declare the error and complete callbacks if we don’t need to react to these events. Imagine for a moment how you would implement the same logic in imperative programming… done it?

 

Ok, now you probably understood how functional and reactive paradigms can help to express complex operations in few lines of code, having a clear idea of what’s happening, without storing temporary values anywhere, and without generating any side effects. Everything is contained inside the observables and cannot be modified from external operations.

 

The next example, instead, aims to retrieve the response from an API on the Web and then propagate the result to a hypothetical view. In this case we won’t use any specific architecture but we are going to work well with the single responsibility principle and good encapsulation.

import Rx from "rxjs";
const URL = "https://jsonplaceholder.typicode.com/users"; const simplifyUserData = (user) => {
return {
name: user.name, email: user.email, website: user.website
}
}
const intervalObs = Rx.Observable.interval(1000)
.take(2)
.mergeMap(_ => fetch(URL))
.mergeMap(data => data.json())
.mergeAll()
.map(simplifyUserData)
intervalObs.subscribe(user => { console.log(`user name is ${user.name}`); console.log(`user email is ${user.email}`);
console.log(`user website is ${user.website}`); console.log('-------');
},
error => console.error("error", error), complete => console.log("completed"))
After creating the constant with the URL to consume, we are creating a method (simplifyUserData) for filtering the data we want to use in our application, by just returning a subset of the information instead of the entire record retrieved from that URL.


The endpoint used is a public endpoint usually used for mocking data, but in our case we are going to receive an array of objects that looks like this:
{
id: 1,
name: "Leanne Graham", username: "Bret",
email: "Sincere@april.biz", address: {
street: "Kulas Light", suite: "Apt. 556", city: "Gwenborough", zipcode: "92998-3874"
[....]
}

We want to consume this endpoint every second but only twice during the application life cycle.

JSON

In order to do that we create an observable with an interval of a second (Rx.Observable.interval), specifying how many times we want to perform this operation (take operator) and then we want to fetch the data from a URL (first mergeMap) and then return the data fetched as JSON, splitting the object retrieved (mergeAll operator, we could have use other operators like concatAll, for instance, obtaining the same result) in order to emit a value per time instead of the entire array in one go. Finally we simplify the data with the method we created at the beginning of the script (map operator).

 

After creating the interval observable, we perform at each tick a fetch operation. Fetch is not an observable but, as you can see from the example, we are not transforming it via an operator for translating it to an observable, but we are using the operator mergeMap and inside there we are performing the fetch operation.

 

This is because fetch is a Promise A+ and these promises are recognized by Rx.JS, so without the need of using the fromPromise operators like the standard promises, we can use them straightaway inside our Observable created by the mergeMap operator.

 

MergeMap is an operator that is doing two things. The first one is merging two streams in a unique one and then iterating with the values emitted inside the streams: in this case it’s flatting the promise in order to return just a stream with the service response.

 

The second mergeMap operator is used with another promise and we are specifying the return value emitted in that stream should be the JSON representation of the data fetched from the endpoint. This second promise is due to the fetch API contract as you can see in the MDN specifications.

 

The last operator is the mergeAll one, which usually should be used for merging all the observables inside an observable of observables, but in this case, it is flatting the last promise containing the array with the data retrieved and emitting each single value of the array in an iterative way instead of emitting the entire array as a unique value, allowing us to use the final operator (map) for simplifying the data emitted.

 

It’s easy to understand how versatile and powerful Rx.JS could be in a situation like this one. Obviously at this stage we know that there is some work to do to get familiar with all the different operators offered by Rx.JS, but don’t be too sad because it’s a step we all have done before mastering the reactive paradigm.

Before reviewing another chunk of code, we need to explain another key concept of the observables: what it means when an observable is hot or cold.

 

Hot and Cold Observables

Hot and Cold Observables

 

We can have two different types of observables: hot and cold. A cold observable is lazy and unicast by nature; it starts to emit values only when someone subscribes to it. Instead, the hot observables could emit events also before someone is listening without the need to wait for a trigger for starting its actions; also they are multicast by design.

 

Another important characteristic that we should understand for recognizing hot and cold observables is to understand how the producer is handled in both scenarios. In cold observables, the producer lives inside the observable itself; therefore every time we subscribe to a cold observable, the producer is created again and again.

 

Instead, in the hot observable the producer is unique and shared across multiple observers; therefore we will receive fresh values every time we are subscribing to it without receiving all of them since the first value emitted. Obviously, there are ways in hot observables to create a buffer of values to emit every time an observer is subscribing to it, but we are not going to evaluate each single scenario right now.

 

Let’s see a hot and a cold observable in action for having a better understanding of how to use these two types of objects using Rx.JS.

 

Cold Observables

Cold Observables

The best way to understand a cold observable is seeing it in action:

import Rx from "rxjs";
const source = Rx.Observable.interval(2000).startWith(123) source.subscribe(value => console.log("first observer", value)) setTimeout(_ =>{
source.subscribe(value => console.log("second observer", value))
}, 5000);
setTimeout(_ =>{
source.subscribe(value => console.log("third observer", value))
}, 8000)

In this example we are creating an observable that is emitting a sequential value every 2 seconds: it will start from 0 until infinite because we didn’t specify how many values we want to emit before the event streams is completed. We use the startWith operator when we want to show an initial value in our user interface or to start a sequence of events without waiting for the values passed asynchronously.

 

In a cold observable we have the producer, in this case the observable emitting sequential numbers, which is instantiated three times – basically any time a consumer is subscribing to it. In the image above you can clearly see the sequence of numbers is starting every time a consumer is subscribing to the producer.

 

We can conclude that the cold observable re-instantiates the producer any time a consumer is subscribing and it is unicast so the values produced are listened by a consumer per time; also we can have multiple consumers subscribing to the same producer. By default all the observables we create in Rx.JS are cold but we have different ways for transforming them into hot observables.

 

Hot Observables

Hot Observables

Let’s see what a hot observable looks like:

import Rx from "rxjs";
const source = Rx.Observable.interval(2000)
.startWith(123)
.publish()
.refCount();
source.subscribe(value => console.log("first observer", value)) setTimeout(_ =>{
source.subscribe(value => console.log("second observer", value))
}, 5000);
setTimeout(_ =>{
source.subscribe(value => console.log("third observer", value))
}, 8000)

 

The example is very similar to the cold observable one, but in this case we are using other two operators: publish and refCount. Publish operator is useful to transform our cold observables to hot observables because it returns a ConnectableObservable instead and observable object.

 

A ConnectableObservable starts emitting values only when a connect method is called or, like in our example, when we use refCount operator. RefCount operator is used when we don’t need to control the start and stop of the ConnectableObservable but instead we automate this process; when a consumer subscribes to a ConnectableObservable, it is the logic provided by refCount that will trigger the connect method for emitting the values to the subscribers.

 

Also the refCount logic will unsubscribe once there are any subscribers ready for receiving new values. When we have a hot observable the producer becomes multicast and the values emitted are shared across all the subscribers but at the same time we need to remember that by default, it’s not waiting for any consumer to subscribe. Instead it is emitting values immediately after the connect method is called.

 

There are operators that will allow you to control when a hot observable starts to emit values like multicast and connect instead of refCount, which is automating these steps. Just keep this in mind when you work with Rx.JS because there are many opportunities available with this great library so keep an eye on the documentation and the implementation will become very smooth.

 

Understanding what we are subscribing to and what are the characteristics and benefits of a hot or a cold observable could save a lot of debugging time and many headaches once our code hits the production stage.

 

I think it is clear that Rx.JS is not just that – it’s way more than that but with these simple examples we are trying to memorize a few useful key concepts that will facilitate the integration of this library in our existing or new applications. Now it’s time to see another reactive library flavor with XStream. It’s important to understand that the concepts showed in the Rx.JS sections are very similar in other libraries; therefore owning them will allow you to pick the right library for the a project.

 

XStream

XStream

XStream is one of the most recent reactive libraries created by André Staltz for providing a simple way to approach reactive programming tailored specifically for Cycle.js.

 

XStream leverages a few concepts that we have already seen in action with Rx.JS like the observables but by simplifying the core concepts behind streams. All the streams are hot observables and there isn’t a way to create cold observables with this library.

 

I personally think the author took into consideration the reactive programming learning curve when he worked on this library, and he tried smoothing out the entry level in order to be more approachable by newbies and experts as well.

 

On top, XStream is very fast, second only to Most.js (another reactive library), and very light too, around 30kb; and with less than 30 operators available, it represents a super basic library for dealing with observables, perfect for any prototype or project that requires the use of observables but without all the “commodities” offered by other libraries with plenty of operators.

 

XStream is using instead of observables the streams concepts, which are event emitters with multiple listeners. A listener is an object with a specific contract, and it has three public methods: next, error, and complete; as the name suggests, a listener object is listening to events emitted by a stream.

 

Comparing streams and listeners to Rx.JS observables and observers, we can say that a stream is acting like a hot observable and the listener like an observer mimicking the same implementation.

 

Last but not least, in XStream we can use producers for generating events broadcasted via a stream to multiple objects. The producer controls the life cycle of the stream emitting the values at its convenience. A producer has a specific signature, and it exposes two main methods: start and stop. We will see later an example that will introduce the producer concept.

 

Now it’s time to see XStream in action, porting the examples we have approached previously during the Rx.JS section. The first example is related to reducing an array of values that extracts just the even numbers, removing the duplicates and calculating their sum:

import xs from 'xstream';
const data = [1,2,10,1,3,9,6,13,11,10,10,3,19,18,17,15,4,8,4];
const filterEven = (value) => value % 2 === 0; const removeDuplicates = (inputStream) => {
const buffer = [];
const outputStream = xs.fromArray(buffer); inputStream.addListener({
next(value){
if(buffer.length === 0 || buffer.indexOf(value) < 0) buffer.push(value);
}
})
return outputStream;
}
const sumValues = (acc, value) => acc + value;
const listener = { next(value){
console.log(`current sum is ${value}`);
},
error(){
console.error("stream error");
},
complete(){
console.log("stream completed");
}
}
const stream = xs.fromArray(data)
.filter(filterEven)
.compose(removeDuplicates)
.fold(sumValues, 0);
stream.addListener(listener);

The data object contains an array of unordered integers with duplicates values and even and odd numbers. Our aim for this exercise is to filter the array retrieving only the even numbers, removing the duplicates after the first transformation, and finally calculating the final sum. After instantiating the array source, we are defining the transformation we are going to apply; the first one is filtering the even numbers from the initial array with the method filterEven.

 

In this method we are checking that each value we are going to receive is divisible by 2 or not. If so it means the value is an even number so we want to keep it, otherwise we will skip it (remember that a stream will emit 1 value per time).

 

The second method is removeDuplicates, but in XStream there isn’t an operator for doing it automatically like for Rx.JS. As we said at the beginning, XStream is meant for learning how to handle streams more than having a complete library with many operators.

 

Therefore we are going to use the compose operator that returns a stream and expects a new stream as input. OutputStream will be our new stream used by the next operator emitting the array generated inside the removeDuplicates method. Inside removeDuplicates we create an array for storing the unique values and we push them only if are not present inside the array.

 

The last transformation for this exercise is calculating the sum of the values filtered in the previous steps. We are going to use the fold operator that requests a function with two arguments: an accumulator and the current value to evaluate for calculating the final result, very similar to the reduce method used when you wanted to calculate the values inside an array.

 

Finally, we can create the stream using xs.fromArray passing the initial array, and this will produce an initial stream that will emit the array values. After that we apply all the different transformations via XStream operators like filter, compose, and fold. Bear in mind that the second parameter of the fold operator is the accumulator initial value, and in our case we want to start from the value zero and sum all the others.

 

The last step is to listen to the stream creating a listener. As we said at the beginning of this section, a listener is just an object with a specific signature (next, error, complete), and in our case we output to the console the transformation made inside our stream.

 

Opening the dev tools of your favorite browser.

As we can see, this example made with XStream resembles the one we did previously with Rx.JS. The key takeaway here is the fact that understanding how the observables work are helping us to switch from a reactive library to another one without investing time on learning new concepts but just applying a few basic concepts with different syntax and operators.

 

Our second example is based on retrieving some data from a REST endpoint every few seconds. In this case we can leverage the power of the producer objects available in XStream for fetching the remote data and emitting the result to a stream. Considering we want to retrieve the data every few seconds for a certain amount of times, we can use an interval stream instead of a set interval like we would do in plain JavaScript.

This is our code example

import xs from "xstream"; import fetch from "fetch";
const URL = "https://jsonplaceholder.typicode.com/users";
const producer = { emitter(){
const that = this; return {
next(){
emitUserData(listener);
},
complete(){
that.stop()
}
}
},
start(listener){
xs.periodic(5000).take(2).addListener(this.emitter())
},
stop(){
listener.complete();
}
}
const emitUserData = (listener) => { fetch.fetchUrl(URL, (error, meta, body) => {
if(error) return;
const data = JSON.parse(body.toString()); data.forEach(user => {
listener.next(user)
}, this);
})
}
const simplifyUserData = (user) => { return {
name: user.name, email: user.email, website: user.website
}
}
const listener = { next(user){
console.log(`user name is ${user.name}`); console.log(`user email is ${user.email}`); console.log(`user website is ${user.website}`); console.log('------------------------------');
},
error(){
console.error("stream error");
},
complete(){
console.log("stream completed");
}
}

const userStream = xs.create(producer).map(simplifyUserData); userStream.addListener(listener);

As you can see in the example above, we start defining our producer object; remember that a producer is just an object that requires two methods: start and stop. The start method will be called once the first consumer subscribes to the stream.

 

Keep in mind that a producer can have just one listener per time; therefore, in order to broadcast the results to multiple listeners, we have to create a stream that uses the producer to emit the values.

 

Our producer contains also another method called emitter that returns a listenerobject that we are going to use inside the interval stream created in the start method. The start method uses the xs.periodic operator that accepts as an argument an interval of time when an event is emitted by the stream; so in our case, every 5 seconds a new event will be emitted.

 

We also used the operator take that is used for retrieving a certain amount of values from that stream, ignoring all the others emitted.

  • The last thing to do is to subscribe to that stream with a listener and every tick (next method) fetches the data from the endpoint.
  • The endpoint is the same one as the previous example in Rx.JS, so we need to expect the same JSON object fetched from the endpoint.
  • The main goal of this example is outputting a simplified version of this data that could be used in a hypothetical view of our application.

 

When we create simplifyUserData method for extracting only the information we need from the value emitted in the stream; this function is returning a filtered object containing only a few fields instead of the entire collection.

 

After that, we create our listener object with the typical signature next, error, and complete methods where we are handling the values emitted by the stream. Finally, we create the glue between the stream and the listener object by creating a stream with xs.create passing as the argument our producer.

 

Then we iterate through all the values emitted, filtering the user data and in the last line of our script we associate the listener to the stream that will trigger the producer to start emitting the values. In this case there are some differences compared to Rx.JS example but again the key concepts are still there.

The last example for the XStream library is focused on how we broadcast values to multiple listener objects; in this case XStream is helping us because all the streams are hot, therefore multicast by nature.

 

We don’t need to perform any action or understand what kind of stream we are dealing with. That’s also why I recommend always starting with a simple library like XStream that contains everything we need for getting our hands dirty with streams and then moving to a more complete toolbox library like

Rx.JS or Most.js.
import xs from "xstream";
const periodicStream = xs.periodic(1000)
.map(_ => Math.floor(Math.random()*1000) + 100); periodicStream.addListener({
next(value){
console.log("first listener", value);
}
})
setTimeout(_ =>{ periodicStream.addListener({
next(value){
console.log("second listener", value);
}
})
}, 3000);
setTimeout(_ =>{ periodicStream.addListener({
next(value){
console.log("third listener", value);
}
})
}, 6000);

Reactive Programming concept

Another important Reactive Programming concept is backpressure and how we can use it for improving our reactive applications.

When we work with multiple streams, they could emit a large amount of events in a short period of time. Therefore we need a way for alleviating the amount of data consumed by the observers if we don’t really need all of them or if the process to elaborate them is too computationally intense, and the consumer is not able to keep up.

 

Usually we have two possibilities to handle back pressure in our application: first, we can queue the value, creating a buffer and elaborate all the values received, so in this case we don’t miss the values emitted. This strategy is called loss-less strategy.

 

Another strategy could be skipping some events and reacting only after a certain amount of time, filtering what we receive because maybe this information is not critical for what the consumer needs to do; in this case we call this strategy lossy strategy. Imagine, for example, that we are merging two observables with a zip operator. The first observable is providing some capital case letters every second, and the second observable is emitting lowercase letters every 200 milliseconds.

 

The zip operator in this case will create a new observable with the values of the two streams coupled together, but because it needs to couple the letters from different streams that are emitting values with different speed, inside the zip operator we have a buffer for storing the values of the second observable until the first one is going to emit the following value.

 

As you can see from the marble diagram above, the second stream is producing 5 times more values than the first one in the same amount of time, so the new observable will need to maintain the data in a buffer to match the values before emitting them to the consumer.

 

Unfortunately, these kinds of scenarios in Reactive Programming are not rare and in this occasion, back pressure operators come to the rescue. These operators allow us to alleviate the pressure from the observer by simply stopping a reaction from values emitted by an observable, pausing the reception of values until we define when to resume receiving the values.

 

Let’s write a concrete example with Rx.JS for understanding better the concept described. What we are going to create with Rx.JS and React is a simple box with a simulation of a stock that receives data in real time and it needs to display the stock value inside the component.

 

This small example is composed by two files: the main application and the React component. The main application will generate the observable that will produce random values in a specific range ready to be displayed inside the component.

import React from "react";
import ReactDOM from "react-dom"; import Rx from "rxjs";
import Stock from "./Stock.jsx";
export default class App{ constructor(){
const cont = document.getElementById("app"); const observable = this.generateProducer(); const AAPL = "AAPL - Apple Inc.";
ReactDOM.render(<Stock producer$={observable} title={AAPL} />, cont);
}


generateProducer(){
const stockValuesProducer = Rx.Observable.interval(50)
.map(value => {
return (Math.random() * 50 + 100).toFixed(2);
})
return stockValuesProducer;
}
}

let app = new App();

What is happening in the main application file is that we are generating a producer (generateProducer method) that should simulate a constant interaction with a source of data, and every 50 milliseconds it is emitting a value between 100 and 150 with 2 decimals.

 

This is a typical example where the back pressure operators could help out; we really don’t need to update the UI every 50 milliseconds because more than a useful experience, we are going to create a constant refresh that won’t be well received by our users, and it will be very intensive, in particular, on low-end machines. So what can we do to alleviate the pressure on the observer that is going to receive these values?

 

If in generateProducer method, instead of returning the observable as it is, we could add a back pressure operator like this one:

generateProducer(){
const stockValuesProducer = Rx.Observable.interval(50)
.map(value => {
return (Math.random() * 50 + 100).toFixed(2);
})
return stockValuesProducer.sampleTime(500);
}
In this case, the sampleTime operator will emit a value only every 500 milliseconds,
ignoring the other values received in the meantime.
Just to fix this concept even better, Figure 3-14 shows how this works inside a marble diagram.
In the component code, we are going to subscribe to the observable received from the producer property and display the value in our UI:
import React from "react";
export default class Stock extends React.Component{ constructor(){
super();
this.state = {stockValue: 0};
}
componentDidMount(){
this.props.producer$.subscribe(this.setStockValue.bind(this));
}
setStockValue(value){ this.setState({stockValue: value});
}
render(){
return (
<div className="stock">
<h2>{this.props.title}</h2>
<p>${this.state.stockValue}</p>
</div>
)
}
}

As we can notice inside the componentDidMount method, provided by the React component life cycle, we are subscribing to the producer (the observable created before in the main application) and then we set the stock value in the React state object for displaying it in our paragraph element.

 

In Rx.JS we can use multiple back pressure operators like debounce or throttle, but there are many others for handling the back pressure properly. It’s important to remember to not create a huge buffer of data when the producer is emitting a large amount of data. So as a rule of thumb, remember that when we don’t need all the data emitted by an observable, we should really filter them for providing a better user experience to our users and improve the performance of our Reactive applications.

 

Cycle.js and MVI

The Cycle.js focus is on the interaction between computer and user, taking into consideration research on human-computer interaction and studies focused on the interfaces between human beings and computers.

 human-computer interaction

As we can see, a person can interact with a computer via its input methods (mouse, keyboard, microphone, touchpad…); for doing this, the person will use a hand or the voice as output. When the computer receives the input from the user, it will elaborate a new output related to the input received, providing it on the screen.

 

The user then will be able to recognize and elaborate on the computer’s output via the user’s input sense (eyes or ears, for instance), understanding how his interactions affect the computer’s output, creating de facto, a circular interaction between computer and user: from here, the name of Cycle.js.

 

We could summarize this interaction saying that between the human and the computer there is a dialogue where the two main actors are interacting with each other, reacting to the inputs and providing a new output.

 

As we understand, this framework uses a different approach from the others we used to work with where the GUI is at the center; instead Cycle.js privileges more the interactions aspect over the graphical representation. In order to do that, Cycle.js introduces a message passing architecture where we send a message to an object and this one knows which part of the code to run based on the input received.

 

This highly decoupled architecture stands on the opposite edge of the spectrum compared to more traditional architectures where dependency injection is heavily used and we favor objects interactions over reactivity. Cycle is not the only framework leveraging a message passing architecture; other examples could be retrieved in Akka with the actor-model architecture or in CSP (Communicating Sequential Processes) with channels.

 

Structuring a Simple Cycle.js Application

Simple Cycle.js Application

 

The first key concept we need to stick in our mind when we work with Cycle.js is the fact that this framework is clearly separating the application logic from the side effects. For example, let’s assume we want to load some data from an endpoint after the user clicks on a button in our interface.

 

Cycle.js separates completely the DOM rendering and the remote data fetching from the logic of preparing the HTTP request and the data manipulation to be presented in the DOM; in this way we can focus on what really matters inside our application delegating de facto the real action of manipulating the DOM or fetching remote data with a HTTPRequest object.

 

Before examining a simple Cycle application, we need to explore how this framework is composed. In Cycle.js there are three key concepts to remember:

  • Pure functions
  • Streams
  • Drivers

 

A pure function is a functional concept where a function can be defined as pure if it doesn’t create side effects inside the program, so when we pass a specific parameter it always returns the same output. We could summarize the pure function concept as a function where its output is determined only by the input values, and an example could be:

function add(a, b){

return a + b;

}

 

In this case when I call the function add I receive a result based on the function’s arguments (a and b), and there is no possibility that external states or variables could affect the final result considering that our pure function is totally independent from the application where it is running.

 

Another key concept is the stream, but considering we extensively talked about them in the previous blogs we can move to the next concept: the drivers. For better understanding the drivers, we need to analyze how Cycle.js is composed; otherwise we will struggle to catch why the drivers are used in this framework.

 

Cycle.js is a modular architecture composed of multiple libraries. The core library is really small and it exposes just one method used for creating the glue between the application logic and the side effects like a DOM state rendering or remote data connection.

 

By default, Cycle.js uses XStream as a main stream library but it allows us to use other libraries like Rx.JS, Most.js, or even a custom one created by us. We have already discussed how Cycle.js separates the application logic from the side effects: this is the key part for understanding the drivers.

 

All the side effects in a Cycle application are handled by drivers, for instance, one of the most used ones is the DOM driver that performs the DOM manipulation received by the application logic that instead prepares a virtual DOM representation instead of interacting directly with the DOM. The communication between the drivers and the application is always made via observables; a driver can be a read and write or a read-only driver.

 

The rule of thumb here is that a driver has always as input an observable but may or may not return an output. If we want to draw the anatomy of a Cycle application we could use this example as a skeleton:

import xs from 'xstream'; import {run} from '@cycle/run';
import {makeDOMDriver, p} from '@cycle/dom'
const main = sources => { const sinks = {
DOM: xs.periodic(1000).map(v => p(`seconds: ${v}`))
}
return sinks;
}
const drivers = {
DOM: makeDOMDriver('#app')
};
run(main, drivers);

 

Let’s analyze what we have in this basic example.

JavaScript library

 

After importing XStream, Cycle run function, and the DOMDriver, all the Cycle.js applications have a run function; a pure function, composed by a sources object; and a sink object as output that contains the logic for the side effects to be applied after our application finishes the elaboration.

 

For instance, in this example we have a stream that every second is incrementing a variable and returning a virtual DOM, in this case a virtual paragraph object. Taking a look at the DOM driver, we can see that as a parameter of the DOMDriver method we need to pass the HTML element to use for appending our future DOM elements created dynamically by our Cycle.js application.

 

The last, but essential, thing to do is calling the method run provided by the framework for creating the glue between the main function and the drivers. What the run method is doing is simple to explain; this method is creating a circular dependency between the main function and the drivers retrieving the output of each function and returning as a source of the other as we explained at the beginning of this blog.

 

Obviously the input and output are always object-containing observables and it’s where Cycle is really shining with its architecture. An important piece of information that we didn’t mention before is related to the virtual DOM library used in Cycle.js. Cycle uses Snabbdom out of the box, a JavaScript library leveraging similar concepts expressed in React.js like the Virtual DOM and good diffing algorithm, on top Snabbdom offers a modular and minimal implementation compared to React (only 200 lines of code).

 

Now let’s try to see a more complete example where we are going to create a simple weather forecast application that allows the user to search for a specific city and retrieve the weather forecasts for the next five days.

 

This example will follow us for the entire blog and we will refine it with two different approaches in order to explore properly the different possibilities offered by Cycle.js. Let’s start to list what we need to do in order to create the weather application with Cycle.js:

  • We need to use two drivers: one for the DOM manipulation and another one for fetching the data from a remote REST service.
  • We need to create an input field with a button that will allow the user to search for a specific city.
  • We need to request the data to render to a weather forecast service (in this case a third-party service).
  • We need to create our UI with a title on the top, the current day forecast highlighted, and a list of the following days.

 

The first thing to create is the typical Cycle skeleton application with a run method, the drivers. and the main function:

 const main = sources => {
// here there will be our application logic
}
const drivers = {
DOM: makeDOMDriver('#app'), HTTP: makeHTTPDriver()
};
run(main, drivers);

 

As planned, we have a DOM driver (makeDOMDriver) that will manipulate the DOM inside the div with id app and the HTTP driver that instead will perform the request to the weather forecast’s endpoint.

 

That means in our main function we are going to return an object with two observables: one for the endpoint request providing which city the user is interested on, and one with the virtual DOM of our page. Then the drivers will take care to perform the actions for us.

 

Let’s go ahead creating our application view, for instance, if we want to create the input field with the button shown in the application picture presented before, we need to create a function called getForm that will return to us the virtual DOM version of our elements:

const getForm = () => div(".form", [ input("#location-input"), button("#location-btn", "get forecasts")

])

work with reactive programming


Now we can observe for changes happening in both interactive elements in order to capture the text inserted by the user in the input field and when the user clicks the button for retrieving the forecast. In order to do that we are going to add these few lines in our main function:

const input$ = sources.DOM.select("#location-input").events("focusout")

.map(evt => evt.target.value); const btn$ = sources.DOM.select("#location-btn").events("mousedown");

 

Remember that everything can be a stream when we work with reactive programming; therefore once the driver will render our interactive elements in the real DOM, it will provide us access to the real DOM available in the DOM object and we are able to observe the user interactions, thanks to the APIs provided by the DOM driver.

 

Every time the user will click the button we will need to retrieve what he typed and prepare the request URL and the query string for allowing the HTTP driver to perform the real request.

 

Because we need to react when the user clicks the button but also to understand what the user wrote in the input field, we are going to combine the two streams in a unique one, and we prepare the URL with the new parameters any time the producer is producing new values, so in the main function we will add:

const merged$ = xs.combine(input$, btn$);
const request$ = merged$.map(([city, mouseEvt]) => getRequest(city))
.startWith(getRequest(INIT_CITY))
And we then create getRequest function that returns the composed URL:
const getRequest = city => { return {
url: `http://api.apixu.com/v1/forecast.json?key=04ca1fa2705645e4830 214415172307&q=${city}&days=7`,
category: CATEGORY
}
}

 

The request$ stream will be the one that we are going to pass to the HTTP driver, and this one will perform the real HTTP request for us, as you can see Cycle is separating the application logic from the side effect, defining what the application should do from how to perform the real effect.

 

In the combined stream, we can spot that there is a startWith method that returns a default city, in our case London, just for providing some information to the user the first time that accesses our weather application and he didn’t interact with our input field yet.

 

It’s time to handle the response once the HTTP driver receives it; inside our main function again we are going to retrieve the HTTP object exposed by the driver, and we are going to prepare the data for a set of functions for generating the virtual DOM based on the data retrieved by the HTTP driver.

const response$ = sources.HTTP.select(CATEGORY)
.flatten()
const vdom$ = response$.map(parseResponse)
.map(simplifyData)
.map(generateVDOM)
.startWith(h1("Loading..."))
And outside our main function we then create the functions needed for generating the UI:
const parseResponse = response => JSON.parse(response.text); const simplifyData = data => {
return {
city: data.location.name, current: data.current,
forecast: data.forecast.forecastday
}
}
const generateVDOM = data => div(".main-container", [ h1(`Your forecasts for ${data.city}`), getForm(), generateCurrentForecast(data.current), generateNext5Days(data.forecast)
])

 

As you can see in the main function, once we receive the response we need to select which one we handle, in this case the CATEGORY one described at the beginning of our application. Then we need to flatten the result because the HTTP driver returns always a stream of streams, so if we want to manipulate the data in this complex structure we need to create a flat stream (flatten method).

 

For creating the virtual DOM that will be passed to the DOM driver, we need now to do the following:

  • 1. Parse the response and return a JSON object (parseResponse method).
  • 2. Extract only the data our UI needs in order to render the final result (simplifyData method).
  • 3. Generate the virtual DOM passing the elaborated data (generateVDOM method).

 

These three operations are generating a final stream with the virtual DOM that will be rendered via the DOM driver. The last bit of our main function is what it returns, so a sink object containing a stream for the DOM driver and one for the HTTP driver that represent the output of our Cycle application.

 

This is the final implementation of our first Cycle example:

import xs from 'xstream'; import {run} from '@cycle/run';
import {makeDOMDriver, div, h1, h2, h3, img, p, input, button} from '@cycle/dom';
import {makeHTTPDriver} from '@cycle/http';
import debounce from 'xstream/extra/debounce' import moment from 'moment';
const CATEGORY = "forecast"; const INIT_CITY = "London";
const getForm = () => div(".form", [ input("#location-input"), button("#location-btn", "get forecasts")
])
const generateNext5Days = forecasts => { const list = forecasts.map(forecast => {
return div(".forecast-box", [ h3(moment(forecast.date).format("dddd Do MMM")),
p(`min ${forecast.day.mintemp_c}°C - max ${forecast.day. maxtemp_c}°C`),
img(".forecast-img", { props: {
class='lazy' data-src: `http:${forecast.day.condition.icon}`
}
}),
p(".status", forecast.day.condition.text)
])
});
return div(".forecasts-container", list)
}
const generateCurrentForecast = forecast => div(".current-forecast- container", [
div(".today-forecast", [
img(".forecast-img", { props: {
class='lazy' data-src: `http:${forecast.condition.icon}`
}
}),
p(".status", forecast.condition.text)
]),
h3(moment(forecast.last_updated).format("dddd Do MMMM YYYY")), h2(`${forecast.temp_c}°C`),
p(`humidity: ${forecast.humidity}%`)
])
const parseResponse = response => JSON.parse(response.text); const simplifyData = data => {
return {
city: data.location.name, current: data.current,
forecast: data.forecast.forecastday
}
}
const generateVDOM = data => div(".main-container", [ h1(`Your forecasts for ${data.city}`), getForm(), generateCurrentForecast(data.current), generateNext5Days(data.forecast)
])
const getRequest = city => { return {
url: `http://api.apixu.com/v1/forecast.json?key=04ca1fa2705645e4830 214415172307&q=${city}&days=7`,
category: CATEGORY
}
}
const main = sources => {
const input$ = sources.DOM.select("#location-input").events("focusout")
.map(evt => evt.target.value);
const btn$ = sources.DOM.select("#location-btn").events("mousedown"); const merged$ = xs.combine(input$, btn$);
const request$ = merged$.map(([city, mouseEvt]) => getRequest(city))
.startWith(getRequest(INIT_CITY))
const response$ = sources.HTTP.select(CATEGORY)
.flatten()
const vdom$ = response$.map(parseResponse)
.map(simplifyData)
.map(generateVDOM)
.startWith(h1("Loading..."))
return {
DOM: vdom$, HTTP: request$
}
}
const drivers = {
DOM: makeDOMDriver('#app'), HTTP: makeHTTPDriver()
};
run(main, drivers);

 

It’s important to highlight a couple of things in this example. First of all, in our main function we are handling the input and the output of our application; we are not operating any real side effects that, instead, are delegated to the drivers.

 

The drivers and the main applications are communicating via streams; remember that Cycle.js is a message passing architecture, and this approach facilitates the data flow of our applications maintaining a high separation between application logic and side effects and a strong encapsulation.

 

For the first time in this blog, we are looking to a reactive implementation where the communication between different parts of our architecture are made by streams; interestingly there isn’t any knowledge in our application on how a driver is going to handle the side effects and we are not calling any specific method exposed by a driver. There is just a circular dependency between our main function and the driver that communicates only via streams.

 

It’s important to iterate again these concepts because they will become very useful from now on considering we are going to discover MVI (model view intent), a reactive architecture heavily based on them.

 

Model View Intent Architecture

Model View Intent Architecture

If you are familiar with ELM language and its architecture, MVI won’t surprise you at all, but we need to admit that this is definitely a great improvement from the architecture we used in the past and in other famous frameworks like Redux or Angular. But first, let’s see what the Model View Intent is and how it differs from the other frameworks.

 

The first characteristic of this architecture is that it follows the unidirectional flow like the Flux pattern introduced by Faceblog right after React.js, unidirectional flow is becoming a constant in many front-end reactive architectures.

 

What it means is that the data flow is always going in a unique direction and it never changes; this helps the debugging of your application and the possibility of adding new team members without a long induction period for explaining how the architecture of your applications work or how the system works.

 

As you can see from the schema above, every time the user interacts with an element in the view, this one dispatches an action that is caught by a global dispatcher. the main aim of the dispatcher is triggering the callbacks the stores have registered in order to listen for the actions they are interested in.

 

Once the store receives the data from the action performed, the changes needed for the view emit a change event to the view that will retrieve the data from the store and then they will render the changes updating the components’ stated.

 

Another characteristic we mentioned previously is the fact that the communication between Models Views and Intents happens via streams only; therefore there isn’t any direct control between different modules but just a stream as input and one as output, like we have seen in the communication between Cycle.js application logic and drivers.

 

MVI is composed of three main modules:

  • The model where we elaborate the user interactions and we keep the application state.
  • The view where we wire our UI with the state provided by the model.
  • The intent where we subscribe to user interactions or inputs and we provide them to the model for changing the state to a new one.

 

The renderer part represents the DOM driver in this case.

Let’s see now how we can change our simple Cycle.js example using model view intent architecture. The code we are going to explore is very similar to the previous example, so we will highlight only the key parts without investing too much time on how we have parsed the data retrieved from the HTTP driver or how we compose the virtual DOM elements rendered later on by the DOM driver.

 

The first thing to do is to identify in our previous example the part in our main function that should be allocated to different parts of an MVI architecture. Originally our main function was implemented in the following way:

const main = sources => {
const input$ = sources.DOM.select("#location-input").events("focusout")
.map(evt => evt.target.value);
const btn$ = sources.DOM.select("#location-btn").events("mousedown"); const merged$ = xs.combine(input$, btn$);
const request$ = merged$.map(([city, mouseEvt]) => getRequest(city))
.startWith(getRequest(INIT_CITY))
const response$ = sources.HTTP.select(CATEGORY)
.flatten()
const vdom$ = response$.map(parseResponse)
.map(simplifyData)
.map(generateVDOM)
.startWith(h1("Loading..."))
return {
DOM: vdom$, HTTP: request$
}
}

We can immediately identify the intent part in the first few lines of our implementation. As we said with the intent we are capturing the user intentions, therefore all the DOM interactions. In fact, the intent receives as input the real DOM after being rendered and as output the user intentions as streams:

const intent = DOM => {
const input$ = DOM.select("#location-input").events("focusout")
.map(evt => evt.target.value); const btn$ = DOM.select("#location-btn").events("mousedown");
return xs.combine(input$, btn$)
.map(([city, mouseEvt])=> getRequest(city))
.startWith(getRequest(INIT_CITY))
}

 

The stream with the request will be passed to the model and to the HTTP driver for executing the request to the remote endpoint. Then we need to handle the response received from the HTTP driver, in this case the model will take care of it by preparing the data for the view:

const model = (actions$, HTTP) => { return HTTP.select(CATEGORY)
.flatten()
.map(parseResponse)
.map(simplifyData)
}

As we can see the model receives the actions stream and the HTTP object, and in this case we don’t need to perform anything with the data inserted by the user because the response from the endpoint is providing all the data we need but potentially we could combine the data received and the user actions in order to prepare a new state for the view.

 

The last part is merging the data prepared from the model with the view and generating the virtual DOM elements that will be passed to the DOM driver:

const view = state$ => {
return state$.map(generateVDOM)
.startWith(h1("Loading..."))
}
So our main function now will look like:
const main = sources => {
const actions$ = intent(sources.DOM);
const state$ = model(actions$, sources.HTTP) const vdom$ = view(state$);
return {
DOM: vdom$, HTTP: actions$
}
}

 

Then we can take a look at the full example with MVI applied:

import xs from 'xstream'; import {run} from '@cycle/run';
import {makeDOMDriver, div, h1, h2, h3, img, p, input, button} from '@cycle/dom';
import {makeHTTPDriver} from '@cycle/http'; import moment from 'moment';
const CATEGORY = "forecast"; const INIT_CITY = "London";
const getForm = () => div(".form", [ input("#location-input"), button("#location-btn", "get forecasts")
])
const generateNext5Days = forecasts => { const list = forecasts.map(forecast => {
return div(".forecast-box", [ h3(moment(forecast.date).format("dddd Do MMM")),
p(`min ${forecast.day.mintemp_c}°C - max ${forecast.day. maxtemp_c}°C`),
img(".forecast-img", { props: {
class='lazy' data-src: `http:${forecast.day.condition.icon}`
}
}),
p(".status", forecast.day.condition.text)
])
});
return div(".forecasts-container", list)
}
const generateCurrentForecast = forecast => div(".current-forecast- container", [
div(".today-forecast", [
img(".forecast-img", {
props: {
class='lazy' data-src: `http:${forecast.condition.icon}`
}
}),
p(".status", forecast.condition.text)
]),
h3(moment(forecast.last_updated).format("dddd Do MMMM YYYY")), h2(`${forecast.temp_c}°C`),
p(`humidity: ${forecast.humidity}%`)
])
const generateVDOM = data => div(".main-container", [ h1(`Your forecasts in ${data.city}`), getForm(), generateCurrentForecast(data.current), generateNext5Days(data.forecast)
])
const parseResponse = response => JSON.parse(response.text); const simplifyData = data => {
return {
city: data.location.name, current: data.current,
forecast: data.forecast.forecastday
}
}
const getRequest = city => { return {
url: `http://api.apixu.com/v1/forecast.json?key=04ca1fa2705645e4830 214415172307&q=${city}&days=7`,
category: CATEGORY
}
}
const model = (actions$, HTTP) => { return HTTP.select(CATEGORY)
.flatten()
.map(parseResponse)
.map(simplifyData)
}
const intent = DOM => {
const input$ = DOM.select("#location-input").events("focusout")
.map(evt => evt.target.value); const btn$ = DOM.select("#location-btn").events("mousedown"); return xs.combine(input$, btn$)
.map(([city, mouseEvt])=> getRequest(city))
.startWith(getRequest(INIT_CITY))
}
const view = state$ => {
return state$.map(generateVDOM)
.startWith(h1("Loading..."))
}
const main = sources => {
const actions$ = intent(sources.DOM);
const state$ = model(actions$, sources.HTTP) const vdom$ = view(state$);
return {
DOM: vdom$, HTTP: actions$
}
}
const drivers = {
DOM: makeDOMDriver('#app'), HTTP: makeHTTPDriver()
};
run(main, drivers);

 

MVI is not as complicated as it looks like; we just need to get used to it. I’d like to highlight a few key concepts that we need to bear in mind when we integrate this architecture in a Cycle.js application:

  • First of all, we transform our Cycle.js project to a structured project where each part can be reused and tested in isolation.

 

  • This architecture allows us to even go further applying the MVI architecture to each single component: a MVI architecture applied to the form, one to the current day, and one for the list of the weekdays due to its nature.
  • Communicating with streams allow the entire architecture to be more flexible and enhance the separation of concerns.

 

Before concluding the blog with an overview of what Cycle.js brings to the reactive programming world, we need to enhance this example once again, introducing the official Cycle.js state management called Onionify.

 

Cycle.js and State Management

Cycle.js and State Management

After seeing MVI in action, our journey continues with Onionify, a library created for managing the application state in Cycle.js.

As we know, handling the state is the key part of any web application. Cycle.js provides a unique approach to that, slightly different from what we are used to seeing with Angular or Redux. Onionify is a tiny library (2kb only) with only one purpose: managing the application state in Cycle applications.

 

This library doesn’t provide a driver, as we have seen in other occasions, but instead Onionify is wrapping the entire Cycle.js application with a unique state that is injected across multiple components. The application state is a stream managed internally by Onionify, and each component can manipulate its own state and the parent components one via reducers.

 

The components need to be “isolated” with the homonymous library called isolate. Isolate is a utility provided by Cycle.js that allows us to literally isolate a component, sharing only the sources provided by the main application; or a parent component, and returning a sink object that could be shared with the main application and/or other components.

 

Let’s stop here for a moment and try to gather what we have learned until now about Cycle.js:

  • We know that we can create an application with an MVI architecture.
  • MVI could be applied not only to an entire Cycle application but could be applied to components too.
  • This leads to the same architecture applied at all the levels of our architecture where sinks and sources are the only way to communicate between objects.

 

Considering all these facts, we can say that with Cycle.js, applying MVI, we can create a Fractal architecture that will allow us to use always the same “piece” (model view intent) for generating a bigger composition made by identical pieces applied several times.

 

Fractal architecture Fractal architecture is not a new concept in the software development. this architecture with identical subsystems structures allows a high separation of concerns in order to shape a large project where modifying, deleting, or creating new parts won’t affect the entire application considering the isolation in which the subsystems live.

 

Onionify applied in conjunction with MVI architecture helps by creating a solid and reusable architecture with strong separation of concerns and good encapsulation. Therefore we should be able to reuse part of our Cycle.js application in others just respecting the contract our components need, so using the correct sources (drivers and streams) and interacting with the sinks returned by them.

 

In order to see Onionify in action, we are going to modify our weather application once again, splitting our MVI application in multiple components and using Onionify for changing the state.

 

Let’s analyze what we have here compared to the previous application:

  • We have three components: CityForm, TodayForecast, and FutureForecast.
  • We have a state that is passed by our main application to the Onionify wrapper composed by the state observable.
  • We still have the communication with drivers in the same way we worked before; therefore we still have the HTTP and DOM drivers.

 

CityForm is the component with the main logic, and it is responsible for retrieving what the user is typing inside the input field and also to prepare the request that will perform then by the HTTP driver.

import onionify from 'cycle-onionify'; import xs from 'xstream';
import {div, input, button, h1} from '@cycle/dom';

const INIT_CITY = "London";
const CITY_SEARCH = "citySearchAction"; const CATEGORY = "forecast";
const getRequest = city => ({ type: CITY_SEARCH,
city: city,
url: `http://api.apixu.com/v1/forecast.json?key=04ca1fa2705645e4830 214415172307&q=${city}&days=7`,
category: CATEGORY
})

const getForm = location => div(".form", [ h1(`Your forecasts in ${location.city}`),
input("#location-input", {props: {value: `${location.city}`}}), button("#location-btn", "get forecasts")
])

const parseResponse = response => JSON.parse(response.text); const simplifyData = data => function changeState(prevState) {
return {
city: data.location.name, current: data.current,
forecasts: data.forecast.forecastday
}
}

const model = (actions$, HTTP) => {
const reducer$ = HTTP.select(CATEGORY)
.flatten()
.map(parseResponse)
.map(simplifyData)

return reducer$
}

const intent = DOM => {
const input$ = DOM.select("#location-input").events("focusout")
.map(evt => evt.target.value);
const btn$ = DOM.select("#location-btn").events("mousedown");

return xs.combine(input$, btn$)
.map(([city, mouseEvt]) => getRequest(city))
.startWith(getRequest(INIT_CITY))

}
const view = state$ => state$.map(state => getForm(state)) export const CityForm = sources => {
const state$ = sources.onion.state$; const actions$ = intent(sources.DOM);
const reducer$ = model(actions$, sources.HTTP); const vdom$ = view(state$);
return {
DOM: vdom$, onion: reducer$, HTTP: actions$
}
}

Onionify wrapper

 

 

As we can immediately recognize, we have a new parameter from the sources that is provided by Onionify wrapper; this library provides an onion object that contains the state stream, based on that we can interact with the parent state stream, reacting to that, or manipulating the internal component state as well.

 

Overall the component is very similar to what we had in the previous application, and the only change is related to the application state that represents the response coming from the weather API, so the CityForm is using the state stream just for retrieving the location chosen by the user.

 

The last thing to mention is to understand what this component is returning as sink and how we can immediately spot the onion property containing the HTTP response as the state of the application.

 

As we can recognize, this component is self-contained, so if we would like to reuse it in another application we would be able to do it without the need for changing anything: that’s the power of working with a fractal architecture where the reusability, separation of concerns, and encapsulation are first citizens in the architecture.

 

Before investigating how we need to modify App.js for handling Onionify library, let’s do a quick tour of the other two passive components: TodayForecast and FutureForecast. These two components are passive because they just need to render some content provided by the state; they don’t have user interactions and they are not going to manipulate any parent state or perform new HTTP requests.

 

This is the TodayForecast component:

import {div, h2, h3, img, p} from '@cycle/dom'; import moment from 'moment';
const generateCurrentForecast = forecast => div(".current-forecast- container", [
div(".today-forecast", [
img(".forecast-img", { props: {
class='lazy' data-src: `http:${forecast.condition.icon}`
}
}),
p(".status", forecast.condition.text)
]),
h3(moment(forecast.last_updated).format("dddd Do MMMM YYYY")), h2(`${forecast.temp_c}°C`),
p(`humidity: ${forecast.humidity}%`)
])
const view = state$ => state$.map(state => generateCurrentForecast(state. current))
export const TodayForecast = sources => { const state$ = sources.onion.state$; const vdom$ = view(state$)
return {
DOM: vdom$
}
}

In this chunk of code we can spot in the TodayForecast function the state stream for rendering the view that corresponds to the weather data representation for the specific moment when the user is requesting the forecasts. Considering this is a passive component, its only duty is providing the virtual dom to the DOM driver for rendering the view.

 

Obviously, in case of any user interaction that could change the application state, this will be reflected in the sink and it would have been able to share the new state via the onionify property of our sink – the same in case it would need to trigger a new HTTP request to the weather forecast endpoint.

 

Let’s take a look to the FutureForecast component then:

import {div, h3, img, p} from '@cycle/dom'; import moment from 'moment';
const generateNext5Days = forecasts => {
const list = forecasts.map(forecast => div(".forecast-box", [ h3(moment(forecast.date).format("dddd Do MMM")),
p(`min ${forecast.day.mintemp_c}°C - max ${forecast.day. maxtemp_c}°C`),
img(".forecast-img", { props: {
class='lazy' data-src: `http:${forecast.day.condition.icon}`
}
}),
p(".status", forecast.day.condition.text)
])
);
return div(".forecasts-container", list)
}
const view = state$ => state$.map(state => generateNext5Days(state. forecasts))
export const FutureForecast = sources => { const state$ = sources.onion.state$; const vdom$ = view(state$)
return {
DOM: vdom$
}
}
Also, this component is very similar to the previous one, and it doesn’t need to share
any state update. It’s just consuming the state stream in order to render the new virtual dom to provide to the DOM driver.
Finally, it’s the turn of App.js where we can find the glue for our “onionified application”:
import xs from 'xstream'; import {run} from '@cycle/run';
import {makeDOMDriver, div, h1} from '@cycle/dom'; import {makeHTTPDriver} from '@cycle/http';
import isolate from '@cycle/isolate'; import onionify from 'cycle-onionify'; import {CityForm} from './CityForm';
import {TodayForecast} from './TodayForecast'; import {FutureForecast} from './FutureForecast';
const generateVDOM = ([formVNode, todayVNode, futureVNode]) => div(".main- container", [
formVNode, todayVNode, futureVNode
])
const view = (locationDOM$, todayForecastDOM$, futureForecastDOM$) => { return xs.combine(locationDOM$, todayForecastDOM$, futureForecastDOM$)
.map(combinedStreams => generateVDOM(combinedStreams))
.startWith(h1("Loading..."));
}
const main = sources => { const cityLens = {
get: state => state,
set: (state, childState) => childState
}
const locationSink = isolate(CityForm, {onion: cityLens})(sources); const todayForecastSink = isolate(TodayForecast, {onion: cityLens}) (sources);
const futureForecastSink = isolate(FutureForecast, {onion: cityLens}) (sources);
const locationReducer$ = locationSink.onion; const httpRequest$ = locationSink.HTTP;
const vdom$ = view(locationSink.DOM,
todayForecastSink.DOM, futureForecastSink.DOM);
return {
DOM: vdom$,
HTTP: httpRequest$, onion: locationReducer$
}
}
const drivers = {
DOM: makeDOMDriver('#app'), HTTP: makeHTTPDriver()
};
const mainOnionified = onionify(main);
run(mainOnionified, drivers);

JavaScript file

Here we can find quite a few interesting new concepts, so let’s start to describe what we are doing from the end of our JavaScript file. As we mentioned, Onionify is not a driver but a wrapper around our application; therefore we used it for wrapping our main function and we passed the decorated, or onionified, version of our main function to the run method.

 

This allows us to pass the state through different components via the onionify property in our sources and sinks. Let’s now take a look on what the main function looks like. We start with a lens called cityLens. Lenses are used when a component needs to access the same object of its parent like in our case but also when we need to manipulate the state before it lands into a specific component.

 

Technically speaking a lens is an object with a getter and a setter, nothing really complicated, but they are very useful in functional programming specifically when we use them for composing objects. Lenses are also well known in the Haskell language, in the JavaScript world they are used in conjunction with

 

Immutable.js, and definitely they are present in Ramda (lensProp method for instance), a JavaScript functional library. After the lens definition, which will allow us to share the data present in the component with the others, we have the instantiation of our three custom components with the isolate utility offered by Cycle.js.

 

We are passing the lens to each component and the sources, and this allows us to get interaction with the parent component inside each component. In this case it’s the parent component in the application itself but it’s clear by the approach, working in this way, we can wrap our component defining a standard input/output contract and reuse it easily in different parts of the same application or even better in other applications.

 

After creating the instances of our components it’s time to pass the manipulated state and the HTTP request to the associated properties in our sink object. In this case we need to highlight a couple of things. In this project the state is handled by the endpoint response, and there isn’t any manipulation from other components that are just used for the rendering phase.

 

Obviously, if we check other examples present in the Onionify repository (staltz/cycle-onionify), we can see that instead of passing just a stream as an application state like we are doing, we can combine multiple streams from different components in order to store a more complex application state.

 

The last bit is retrieving all the streams containing the virtual dom prepared by the different components and combine them all together for producing the look and feel of our Cycle application.

 

For doing that, we create a view method that collects all the streams and combines them in a unique one that will generate the final stream with the virtual dom that will be rendered by the DOM driver. I’d like to get your attention on the way we are doing that because as you can see, it’s a little bit verbose, in particular when the application grows and we need to merge multiple components.

 

In these cases we should prepare some utils methods for handling these situations (wrapping in a single or multiple functions would make the deal). Cycle.js doesn’t come with anything useful out of the box, but there are some libraries that are aiming for that. At the moment, however, there is nothing official for achieving a more readable and flexible approach.