RxJava Guide 🐉

Betül Necanlı
13 min readFeb 22, 2023

--

RxJava is a Java-based implementation of ReactiveX, a library for composing asynchronous and event-based programs by using observable sequences. It is a popular choice for building robust, scalable, and responsive applications in Java.

With Kotlin, you can use RxJava to write concise, expressive, and readable code for asynchronous programming. Here’s how you can get started with RxJava in Kotlin:

1.Dependencies: To use RxJava in your Kotlin project, you need to include the following dependencies in your build.gradle file:

implementation ‘io.reactivex.rxjava2:rxjava:2.x.y’
implementation ‘io.reactivex.rxjava2:rxkotlin:2.x.y’

2.Observables: Observables are the source of data in RxJava. They emit a sequence of items, which can be processed by one or more Observers.

👉🏻 Here’s an example of how you can create an Observable in Kotlin:

3.Observers: Observers are objects that subscribe to Observables and receive the data emitted by them. You can create an Observer in Kotlin as follows:

4.Subscriptions: Subscriptions represents a connection between an Observable and an Observer. When you subscribe an Observer to an Observable, a Subscription is returned. The Subscriptions allows you to manage the lifecycle of the subscription and to unsubscribe or cancel the subscription when you no longer need to receive items from the Observable.

5.Operators: Operators are functions that you can apply to Observables to modify the data they emit. Some common operators include map, filter, flatMap, etc.

👉🏻 Here's an example of using the map operator in Kotlin:

Map the observable to multiply each item by 2 and subscribe the same observer to the mapped observable.

6.Schedulers: RxJava provides a number of Schedulers that you can use to specify how and where you want your observable sequences to be executed. Schedulers can be used to specify thread pools, whether to use a new thread for each observable item, or whether to run on the main thread. Examples of RxJava schedulers include:

  • Schedulers.io(): for I/O-bound work such as network requests or disk operations
  • Schedulers.computation(): for CPU-bound work that can be parallelized, such as image processing or mathematical calculations
  • Schedulers.single(): for running observable sequences on a single thread
  • AndroidSchedulers.mainThread(): for running observable sequences on the Android main thread

7.Subjects: Subjects are a type of observable that can act as both an observer and an observable. This means that you can subscribe to a subject and also emit items to it. There are four types of Subjects in RxJava:

  • PublishSubject: emits all items to its subscribers at the time of subscription and all subsequent items
  • BehaviorSubject: emits the most recent item to new subscribers and all subsequent items
  • ReplaySubject: emits all items to all subscribers, regardless of when they subscribe
  • AsyncSubject: only emits the last item when the observable sequence completes

👉🏻 Here are some examples of using Schedulers and Subjects in Kotlin RxJava:

In this first example, we create an observable sequence using Observable.fromCallable that performs some work on a new thread using Schedulers.newThread(). We then observe the results on the Android main thread using observeOn(AndroidSchedulers.mainThread()).

In the second example, we create a PublishSubject and subscribe to it using subject.subscribe. We then emit three items using subject..onNext and complete the sequence using subject.onComplete. The onNext calls trigger the subject's subscribers to handle the emitted items.

Observables

In RxJava, an Observable represents a stream of data that can be emitted over time. An Observable can emit zero or more items, and it can also indicate that it has completed or that an error has occurred. An Observer subscribes to an Observable and receives the items emitted by it.

Here’s how you can create and use observables in Kotlin:

1.Creating observables: There are several ways to create observables in RxJava, such as using just, fromArray, fromCallable, fromFuture, etc.

👉🏻 Here’s an example of creating an observable using just:

val observable = Observable.just(1, 2, 3, 4, 5)

Here’s an example of creating an observable using fromArray:

val numbers = intArrayOf(1, 2, 3, 4, 5)
val observable = Observable.fromArray(*numbers)

2.Subscribing to observables: To receive the items emitted by an observable, you need to create an observer and subscribe it to the observable. An observer is an object that implements the Observer interface and provides implementations for onNext, onComplete, and onError methods.

👉🏻 Here’s an example of subscribing an observer to an observable:

3.Disposing of subscriptions: When you subscribe an observer to an observable, a Disposable is returned. You can use this Disposable to dispose of the subscription if you no longer want to receive the items emitted by the observable.

👉🏻 Here’s an example of disposing of a subscription:

4.Transformation operators: You can use transformation operators to modify the items emitted by an observable. For example, you can use the map operator to transform each item emitted by an observable into a different item.

👉🏻 Here’s an example of using the map operator:

Observers

An observer in RxJava is an object that subscribes to an Observable to receive notifications about its emitted items.

👉🏻 Here is an example of creating an Observer in Kotlin:

In this example, we create an anonymous class that implements the Observer interface and overrides its three methods: onNext, onComplete, and onError.

The onNext method is called for each item emitted by the observable. The onComplete method is called when the observable has completed emitting all of its items. The onError method is called if an error occurs in the observable.

Here’s an example of using the observer with an observable:

val observable = Observable.just(1, 2, 3, 4, 5)
observable.subscribe(observer)

In this example, we create an Observable using the just method and then subscribe the onComplete to it. The just method creates an observable that emits a fixed set of items.

In conclusion, an observer in RxJava is an object that receives notifications about items emitted by an Observable. It allows you to react to these emissions and perform actions based on the items emitted.

filter operators, combine operators, error handling

1.Filter Operators: Filter operators allow you to filter items emitted by an observable based on some condition. Some of the common filter operators in RxJava are filter, take, skip, takeLast, skipLast, etc.

👉🏻 Here’s an example of using the filter operator:

In this example, an Observable is created that emits the numbers from 1 to 10. The filter() operator is used to filter out only the even numbers from the stream, by checking if the number is divisible by 2. The resulting Observable emits only the even numbers, which are then printed out to the console using the subscribe() method.

👉🏻 Here’s an example of using the take operator:

In this example, an Observable is created that emits the numbers from 1 to 10. The take() operator is used to take only the first 5 numbers from the stream. The resulting Observable emits only the first 5 numbers, which are then printed out to the console using the subscribe() method.

👉🏻 Here’s an example of using the skip operator:

In this example, an Observable is created that emits the numbers from 1 to 10. The skip() operator is used to skip the first 5 numbers in the stream. The resulting Observable emits only the numbers from 6 to 10, which are then printed out to the console using the subscribe() method.

👉🏻 Here’s an example of using the takeLast operator:

In this example, an Observable is created that emits the numbers from 1 to 10. The takeLast() operator is used to take the last 5 numbers from the stream. The resulting Observable emits only the last 5 numbers, which are then printed out to the console using the subscribe() method.

👉🏻 Here’s an example of using the skipLast operator:

In this example, an Observable is created that emits the numbers from 1 to 10. The skipLast() operator is used to skip the last 5 numbers in the stream. The resulting Observable emits only the numbers from 1 to 5, which are then printed out to the console using the subscribe() method.

👉🏻 Here’s an example of using the debounce operator:

In this example, the source Observable emits items, which are then passed through the debounce operator. The debounce operator is configured to only emit an item if 500 milliseconds have passed since the last item emission. The resulting stream of items is then subscribed to an observer, which handles the emitted items as desired.

Debounce is commonly used for scenarios like filtering out multiple button clicks or keystrokes in a text box, where only the last event is relevant.

2.Combine Operators: Combine operators allow you to combine items emitted by two or more observables into a single observable. Some of the common combine operators in RxJava are zip, merge, concat, etc.

👉🏻 Here’s an example of using the zip operator:

In this example, two Observables are created - one that emits the strings "A", "B", and "C", and one that emits the integers 1, 2, and 3. The zip() operator is used to combine the two streams into a single stream that emits the concatenated string and integer values.

The zip() operator takes two Observable as arguments, as well as a BiFunction that defines how to combine the elements of the two streams. In this case, the BiFunction concatenates the string and integer values, resulting in a stream that emits the strings "A1", "B2", and "C3".

The resulting Observable is then subscribed to, and the concatenated strings are printed out to the console using the subscribe() method.

👇🏻 Output:

Result: A1
Result: B2
Result: C3

👉🏻 Here’s an example of using the merge operator:

In this example, two Observables are created - one that emits the strings "A", "B", and "C", and one that emits the strings "D", "E", and "F". The merge() operator is used to combine the two streams into a single stream that emits all of the values in both streams.

The resulting Observable is then subscribed to, and the values from both streams are printed out to the console using the subscribe() method.

👇🏻 Output:

Value: A
Value: B
Value: C
Value: D
Value: E
Value: F

👉🏻 Here’s an example of using the concat operator:

In this example, two Observables are created - one that emits the strings "A", "B", and "C", and one that emits the strings "D", "E", and "F". The concat() operator is used to concatenate the two streams into a single stream that emits all of the values in the first stream, followed by all of the values in the second stream.

The resulting Observable is then subscribed to, and the values from both streams are printed out to the console using the subscribe() method.

👇🏻 Output:

Value: A
Value: B
Value: C
Value: D
Value: E
Value: F

3.Error Handling: When an error occurs in an observable, it can be propagated to the observer using the onError method. You can handle errors in several ways, such as by using try-catch blocks, by using the onErrorReturn operator, or by using the onErrorResumeNext operator.

Here’s an example of using the try-catch block:

In this example, the error that occurred in the observable is propagated to the observer via the onError method.

4.Other Operators: RxJava has many other operators that allow you to perform operations such as reducing, grouping, and more.

👉🏻 Here’s an example of using the reduce() operator:

In this example, an Observable is created that emits the integers 1, 2, 3, 4, and 5. The reduce() operator is used to apply an accumulator function to the values emitted by the Observable in sequence, starting with an initial value of 0. The accumulator function simply adds each value to the accumulated value. The resulting Observable emits a single value that represents the final result of the reduction operation, which is then printed out to the console using the subscribe() method.

👇🏻 Output:

Reduced value: 15

👉🏻 Here’s an example of using the groupBy() operator:

In this example, an Observable is created that emits instances of the Person data class, each of which contains a name and an age. The groupBy() operator is used to group the Person instances emitted by the Observable by their age. The resulting Observable emits GroupedObservable instances, each of which contains the Person instances with the same age. The flatMap() operator is then used to convert each GroupedObservable instance into an Observable that emits a list of the Person instances it contains. Finally, the resulting Observable emits the lists of Person instances for each age group, which are then printed out to the console using the subscribe() method.

👇🏻 Output:

People with age 23: [Person(name=Alice, age=23), Person(name=Charlie, age=23)]
People with age 28: [Person(name=Bob, age=28), Person(name=Emma, age=28)]
People with age 25: [Person(name=Dave, age=25)]

Where to use? 📜

Here are some real-life examples of how RxJava can be used with Kotlin:

1.User Interfaces: RxJava can be used to handle user interactions in mobile and desktop applications. For example, when a user types a search query into a search box, an Observable can be used to emit the query text in real-time, which can be used to update the search results.

In this example, textChanges() is an extension function provided by the RxTextView library, which returns an Observable that emits the text of the EditText as it changes. The debounce() operator is used to wait for 500 milliseconds before emitting the latest text to avoid emitting too many unnecessary queries. The observeOn() operator is used to switch the emissions to the io scheduler for performing the search on a background thread, and then switch back to the main thread using the observeOn() operator for displaying the search results in the UI.

2.Networking: RxJava can be used to handle network requests and responses in Android and web applications. For example, when a user requests data from a web service, an Observable can be used to emit the response in real-time, which can be used to update the UI or perform further computations.

In this example, getPost() is a method provided by the apiService that returns an Observable that emits a list of Post objects. The subscribeOn() operator is used to schedule the network request on the io scheduler for performing the request on a background thread, and then switch back to the main thread using the observeOn() operator for displaying the posts in the UI.

3.Database Operations: RxJava can be used to handle database operations in Android and desktop applications. For example, when a user adds a new item to a shopping list, an Observable can be used to emit the new item in real-time, which can be used to update the UI and store the item in the database.

