当前位置:网站首页>[go deep into kotlin] - flow advanced
[go deep into kotlin] - flow advanced
2022-06-11 05:26:00 【the Summer Palace】
Flow Context
Flow The collection action of always occurs in the context of the calling coroutine , Not definition Flow The context of .
fun log(msg: String) = println("[${
Thread.currentThread().name}], $msg")
fun myMethod(): Flow<Int> = flow {
log("started")
for(i in 1..3){
emit(i)
}
}
fun main() = runBlocking {
myMethod().collect {
log("collected: $it")}
}
Run the output as follows ( open debug Parameters ):
[main @coroutine#1], started
[main @coroutine#1], collected: 1
[main @coroutine#1], collected: 2
[main @coroutine#1], collected: 3
It can be seen that flow To run on collect Calling process , namely runBlocking Open collaboration . This will undoubtedly block the main thread . So we can flow Run in other contexts :
private fun log(msg:String) = println("[${
Thread.currentThread().name}], $msg")
private fun myMethod(): Flow<Int> = flow {
withContext(Dispatchers.Default) {
for(i in 1..4){
Thread.sleep(100)
emit(i)
}
}
}
fun main() = runBlocking {
myMethod().collect {
log(it) }
}
The program reports the following exception :
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@405324edf, [email protected]],
but emission happened in [DispatchedCoroutine{Active}@40452e23e, DefaultDispatcher].
...
The collection thread occurs in the main thread , but emit The thread occurs in the background thread .withContext(Dispatchers.Default) A sentence has been modified flow The context of , take flow The distributor of has been modified to Dispatchers.Default. but collect The distributor used by our code is runBlocking The dispenser of .
flowOn Operator
To solve this problem ,kotlin Introduced flowOn Method :
private fun myMethod(): Flow<Int> = flow {
for(i in 1..4){
Thread.sleep(100)
log("emit: $i")
emit(i)
}.flowOn(Dispatchers.Default)
}
fun main() = runBlocking {
myMethod().collect {
println("collect: $it") }
}
Run the program , The error no longer occurs .flowOn Method allows you to modify Intermediate operation Of Context , Make the intermediate operation and the termination operation run in different contexts .flowOn The operation has changed Flow The default order . Now the collection operation and the launch operation take place in different co processes ( Threads ):
[DefaultDispatcher-worker-1 @coroutine#2], emit: 1
[main @coroutine#1], collect: 1
[DefaultDispatcher-worker-1 @coroutine#2], emit: 2
[main @coroutine#1], collect: 2
[DefaultDispatcher-worker-1 @coroutine#2], emit: 3
[main @coroutine#1], collect: 3
[DefaultDispatcher-worker-1 @coroutine#2], emit: 4
[main @coroutine#1], collect: 4
buffer
If you allow flow Different parts of the are executed using different coroutines , It will help to realize parallel operation and improve performance . This uses the concept of buffering . Typically , We can get emit Operation and collect Parallel operation . such emit Operation in collect Operation will not be blocked at the same time , But go on emit Next element and buffer the result into the cache , The next collection will read directly from the cache .
privatet fun myMethod(): Flow<Int> = flow {
for(i in 1..4){
delay(100) // Simulation time operation
emit(i)
}
}
fun main() = runBlocking {
val time = measureTimeMillis {
myMethod().collect{
delay(200)
println(it)
}
}
println(time)
}
here , Emitting an element requires 0.1 second , Collecting an element takes time 0.2 second , therefore 4 The total time of the elements is 1.2 second , So the output is as follows :
1
2
3
4
1223
If buffer operation is used , It doesn't take so much time :
myMethod().buffer().collect{
delay(200)
println(it)
}
The only change is in collect One was added before buffer() operation , In this way, the element will not wait for the collection to complete after it is emitted , Instead, the results are buffered directly into the cache . What is really collected is actually the elements in the cache . This will undoubtedly speed up the entire launch and collection process :
1
2
3
4
1003
The first collection took place in 0.3 second (0.1 Second emission +0.2 Second collection ), The second collection occurs at 0.5 second , The third collection took place in 0.7 second , The fourth collection took place in 0.9 second . Add some build 、 Time to destroy the buffer , The approximate total time is 1 About seconds .
Buffering and flowOn There is a certain relationship . Essentially flowOn The operation needs to be changed CoroutineDispatcher The same buffering mechanism will be used in the , For example, above flowOn Example .
Flow The combination of
Put two flow The contents of are merged into one flow.
fun main()=runBlocking<Unit> {
val nums = (1..5).asFlow()
val strings = flowOf("a","b","c","d","e")
// zip - Merge operation
nums.zip(strings){
a, b -> "$a$b"
}.collect{
println(it) }
}
zip The operation takes out the elements in the first stream and the second stream in sequence , Then according to the second parameter lambda To deal with , And then launch . The output is as follows :
1a
2b
3c
4d
5e
Flattening operation
Will contain f low Of flow ( similar Flow<Flow) Convert to not containing flow Of flow(Flow) operation ( It is similar to converting a two-dimensional array into a one-dimensional array ).
private fun myMethod(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500)
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach{
// 1. Traversing elements
delay(100)
}.flatMapConcat{
// 2. Strike a level
myMethod(it)
}.collect {
// 3. End
println("$it: ${
System.currentTimeMillis()-startTime} ms")
}
}
myMethod Method
The output is as follows :
1: 144 ms
1: 646 ms
2: 751 ms
2: 1256 ms
3: 1360 ms
3: 1865 ms
take 1…3 Convert to stream , In each element (1…3) On delay 0.1 second .
take 3 Elements (1…3) Convert to 3 A flow ,myMethod Method produces two streams of the same elements each time (emit two ). such 1…3 Will become :
[1,1],[2,2],[3,3]
And since there is one between each stream delay 0.1 second , So when printing out , The interval between two identical numbers 0.5 second , And the interval between two groups 0.1 second .
Besides flatMapConcat Will flatten it from a two-dimensional flow to a one-dimensional flow , So this stream will become 6 Individual elements :
[1,1,2,2,3,3]
Flow It's abnormal
It can be done to flow The exception of the traditional try…catch Handle :
private fun myMethod():Flow<Int> = flow {
for(i in 1..3) {
println("emit: $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
try {
myMethod().collect {
println(it)
check(it <= 1){
// check The function checks the first argument , When true Execute the second parameter (lambda), Otherwise throw a IllegalStateException, meanwhile error message When lambda The expression returns Any type .
"collect $it"
}
}
}catch(e: Throwable) {
println("Caught $e")
}
}
Execute the above code , Output :
emit: 1
1
emit: 2
2
Caught java.lang.IllegalStateException: collect 2
When the second element is 2 when ,check Fail to judge , Throw an exception , By try…catch Captured .
In the example above , The exception occurs during the collection phase . but try…catch You can also capture flow Launch phase and intermediate operation phase .
private fun myMethod():Flow<String> = flow {
for(i in 1..3) {
println("emit: $i")
emit(i)
}.map{
value ->
check(value <= 1) {
"crash on $value" }
"string $value"
}
}
fun main()=runBlocking<Unit> {
try{
myMethod().collect{
println(it) }
}catch(e: Throwable){
println("caught $e")
}
}
Run the program , Output :
emit: 1
string 1
emit: 2
caught java.lang.IllegalStateException: crash on 2
When element Greater than 1 when , Exception caught , also crash on 2 Saved in an abnormal message among .
complete
When flow After execution , You can add an additional completion action . This action can be imperative , It can also be in a declarative way .
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main()=runBlocking<Unit> {
try {
myMethod().collect {
println(it) }
}finally {
println("finally")
}
}
finally The code in the block will eventually be executed , No matter what flow Whether the execution is completed normally or an exception is thrown , This is imperative , Take advantage of try…finally sentence :
1
2
...
10
finally
Declarative is more flexible , It is onCompletion Intermediate operation , It's in collect or Cancel After performing :
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {
myMethod().onCompletion{
println("onCompletion") }
.collect {
println(it) }
}
The output is the same as before . It is worth noting that ,onCompletion Of action The parameter is a parameter lambda expression , The type of this parameter is nullable Throwable. You can use this parameter to get the thrown exception .
private fun myMethod(): Flow<Int> = flow {
emit(1)
throw RuntimeException() // Throw an exception
}
fun main() = runBlocking {
myMethod().onCompletion {
e -> if(e!=null) println("Flow stopped with an Exception.")}
.catch {
e -> println("Caught an Exception") }
.collect {
println(it) }
}
among ,catch Operation is also an intermediate operation , Used to capture flow The abnormal .
Print the results :
1
Flow stopped with an Exception.
Caught an Exception
But there is one caveat ,onCompletion Only from flow Upstream anomaly , But the downstream exception cannot be caught .
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {
myMethod().onCompletion{
e -> println("Flow stopped with an Exception $e.")}
.collect {
value ->
check{
value <= 1} {
"collect $value" }
println(value)
}
}
The output is as follows :
1
Flow stopped with an Exception null.
Exception in thread "main" java.lang.IllegalStateException: Collect 2
...
here ,onCompletion hold e Print as null, This indicates that it did not catch this exception , because onCompletion It's just an intermediate operation , and collect Is to terminate the operation , Look at it this way collect yes onCompletion Downstream operation of , therefore collect The exception in cannot be onCompletion Capture .
Cancel
flow The cancellation of is actually realized through the cancellation of the collaboration process , There is no such thing as a cancel operation . For example, we cancel one cellect operation , Premise is flow Itself in a cancelable pending function ( Such as delay) Is suspended :
private fun myMethod(): Flow<Int> = flow {
for (i int 1..4) {
delay(100)
println("Emit: $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(280) {
// Set timeout 280 millisecond
myMethod().collect {
println(it) }
}
println("Done.")
}
The console output is as follows :
Emit: 1
1
Emit: 2
2
Done.
The first 3、4 The second cycle was canceled due to timeout . here ,withTimeoutOrNull Is a cancelable suspend function .
边栏推荐
- 2021-04-19
- 1.使用阿里云对象OSS(初级)
- (15) Infrared communication
- Preliminary understanding of DFS and BFS
- Wxparse parsing iframe playing video
- NVIDIA SMI has failed because it could't communicate with the NVIDIA driver
- Carrier coordinate system inertial coordinate system world coordinate system
- Take stock of the AI black technologies in the Beijing Winter Olympic Games, and Shenzhen Yancheng Technology
- Wechat applet uploads the data obtained from database 1 to database 2
- Introduction to coordinate system in navigation system
猜你喜欢

Huawei equipment is configured with bgp/mpls IP virtual private network address space overlap

MySQL regularly deletes expired data.

1.使用阿里云对象OSS(初级)

Preliminary test of running vins-fusion with zed2 binocular camera

The central rural work conference has released important signals. Ten ways for AI technology to help agriculture can be expected in the future

Topological sorting

微信小程序,购买商品属性自动换行,固定div个数,超出部分自动换行

Huawei equipment is configured to access the virtual private network through GRE

Cascade EF gan: local focus progressive facial expression editing

C (I) C basic grammar all in one
随机推荐
Simple linear regression of sklearn series
Emnlp2021 𞓜 a small number of data relation extraction papers of deepblueai team were hired
JS promise, async, await simple notes
GAMES101作业7-Path Tracing实现过程&代码详细解读
QT Road (2) -- HelloWorld
wxParse解析iframe播放视频
Share | guide language image pre training to achieve unified visual language understanding and generation
Section V: Recycling Application of asphalt pavement materials
In the future, how long will robots or AI have human creativity?
[opencv learning problems] 1 Namedwindow() and imshow() show two windows in the picture
Convert result set of SQL to set
oh my zsh正确安装姿势
Section I: classification and classification of urban roads
2021-04-19
【入门级基础】Node基础知识总结
Tianchi - student test score forecast
2021 iccv paper sharing - occlusion boundary detection
The programmers of a large factory after 95 were dissatisfied with the department leaders, and were sentenced for deleting the database and running away
jvm调优五:jvm调优工具和调优实战
Dongmingzhu said that "Gree mobile phones are no worse than apple". Where is the confidence?