RxJava / RxAndroid

最近幾年非常紅的RxJava,大概用了快一年了。來稍微寫的心得分享吧。

關於 RxJava 的原理,網路上有一些大神(例如這個),我相信我應該是沒辦法寫更好了,所以這篇就是著重在 如何使用

What is RxJava

直接引用官方說法:a library for composing asynchronous and event-based programs using observable sequences for the Java VM

我個人認為重點是兩個字:asynchronous & sequences!RxJava強大的地方是,可以用非常簡單的方式,在不同thread之間,依序執行一連串的 program。處此之外,它提供了許多 operator,可以讓我們在一連串的 program之中,進行方便的整理或操作。

使用原理

首先你一定要先知道的,因為他是一種 event-base programming,所以一定會有兩個元素:Observable(可被觀察的對象/被觀察者) & Observer(觀察者)。一般時候 Observable要被指定 Observer之後,才會去執行。

一旦 Observable & observer 都設定好,Observable就會去觸發event(官網是使用emit這個字),Observer就會去接收並處理event。

恩...所以到底是什麼意思呢XD

Observable

先從 Observable 開始。來個最基本的範例來解釋

Observable.just("test string")
          .subscribe(observer)

上面這個例子很簡單,Observable擊發(emit)一個string object,然後由 observer 接收並處理

下一個範例就比較特別且常用

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribe(observer)

這個例子,他不是擊發Array,而是依序擊發Array Item,所以observer會依序地呼叫三次,且三次所帶的 event(或是參數)會分別是 David,Cathy & Lulu

Observable.from() & Observable.just()是最容易且最常見create Observable 物件的方式,當然你也可以客製化自己的 Observable 物件,方法是使用 Observable.create(),如此可以自己定義擊發規則event型態
例如以下範例:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("No. 1");
        subscriber.onNext("No. 2");
        subscriber.onNext("No. 3");
        subscriber.onCompleted();
    }
});

該 Observable 會依序擊發 No. 1, No. 2, No. 3的字串給 Observer,然後結束

Observer

Observer 基本上有三個部分:onNest, onCompleted, & onError,如下:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
  • OnNext:是你要處理每次擊發的地方。這個範例僅只是做輸出
  • onCompleted:如果所有擊發完成,你可以在這裡執行想做的事
  • onError:如果在整個sequence中任何地方發生Exception,他會一律呼叫onError。這也凸顯一個好處:我們只需要在一個地方統一處理所有可能 Exception
  • 提醒onCompleted & onError,不會同時被呼叫,且僅僅只會被呼叫一次。onNext則是會依照需求,可能被呼叫1次或多次,甚至不會呼叫。

所以一個完整的使用會長的像這樣:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "name: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribe(observer)

執行結果:

name: David
name: Cathy
name: Lulu
Complete!

但是有時候我們為了節省時間,不會完整定義Observer,而只是 implement 我們想要的部分。所以就會有以下的寫法:

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  Log.d(tag, "name: " + s);
              }
          });

這個範例只 implement 了 onNext method,而不是完整 Observer物件。基本上這樣寫是沒有問題的,只是如果有Exception,就沒有辦法處理了。那你可以這樣寫:

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  Log.d(tag, "name: " + s);
              }
          }, new Action1<Throwable>(){
              @Override
              public void onError(Throwable e) {
                Log.d(tag, "Error!");
              }
          });

Operator

RxJava 令人驚豔的一個特點就是豐富的 Operator,其實目前我也還沒有完全參透(因為太多了)有興趣的話可以看這裡。這裡示範我自己常用的

*map

可以幫你把型態轉換成另一個再繼續往下執行

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return s.length();
                    }
                })
          .subscribe(new Action1<Integer>() {
              @Override
              public void call(Integer length) {
                  Log.d(tag, "The length of string : " + length);
              }
          })

這個範例可以輸出字串的長度

輸出結果如下

The length of string : 5
The length of string : 5
The length of string : 4

*filter

如果符合規則才會往下執行

String[] names = {"David","Badu","Cathy","Lulu"};
Observable.from(names)
          .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.length() == 5;
                    }
                })
          .subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  Log.d(tag, "name: " + s);
              }
          })

這個範例是希望只輸出長度為5的字串

輸出結果如下

name: David
name: Cathy

*強!

RxJava 強大的地方要來了:Operator是可以串連的。例如我現在想要跟前兩位姓名長度為5的人打招呼(這啥鬼要求),你可以這樣寫:

String[] names = {"Tasvaluan","David","Badu","Cathy","Lulu","Bruce"};
Observable.from(names)
          .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.length() == 5;
                    }
                })
          .take(2) // 取兩位
          .map(new Func1<String, String>){
              @Override
              public String call(String s){
                  return "Hello, " + s;
              }
          }
          .subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  Log.d(tag, "name: " + s);
              }
          })

這個範例使用到三個 operator:filter, take & map

則你的輸出

Hello, David
Hello, Cathy

如何,有沒有開始感受到 RxJava 的強大呢!所以你可以利用各種 operator,幫你依序處理你的資料流。

Threading

知道基本寫法之後,就是介紹如何切換 thread! 其實很間單,只需要加一行就可以指定 Thread:

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribeOn(Schedulers.io()) // new line
          .subscribe(observer)

如此就會把依序即發 array item的程序,設定在 io thread裡。
但是為了把文字輸出在 Android Device上,你需要指定回 Android UI Thread。那你就再加一行:

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread()) // new line
          .subscribe(observer)

如此就會先在io thread擊發事件,然後在UI Thread中處理字串。
沿用上面的範例,你可以這樣寫:

String[] names = {"Tasvaluan","David","Badu","Cathy","Lulu","Bruce"};
Observable.from(names)
          .subscribeOn(Schedulers.io()) // in io thread
          .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.length() == 5;
                    }
                })
          .take(2) // 取兩位
          .map(new Func1<String, String>){
              @Override
              public String call(String s){
                  return "Hello, " + s;
              }
          }
          .observeOn(AndroidSchedulers.mainThread()) // in UI thread
          .subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  Log.d(tag, "name: " + s);
              }
          })

如此你對於資料的處理就會全部發生在io thread,一旦資料處理完,就會送回 UI thread處理。這是實務上最常遇到的寫法。

如果你需要更多次的切換 thread ,可以參考這個线程控制部分。

more operator:flatMap

在介紹一個也非常常見但是有點難理解的 operator: flatMap。基本上跟map有點像,是用來做型態轉換,但是比較不同的是,會轉換成另一個 Observable 物件,所以這個物件會具有自己的擊發規則,以及自己的 operator。比較常見的用法是處理被擊發的Array

例如,你想要輸出每個人名的字元,不用flatMap之前你可以會這樣寫:

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  char[] charArray = s.toCharArray();
                  for(int i=0;i<charArray.length;i++){
                      Log.d(tag, "the char of name : " + charArray[i]);
                  }
              }
          });

這樣的寫法,會覺得有點可惜,因為我們已經用from去依序擊發 names,怎麼還是用 for loop去處理顯示呢?

You Need flatMap !

String[] names = {"David","Cathy","Lulu"};
Observable.from(names)
          .flatMap(new Func1<String, Observable<Char>() {
                    @Override
                    public Observable<Char> call(String s) {
                        char[] charArray = s.toCharArray();
                        return Observable.from(charArray);
                    }
                })
          .subscribe(new Action1<Char>() {
              @Override
              public void call(char c) {
                Log.d(tag, "the char of name : " + c);
              }
          });

results matching ""

    No results matching ""