当前位置:网站首页>深潜Kotlin协程(二十一):Flow 生命周期函数
深潜Kotlin协程(二十一):Flow 生命周期函数
2022-07-05 16:08:00 【RikkaTheWorld】
系列电子书:传送门
Flow 可以想象成一个管道,请求的值在一个方向上流动,而相应产生的值在另一个方向上流动。当 flow 完成或出现异常时,这些信息也会被传递,并关闭途中的中间步骤。因此,当这些值开始流动时,我们可以监听值、异常或其它特征事件(如开始或完成)。为此,我们使用了 onEach
、 onStart
、onCompletion
、onEmpty
和 catch
等方法。下面让我逐一解释这些生命周期方法。
onEach
为了响应每个流动的值,我们使用 onEach
函数。
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach {
print(it) }
.collect() // 1234
}
onEach
的 lambda 表达式是挂起的,元素依次按顺序(顺序)被处理。因此,如果我们在 onEach
添加 delay
函数,我们将延迟每个值的流动。
suspend fun main() {
flowOf(1, 2)
.onEach {
delay(1000) }
.collect {
println(it) }
}
// (1 sec)
// 1
// (1 sec)
// 2
onStart
onStart
函数设置一个监听器,一旦 flow 启动,就会回调该监听器。需要注意的是, onStart
并不会等待第一个元素的响应,而是当我们请求第一个元素时,它就会被调用。
suspend fun main() {
flowOf(1, 2)
.onEach {
delay(1000) }
.onStart {
println("Before") }
.collect {
println(it) }
}
// Before
// (1 sec)
// 1
// (1 sec)
// 2
在 onStart
(以及在 onCompletion、onEmpty、catch) 中可以发射元素,这些元素将该处往下流动。
suspend fun main() {
flowOf(1, 2)
.onEach {
delay(1000) }
.onStart {
emit(0) }
.collect {
println(it) }
}
// 0
// (1 sec)
// 1
// (1 sec)
// 2
onCompletion
有几种方法可以完成一个 flow。最常见的是在 flow 构建器完成时(比如发送了最后一个元素),尽管有可能会出现在未捕获异常或者协程取消的情况下。在所有这些情况下,我们都可以使用 onCompletion
方法为 flow 的完成添加一个监听器。
suspend fun main() = coroutineScope {
flowOf(1, 2)
.onEach {
delay(1000) }
.onCompletion {
println("Completed") }
.collect {
println(it) }
}
// (1 sec)
// 1
// (1 sec)
// 2
// Completed
suspend fun main() = coroutineScope {
val job = launch {
flowOf(1, 2)
.onEach {
delay(1000) }
.onCompletion {
println("Completed") }
.collect {
println(it) }
}
delay(1100)
job.cancel()
}
// (1 sec)
// 1
// (0.1 sec)
// Completed
在 Android 中,我们经常使用 onStart
来展示进度条(等待网络响应的指示器),之后我们使用 onCompletion
来隐藏它。
fun updateNews() {
scope.launch {
newsFlow()
.onStart {
showProgressBar() }
.onCompletion {
hideProgressBar() }
.collect {
view.showNews(it) }
}
}
onEmpty
flow 可能在不发射任何值的情况下完成,有可能是出现意外状况。对于这种情况,有一个 onEmpty
函数,它在 flow 完成但没有发出任何元素时会回调。我们可以使用 onEmpty
来发射一些默认值。
suspend fun main() = coroutineScope {
flow<List<Int>> {
delay(1000) }
.onEmpty {
emit(emptyList()) }
.collect {
println(it) }
}
// (1 sec)
// []
catch
在 flow 构建器或处理值的任何时刻,都可能发生异常。这样的异常会向下流动,关闭途中每个处理步骤;然而,它是可以被捕获和管理的。为此,我们可以使用 catch
方法。这个监听器接收异常作为参数,并允许你执行恢复操作。
class MyError : Throwable("My error")
val flow = flow {
emit(1)
emit(2)
throw MyError()
}
suspend fun main(): Unit {
flow.onEach {
println("Got $it") }
.catch {
println("Caught $it") }
.collect {
println("Collected $it") }
}
// Got 1
// Collected 1
// Got 2
// Collected 2
// Caught MyError: My error
在上面的例子中,onEach
没有对异常做出响应。同样的情况也发生在其他功能上,如 map
、 filter
等。只有 onCompletion
的处理才会被调用。
catch
方法通过捕获来阻止异常的传播。虽然前面的步骤已经完成了,但是 catch
仍然可以发射出新的值,并保持 flow 的其余部分处于活跃状态。
val flow = flow {
emit("Message1")
throw MyError()
}
suspend fun main(): Unit {
flow.catch {
emit("Error") }
.collect {
println("Collected $it") }
}
// Collected Message1
// Collected Error
catch
只会对上游定义的函数中抛出的异常做出响应(可以想象,当下流还有异常时,仍需要捕获异常)。
在 Android 中,我们经常使用 catch
来展示 flow 中发生的异常:
fun updateNews() {
scope.launch {
newsFlow()
.catch {
view.handleError(it) }
.onStart {
showProgressBar() }
.onCompletion {
hideProgressBar() }
.collect {
view.showNews(it) }
}
}
我们也可以使用 catch
来发出默认数据以显示在屏幕上,比如空列表。
fun updateNews() {
scope.launch {
newsFlow()
.catch {
view.handleError(it)
emit(emptyList())
}
.onStart {
showProgressBar() }
.onCompletion {
hideProgressBar() }
.collect {
view.showNews(it) }
}
}
未捕获的异常
flow 中出现未捕获的异常则会立即取消该 flow,并且 collect
会重新抛出此异常。这种行为是挂起函数的典型行为, coroutineScope
也有同样的行为,典型的应对方法是使用 try-catch 块在 flow 的外部捕获异常:
val flow = flow {
emit("Message1")
throw MyError()
}
suspend fun main(): Unit {
try {
flow.collect {
println("Collected $it") }
} catch (e: MyError) {
println("Caught")
}
}
// Collected Message1
// Caught
请注意,使用 catch
并不能防止终端操作中出现异常(因为 catch
不能用最后一个操作之后)。因此,如果 collect
中有一个异常,它将不会捕获,而是将抛出一个错误。
val flow = flow {
emit("Message1")
emit("Message2")
}
suspend fun main(): Unit {
flow.onStart {
println("Before") }
.catch {
println("Caught $it") }
.collect {
throw MyError() }
}
// Before
// Exception in thread "..." MyError: My error
因此,通常的做法是将逻辑层从 collect
移动到 onEach
,并将其放在 catch
之前。当我们怀疑 collect
的逻辑可能会出现异常时,这样做会特别有用。如果我们将逻辑操作从 collect
中挪走,就可以确定 catch
能捕获所有的异常。
val flow = flow {
emit("Message1")
emit("Message2")
}
suspend fun main(): Unit {
flow.onStart {
println("Before") }
.onEach {
throw MyError() }
.catch {
println("Caught $it") }
.collect()
}
// Before
// Caught MyError: My error
flowOn
传给 flow 操作(如 onEach
、onStart
、onCompletion
等)的 lambda 表达式及其构建器(如 flow {..}
或 channelFlow{..}
)都是挂起的。挂起函数需要有一个上下文,并且应该与它们的父协程相关联(结构化并发)。因此,你可能想知道这些函数的上下文都来自哪里。答案是:从调用的 collect
的上下文中而来。
fun usersFlow(): Flow<String> = flow {
repeat(2) {
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
withContext(CoroutineName("Name1")) {
users.collect {
println(it) }
}
withContext(CoroutineName("Name2")) {
users.collect {
println(it) }
}
}
// User0 in Name1
// User1 in Name1
// User0 in Name2
// User1 in Name2
这段代码是如何工作的? 终端操作会调用来自上游的元素,从而提供协程上下文。然而,它也可以通过 flowOn
函数进行修改(修改协程上下文):
suspend fun present(place: String, message: String) {
val ctx = coroutineContext
val name = ctx[CoroutineName]?.name
println("[$name] $message on $place")
}
fun messagesFlow(): Flow<String> = flow {
present("flow builder", "Message")
emit("Message")
}
suspend fun main() {
val users = messagesFlow()
withContext(CoroutineName("Name1")) {
users
.flowOn(CoroutineName("Name3"))
.onEach {
present("onEach", it) }
.flowOn(CoroutineName("Name2"))
.collect {
present("collect", it) }
}
}
// [Name3] Message on flow builder
// [Name2] Message on onEach
// [Name1] Message on collect
请记住, flowOn
只适用于 flow 中位于上游的函数。
launchIn
collect
是一个挂起函数,它会挂起一个协程直到 flow 完成。我们通常会使用 launch
构建器对其进行包装,以便 flow 处理可以在另一个协程上启动。为了优化这种情况,有一个 launchIn
函数,它会在作为参数传递的作用域上调用 collect
。
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job =
scope.launch {
collect() }
launchIn
通常用来在一个单独的协程中启动一个 flow。
suspend fun main(): Unit = coroutineScope {
flowOf("User1", "User2")
.onStart {
println("Users:") }
.onEach {
println(it) }
.launchIn(this)
}
// Users:
// User1
// User2
总结
在本章中,我们学习了不同的 flow 功能。现在我们知道如何在 flow 开始时、结束时或者在每个元素上执行某些操作;我们还知道如何捕获异常,以及如何在新的协程中启动 flow。这些都是被广泛使用的基本工具,特别是在 Android 开发中。例如,下面是一段 Android 中使用 flow 的代码:
fun updateNews() {
newsFlow()
.onStart {
showProgressBar() }
.onCompletion {
hideProgressBar() }
.onEach {
view.showNews(it) }
.catch {
view.handleError(it) }
.launchIn(viewModelScope)
}
边栏推荐
- 降本40%!Redis多租户集群的容器化实践
- Flet教程之 11 Row组件在水平数组中显示其子项的控件 基础入门(教程含源码)
- Spring Festival Limited "forget trouble in the year of the ox" gift bag waiting for you to pick it up~
- 英特尔第13代Raptor Lake处理器信息曝光:更多核心 更大缓存
- 有序链表集合求交集 方法 总结
- How to set the WiFi password of the router on the computer
- 解决CMakeList find_package找不到Qt5,找不到ECM
- Enterprise backup software Veritas NetBackup (NBU) 8.1.1 installation and deployment of server
- Is it safe for Guotai Junan to open an account online
- Jarvis OJ shell流量分析
猜你喜欢
Flet教程之 12 Stack 重叠组建图文混合 基础入门(教程含源码)
Migrate /home partition
Win11 prompt: what if the software cannot be downloaded safely? Win11 cannot download software safely
效果编辑器新版上线!3D渲染、加标注、设置动画,这次一个编辑器就够了
Seaborn绘制11个柱状图
为季前卡牌游戏 MotoGP Ignition Champions 做好准备!
Benji Bananas 会员通行证持有人第二季奖励活动更新一览
【 brosser le titre 】 chemise culturelle de l'usine d'oies
如何将mysql卸载干净
OneForAll安装使用
随机推荐
Pspnet | semantic segmentation and scene analysis
Jarvis OJ Webshell分析
DeSci:去中心化科学是Web3.0的新趋势?
PSPNet | 语义分割及场景分析
How to uninstall MySQL cleanly
APICloud云调试解决方案
Domestic API management artifact used by the company
【刷題篇】鹅廠文化衫問題
Android privacy sandbox developer preview 3: privacy, security and personalized experience
Enterprise backup software Veritas NetBackup (NBU) 8.1.1 installation and deployment of server
scratch五彩糖葫芦 电子学会图形化编程scratch等级考试三级真题和答案解析2022年6月
详解SQL中Groupings Sets 语句的功能和底层实现逻辑
Jarvis OJ shell流量分析
Practice independent and controllable 3.0 and truly create the open source business of the Chinese people
漫画:什么是服务熔断?
二叉树相关OJ题
漫画:什么是蓝绿部署?
漫画:什么是八皇后问题?
How to set the WiFi password of the router on the computer
Desci: is decentralized science the new trend of Web3.0?