Getting Started with Responsive Programming via RxJava in Android

  • 2021-07-24 11:49:21
  • OfStack

Error handling

So far, we haven't introduced much to the onComplete () and onError () functions. These two functions are used to inform the subscriber that the object being observed will stop sending data and why it stopped (successfully completed or something went wrong).

The following code shows how to use these two functions:


Observable.just("Hello, world!")
  .map(s -> potentialException(s))
  .map(s -> anotherPotentialException(s))
  .subscribe(new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { System.out.println("Completed!"); }

    @Override
    public void onError(Throwable e) { System.out.println("Ouch!"); }
  });

potentialException () and anotherPotentialException () in the code may throw exceptions. Every Observerable object calls the onCompleted () or onError () methods at the end, so Demo prints "Completed!" Or "Ouch!".

This model has the following advantages:

1. onError () 1 will be called whenever an exception occurs

This greatly simplifies error handling. You only need to handle errors in one place.

2. Operators do not need to handle exceptions

Leaving exception handling to the subscriber, the onError () method is executed directly when one of the 1 operators in the Observerable call chain throws an exception.

3. You can know when the subscriber has received all the data.

Knowing when the task ends can help simplify the flow of code. (Although it is possible that the Observable object will never end)

I think this error handling method is simpler than traditional error handling. In traditional error handling, errors are usually handled in each callback. This not only leads to duplicate code, but also means that every callback must know how to handle errors, and your callback code will be tightly coupled to the caller at 1.

With RxJava, Observable objects don't need to know how to handle errors at all! The operator also does not need to handle the error state-1. When an error occurs, the current and subsequent operators are skipped. All error handling is left to the subscriber.

Scheduler

Suppose you write Android app to request data from the network (feel this is necessary, is there a stand-alone?) . Network requests take a long time, so you plan to load the data in another thread. Then the problem is coming!

Writing a multithreaded Android application is difficult because you must make sure that the code runs on the correct thread, otherwise app may crash. The most common way is to update UI on a non-main thread.

With RxJava, you can use subscribeOn () to specify the threads run by the observer code and observerOn () to specify the threads run by the subscriber:


myObservableServices.retrieveImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

Isn't it simple? Any code that executes before my Subscriber runs in an I/O thread. Finally, the code to operate view runs in the main thread.

Best of all, I can add subscribeOn () and observerOn () to any Observable object. These two are also operators! . I don't need to care about the Observable object and what operators it has on it. Only using these two operators can realize scheduling in different threads.

If I use AsyncTask or something like that, I will have to carefully design my code to figure out the parts that need to be executed concurrently. With RxJava, I can leave the code unchanged and just call these two operators when concurrency is required.

Subscription (Subscriptions)

When Observable. subscribe () is called, an Subscription object is returned. This object represents the connection between the observed and the subscriber.


ubscription subscription = Observable.just("Hello, World!")
  .subscribe(s -> System.out.println(s));

You can use this Subscription object later to manipulate the connection between the observer and the subscriber.


subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Outputs "Unsubscribed=true"

Another benefit of RxJava is that when it handles unsubscribing, it stops the entire call chain. If you use a very complex string of operators, the call to unsubscribe will terminate where it is currently executing. There is no need to do any extra work!

RxAndroid

RxAndroid is an extension of RxJava for the Android platform. It includes a few tools that can simplify the development of Android.

First, AndroidSchedulers provides a scheduler for Android's threading system. Do you need to run some code in an UI thread? It's very simple, just use AndroidSchedulers. mainThread ():


retrofitService.getImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

If you have created your own Handler, you can use HandlerThreadScheduler1 to link a scheduler to your handler.

Next, we will introduce AndroidObservable, which provides many functions to match the life cycle of Android. The bindActivity () and bindFragment () methods use AndroidSchedulers. mainThread () by default to execute observer code, which notifies the observer to stop emitting new messages at the end of Activity or Fragment.


AndroidObservable.bindActivity(this, retrofitService.getImage(url))
  .subscribeOn(Schedulers.io())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap);

I myself like the AndroidObservable. fromBroadcast () method, which allows you to create an Observable object similar to BroadcastReceiver. The following example shows how to be notified when the network changes:


IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
  .subscribe(intent -> handleConnectivityChange(intent));

Finally, we will introduce ViewObservable, which can be used to add some bindings to View. If you want to receive an event every time you click view, you can use ViewObservable. clicks (), or if you want to listen for content changes in TextView, you can use ViewObservable. text ().


ViewObservable.clicks(mCardNameEditText, false)
  .subscribe(view -> handleClick(view));

Retrofit

The famous Retrofit library has built-in support for RxJava (official download page http://square.github.io/retrofit/# download). Typically, the caller can get asynchronous results by using an Callback object:


@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);

With RxJava, you can directly return 1 Observable object.


@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);

Now you can use Observable objects at will. You can not only get data, but also transform it.
Retrofit's support for Observable makes it easy to combine multiple REST requests. For example, if we have one request to get photos and one request to get metadata, we can send these two requests concurrently and wait for both results to return before processing:


myObservableServices.retrieveImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

0

I showed a similar example in article 2 (using flatMap ()). Here I just want to show how easy it is to combine multiple REST requests using RxJava + Retrofit.

Legacy code, extremely slow running code

Retrofit can return Observable objects, but what if other libraries you use do not support this? Or an internal code, do you want to convert them into Observable? Is there any simple way?

Most of the time Observable. just () and Observable. from () help you create Observable objects from legacy code:


myObservableServices.retrieveImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

1

In the above example, if oldMethod () is fast enough, there is no problem, but what if it is slow? Calling oldMethod () will block his thread.
To solve this problem, we can use defer () to wrap slow code as I used in 1:


myObservableServices.retrieveImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

2

Now, the call to newMethod () will not block unless you subscribe to the returned observable object.

Life cycle

I left the hardest part at the end. How to deal with the life cycle of Activity? There are two main problems:
1. Continue the previous Subscription after configuration changes (such as screen rotation).

For example, you make an REST request using Retrofit, and then you want to present the result in listview. What if the user rotates the screen during the network request? Of course you want to continue your request, but what happened?

2. Memory leak caused by Observable holding Context

The problem is that when you create subscription, you somehow hold a reference to context, especially when you interact with view, which is too easy to happen! If the Observable does not end in time, the memory footprint will become larger and larger.
Unfortunately, there is no silver bullet to solve these two problems, but here are some guidelines for you to refer to.

The solution to the first problem is to use the built-in caching mechanism of RxJava, so that you can execute unsubscribe/resubscribe on the same Observable object without repeatedly running the code to get Observable. cache () (or replay ()) will continue to execute network requests (even if you call unsubscribe, it will not stop). This means that you can create a new Observable object from the return value of cache () when Activity is recreated.


Observable<Photo> request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...When the Activity is being recreated...
sub.unsubscribe();

// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));

Note that both sub are requests using the same cache. Of course, it is up to you to store the results of the request. Solution 1, which is related to all other lifecycles, must be stored somewhere outside the lifecycle. (ES 245EN ES 246EN or singleton, etc.).

The solution to the second problem is to unsubscribe at some point in the life cycle. A common pattern is to use CompositeSubscription to hold all Subscriptions and then cancel all subscriptions in onDestroy () or onDestroyView ().


myObservableServices.retrieveImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

4

You can create an CompositeSubscription object in the base class of Activity/Fragment and use it in the subclass.

Attention! 1 Once you call CompositeSubscription. unsubscribe (), the CompositeSubscription object is not available. If you still want to use CompositeSubscription, you must create a new object.


Related articles: