読者です 読者をやめる 読者になる 読者になる

visible true

技術的なメモを書く

RxJavaをコレクション操作ライブラリとして捉えれば学習コストと導入リスクを低減できるのではないか

Android RxJava

とりあえずRxJavaをプロダクトで使いたい、しかしチームに気軽に持ち込むには学習コストが高すぎたり習熟度が低い状態ではメンテナンスでリスクになったりするので導入できない。しかし導入したい。導入するにはどうしたらいいんだろうということで、RxJavaをコレクション操作ライブラリとして採用すれば低コスト低リスクに導入できるのではないかなと考えました。

本エントリではRxJavaをコレクション操作ライブラリとして使う観点でRxJavaのメソッドをいくつか紹介していきます。

導入する

Android前提でかくのでRxAndroidを使います。

dependencies {
  compile 'io.reactivex:rxandroid:0.24.0'
}

また、lambdaを使った記述をします。

登場するモデル

primitiveな型ばかり使っててもあんまり恩恵を想像できないので便宜的に幾つかのクラスを使います。中身に特に意味はないです。またRxJavaの雰囲気を掴む事を目的としているので色々な部分を省略して書いていきます。

User

ID、名前、有料会員かどうか、最終来訪日時を持つ。

public class User {
  long id;
  String name;
  boolean isPremium;
  long lastVisitedAt;
}

TrackingMeta

ユーザID、アプリケーションバージョンを持つメタ情報。

public class TrackingMeta {
  long userId;
  int appVersionCode;
}

Article

記事ID、タイトル、本文、リンク、タグ、有料会員のみかどうかを持つ。

public class Article {
  long id;
  String title;
  String description;
  String url;
  List<String> tags;
  boolean forPremium;
}

Observable.from()

とりあえずこれがないと始まらないです。Observable.from()は引数に渡した配列やListをObservable<T>に変換するメソッドです。Observable<T>は時間軸に流れるTのstreamを表しますが、コレクション操作ではIteratorをイメージすると良いと思います。for文やIteratorが欲しくなったらObservable.from()する感じでいきましょう。

//配列
Integer[] integers = new Integer[]{1, 2, 3, 4};
Observable<Integer> integerObservable = Observable.from(integers);

//リスト
List<User> users = createUserList();
Observable<User> userObservable = Observable.from(users);

subscribe()

subscribe()はObservable<T>が発行する値を購読するメソッドです。単純にfor-each相当のものであると考えてもとりあえず構わないと思います*1。subscribe()には引数の異なる6種類のバリエーションがありますが、コレクション操作ではsubscribe(Action1<T>)を使います。

integerObservable.subscribe(
  i -> Log.d(TAG, "from() i = " + i)
);

userObservable.subscribe(
  user -> {
    //do something
    Log.d(TAG, "from() user = " + user.getId());
});

map()

一番よく使うメソッドの一つです。Observableの要素1つ1つに関数を適用できます。

map()に渡すFunc1<T,R>はT型を引数に受け取ってR型を返す関数を表します。これによりObservableの型を変換できます*2

List<User> users = createUserList();
Observable<User> userObservable = Observable.from(users);
//Observable<User>をObservable<Long>に変換する
Observable<Long> userIdObservable = userObservable.map(User::getId);

//Observable<User>を元にObservable<TrackingMeta>をつくる
Observable<TrackingMeta> trackingMetaObservable = userObservable
  .map(user -> {
    return new TrackingMeta(user.getId(), 1001);
  }
);

filter()

その名の通り条件で要素をフィルタリングできます。引数はFunc1<T, Boolean>で、受け取ったTを後続に渡してよければtrue,捨てる場合はfalseを返します。

List<Article> articles = createArticleList();
//プレミアムじゃないArticleだけを抽出する
Observable<Article> freeArticles = Observable.from(articles)
  .filter(article -> !article.forPremium());

//プレミアムなArticleだけを抽出する
Observable<Article> premiumArticles = Observable.from(articles)
  .filter(article -> article.forPremium());

take(), limit(), range()

リスト操作っぽい関数群です。take()はObservable<T>から指定した長さのObservable<T>を取り出せます。limit()は指定した数を上限値としたObservable<T>を取り出せます。range()は初期値と長さを指定してObservable<Integer>を生成でき、それぞれの要素は初期値から順にインクリメントされた値になります。

take()

//15件のリストであるとする
List<Article> articles = createArticleList();

//10件のObservable<Article>を取り出す
Observable<Article> articlesTake = Observable.from(articles)
  .take(10);

//大きすぎる値をセットした場合Listの長さになる。ここでは15件になる。
Observable<Article> articlesTake2 = Observable.from(articles)
  .take(20);

limit()

List<Article> articles = createArticleList();
Observable<Article> articlesLimit = Observable.from(articles)
  .limit(10);

limit()の実装は以下のとおり。take()に別名を付けただけだった!

public final Observable<T> limit(int num) {
  return take(num);
}

range()

単純な使い方は以下の様な感じです。ただこのままだとあんまり使い道はなさそうです。for(int i = 0; i < 10; i++)みたいな事を書く事があれば置き換えは出来るよねといった所。

Observable.range(0, 10)
  .subscribe(i -> {
    //0,1,2,3,4,5,6,7,8,9
  });

コレクション操作ではrange()はあんまり要らないかもしれないです。例えばArrayAdapter<T>みたいに要素をindex指定でしか取れないものをObservable<T>にするといった利用方法が考えられます。

reduce()

reduce()Observable<T>の一連のsequenceを一つの値に集約します。第一引数に初期値Rをセットし、第二引数にFunc2<R, ? super T, R>を渡します。Func2<R, ? super T, R>は「RとTを受け取ってRを返す関数」で、第一引数には初期値Rか前回Func2が返したRが入ってきます。以下の実装では、ArticleのリストからforPremium=trueが何件あるかを数え上げています。

List<Article> articles = createArticleList();
Observable.from(articles)
                .filter(Article::forPremium)
                .reduce(0, (count, article) -> {
                    return count + 1;
                });

[休憩] 複雑な処理を簡潔に書く

ここまでに見たメソッドを使って少し複雑な事をしてみましょう。例えば「ArticleのリストからforPremium=trueを最大n件取り出し、そのIDをカンマ区切りで連結してStringにする」という要件を考えてみます。色んな実装の仕方があると思いますがRxJavaなしでザクッと適当にやると以下のような感じになります。

private String toPremiumIdListString(List<Article> articles, int n) {
  int count = 0;
  StringBuilder result = new StringBuilder();
  for (Article article : articles) {
    if (article.forPremium()) {
      count++;
      if (result.length() > 0) {
        result.append(",");
      }
      result.append(article.getId().toString());
      if (count >= n) {
        break;
      }
    }
  }
  return result.toString();
}

RxJavaを用いた場合は以下の通りです。要件の一つ一つをメソッド毎に分けて記述でき、理解がしやすいのではないでしょうか。

private Observable<String> toPremiumIdListString(List<Article> articles, int n) {
  return Observable.from(articles)
    .filter(Article::forPremium)
    .limit(n)
    .map(article -> article.getId().toString())
    .reduce(null,
      (ids, id) -> ids == null ? id : ids + "," + id
    );
}

RxJavaをコレクション操作に用いる事で以下のメリットが考えられます。

  • 要件を関数に分解して書くことで仕様が明快になる
  • 書き方が統一され可読性が上がる

lambdaが無いと冗長になりますが書く時はIDEがサポートしてくれるし、読む時も統一的なRxJavaの記述となるので負担はそこまでかからないでしょう。読みづらい場合は一つの関数に色々詰め込み過ぎているだとか抽象化できるものを展開して書いているとかいった兆候なので改善を検討しやすいです。

map()filter()limit()などの関数はOperatorと呼ばれていて、内部的にはそれぞれに対応したOperatorインタフェースの実装を使ってlift()しています。なのでカスタムOperatorを作ってlift()で独自の演算をするといった事も可能になります。

merge(), concat()

merge()concat()は2つのObservable<T>を連結するメソッドです。コレクション操作においては動作に違いはないです。非同期処理などで時間軸の概念が入ってくると動作に違いが出てきます。merge()は2つのObervableを要素の発行時間順に合成し、concat()は時間に関わらずObservableを前後に連結します。

Observable.merge(Observable.range(0, 3), Observable.range(10, 3))
//0,1,2,10,11,12

Observable.concat(Observable.range(0, 3), Observable.range(10, 3));
//0,1,2,10,11,12

Observableの世界から値を取り出す

今までの例では常にObservable<T>を返す様な処理を書いてきました。値の取り出しはsubscribe()で行いますが、どうしてもObservableにくるまれた値を直接取り出したい時があります。そこでtoBlocking()を使います。

Observable<T>toBlocking()するとBlockingObservable<T>が得られます。このクラスはObservableの世界と外の世界を繋いでくれます。

以下はtoBlocking()を使ってhead関数を実装する例です。すこし大袈裟ですが、tail関数も実装しようとすると統一的で簡潔に書ける事がわかります。

public <T> T head(List<T> list) {
  if (list == null || list.isEmpty()) {
    return null;
  }
  return Observable.from(list).toBlocking().first();
}

次にtail関数を実装してみます。普通に実装しようとするとちょっと冗長になりますが、RxJavaを使うと簡潔に書けるようになります。以下の例では新たにskip()を使ってます。これはもう説明はいらないんじゃないでしょうか。

public <T> List<T> tail(List<T> list) {
  if (list == null || list.isEmpty()) {
    return Collections.emptyList();
  }
  return Observable.from(list).skip(1).toList().toBlocking().single();
}

まとめ

いかがだったでしょうか。コレクション操作に的を絞っているので簡単だったと思います。RxJavaの文法に慣れたら非同期処理を試したりFRPに手を出してみたりアプリケーションアーキテクチャにどのように適用するか検討したりしていくといいと思います。AsyncTaskの置き換えなどはすぐ出来るようになると思います。その頃には大分楽しくなってるでしょう。

学習に当たってはソースを読むほか以下のページを行ったり来たりすると良いと思います。他にもオススメのドキュメントがあれば教えてください。

*1:ただしsubscribe()の中は外側とは完全に空間が違うのでbreakとかtry-catchはできません注意。

*2:型変換せずTに副作用を与えるといった書き方も出来ますが、そうするとsourceも書き換わるので好ましくないと思います。