当前位置:网站首页>[go deep into kotlin] - get to know flow for the first time
[go deep into kotlin] - get to know flow for the first time
2022-06-11 05:26:00 【the Summer Palace】
asContextElement
ThreadLocal yes Java A solution to data access conflicts between threads , Often used in place of locks . It will copy the data shared between threads into multiple copies ( Per thread ThreadLocal Data is maintained on one sheet map In the table , Among them key Each thread object . This piece of map in ,key yes ThreadLocal own ,value Is the copied data ). such , Each thread operates on a piece of data , So as to solve the access conflict .
But in a collaborative environment , The problem becomes complicated . Because a coroutine is not bound to a thread , A coroutine can switch threads during execution ( For example, before Dispatchers.Unconfined Example ).
Kotlin Solve this problem through a series of expansion methods . When a coroutine from a thread A Switch to B And then back to A, Threads A Of ThreadLocal Properties are automatically restored . This is it. asContextElement(value:) Method , It is ThreadLocal Extension method of , Used to put a ThreadLocal Package as ThreadContextElement.ThreadContextElement Will ThreadLocal Values are copied into the coroutine , But not bound to a specific thread ,value Parameters will override ThreadLocal Medium value.
Let's look at this example .
val threadLocal = ThreadLocal<String>()
fun main() = runBlocking<Unit> {
threadLocal.set("Jim")
println("${
Thread.currentThread()}: ${
threadLocal.get()}") // Print :Thread[main @coroutine#1,5,main]: Jim
val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann")){
// ThreadLocal -> ThreadContextElement, Simultaneous coverage threadLocal Of value, therefore ,threadLocal The value of will come from Jim become Ann.
println("${
Thread.currentThread()}: ${
threadLocal.get()}") // Print :Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
yield() // Switch the coordination process to ( In the thread pool ) Other threads execute
println("${
Thread.currentThread()}: ${
threadLocal.get()}") // Print :Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
}
job.join() // Return to the main process ,threadLocal Value auto recovery
println("${
Thread.currentThread()}, ${
threadLocal.get()}") // Print :Thread[main @coroutine#1,5,main]: Jim
}
The entire output of the program is as follows ( open coroutines.debug switch ):
Thread[main @coroutine#1,5,main]: Jim
Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
Thread[main @coroutine#1,5,main]: Jim
ThreadContextElement No tracking thread local Value , It is just a copy of the original value , So any modification to it will not affect the original thread local Value .
If you use directly in the collaboration thread local( Not used asContextElement Method ), And modify the thread local Value , be thread local The value of may become uncertain , Such as :
val t1 = ThreadLocal.withInitial{ "initial" } runBlocking { // Be careful , It's not useful here asContextElement(value:) Method println(t1.get()) // Print :initial withContext(t1.asContextElement("modified")) { println(t1.get()) // Print :modified } // Return to the original context println(t1.get())// May print :initial, It is also possible to print :modified, Not sure }
Flow
Flow Represents an asynchronous flow , Be similar to Java stream. If a function needs to return multiple values , In addition to using collections , You can go back to Flow:
// Method 1 、 Use set
private fun myMethod(): List<String> = listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main() {
myMethod().forEach {
println(it)
}
}
// Method 2 、 Use Sequence
private fun myMethod1(): Sequence<Int> = sequence {
for(i in 100..105){
Thread.sleep(1000) // Block main thread
yield(i) // Returns an element
}
}
fun main() {
myMethod1().forEach{
println(it) } // Print 100 ~ 105
}
// Method 3 、 Use the process
private suspend fun myMethod2():List<String>{
delay(1000)
return listOf("How are you doing", "Not bad","Thank you", "How about you")
}
fun main()=runBlocking<Unit> {
myMethod2().forEach{
println(it) } // Print a string every second
}
// Method four 、 Use Flow
fun myMethod3():Flow<Int> = flow{
// call flow Builder , Automatically becomes a pending function ( No explicit use is required suspend keyword ), So you can call other pending functions
for (i in 1..4){
delay(1000)
emit(i) // similar yield, Asynchronously returns a result
}
}
fun main() = runBlocking<Unit> {
launch{
for (i in 1..2) {
println("group $i ----")
delay(2000)
}
}
myMethod3().collect {
println(it) } // collect Used to receive emit A result returned , The two are paired
}
The first method is characterized by :
- myMethod Method is blocked
- All values in the set can only be returned once after all the values are solved , Regardless of priority
The second method is characterized by :
Thread.sleep Simulate the solution process of blocking mode
sequence Every time yield once , Just return to the caller once , therefore 5 Print statements are not printed at one time , Instead, a number is printed every second .
The third method is characterized by :
- delay It's asynchronous , Simulate the asynchronous solution process , It does not block the main thread
- myMethod2 It's a suspend function , So it can only be called in another pending function or coroutine , therefore main() The function uses runBlocking
- But it still returns all the results at once
The fourth method is characterized by :
The solution process is asynchronous , Will not block threads
The return process is asynchronous , Every time emit One at a time , Instead of returning all at once
So the print result will be printed first
group 1 ----, Then the numbers 1 and 2( interval 1 second ),2 Seconds latergroup 2 ----, And then there was 3 and 4( interval 1 second ):group 1 ---- 1 2 group 2 ---- 3 4
It can be seen that Flow Very similar to Sequence, But it is executed asynchronously , and Sequence It's synchronous .
Besides , If you will Flow Medium delay Switch to Thread.sleep, be Flow Of emit No use , An asynchronous return becomes a one-time return :
1
2
3
4
group 1 ----
group 3 ----
Obviously ,Thread.sleep Blocking the execution of the main thread .
Flow Builder
Flow Build through the builder , It has 4 Species builder :
- flowOf(…)
- asFlow(…)
- flow{…}
- channelFlow{ … }
What we have used before is flow{…} Builder . It is easy to use . Next, let's look at the other 3 Builders .
flowOf
Define a stream that emits a fixed number of values . It receives the value of a variable parameter , And loop this variable parameter emit.
fun main() = runBloking {
flowOf(10,20,29,30).collect{
println(it) }
}
asFlow
Both sets and sequences provide asFlow Extension method , You can transform yourself into a flow flow .
fun main() = runBlocking {
(1..10).asFlow().collect{
println(it) }
}
Intermediate operation and termination operation
Intermediate operations do not result in Flow The code in is executed , such as emit It is an intermediate operation , It does not cause the code to actually be executed . Only when the operation is terminated will the code be executed ,collect Is a termination operation .
private fun myMethod():Flow<Int> = flow{
println("I'm fine.")
for(i in 1..3) {
delay(1000)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Let it go!")
val flow = myMethod() // Calling a method does not cause the code to be executed
println("See you.")
}
The printout is “I’m fine." A sentence will not be printed :
Let it go!
See you.
Want to let flow True execution , Need to add :
flow.collect{
println(it) }
Be careful , If the terminate operation is called more than once , Will lead to flow Multiple execution .
Flow You can call the pending function in the intermediate operation of , This is related to Sequence Is different .
private suspend fun myExecution(input:Int): String{
delay(1000)
return "output:$input"
}
fun main() = runBlocking {
(1..10).asFlow()
.filter{
it -> it > 5 }
.map{
input -> myExecution(input) }// Call the suspend function myExecution
.collect{
println(it) }
}
The output is as follows :
output:6
output:7
...
output:10
except filter and map,Flow And support transform operation :
private suspend fun myExecution(input:Int): String{
delay(1000)
return "output: $input"
}
fun main() = runBlocking {
(1..10).asFlow().transform{
input ->
emit( "transform: $input")
emit(myExecution(input))
emit("------")
}.collect{
println(it) }
}
The output is as follows :
transform: 1
output: 1
------
transform: 2
output: 2
------
...
transform: 10
output: 10
------
transform Any logic can be executed in , There is no need to return any value , If required flow Send value , have access to emit ( It can be launched many times ). It is better than filter or map Freer and stronger .
The number of elements can be limited in the intermediate operation :
fun myNumbers():Flow<Int> = flow{
try {
emit(1)
emit(2)
println("----")
emit(3)
}catch(e: Exception) {
println(e)
}finally{
println("--finally")
}
}
fun main() = runBlocking<Uint> {
myNumbers().take(2).collect( println(it) )// take(2) Get only the first 2 Elements
}
The output is as follows :
1
2
kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed
finally
You can see when take After the specified element ,flow An exception will be thrown directly , This causes the flow to be canceled .
All termination operations are suspended functions . Only when the operation is terminated can the code of the stream be actually executed . except collect operation ,flow There are other termination operations , such as toList,toSet,reduce etc. .
fun main() = runBlocking {
val result = (1..4).asFlow().map( it*it )
.reduce( a,b -> a+b ) // Aggregation operation , Accumulate element values
println(result)
}
The output is 30, because 1+4+9+16 = 30.
Flow It's sequential .collect The operation runs in the process of terminating the operation , By default, no new collaboration will be opened . Every emit The elements of are handled by all intermediate operations , Finally, it is handled by the termination operation .
fun main() = runBlocking {
(1..10).asFlow().filter{
// Only even elements are allowed to pass through , Odd numbers are filtered
it % 2 == 0
}.map {
// Even elements enter map operation
println("map: $it")
it
}.collect {
println("collect: $it")
}
}
The output is as follows :
filter: 2
map: 2
collect: 2
filter: 4
map: 4
collect: 4
...
filter: 10
map: 10
collect: 10
边栏推荐
- How much current can PCB wiring carry
- Stone game -- leetcode practice
- Huawei equipment configuration MCE
- Section V: Recycling Application of asphalt pavement materials
- 22. Generate parentheses
- [entry level basics] node basic knowledge summary
- PCB走线到底能承载多大电流
- Opencv learning path (2-5) -- Deep parsing imwrite function
- 1.使用阿里云对象OSS(初级)
- C (I) C basic grammar all in one
猜你喜欢

Sealem finance builds Web3 decentralized financial platform infrastructure

Analyzing while experimenting - memory leakage caused by non static inner classes

Overview of self attention acceleration methods: Issa, CCNET, cgnl, linformer

In the future, how long will robots or AI have human creativity?

Zed2 running vins-mono preliminary test

微信小程序,购买商品属性自动换行,固定div个数,超出部分自动换行

JVM tuning V: JVM tuning tools and tuning practice

Preliminary test of running vins-fusion with zed2 binocular camera
![[aaai 2021 timing action nomination generation] detailed interpretation of bsn++ long article](/img/28/d69a7583036a2076facffcf9098d7e.jpg)
[aaai 2021 timing action nomination generation] detailed interpretation of bsn++ long article

Huawei equipment is configured with bgp/mpls IP virtual private network address space overlap
随机推荐
一大厂95后程序员对部门领导不满,删库跑路被判刑
Zed2 running vins-mono preliminary test
Opencv learning path (2-1) -- Deep parsing imread function
推荐一款免费的内网穿透开源软件,可以在测试本地开发微信公众号使用
About custom comparison methods of classes and custom methods of sort functions
Paper recommendation: relicv2, can the new self supervised learning surpass supervised learning on RESNET?
SQLite installation and configuration tutorial +navicat operation
The solution "no hardware is configured for this address and cannot be modified" appears during botu simulation
点击图标不灵敏咋整?
WinForm (I) introduction to WinForm and use of basic controls
MySQL string to array, merge result set, and convert to array
oh my zsh正确安装姿势
Sealem finance builds Web3 decentralized financial platform infrastructure
自定义View之基础篇
Opencv learning path (2-5) -- Deep parsing imwrite function
NVIDIA SMI has failed because it could't communicate with the NVIDIA driver
JVM tuning 6: GC log analysis and constant pool explanation
Technology | image motion drive interpretation of first order motion model
Target detection - personal understanding of RCNN series
How to make lamps intelligent? How to choose single fire and zero fire intelligent switches!