当前位置:网站首页>【深入kotlin】 - Flow 进阶
【深入kotlin】 - Flow 进阶
2022-06-11 05:23:00 【颐和园】
Flow 上下文
Flow 的收集动作总是发生在调用协程的上下文当中,而非定义 Flow 的上下文。
fun log(msg: String) = println("[${
Thread.currentThread().name}], $msg")
fun myMethod(): Flow<Int> = flow {
log("started")
for(i in 1..3){
emit(i)
}
}
fun main() = runBlocking {
myMethod().collect {
log("collected: $it")}
}
运行输出结果如下(打开 debug 参数):
[main @coroutine#1], started
[main @coroutine#1], collected: 1
[main @coroutine#1], collected: 2
[main @coroutine#1], collected: 3
可以看出 flow 运行于 collect 调用时的协程,即 runBlocking 开启的协程。这无疑会阻塞住主线程。因此我们可以将 flow 运行在其它上下文:
private fun log(msg:String) = println("[${
Thread.currentThread().name}], $msg")
private fun myMethod(): Flow<Int> = flow {
withContext(Dispatchers.Default) {
for(i in 1..4){
Thread.sleep(100)
emit(i)
}
}
}
fun main() = runBlocking {
myMethod().collect {
log(it) }
}
程序报出如下异常:
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@405324edf, [email protected]],
but emission happened in [DispatchedCoroutine{Active}@40452e23e, DefaultDispatcher].
...
大意是收集线程发生在主线程,但 emit 线程发生在后台线程。withContext(Dispatchers.Default) 一句修改了 flow 的上下文,将 flow 的分发器修改到了 Dispatchers.Default。但 collect 的代码使用的分发器则是 runBlocking 的分发器。
flowOn 运算符
为了解决这个问题,kotlin 引入了 flowOn 方法:
private fun myMethod(): Flow<Int> = flow {
for(i in 1..4){
Thread.sleep(100)
log("emit: $i")
emit(i)
}.flowOn(Dispatchers.Default)
}
fun main() = runBlocking {
myMethod().collect {
println("collect: $it") }
}
运行程序,错误不再出现。flowOn 方法允许修改 中间操作 的 上下文,使得中间操作和终止操作运行于不同的上下文。flowOn 操作改变了 Flow 本身默认的顺序。现在收集操作和发射操作发生在不同的协程(线程):
[DefaultDispatcher-worker-1 @coroutine#2], emit: 1
[main @coroutine#1], collect: 1
[DefaultDispatcher-worker-1 @coroutine#2], emit: 2
[main @coroutine#1], collect: 2
[DefaultDispatcher-worker-1 @coroutine#2], emit: 3
[main @coroutine#1], collect: 3
[DefaultDispatcher-worker-1 @coroutine#2], emit: 4
[main @coroutine#1], collect: 4
缓冲
如果让 flow 的不同部分使用不同的协程执行,将有助于实现并行操作从而提升性能。这就会用到缓冲的概念。典型地,我们可以让 emit 操作和collect 操作并行。这样 emit 操作在collect 操作进行的同时不会被阻塞,而是继续 emit 下一个元素并将结果缓冲到缓存里,下一次收集将直接从缓存中读取。
privatet fun myMethod(): Flow<Int> = flow {
for(i in 1..4){
delay(100) // 模拟耗时操作
emit(i)
}
}
fun main() = runBlocking {
val time = measureTimeMillis {
myMethod().collect{
delay(200)
println(it)
}
}
println(time)
}
这里,发射一个元素需要0.1秒,收集一个元素耗时0.2秒,所以4个元素总共耗时 1.2 秒,所以输出结果如下:
1
2
3
4
1223
如果使用缓冲操作,则不需要耗时这么多时间了:
myMethod().buffer().collect{
delay(200)
println(it)
}
唯一的修改就是在 collect 之前增加了一个buffer() 操作,这样元素发射之后不会等待收集完成,而是直接将结果缓冲到缓存里。真正搜集到的实际上是缓存里的元素。这样无疑会加快整个发射和搜集的过程:
1
2
3
4
1003
第一次收集发生在 0.3秒(0.1秒发射+0.2秒收集),第二次收集发生在 0.5 秒,第三次收集发生在 0.7 秒,第四次收集发生在0.9 秒。加上一些建立、销毁缓冲的时间,大致总耗时1秒左右。
缓冲和 flowOn 有一定关系。本质上 flowOn 操作需要改变 CoroutineDispatcher 时也会使用同样的缓冲机制,比如上面 flowOn 的例子。
Flow 的组合
将两个flow的内容合并为一个flow。
fun main()=runBlocking<Unit> {
val nums = (1..5).asFlow()
val strings = flowOf("a","b","c","d","e")
// zip - 合并操作
nums.zip(strings){
a, b -> "$a$b"
}.collect{
println(it) }
}
zip 操作将第一个流和第二个流中的元素依序取出,然后按照第二个参数的lambda 进行处理,然后发射。输出结果如下所示:
1a
2b
3c
4d
5e
打平操作
将一个包含了 f low 的 flow (类似 Flow<Flow)转换成不包含 flow 的flow(Flow)操作(类似将二维数组转换成一维数组)。
private fun myMethod(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach{
// 1. 遍历元素
delay(100)
}.flatMapConcat{
// 2. 打平
myMethod(it)
}.collect {
// 3. 终止
println("$it: ${
System.currentTimeMillis()-startTime} ms")
}
}
myMethod 方法
输出结果如下:
1: 144 ms
1: 646 ms
2: 751 ms
2: 1256 ms
3: 1360 ms
3: 1865 ms
将1…3 转换为流,在每个元素(1…3)上 delay 0.1 秒。
将 3 个元素(1…3)转换为 3 个流,myMethod 方法每次都会产生两个相同元素的流(emit 两次)。这样 1…3 就会变成:
[1,1],[2,2],[3,3]
而且由于每个流之间有一个 delay 0.1秒,所以打印输出时,两个相同数字之间间隔0.5秒, 而两两一组之间间隔 0.1 秒。
此外 flatMapConcat 会将它从二维的流打平成一维的流,于是这个流在收集时会变成 6 个单独的元素:
[1,1,2,2,3,3]
Flow 的异常
可以对 flow的异常进行传统的 try…catch 处理:
private fun myMethod():Flow<Int> = flow {
for(i in 1..3) {
println("emit: $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
try {
myMethod().collect {
println(it)
check(it <= 1){
// check 函数的作用是检查第一个参数,当true 时执行第二个参数(lambda),否则抛出一个 IllegalStateException,同时 error message 的内容时lambda 表达式返回的 Any 类型。
"collect $it"
}
}
}catch(e: Throwable) {
println("Caught $e")
}
}
执行上述代码,输出:
emit: 1
1
emit: 2
2
Caught java.lang.IllegalStateException: collect 2
当第二个元素为 2 时,check 判断失败,抛出异常,被 try…catch 所捕获。
在上面的例子中,异常发生在收集阶段。但 try…catch 也可以捕获flow 的发射阶段和中间操作阶段。
private fun myMethod():Flow<String> = flow {
for(i in 1..3) {
println("emit: $i")
emit(i)
}.map{
value ->
check(value <= 1) {
"crash on $value" }
"string $value"
}
}
fun main()=runBlocking<Unit> {
try{
myMethod().collect{
println(it) }
}catch(e: Throwable){
println("caught $e")
}
}
运行程序,输出:
emit: 1
string 1
emit: 2
caught java.lang.IllegalStateException: crash on 2
当元素 大于 1 时,异常被捕获,并且 crash on 2 被保存在了异常的 message 当中。
完成
当 flow 执行完毕之后,可以额外追加一个完成动作。这个动作可以是命令式的,也可以时声明式的。
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main()=runBlocking<Unit> {
try {
myMethod().collect {
println(it) }
}finally {
println("finally")
}
}
finally 块中的代码最终都会得到执行,无论 flow 是正常执行完还是抛出异常,这就是命令式,利用了 try…finally 语句:
1
2
...
10
finally
声明式则比较灵活,它就是 onCompletion 中间操作,它在 collect 或 取消 之后执行:
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {
myMethod().onCompletion{
println("onCompletion") }
.collect {
println(it) }
}
输出结果同之前一模一样。值得注意的是,onCompletion 的 action 参数是一个带参数的 lambda 表达式,这个参数的类型是可空的 Throwable。可以利用这个参数获取抛出的异常。
private fun myMethod(): Flow<Int> = flow {
emit(1)
throw RuntimeException() // 抛出异常
}
fun main() = runBlocking {
myMethod().onCompletion {
e -> if(e!=null) println("Flow stopped with an Exception.")}
.catch {
e -> println("Caught an Exception") }
.collect {
println(it) }
}
其中,catch 操作也是一个中间操作,用于捕获 flow 中的异常。
打印结果:
1
Flow stopped with an Exception.
Caught an Exception
但是有一点需要注意,onCompletion 只会看到来自于 flow 上游的异常,但无法捕获下游异常。
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {
myMethod().onCompletion{
e -> println("Flow stopped with an Exception $e.")}
.collect {
value ->
check{
value <= 1} {
"collect $value" }
println(value)
}
}
输出结果如下:
1
Flow stopped with an Exception null.
Exception in thread "main" java.lang.IllegalStateException: Collect 2
...
这里,onCompletion 把 e 打印成 null,说明它并没有捕获到这个异常,因为 onCompletion 只是一个中间操作,而 collect 是终止操作,从这个角度上看 collect 是 onCompletion 的下游操作,因此 collect 中出现的异常无法被 onCompletion 捕获。
取消
flow 的取消实际上是通过协程的取消来实现的,本身没有所谓的取消操作。比如我们取消一个 cellect 操作,前提是 flow 本身在一个可取消的挂起函数(如 delay) 中被挂起了:
private fun myMethod(): Flow<Int> = flow {
for (i int 1..4) {
delay(100)
println("Emit: $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(280) {
// 设置超时时间 280 毫秒
myMethod().collect {
println(it) }
}
println("Done.")
}
控制台输出如下:
Emit: 1
1
Emit: 2
2
Done.
第3、4次循环因超时被取消。这里,withTimeoutOrNull 就是一个可取消的挂起函数。
边栏推荐
- Target detection - personal understanding of RCNN series
- Customize the layout of view Foundation
- Analyze while doing experiments -ndk article -jni uses registernatives for explicit method registration
- JVM tuning 6: GC log analysis and constant pool explanation
- 自定义View基础之Layout
- MySQL nested sorting: first sort and filter the latest data, and then customize the sorting of this list
- Poverty has nothing to do with suffering
- Share 𞓜 jointly pre training transformers on unpaired images and text
- Recommend a free intranet penetration open source software that can be used in the local wechat official account under test
- WinForm (I) introduction to WinForm and use of basic controls
猜你喜欢

Preliminary test of running vins-fusion with zed2 binocular camera

mysql字符串转数组,合并结果集,转成数组

Combien de courant le câblage des PCB peut - il supporter?

Share | guide language image pre training to achieve unified visual language understanding and generation

Top 100 video information of station B

在未来,机器人或 AI 还有多久才能具备人类的创造力?

Iris dataset - Introduction to machine learning

1. use alicloud object OSS (basic)

Topological sorting

Apply the intelligent OCR identification technology of Shenzhen Yanchang technology to break through the bottleneck of medical bill identification at one stroke. Efficient claim settlement is not a dr
随机推荐
Section III: structural characteristics of cement concrete pavement
27. Remove elements
Huawei equipment is configured to access the virtual private network through GRE tunnel
Project - Smart City
高斯白噪声(white Gaussian noise,WGN)
Some details about memory
code
wxParse解析iframe播放视频
The programmers of a large factory after 95 were dissatisfied with the department leaders, and were sentenced for deleting the database and running away
ROS compilation error: could not find a package configuration file provided by "XXX“
Use acme SH automatically apply for a free SSL certificate
Titanic rescued - re exploration of data mining (ideas + source code + results)
Sealem Finance打造Web3去中心化金融平台基础设施
Opencv learning path (2-2) -- Deep parsing namedwindow function
Analyze while doing experiments -ndk article -jni uses registernatives for explicit method registration
Simple linear regression of sklearn series
Sealem finance builds Web3 decentralized financial platform infrastructure
NVIDIA SMI has failed because it could't communicate with the NVIDIA driver
Recommend a free intranet penetration open source software that can be used in the local wechat official account under test
Stone game -- leetcode practice