Subjects: Rxjava

Subjects act as an observer as well as observable. It means you can call onNext(), onComplete(), and onError() on a Subject, and it will, in turn, pass those events downstream toward its Observers.

Let’s discuss what the types of subjects are and how and where to use them.

PublishSubject:

Let’s see how we can create a publish subject. Let’s do for the string.

val basicSubject: PublishSubject<String> = PublishSubject.create()

now this basicSubject is an observable, we can subscribe to it and receive the emitted strings.

val observer = basicSubject.subscribe({
Log.d(TAG, "received string:= $it")
}, {
it
.printStackTrace()
})

Now let’s call onNext on observable:

basicPublishSubject.onNext("hello")
basicPublishSubject.onNext("Kotlin")
basicPublishSubject.onComplete()

Output:

received string:= hello
received string:= Kotlin

Now let’s see what happens if we subscribe to this observable after calling onNext

//subject declaration 
val basicSubject: PublishSubject<String> = PublishSubject.create()
basicSubject.onNext("hello")//attach subscriber to it
val observer = basicPublishSubject.subscribe({
Log.d(TAG, "received string:= $it")
}, {
it
.printStackTrace()
})
basicPublishSubject.onNext("Kotlin")

So what do you think what will be the output here, as it’s hot observable, it will receive emission that are emitted after subscription.

Output:

received string:= Kotlin

Now let’s see how subjects work as an observer. We can pass the subjects to subscribe() like in the below example:

val observableSource1 : Observable<String> =  Observable.just("1", "2", "3", "4")val basicSubject: PublishSubject<String> = PublishSubject.create()basicSubject.subscribe({
Log.d(TAG, "received string:= $it")
},{
it
.printStackTrace()
})
//pass this subject as a observer to subscribe methodobservableSource1.subscribe(basicSubject)

Output:

received string:= 1

received string:= 2

received string:= 3

received string:= 4

BehaviorSubject:

//behaviour Subject
val behaviorSubject: BehaviorSubject<String> = BehaviorSubject.create()
behaviorSubject.subscribe{ Log.d(TAG, "observer 1 received string:= $it") }behaviorSubject.onNext("hello")
behaviorSubject.onNext("there")
behaviorSubject.onNext("Kotlin")
behaviorSubject.subscribe { Log.d(TAG, "observer 2 received string:= $it") }behaviorSubject.subscribe { Log.d(TAG, "observer 3 received string:= $it") }

Output:

observer 1 received string:= hello
observer 1 received string:= there
observer 1 received string:= Kotlin
observer 2 received string:= Kotlin
observer 3 received string:= Kotlin

Here Observer two and Observer three are subscribing after all onNext called. That’s why Observer two and observer three received the last emitted item. If it was the publish subject, Observer two and Observer three will not receive anything, as they are called after all emissions.

ReplaySubject:

Output:

observer 1 received string:= hello
observer 1 received string:= there
observer 1 received string:= Kotlin
observer 2 received string:= hello
observer 2 received string:= there
observer 2 received string:= Kotlin

You need to be careful with this subject in case of infinite observable sources, as it will cache all the data and will take memory.

AsyncSubject:

Be careful in case of the infinite observable source as it will emit the last item that is received before onComplete called.

//Async Subject
val asyncSubject: AsyncSubject<String> = AsyncSubject.create()
asyncSubject.onNext("hello")
asyncSubject.onNext("there")
asyncSubject.onNext("Kotlin")
asyncSubject.onComplete()
asyncSubject.subscribe {
Log.d(TAG, "observer 1 received string:= $it")
}
asyncSubject.subscribe {
Log.d(TAG, "observer 2 received string:= $it")
}
asyncSubject.subscribe {
Log.d(TAG, "observer 3 received string:= $it")
}

Output:

observer 1 received string:= Kotlin
observer 2 received string:= Kotlin
observer 3 received string:= Kotlin

UnicastSubject:

val unicastSubject: UnicastSubject<String> = UnicastSubject.create()
val observable = Observable.interval(300, TimeUnit.MILLISECONDS)
.map {
"${(it+1)*300} milliseconds"
}.subscribe(unicastSubject)

Thread.sleep(2000);

unicastSubject.subscribe { Log.d(TAG, "observer 1 received string:= $it") }
Thread.sleep(2000);

Output:

observer 1 received string:= 300 milliseconds
observer 1 received string:= 600 milliseconds
observer 1 received string:= 900 milliseconds
observer 1 received string:= 1200 milliseconds
observer 1 received string:= 1500 milliseconds
observer 1 received string:= 1800 milliseconds
observer 1 received string:= 2100 milliseconds
observer 1 received string:= 2400 milliseconds
observer 1 received string:= 2700 milliseconds
observer 1 received string:= 3000 milliseconds
observer 1 received string:= 3300 milliseconds
observer 1 received string:= 3600 milliseconds
observer 1 received string:= 3900 milliseconds

What happened here?

Here, after 2 seconds, the first six items will be released immediately to the Observer when it subscribes. After that, it will receive one by one.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store