Implementation summary of multiple scenarios in RxJava

  • 2020-05-10 18:08:19
  • OfStack

1. Delay the execution of the action

You can use the timer+map method. The code is as follows:


Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
   return doSomething();
  }).subscribe(System.out::println);
 }

2. Delay sending the results of the execution

This scenario requires that the action to produce the data be executed immediately, but the result is sent late. This is different from the scenario above.

This scenario can be used Observable.zip To implement.

The zip operator combines the data emitted by multiple Observable in sequence, each of which can be combined only once and in an orderly manner. The final amount of combined data is determined by Observable, which has the least transmitted data.

For the data of the same location of each observable, we need to wait for each other. That is to say, after the data of the first observable location is generated, we need to wait for the data of the second observable location to be generated, and only after the data of the same location of each Observable are generated, can we combine them according to the specified rules.

There are many declarations for zip, but they are basically 1, that is, several observable are passed in, and then a rule is specified to process the data at the corresponding location of each observable and generate a new data. The following is the simplest one:


 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);

Implementation of push sending with zip results are as follows:


 Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
         ,Observable.just(doSomething()), (x,y)->y)
   .subscribe(System.out::println));

3. Use defer to perform an action in a specified thread

The following code, although we specify how the thread will run, however doSomething() This function is still executed in the thread that the current code calls.


 Observable.just(doSomething())
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
     .subscribe(v->Utils.printlnWithThread(v.toString()););

We usually do this in the following ways:


 Observable.create(s->{s.onNext(doSomething());})
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{
     Utils.printlnWithThread(v.toString());
  });

But we can do the same with defer.

About defer

The defer operator, like the operators create, just, from, etc., is to create a class operator, but all the data associated with this operator is only valid after the subscription.

Statement:


 public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);

Observable in Func0 of defer is created when subscribing to (subscribe).

Function:

Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.

So observable was created at subscription time.

The above question is implemented by defer:


 Observable.defer(()->Observable.just(doSomething()))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
  });

4. Use compose without breaking the chain

We often see the following code:


 Observable.just(doSomething())
    .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());

In the code above, subscribeOn(xxx).observeOn(xxx) It might be 1 in a lot of places, but if we wanted to do it in one place, we could write it like this:


 private static <T> Observable<T> applySchedulers(Observable<T> observable) {
  return observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

But then every time we need to call the above method, it's going to look something like this, and the outermost function is 1, which is equal to breaking the link structure:


 applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
   @Override public Data call(Data data) {
   return manipulate(data);
   }
  })
 ).subscribe(new Action1<Data>() {
  @Override public void call(Data data) {
  doSomething(data);
  }
 });

You can use the compose operator without breaking the link structure.

compose's statement is as follows:


 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
0

Its input parameter is 1 Transformer interface, and its output is 1 Observable interface. Transformer is actually 1 interface Func1<Observable<T> , Observable<R>> In other words, it can convert one type of Observable into another type of Observable.

Simply put,compose can be converted from the original observable to another Observable by the specified conversion mode (input parameter transformer).

With compose, the thread mode is specified as follows:


 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
1

The function applySchedulers can be simplified one step further by using the lambda expression as follows:


 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
2

5. Use different execution results by priority

The above heading probably doesn't make the scenario I'm trying to get across very well. In fact, the scenario I'm trying to get across is similar to the normal scenario of getting network data: if the cache has it, get it from the cache; if not, get it from the network.

The requirement here is that if the cache has, it will not do anything to get data from the network.

This can be implemented using concat+first.

concat merges several Observable into one Observable and returns the final one Observable. The data is as if it were sent from one Observable. The parameters can be multiple Observable or Iterator containing Observalbe.

The data in the new observable is arranged in the order of observable in the original concat, that is, the data in the new result is sorted in the original order.

Here is the implementation of the above requirements:


 Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
    .subscribe(v->System.out.println("result:"+v));
 // Get data from the cache 
 private static Observable<String> getDataFromCache(){
  return Observable.create(s -> {
   //dosomething to get data
   int value = new Random().nextInt();
   value = value%2;
   if (value!=0){
    s.onNext("data from cache:"+value); // To generate data 
   }
   //s.onError(new Throwable("none"));
   s.onCompleted();
  }
    );
 }
 // Get data from the network 
 private static Observable<String> getDataFromNetwork(){
  return Observable.create(s -> {
   for (int i = 0; i < 10; i++) {
    Utils.println("obs2 generate "+i);
    s.onNext("data from network:" + i); // To generate data 
   }
   s.onCompleted();
  }
    );
 }

The above implementation, if getDataFromCache has data, getDataFromNetwork's code here will not execute, which is exactly what we want.

The above implementation has a few caveats:

      1. It is possible to get data from both places. In this case, first will throw an exception NoSuchElementException.

      2, above getDataFromCache() If we call onCompleted instead of onCompleted, we call onError instead of concat, then the above concat will get no result, because concat will stop merging after receiving any one error, so if we want to use onError, we need to use concatDelayError instead concat.concatDelayError error will be ignored and error will be deferred to the end.

conclusion

The above is the whole content of this article, I hope the content of this article to your study or work can bring 1 definite help, if you have questions you can leave a message to communicate.


Related articles: