FlowのchannelFlowを使ってRxBindingを置き換える
FlowのflowViaChannelを使ってRxBindingを置き換える - visible trueでKotlin Coroutines 1.2.xでFlowというコールドストリームをサポートするクラスや関数群が登場しました。
ってことでflowViaChannel関数について書いたら、Kotlin Coroutines 1.3.0-M1でflowViaChannel関数がdeprecatedになりました。previewなのでそういうこともあるでしょう。
ということで、FlowのflowViaChannelを使ってRxBindingを置き換える
改め、FlowのchannelFlowを使ってRxBindingを置き換える
話をします。
1.3.0-M1でなにが変わったか
Release 1.3.0-M1 · Kotlin/kotlinx.coroutines · GitHubによるとFlowにおけるコンテキスト保存の不変性について見直したとあります。コンテキスト保存の不変性というのは、Flowのブロックをどのコンテキストで実行するかを利用者がコントロールできることを保証するものです。
Flowのコンテキスト保存の不変性
たとえば次のコードはすべて同じスレッドで実行されます。
suspend fun a() { println("start: ${Thread.currentThread().id}") flow { println("emit: ${Thread.currentThread().id}") repeat(3) { emit(it) } } .collect { println("collect: ${Thread.currentThread().id}") } }
出力は次のようになります。
start: 11 emit: 11 collect: 11 collect: 11 collect: 11
flowOn関数を使うと、upstreamの実行スレッドを変更できます。
suspend fun b() { println("start: ${Thread.currentThread().id}") flow { println("emit: ${Thread.currentThread().id}") repeat(3) { emit(it) } } .flowOn(Dispatchers.IO) // ここより上流はDispatchers.IOで動作する .collect { println("collect: ${Thread.currentThread().id}") } }
出力は次のようになります。
start: 11 emit: 14 collect: 11 collect: 11 collect: 11
さて、次の場合はどうでしょうか*1。
suspend fun c() { println("start: ${Thread.currentThread().id}") flow { kotlinx.coroutines.withContext(Dispatchers.IO) { println("emit: ${Thread.currentThread().id}") repeat(3) { emit(it) } } } .collect { println("collect: ${Thread.currentThread().id}") } }
flow関数のブロック内で新たなコンテキストを使おうとしています。これだと利用者が例えばDispatchers.Main
で動作させたいと思ってもコントロールできませんよね。このコードを実行すると例外が投げられます。
start: 11 emit: 14 Flow invariant is violated: flow was collected in BlockingEventLoop@71cde863, but emission happened in LimitingDispatcher@73ad1220[dispatcher = DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead
このようにflowのブロックの実行コンテキストは、呼び出し元によってコントロールできることを保証するというのが、コンテキスト保存の不変性です。
Flowのコンテキスト保存の不変性の見直し
コンテキスト保存の不変性の見直しについては次のissueが詳しいです。 Flow context preserving property, thread safety and context changing · Issue #1210 · Kotlin/kotlinx.coroutines · GitHub
既存のコンテキスト保存の不変性では対応しきれない(あるいはコストがかかる)ユースケースがでてきたようです。 そのために新たに、異なるコルーチンからFlowCollector*2のemit関数の呼び出しを禁止するようになりました。 またchannelFlow関数を導入し、コンテキスト保存の不変性を保証しつつ、簡単に異なる実行コンテキストを利用できるようになりました。
その流れでflowViaChannel関数が非推奨になりました。非推奨になった明確な理由の説明がパッと見当たらなかったのですが、flowViaChannel関数のブロックがCoroutineScope.(channel: SendChannel<T>) -> Unit
であるのに対して、channelFlow関数のブロックがsuspend ProducerScope<T>.() -> Unit
であることから、ブロックが完了すると自動的にChannelをクローズするようにすることで、予期しない不具合を起こりづらくしてるのかな〜と思いました*3。
channelFlowを使ってRxBindingを置き換える
ではflowViaChannel関数を使うのをやめてchannelFlow関数を使ってRxBindingを置き換えましょう
まずはbeforeのflowViaChannel関数を使うパターンです。
fun TextView.textChangeAsFlow() = flowViaChannel<String?> { channel -> channel.offer(text.toString()) val textWatcher = addTextChangedListener { channel.offer(it?.toString()) } channel.invokeOnClose { removeTextChangedListener(textWatcher) } }
実はこれ1.3.0-M1
では正しく動作しません。flowViaChannel関数の実装がchannelFlow関数を使う形に変更されたために、ブロックを抜けるとchannelがクローズされてしまうのです!怖いですね。
channelFlow関数を使った実装は次です。
fun TextView.textChangeAsFlow() = channelFlow<String?> { channel.offer(text.toString()) val textWatcher = addTextChangedListener { channel.offer(it?.toString()) } awaitClose { removeTextChangedListener(textWatcher) } }
channel.invokeOnClose
の代わりにawaitClose関数を使っています。これはProducerScopeに追加された拡張関数で、呼び出し元がcancelをするまで待ってくれます。便利ですね。
おわりに
Kotlin Coroutines 1.3.0-M1でFlowはPreviewからExperimentalになりました。まだ変更はいろいろありそうですが進捗してる感じがあってstableが楽しみですね。