当前位置:网站首页>[go deep into kotlin] - get to know flow for the first time
[go deep into kotlin] - get to know flow for the first time
2022-06-11 05:26:00 【the Summer Palace】
asContextElement
ThreadLocal yes Java A solution to data access conflicts between threads , Often used in place of locks . It will copy the data shared between threads into multiple copies ( Per thread ThreadLocal Data is maintained on one sheet map In the table , Among them key Each thread object . This piece of map in ,key yes ThreadLocal own ,value Is the copied data ). such , Each thread operates on a piece of data , So as to solve the access conflict .
But in a collaborative environment , The problem becomes complicated . Because a coroutine is not bound to a thread , A coroutine can switch threads during execution ( For example, before Dispatchers.Unconfined Example ).
Kotlin Solve this problem through a series of expansion methods . When a coroutine from a thread A Switch to B And then back to A, Threads A Of ThreadLocal Properties are automatically restored . This is it. asContextElement(value:) Method , It is ThreadLocal Extension method of , Used to put a ThreadLocal Package as ThreadContextElement.ThreadContextElement Will ThreadLocal Values are copied into the coroutine , But not bound to a specific thread ,value Parameters will override ThreadLocal Medium value.
Let's look at this example .
val threadLocal = ThreadLocal<String>()
fun main() = runBlocking<Unit> {
threadLocal.set("Jim")
println("${
Thread.currentThread()}: ${
threadLocal.get()}") // Print :Thread[main @coroutine#1,5,main]: Jim
val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann")){
// ThreadLocal -> ThreadContextElement, Simultaneous coverage threadLocal Of value, therefore ,threadLocal The value of will come from Jim become Ann.
println("${
Thread.currentThread()}: ${
threadLocal.get()}") // Print :Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
yield() // Switch the coordination process to ( In the thread pool ) Other threads execute
println("${
Thread.currentThread()}: ${
threadLocal.get()}") // Print :Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
}
job.join() // Return to the main process ,threadLocal Value auto recovery
println("${
Thread.currentThread()}, ${
threadLocal.get()}") // Print :Thread[main @coroutine#1,5,main]: Jim
}
The entire output of the program is as follows ( open coroutines.debug switch ):
Thread[main @coroutine#1,5,main]: Jim
Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
Thread[main @coroutine#1,5,main]: Jim
ThreadContextElement No tracking thread local Value , It is just a copy of the original value , So any modification to it will not affect the original thread local Value .
If you use directly in the collaboration thread local( Not used asContextElement Method ), And modify the thread local Value , be thread local The value of may become uncertain , Such as :
val t1 = ThreadLocal.withInitial{ "initial" } runBlocking { // Be careful , It's not useful here asContextElement(value:) Method println(t1.get()) // Print :initial withContext(t1.asContextElement("modified")) { println(t1.get()) // Print :modified } // Return to the original context println(t1.get())// May print :initial, It is also possible to print :modified, Not sure }
Flow
Flow Represents an asynchronous flow , Be similar to Java stream. If a function needs to return multiple values , In addition to using collections , You can go back to Flow:
// Method 1 、 Use set
private fun myMethod(): List<String> = listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main() {
myMethod().forEach {
println(it)
}
}
// Method 2 、 Use Sequence
private fun myMethod1(): Sequence<Int> = sequence {
for(i in 100..105){
Thread.sleep(1000) // Block main thread
yield(i) // Returns an element
}
}
fun main() {
myMethod1().forEach{
println(it) } // Print 100 ~ 105
}
// Method 3 、 Use the process
private suspend fun myMethod2():List<String>{
delay(1000)
return listOf("How are you doing", "Not bad","Thank you", "How about you")
}
fun main()=runBlocking<Unit> {
myMethod2().forEach{
println(it) } // Print a string every second
}
// Method four 、 Use Flow
fun myMethod3():Flow<Int> = flow{
// call flow Builder , Automatically becomes a pending function ( No explicit use is required suspend keyword ), So you can call other pending functions
for (i in 1..4){
delay(1000)
emit(i) // similar yield, Asynchronously returns a result
}
}
fun main() = runBlocking<Unit> {
launch{
for (i in 1..2) {
println("group $i ----")
delay(2000)
}
}
myMethod3().collect {
println(it) } // collect Used to receive emit A result returned , The two are paired
}
The first method is characterized by :
- myMethod Method is blocked
- All values in the set can only be returned once after all the values are solved , Regardless of priority
The second method is characterized by :
Thread.sleep Simulate the solution process of blocking mode
sequence Every time yield once , Just return to the caller once , therefore 5 Print statements are not printed at one time , Instead, a number is printed every second .
The third method is characterized by :
- delay It's asynchronous , Simulate the asynchronous solution process , It does not block the main thread
- myMethod2 It's a suspend function , So it can only be called in another pending function or coroutine , therefore main() The function uses runBlocking
- But it still returns all the results at once
The fourth method is characterized by :
The solution process is asynchronous , Will not block threads
The return process is asynchronous , Every time emit One at a time , Instead of returning all at once
So the print result will be printed first
group 1 ----, Then the numbers 1 and 2( interval 1 second ),2 Seconds latergroup 2 ----, And then there was 3 and 4( interval 1 second ):group 1 ---- 1 2 group 2 ---- 3 4
It can be seen that Flow Very similar to Sequence, But it is executed asynchronously , and Sequence It's synchronous .
Besides , If you will Flow Medium delay Switch to Thread.sleep, be Flow Of emit No use , An asynchronous return becomes a one-time return :
1
2
3
4
group 1 ----
group 3 ----
Obviously ,Thread.sleep Blocking the execution of the main thread .
Flow Builder
Flow Build through the builder , It has 4 Species builder :
- flowOf(…)
- asFlow(…)
- flow{…}
- channelFlow{ … }
What we have used before is flow{…} Builder . It is easy to use . Next, let's look at the other 3 Builders .
flowOf
Define a stream that emits a fixed number of values . It receives the value of a variable parameter , And loop this variable parameter emit.
fun main() = runBloking {
flowOf(10,20,29,30).collect{
println(it) }
}
asFlow
Both sets and sequences provide asFlow Extension method , You can transform yourself into a flow flow .
fun main() = runBlocking {
(1..10).asFlow().collect{
println(it) }
}
Intermediate operation and termination operation
Intermediate operations do not result in Flow The code in is executed , such as emit It is an intermediate operation , It does not cause the code to actually be executed . Only when the operation is terminated will the code be executed ,collect Is a termination operation .
private fun myMethod():Flow<Int> = flow{
println("I'm fine.")
for(i in 1..3) {
delay(1000)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Let it go!")
val flow = myMethod() // Calling a method does not cause the code to be executed
println("See you.")
}
The printout is “I’m fine." A sentence will not be printed :
Let it go!
See you.
Want to let flow True execution , Need to add :
flow.collect{
println(it) }
Be careful , If the terminate operation is called more than once , Will lead to flow Multiple execution .
Flow You can call the pending function in the intermediate operation of , This is related to Sequence Is different .
private suspend fun myExecution(input:Int): String{
delay(1000)
return "output:$input"
}
fun main() = runBlocking {
(1..10).asFlow()
.filter{
it -> it > 5 }
.map{
input -> myExecution(input) }// Call the suspend function myExecution
.collect{
println(it) }
}
The output is as follows :
output:6
output:7
...
output:10
except filter and map,Flow And support transform operation :
private suspend fun myExecution(input:Int): String{
delay(1000)
return "output: $input"
}
fun main() = runBlocking {
(1..10).asFlow().transform{
input ->
emit( "transform: $input")
emit(myExecution(input))
emit("------")
}.collect{
println(it) }
}
The output is as follows :
transform: 1
output: 1
------
transform: 2
output: 2
------
...
transform: 10
output: 10
------
transform Any logic can be executed in , There is no need to return any value , If required flow Send value , have access to emit ( It can be launched many times ). It is better than filter or map Freer and stronger .
The number of elements can be limited in the intermediate operation :
fun myNumbers():Flow<Int> = flow{
try {
emit(1)
emit(2)
println("----")
emit(3)
}catch(e: Exception) {
println(e)
}finally{
println("--finally")
}
}
fun main() = runBlocking<Uint> {
myNumbers().take(2).collect( println(it) )// take(2) Get only the first 2 Elements
}
The output is as follows :
1
2
kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed
finally
You can see when take After the specified element ,flow An exception will be thrown directly , This causes the flow to be canceled .
All termination operations are suspended functions . Only when the operation is terminated can the code of the stream be actually executed . except collect operation ,flow There are other termination operations , such as toList,toSet,reduce etc. .
fun main() = runBlocking {
val result = (1..4).asFlow().map( it*it )
.reduce( a,b -> a+b ) // Aggregation operation , Accumulate element values
println(result)
}
The output is 30, because 1+4+9+16 = 30.
Flow It's sequential .collect The operation runs in the process of terminating the operation , By default, no new collaboration will be opened . Every emit The elements of are handled by all intermediate operations , Finally, it is handled by the termination operation .
fun main() = runBlocking {
(1..10).asFlow().filter{
// Only even elements are allowed to pass through , Odd numbers are filtered
it % 2 == 0
}.map {
// Even elements enter map operation
println("map: $it")
it
}.collect {
println("collect: $it")
}
}
The output is as follows :
filter: 2
map: 2
collect: 2
filter: 4
map: 4
collect: 4
...
filter: 10
map: 10
collect: 10
边栏推荐
- 课程设计总结
- Share | defend against physically realizable image classification attacks
- Big meal count (time complexity) -- leetcode daily question
- JS promise, async, await simple notes
- Tencent X5 kernel initialization failed tbsreaderview not support by:***
- The central rural work conference has released important signals. Ten ways for AI technology to help agriculture can be expected in the future
- 22. Generate parentheses
- Leetcode 161 Editing distance of 1 (2022.06.10)
- Opencv learning path (2-4) -- Deep parsing cvtcolor function
- How to apply for free idea with official documents
猜你喜欢

BERT知识蒸馏

Tightly coupled laser vision inertial navigation slam system: paper notes_ S2D. 66_ ICRA_ 2021_ LVI-SAM
![[aaai 2021 timing action nomination generation] detailed interpretation of bsn++ long article](/img/28/d69a7583036a2076facffcf9098d7e.jpg)
[aaai 2021 timing action nomination generation] detailed interpretation of bsn++ long article

1.使用阿里云对象OSS(初级)

Recommend a free intranet penetration open source software that can be used in the local wechat official account under test

PCB走线到底能承载多大电流

微信小程序,购买商品属性自动换行,固定div个数,超出部分自动换行

Huawei equipment is configured to access the virtual private network through GRE

Tianchi - student test score forecast
![[NIPS2021]MLP-Mixer: An all-MLP Architecture for Vision](/img/89/66c30ea8d7969fef76785da1627ce5.jpg)
[NIPS2021]MLP-Mixer: An all-MLP Architecture for Vision
随机推荐
点击图标不灵敏咋整?
一大厂95后程序员对部门领导不满,删库跑路被判刑
Combination sum Ⅳ -- leetcode exercise
Retinanet+keras train their own data set to tread on the pit
wxParse解析iframe播放视频
Target detection - personal understanding of RCNN series
Support vector machine -svm+ source code
es-ik 安装报错
Simple linear regression of sklearn series
Lianrui electronics made an appointment with you with SIFA to see two network cards in the industry's leading industrial automation field first
董明珠称“格力手机做得不比苹果差”哪里来的底气?
Cascade EF gan: local focus progressive facial expression editing
C (I) C basic grammar all in one
Inventory | ICLR 2022 migration learning, visual transformer article summary
27、移除元素
MySQL regularly deletes expired data.
微信小程序,购买商品属性自动换行,固定div个数,超出部分自动换行
Project - Smart City
[NIPS2021]MLP-Mixer: An all-MLP Architecture for Vision
Some details about memory