Getting Started with RxJava and Its Application in Android Development

  • 2021-07-24 11:49:29
  • OfStack

RxJava's GitHub homepage, there is nothing to say in the deployment part ~
https://github.com/ReactiveX/RxJava

Foundation
The two core things of RxJava are Observables (observed, event source) and Subscribers (observer). Observables emits 1 series of events, and Subscribers handles these events. Events here can be anything you're interested in (touch events, data returned from web interface calls...)

1 Observable can emit zero or more events until the end or error. Every time an event is emitted, the onNext method of its Subscriber is called, and the final call to Subscriber. onNext () or Subscriber. onError () ends.

The Rxjava looks like the observer pattern in the design pattern, but there is one obvious difference, that is, if an Observerble does not have any Subscriber, then the Observable will not emit any events.

Hello World
It is very simple to create an Observable object, just call Observable. create directly


Observable<String> myObservable = Observable.create( 
  new Observable.OnSubscribe<String>() { 
    @Override 
    public void call(Subscriber<? super String> sub) { 
      sub.onNext("Hello, world!"); 
      sub.onCompleted(); 
    } 
  } 
); 


The Observable object defined here emits only one Hello World string, and then it ends. Next, we create an Subscriber to handle the string emitted by the Observable object.


Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { System.out.println(s); } 
 
  @Override 
  public void onCompleted() { } 
 
  @Override 
  public void onError(Throwable e) { } 
}; 

Here, subscriber just prints the string emitted by observable. Through the subscribe function, we can associate the myObservable object we defined with the mySubscriber object, thus completing the subscription of subscriber to observable.


myObservable.subscribe(mySubscriber); 

1 Once mySubscriber subscribes to myObservable, myObservable is to call onNext and onComplete methods of mySubscriber object, and mySubscriber will print Hello World!

Simpler code
Do you think it's too wordy to write so much code just to print an hello world? I have adopted this wordy writing here mainly to show the principle behind RxJava. RxJava actually provides many convenient functions to help us reduce code.

First, let's look at how to simplify the creation process of Observable objects. RxJava has many built-in functions to simplify the creation of Observable objects. For example, Observable. just is used to create Observable objects that end with only one event. The above code for creating Observable objects can be simplified to one line

Observable < String > myObservable = Observable.just("Hello, world!");
Next, let's look at how to simplify Subscriber. In the above example, we don't really care about OnComplete and OnError. We only need to do some processing when onNext, and then we can use Action1 class.


Action1<String> onNextAction = new Action1<String>() { 
  @Override 
  public void call(String s) { 
    System.out.println(s); 
  } 
}; 

The subscribe method has one overloaded version and accepts three Action1 type parameters corresponding to OnNext, OnComplete, and OnError functions.


myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction); 

We don't care about onError and onComplete here, so we only need the first parameter


 
myObservable.subscribe(onNextAction); 
// Outputs "Hello, world!" 

The above code can eventually be written like this


Observable.just("Hello, world!") 
  .subscribe(new Action1<String>() { 
    @Override 
    public void call(String s) { 
       System.out.println(s); 
    } 
  }); 

Using lambda of java8 can make the code simpler


Observable.just("Hello, world!") 
  .subscribe(s -> System.out.println(s)); 


In Android development, it is highly recommended to use retrolambda, the gradle plug-in, so that you can use lambda in your code.

Transformation
Let's do something more interesting!
For example, if I want to add my signature to hello world, you might want to modify the Observable object:


Observable.just("Hello, world! -Dan") 
  .subscribe(s -> System.out.println(s)); 

If you can change the Observable object, of course, this is OK, but what if you can't change the Observable object? For example, the Observable object is provided by the third party library? For example, my Observable object is subscribed to multiple Subscriber, but I only want to make changes to one subscriber?
What about modifying events in Subscriber? For example, the following code:


Observable.just("Hello, world!") 
  .subscribe(s -> System.out.println(s + " -Dan")); 

This approach is still unsatisfactory, because I want my Subscribers to be as lightweight as possible, because it is possible for me to run subscriber in mainThread. In addition, according to the concept of responsive functional programming, what Subscribers should do is to "respond", respond to the events emitted by Observable, rather than modify them. What if I could do some of the intermediate steps with "Hello World!" Isn't it cool to make changes?

Instances

We will use the Meteorological Map Open Platform (OpenWeatherMap) API as a demonstration example. OpenWeatherMap (http://api. openweathermap. org/) is a free weather data API, which is very easy to configure and use. When calling, you only need to pass in location information (city name or geographical coordinates) as parameters
Usually, to invoke an API, you need the following steps (each step has a heap of formulaic code):

Create the required model classes (add annotations if necessary).
Network layer code that implements request-response management with error handling.
Request call is realized by background thread (1 is usually realized in the form of asynchronous task), and response information is presented by a callback function (Callback Function) on UI thread.
Create a model class

Step 1 can be automated (semi-) with an JSON-POJO generation tool like jsonschema2pojo. The model classes for OpenWeather API are as follows:


Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { System.out.println(s); } 
 
  @Override 
  public void onCompleted() { } 
 
  @Override 
  public void onError(Throwable e) { } 
}; 
0

