当前位置:网站首页>[go deep into kotlin] - get to know flow for the first time

[go deep into kotlin] - get to know flow for the first time

2022-06-11 05:26:00 the Summer Palace

asContextElement

ThreadLocal yes Java A solution to data access conflicts between threads , Often used in place of locks . It will copy the data shared between threads into multiple copies ( Per thread ThreadLocal Data is maintained on one sheet map In the table , Among them key Each thread object . This piece of map in ,key yes ThreadLocal own ,value Is the copied data ). such , Each thread operates on a piece of data , So as to solve the access conflict .

But in a collaborative environment , The problem becomes complicated . Because a coroutine is not bound to a thread , A coroutine can switch threads during execution ( For example, before Dispatchers.Unconfined Example ).

Kotlin Solve this problem through a series of expansion methods . When a coroutine from a thread A Switch to B And then back to A, Threads A Of ThreadLocal Properties are automatically restored . This is it. asContextElement(value:) Method , It is ThreadLocal Extension method of , Used to put a ThreadLocal Package as ThreadContextElement.ThreadContextElement Will ThreadLocal Values are copied into the coroutine , But not bound to a specific thread ,value Parameters will override ThreadLocal Medium value.

Let's look at this example .

val threadLocal = ThreadLocal<String>()
fun main() = runBlocking<Unit> {
    
  threadLocal.set("Jim")
  println("${
      Thread.currentThread()}: ${
      threadLocal.get()}") //  Print :Thread[main @coroutine#1,5,main]: Jim
  
  val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann")){
    // ThreadLocal -> ThreadContextElement, Simultaneous coverage  threadLocal  Of  value, therefore ,threadLocal  The value of will come from  Jim  become  Ann.
    println("${
      Thread.currentThread()}: ${
      threadLocal.get()}") //  Print :Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
    yield() //  Switch the coordination process to ( In the thread pool ) Other threads execute 
		println("${
      Thread.currentThread()}: ${
      threadLocal.get()}") //  Print :Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
  }
  job.join() //  Return to the main process ,threadLocal  Value auto recovery 
  println("${
      Thread.currentThread()}, ${
      threadLocal.get()}") //  Print :Thread[main @coroutine#1,5,main]: Jim
}

The entire output of the program is as follows ( open coroutines.debug switch ):

