Merging Observable: RxJava2

Gaurav Rajput
2 min readAug 20, 2020

--

Sometimes we need to merge two or more observable emitting the same type of items T. In RxJava, we have some operator to do so.

Let’s see how we can do that:

Observable.merge()

merge() operator can merge 2 to 4 observable sources of the same type T.

val observableSource1 : Observable<String> =  Observable.just("1", "2")val observableSource2  : Observable<String> =  Observable.just("3", "4")val observableSource3  : Observable<String> =  Observable.just("5", "6")val observableSource4  : Observable<String> =  Observable.just("7", "8")val observableSource5 : Observable<String> =  Observable.just("9", "10")

let’s merge the first two with merge() operator:

Observable.merge(observableSource1, observableSource2)
.subscribe {item->
Log.d(TAG, "received string : $item")
}

Output:

received string : 1

received string : 2

received string : 3

received string : 4

Note: Here emission of items seems sequential, but it’s not true if they are cold observable and on the same thread

Now if you want to make sure sequential emission of items, we should use Observable.concat()

Merge using concat() operator:

Observable.concat(observableSource1, observableSource2)
.subscribe {item->
Log.d(TAG, "received string : $item")
}

The output will be sequential.

What if the user wants to merge more then four observable of the same type?

For that, we have mergeArray(), operator.

Observable.mergeArray(observableSource1,
observableSource2,
observableSource3,
observableSource4,
observableSource5)
.subscribe {item->
Log.d(TAG, "received string : $item")
}

Cant we use merge() operator for more than 4 observable in any way. For that we can pass Iterable<Observable<T>>

Let’s see by example:

Create a list of above observables :

val sources: List<Observable<String>> = listOf(observableSource1, observableSource2, observableSource3, observableSource4, 
observableSource5)
// pass list to merge
Observable.merge(sources).subscribe {
Log.e(TAG, "received string :- $it")
}

The output is the same as above.

To summarize, Observable.merge() will combine multiple Observable<T> sources emitting the same type T and consolidate into a single Observable<T>.

--

--