Realization of network call with Retrofit

The implementation of the network call in Step 2 usually requires a lot of formulaic code, but if implemented with Square's Retrofit component (http://square. github. io/retrofit/), the amount of code will be greatly reduced. All you need to do is create an interface class (annotate the entire request), and then create the client with RestAdapter. Builder. Retrofit can also be used to serialize and deserialize JSON.


Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { System.out.println(s); } 
 
  @Override 
  public void onCompleted() { } 
 
  @Override 
  public void onError(Throwable e) { } 
}; 
1

In the above example, we can see that the comments before the method are made by an HTTP method (we use GET here, of course, you can also implement POST, PUT, DELETE and HEAD methods with Retrofit as needed) and a relative path (the basic path is provided by RestAdapter. Builder). The @ Query annotation is used to assemble the request parameters. We have two parameters here, one is place (for location) and the other is units unit of measurement.

Let's look at a concrete example of a call (which should be placed in a non-UI thread in actual code). This code is relatively easy to understand:


Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { System.out.println(s); } 
 
  @Override 
  public void onCompleted() { } 
 
  @Override 
  public void onError(Throwable e) { } 
}; 
2

Well, it's very simple, you only need a little code to implement the whole call process, which is the power of Retrofit

Realization of responsive programming with RxJava

Now we're in step 3: the RxJava section! Our example here will use it to implement asynchronous request invocation.
First, we need to replace the interface class created earlier with this class:


public class ApiManager {
 
  private interface ApiManagerService {
    @GET("/weather")
    WeatherData getWeather(@Query("q") String place, @Query("units") String units);
  }
 
  private static final RestAdapter restAdapter = new RestAdapter.Builder()
    .setServer("http://api.openweathermap.org/data/2.5")
    .build();
  private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
 
  public static Observable<WeatherData> getWeatherData(final String city) {
    return Observable.create(new Observable.OnSubscribeFunc<WeatherData>() {
      @Override
      public Subscription onSubscribe(Observer<? super WeatherData> observer) {
        try {
          observer.onNext(apiManager.getWeather(city, "metric"));
          observer.onCompleted();
        } catch (Exception e) {
          observer.onError(e);
        }
 
        return Subscriptions.empty();
      }
    }).subscribeOn(Schedulers.threadPoolForIO());
  }
 
}

Let's first look at the method getWeatherData (), which calls the Observable. create () method and passes in an implementation of Observable. OnSubscribeFunc to get an Observable object and returns. And 1 time the Observable object is subscribed (subscribed), it will start working. The result of each Observable process is passed as a parameter to the onNext () method. Because we only want to make concurrent calls to network requests here, we only need to make the request call once in each Observable object. The code finally calls the onComplete () method. The subscribeOn () method here is important because it determines which thread the program will choose. This is Schedulers. threadPoolForIO (), which is used to optimize IO and network performance related work.

The last step is to implement this API call. The following code implements concurrent network requests, each of which invokes the same url asynchronously with different invocation parameters:


Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { System.out.println(s); } 
 
  @Override 
  public void onCompleted() { } 
 
  @Override 
  public void onError(Throwable e) { } 
}; 
4

The Observable. from () method converts an array of city names into an observable object, providing the strings in the array to different threads. The mapMany () method will then convert every string provided by the former to an observable object. The transformation here is done by calling ApiManager. getWeatherData ().

It is still registered on the I/O thread pool. On an Android system, if the results need to be presented on an UI, the data must be published to an UI thread for processing. Because we know that on Android, only the original thread that created the interface can operate the interface. All you need to do here is call AndroidSchedulers. mainThread () with the observeOn () method. The call to the subscribe () method triggers the observable object, where we can process the results emitted by the observable object.

This example demonstrates the power of RxJava. Without Rx, we need to create N threads to invoke the request, and then asynchronously hand over the processing results to the UI threads. Using Rx requires very little code to do the job, using its powerful capabilities to create, merge, filter, and transform observable objects.

RxJava can be used as a powerful concurrency tool when developing Android App. Although it still needs 1 time to be familiar with it, sharpening knives and cutting firewood workers will bring you great help if you master it. The reactive extension library is a good idea and we've been using it for several weeks to develop Android programs (in the near future, asynchronous task processing for our products will be done entirely based on it). The more you know it, the more you will fall in love with it.


Related articles: