Observable vs Observer: RxJava

Gaurav Rajput
4 min readAug 30, 2020

For a min, don’t think about Rxjava. If, in general, I ask you what is observable, what would you say? Lets search on google

According to google:

Observable
Observer

Observable:- able to be noticed or perceived;

Observer:- a person who watches or notices something.

Can I say here, observable is something that can be observed. Or observable is a source that we can observe for something?

For what observable can be observed, it could be any data. For example, you are watching movies on your laptop, so here your computer is observable that is emitting movies, and you are the observer who is receiving that data.

Similarly, in RxJava, Observable is something that emits some data or event, and an observer is something that receives that data or event.

Note: I will be using Kotlin code examples in this post.

Let’s create a simple observable :

val observable: Observable<T> = Observable.just(item : T)

Here T could be of any type like a string, char, int, or even a list. What observable will do here is, it will emit item T. There are other ways to emit items, we will see later in this post.

How do Observable works???.

An Observable works through its onNext(), onCompleted(), and onError() calls.

At the highest level, an Observable works by passing three types of events:

  1. onNext(T):- used to emit item(of type T) one at a time all the way down to the observer
  2. onComplete():- communicates that all data has been emitted or indicates that no item will be emitted after this call.
  3. onError():- communicates an error

Let’s see how can we emit a string

val observable: Observable<String> = Observable.just("Hello")

Now let’s see how to receive this string using subscribe (will discuss just in few moments):

observable.subscribe { s ->
//
received string s here
Log.d(TAG, "received string:- $s")
}

Output

received string:- Hello

Let’s start with creating a source Observable using Observable.create() and see how we can use onNext() to emit data(say some string):

val observable: Observable<String> = Observable.create<String> {
it
.onNext("hello")
it.onNext("Kotlin")
it.onComplete()
}

Use subscribe to receive these string :

observable.subscribe { s ->
Log.d(TAG, "received string:- $s")
}

Output

received string:- hello

received string:- Kotlin

You may be wondering we have only seen how is observable emitting data. We are still not sure where the observer is, how it works? So what is this subscription thing here?

Let’s see what is subscribe here doing:

Well, subscribe is the method to attach an observer to an observable. For that subscribe method, accept observer object as a parameter.

How do we create an observer then?

The Observer interface

public interface Observer<T> {void onSubscribe(Disposable d);void onNext(T value);void onError(Throwable e);void onComplete();}

Let’s understand each one by one:

  1. onNext(T value): Here, we will be receiving T value emitted by observable.
  2. onError(Throwable e): Used for error handling
  3. onComplete(): Called when observable is done emitting items.
  4. onSubscribe(): here we get disposable, which will be used to dispose of the stream, or we can say to unsubscribe the observable.

Now let’s get back to the previous example :

Observable:

val observable: Observable<String> = Observable.create<String> {
it
.onNext("hello")
it.onNext("Kotlin")
it.onComplete()
}

Create Observer:

val observer: Observer<String> = object : Observer<String> {
override fun onComplete() {
// onComplete called of observer
}
override fun onSubscribe(d: Disposable) {
// onSubscribe called of observer
}
override fun onNext(t: String) {
// onNext called of observer
Log.d(TAG, "received string: $t ")
}
override fun onError(e: Throwable) {
// onError called of observer
}
}

Pass this observer to subscribe :

observable.subscribe(observer)

Output

received string:- hello

received string:- Kotlin

Instead of creating observer like above, shortened the observer by lambda:

observable.subscribe(
{ // onNext called of observer
Log.d(TAG, "received string: $t ")
},
{ // onError called of observer
},
{ // onComplete called of observer
},
{ // onSubscribe called of observer
})

Here the output will be the same.

Can we reduce more? Off cause, we can. All these functions are optional, and we can only pass the lambda for receiving item like below:

observable.subscribe({ s ->
Log.d(TAG, "received string:- $s")
})

Or

observable.subscribe{ s ->
Log.d(TAG, "received string:- $s")
}

Here we are implementing only onNext .

--

--