visible true

技術的なメモを書く

FlowのchannelFlowを使ってRxBindingを置き換える

FlowのflowViaChannelを使ってRxBindingを置き換える - visible trueKotlin Coroutines 1.2.xでFlowというコールドストリームをサポートするクラスや関数群が登場しました。ってことでflowViaChannel関数について書いたら、Kotlin Coroutines 1.3.0-M1でflowViaChannel関数がdeprecatedになりました。previewなのでそういうこともあるでしょう。

ということで、FlowのflowViaChannelを使ってRxBindingを置き換える 改め、FlowのchannelFlowを使ってRxBindingを置き換える 話をします。

1.3.0-M1でなにが変わったか

Release 1.3.0-M1 · Kotlin/kotlinx.coroutines · GitHubによるとFlowにおけるコンテキスト保存の不変性について見直したとあります。コンテキスト保存の不変性というのは、Flowのブロックをどのコンテキストで実行するかを利用者がコントロールできることを保証するものです。

Flowのコンテキスト保存の不変性

たとえば次のコードはすべて同じスレッドで実行されます。

suspend fun a() {
  println("start: ${Thread.currentThread().id}")
  flow {
    println("emit: ${Thread.currentThread().id}")
    repeat(3) { emit(it) }
  }
    .collect {
      println("collect: ${Thread.currentThread().id}")
    }
}

出力は次のようになります。

start: 11
emit: 11
collect: 11
collect: 11
collect: 11

flowOn関数を使うと、upstreamの実行スレッドを変更できます。

suspend fun b() {
  println("start: ${Thread.currentThread().id}")
  flow {
    println("emit: ${Thread.currentThread().id}")
    repeat(3) { emit(it) }
  }
    .flowOn(Dispatchers.IO) // ここより上流はDispatchers.IOで動作する
    .collect {
      println("collect: ${Thread.currentThread().id}")
    }
}

出力は次のようになります。

start: 11
emit: 14
collect: 11
collect: 11
collect: 11

さて、次の場合はどうでしょうか*1

suspend fun c() {
  println("start: ${Thread.currentThread().id}")
  flow {
    kotlinx.coroutines.withContext(Dispatchers.IO) {
      println("emit: ${Thread.currentThread().id}")
      repeat(3) { emit(it) }
    }
  }
    .collect {
      println("collect: ${Thread.currentThread().id}")
    }
}

flow関数のブロック内で新たなコンテキストを使おうとしています。これだと利用者が例えばDispatchers.Mainで動作させたいと思ってもコントロールできませんよね。このコードを実行すると例外が投げられます。

start: 11
emit: 14

Flow invariant is violated: flow was collected in BlockingEventLoop@71cde863, but emission happened in LimitingDispatcher@73ad1220[dispatcher = DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead

このようにflowのブロックの実行コンテキストは、呼び出し元によってコントロールできることを保証するというのが、コンテキスト保存の不変性です。

Flowのコンテキスト保存の不変性の見直し

コンテキスト保存の不変性の見直しについては次のissueが詳しいです。 Flow context preserving property, thread safety and context changing · Issue #1210 · Kotlin/kotlinx.coroutines · GitHub

既存のコンテキスト保存の不変性では対応しきれない(あるいはコストがかかる)ユースケースがでてきたようです。 そのために新たに、異なるコルーチンからFlowCollector*2のemit関数の呼び出しを禁止するようになりました。 またchannelFlow関数を導入し、コンテキスト保存の不変性を保証しつつ、簡単に異なる実行コンテキストを利用できるようになりました。

その流れでflowViaChannel関数が非推奨になりました。非推奨になった明確な理由の説明がパッと見当たらなかったのですが、flowViaChannel関数のブロックがCoroutineScope.(channel: SendChannel<T>) -> Unitであるのに対して、channelFlow関数のブロックがsuspend ProducerScope<T>.() -> Unitであることから、ブロックが完了すると自動的にChannelをクローズするようにすることで、予期しない不具合を起こりづらくしてるのかな〜と思いました*3

channelFlowを使ってRxBindingを置き換える

ではflowViaChannel関数を使うのをやめてchannelFlow関数を使ってRxBindingを置き換えましょう

まずはbeforeのflowViaChannel関数を使うパターンです。

fun TextView.textChangeAsFlow() =
  flowViaChannel<String?> { channel ->
    channel.offer(text.toString())
    val textWatcher = addTextChangedListener {
      channel.offer(it?.toString())
    }
    channel.invokeOnClose {
      removeTextChangedListener(textWatcher)
    }
  }

実はこれ1.3.0-M1では正しく動作しません。flowViaChannel関数の実装がchannelFlow関数を使う形に変更されたために、ブロックを抜けるとchannelがクローズされてしまうのです!怖いですね。

channelFlow関数を使った実装は次です。

fun TextView.textChangeAsFlow() =
  channelFlow<String?> {
    channel.offer(text.toString())
    val textWatcher = addTextChangedListener {
      channel.offer(it?.toString())
    }
    awaitClose {
      removeTextChangedListener(textWatcher)
    }
  }

channel.invokeOnCloseの代わりにawaitClose関数を使っています。これはProducerScopeに追加された拡張関数で、呼び出し元がcancelをするまで待ってくれます。便利ですね。

おわりに

Kotlin Coroutines 1.3.0-M1でFlowはPreviewからExperimentalになりました。まだ変更はいろいろありそうですが進捗してる感じがあってstableが楽しみですね。

*1:withContext関数をFQCNで書いているのは、FlowCollector.withContextの利用が禁止されていてコンパイルエラーになるためです。

*2:flow関数のブロックがsuspend FlowCollector.() -> Unitです

*3:普通に https://github.com/Kotlin/kotlinx.coroutines/pull/1214に書いてました..