Thread[main @coroutine#1,5,main]: Jim
Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
Thread[main @coroutine#1,5,main]: Jim

ThreadContextElement No tracking thread local Value , It is just a copy of the original value , So any modification to it will not affect the original thread local Value .

If you use directly in the collaboration thread local( Not used asContextElement Method ), And modify the thread local Value , be thread local The value of may become uncertain , Such as :

val t1 = ThreadLocal.withInitial{
      "initial" }
runBlocking {
      //  Be careful , It's not useful here  asContextElement(value:)  Method 
	println(t1.get()) //  Print :initial
	withContext(t1.asContextElement("modified")) {
      
		println(t1.get()) //  Print :modified
	}
	//  Return to the original context 
	println(t1.get())//  May print :initial, It is also possible to print :modified, Not sure 
}

Flow

Flow Represents an asynchronous flow , Be similar to Java stream. If a function needs to return multiple values , In addition to using collections , You can go back to Flow:

//  Method 1 、 Use set 
private fun myMethod(): List<String> = listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main() {
    
  myMethod().forEach {
    
    println(it)
  }
}
//  Method 2 、 Use  Sequence
private fun myMethod1(): Sequence<Int> = sequence {
    
  for(i in 100..105){
    
    Thread.sleep(1000) //  Block main thread 
    yield(i) //  Returns an element 
  }
} 
fun main() {
    
  myMethod1().forEach{
     println(it) } //  Print  100 ~ 105
}
//  Method 3 、 Use the process 
private suspend fun myMethod2():List<String>{
    
  delay(1000)
  return listOf("How are you doing", "Not bad","Thank you", "How about you")
}
fun main()=runBlocking<Unit> {
    
  myMethod2().forEach{
     println(it) } //  Print a string every second 
}
//  Method four 、 Use  Flow
fun myMethod3():Flow<Int> = flow{
     //  call  flow  Builder , Automatically becomes a pending function ( No explicit use is required  suspend  keyword ), So you can call other pending functions 
  for (i in 1..4){
    
    delay(1000)
    emit(i) //  similar  yield, Asynchronously returns a result 
  }
}
fun main() = runBlocking<Unit> {
    
  launch{
    
    for (i in 1..2) {
    
      println("group $i ----")
      delay(2000)
    }
  }
  myMethod3().collect {
     println(it) } // collect  Used to receive  emit  A result returned , The two are paired 
}

The first method is characterized by :

  • myMethod Method is blocked
  • All values in the set can only be returned once after all the values are solved , Regardless of priority

The second method is characterized by :

  • Thread.sleep Simulate the solution process of blocking mode

  • sequence Every time yield once , Just return to the caller once , therefore 5 Print statements are not printed at one time , Instead, a number is printed every second .

The third method is characterized by :

  • delay It's asynchronous , Simulate the asynchronous solution process , It does not block the main thread
  • myMethod2 It's a suspend function , So it can only be called in another pending function or coroutine , therefore main() The function uses runBlocking
  • But it still returns all the results at once

The fourth method is characterized by :

  • The solution process is asynchronous , Will not block threads

  • The return process is asynchronous , Every time emit One at a time , Instead of returning all at once

    So the print result will be printed first group 1 ----, Then the numbers 1 and 2( interval 1 second ),2 Seconds later group 2 ----, And then there was 3 and 4( interval 1 second ):

    group 1 ----
    1
    2
    group 2 ----
    3
    4
    

It can be seen that Flow Very similar to Sequence, But it is executed asynchronously , and Sequence It's synchronous .

Besides , If you will Flow Medium delay Switch to Thread.sleep, be Flow Of emit No use , An asynchronous return becomes a one-time return :

1
2
3
4
group 1 ----
group 3 ----

Obviously ,Thread.sleep Blocking the execution of the main thread .

Flow Builder

Flow Build through the builder , It has 4 Species builder :

  • flowOf(…)
  • asFlow(…)
  • flow{…}
  • channelFlow{ … }

What we have used before is flow{…} Builder . It is easy to use . Next, let's look at the other 3 Builders .

  • flowOf

    Define a stream that emits a fixed number of values . It receives the value of a variable parameter , And loop this variable parameter emit.

fun main() = runBloking {
    
	flowOf(10,20,29,30).collect{
     println(it) }
}
  • asFlow

    Both sets and sequences provide asFlow Extension method , You can transform yourself into a flow flow .

fun main() = runBlocking {
    
	(1..10).asFlow().collect{
     println(it) }
}
Intermediate operation and termination operation

Intermediate operations do not result in Flow The code in is executed , such as emit It is an intermediate operation , It does not cause the code to actually be executed . Only when the operation is terminated will the code be executed ,collect Is a termination operation .

private fun myMethod():Flow<Int> = flow{
    
	println("I'm fine.")
	for(i in 1..3) {
    
		delay(1000)
		emit(i)
	}
}
fun main() = runBlocking<Unit> {
    
  println("Let it go!")
  val flow = myMethod() //  Calling a method does not cause the code to be executed 
  println("See you.")
}

The printout is “I’m fine." A sentence will not be printed :

Let it go!
See you.

Want to let flow True execution , Need to add :

flow.collect{
     println(it) }

Be careful , If the terminate operation is called more than once , Will lead to flow Multiple execution .

Flow You can call the pending function in the intermediate operation of , This is related to Sequence Is different .

private suspend fun myExecution(input:Int): String{
    
	delay(1000)
	return "output:$input"
}
fun main() = runBlocking {
    
  (1..10).asFlow()
  .filter{
     it -> it > 5 }
  .map{
     input -> myExecution(input) }//  Call the suspend function  myExecution
  .collect{
     println(it) }
}

The output is as follows :

output:6
output:7
...
output:10

except filter and map,Flow And support transform operation :

private suspend fun myExecution(input:Int): String{
    
	delay(1000)
	return "output: $input"
}
fun main() = runBlocking {
    
	(1..10).asFlow().transform{
     input -> 
                             emit( "transform: $input")
                             emit(myExecution(input))
                             emit("------")
                            }.collect{
     println(it) }
}

The output is as follows :

transform: 1
output: 1
------
transform: 2
output: 2
------
...
transform: 10
output: 10
------

transform Any logic can be executed in , There is no need to return any value , If required flow Send value , have access to emit ( It can be launched many times ). It is better than filter or map Freer and stronger .

The number of elements can be limited in the intermediate operation :

fun myNumbers():Flow<Int> = flow{
    
  try {
    
		emit(1)
 	 emit(2)
  	println("----")
 	 emit(3)
  }catch(e: Exception) {
    
		println(e)
  }finally{
    
    println("--finally")
  }
	
}
fun main() = runBlocking<Uint> {
    
	myNumbers().take(2).collect( println(it) )// take(2)  Get only the first  2  Elements 
}

The output is as follows :

1
2
kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed
finally

You can see when take After the specified element ,flow An exception will be thrown directly , This causes the flow to be canceled .

All termination operations are suspended functions . Only when the operation is terminated can the code of the stream be actually executed . except collect operation ,flow There are other termination operations , such as toList,toSet,reduce etc. .

fun main() = runBlocking {
    
	val result = (1..4).asFlow().map( it*it )
	.reduce( a,b -> a+b ) //  Aggregation operation , Accumulate element values 
	println(result)
}

The output is 30, because 1+4+9+16 = 30.

Flow It's sequential .collect The operation runs in the process of terminating the operation , By default, no new collaboration will be opened . Every emit The elements of are handled by all intermediate operations , Finally, it is handled by the termination operation .

fun main() = runBlocking {
    
	(1..10).asFlow().filter{
     //  Only even elements are allowed to pass through , Odd numbers are filtered 
    it % 2 == 0
  }.map {
    //  Even elements enter  map  operation 
    println("map: $it")
    it
  }.collect {
    
    println("collect: $it")
  }
}

The output is as follows :

filter: 2
map: 2
collect: 2
filter: 4
map: 4
collect: 4
...
filter: 10
map: 10
collect: 10
原网站

版权声明
本文为[the Summer Palace]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206110522399889.html