In this example, addItem() is a method provided by the shoppingListRepository that returns an Observable that emits the newly added item. The subscribeOn() operator is used to schedule the database operation on the io scheduler for performing the insertion on a background thread, and then switch back to the main thread using the observeOn() operator for displaying a toast message to the user.

4.Event-Driven Programming: RxJava can be used to handle events in an event-driven system, such as a message queue. For example, when a new message is received from a queue, an Observable can be used to emit the message in real-time, which can be used to perform further computations.

In this example, receiveMessages() is a method provided by the queueService that returns an Observable that emits messages received from the queue. The subscribeOn() operator is used to schedule the message retrieval on the io scheduler for performing the retrieval on a background thread, and then switch to the computation scheduler for performing further computations on the message. The observeOn() operator is then used to switch back to the io scheduler for storing the result.

5.Streaming Data Processing: RxJava can be used to handle real-time streaming data, such as sensor data from a device. For example, when sensor data is received, an Observable can be used to emit the data in real-time, which can be used to perform further computations or store the data in a database.

In this example, getDataStream() is a method provided by the sensorService that returns an Observable that emits sensor data as it is received. The subscribeOn() operator is used to schedule the data retrieval on the io scheduler for performing the retrieval on a background thread, and then switch to the computation scheduler for performing further computations on the data. The filter() operator is then used to filter out invalid data, and the map() operator is used to compute the result. The observeOn() operator is then used to switch back to the io scheduler for storing the result.

6.Asynchronous Task Execution: RxJava can be used to execute asynchronous tasks and handle their results in a reactive manner. For example, when a task is executed asynchronously, an Observable can be used to emit the result in real-time, which can be used to update the UI or perform further computations.

In this example, asyncTask.execute() is a method that executes a task asynchronously and returns a Future object. The fromFuture() operator is used to convert the Future object to an Observable that emits the result of the task. The subscribeOn() operator is used to schedule the execution of the task on the io scheduler for performing the task on a background thread, and then switch back to the main thread using the observeOn() operator for displaying the result in the UI.

👉🏻RxJava is a versatile and powerful library that can be used in many different applications.

Tips & Tricks 🍬

1.Use Kotlin extension functions to make RxJava code more concise and readable. For example, you can define an extension function that converts an Observable to a Flowable like this:

With this function defined, you can now easily convert an Observable to a Flowable using the toFlowable() extension function.

2.Use the let function to chain RxJava operators together. The let function allows you to pass the result of one RxJava operator to the next operator in the chain, like this:

In this example, the let function is used to pass the result of the filter() operator to the map() operator, which is then subscribed to using the subscribe() method.

3.Use the doOnNext(), doOnError(), and doOnComplete() operators to add side effects to your RxJava code. These operators allow you to perform an action when an item is emitted by the Observable, when an error occurs, or when the Observable completes, respectively.

In this example, the doOnNext(), doOnError(), and doOnComplete() operators are used to print out messages when an item is emitted, when an error occurs, and when the Observable completes, respectively.

4.Use the observeOn() and subscribeOn() operators to control the threading behavior of your RxJava code. The observeOn() operator allows you to specify which scheduler to use for downstream operators, while the subscribeOn() operator allows you to specify which scheduler to use for upstream operators.

In this example, the subscribeOn() operator is used to specify that the Observable should emit items on an I/O thread, while the observeOn() operator is used to specify that downstream operators should run on the main thread.

5.Use the Single, Completable, and Maybe classes to represent single-value and void responses. These classes provide a more concise and type-safe way to represent single-value and void responses in RxJava.

In this example, the getUser() function returns a Single that emits a single User object, the saveUser() function returns a Completable that indicates whether the save was successful or not, and the deleteUser() function returns a Maybe that emits the deleted User object, if one exists.

--

--

Betül Necanlı
Betül Necanlı

Written by Betül Necanlı

Kotlin, Android Programming, Data Structures&Algorithms, Math. https://www.youtube.com/@betulnecanli

No responses yet