Subjects: Rxjava

PublishSubject:

It’s the simplest form of the subject; there are other subjects that changes its behavior based on their implementation. PublishSubject acts as hot observable. If you don’t know what is hot and cold observable, you can read the following post.

val basicSubject: PublishSubject<String> = PublishSubject.create()
val observer = basicSubject.subscribe({
Log.d(TAG, "received string:= $it")
}, {
it
.printStackTrace()
})
basicPublishSubject.onNext("hello")
basicPublishSubject.onNext("Kotlin")
basicPublishSubject.onComplete()
//subject declaration 
val basicSubject: PublishSubject<String> = PublishSubject.create()
basicSubject.onNext("hello")//attach subscriber to it
val observer = basicPublishSubject.subscribe({
Log.d(TAG, "received string:= $it")
}, {
it
.printStackTrace()
})
basicPublishSubject.onNext("Kotlin")
val observableSource1 : Observable<String> =  Observable.just("1", "2", "3", "4")val basicSubject: PublishSubject<String> = PublishSubject.create()basicSubject.subscribe({
Log.d(TAG, "received string:= $it")
},{
it
.printStackTrace()
})
//pass this subject as a observer to subscribe methodobservableSource1.subscribe(basicSubject)

BehaviorSubject:

It’s somewhat similar to publish subject, but it will replay the last emitted item to each newly attached Observer.

//behaviour Subject
val behaviorSubject: BehaviorSubject<String> = BehaviorSubject.create()
behaviorSubject.subscribe{ Log.d(TAG, "observer 1 received string:= $it") }behaviorSubject.onNext("hello")
behaviorSubject.onNext("there")
behaviorSubject.onNext("Kotlin")
behaviorSubject.subscribe { Log.d(TAG, "observer 2 received string:= $it") }behaviorSubject.subscribe { Log.d(TAG, "observer 3 received string:= $it") }

ReplaySubject:

As the name suggests, it will replay all the emissions to the newly attached Observer. Replay subject will cache all the emissions, and then whenever the Observer is attached, it will replay all those emissions.

AsyncSubject:

Async Subject will only emit the last received item before onComplete called.

//Async Subject
val asyncSubject: AsyncSubject<String> = AsyncSubject.create()
asyncSubject.onNext("hello")
asyncSubject.onNext("there")
asyncSubject.onNext("Kotlin")
asyncSubject.onComplete()
asyncSubject.subscribe {
Log.d(TAG, "observer 1 received string:= $it")
}
asyncSubject.subscribe {
Log.d(TAG, "observer 2 received string:= $it")
}
asyncSubject.subscribe {
Log.d(TAG, "observer 3 received string:= $it")
}

UnicastSubject:

This subject will buffer all the emissions it receives until an Observer subscribes to it. After that, it will immediately release all the items to the Observer and then clear it’s cache. The unicast subject can only be subscribed by one Observer, if you try to attach another Observer to it, it will receive an error.

val unicastSubject: UnicastSubject<String> = UnicastSubject.create()
val observable = Observable.interval(300, TimeUnit.MILLISECONDS)
.map {
"${(it+1)*300} milliseconds"
}.subscribe(unicastSubject)

Thread.sleep(2000);

unicastSubject.subscribe { Log.d(TAG, "observer 1 received string:= $it") }
Thread.sleep(2000);

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Gaurav Rajput

Gaurav Rajput

Working remotely as Android Developer