当前位置:网站首页>[go deep into kotlin] - flow advanced

[go deep into kotlin] - flow advanced

2022-06-11 05:26:00 the Summer Palace

Flow Context

Flow The collection action of always occurs in the context of the calling coroutine , Not definition Flow The context of .

fun log(msg: String) = println("[${
      Thread.currentThread().name}], $msg")
fun myMethod(): Flow<Int> = flow {
    
  log("started")
  for(i in 1..3){
    
    emit(i)
  }
}
fun main() = runBlocking {
    
	myMethod().collect {
     log("collected: $it")}
}

Run the output as follows ( open debug Parameters ):

[main @coroutine#1], started
[main @coroutine#1], collected: 1
[main @coroutine#1], collected: 2
[main @coroutine#1], collected: 3

It can be seen that flow To run on collect Calling process , namely runBlocking Open collaboration . This will undoubtedly block the main thread . So we can flow Run in other contexts :

private fun log(msg:String) = println("[${
      Thread.currentThread().name}], $msg")
private fun myMethod(): Flow<Int> = flow {
    
	withContext(Dispatchers.Default) {
    
		for(i in 1..4){
    
			Thread.sleep(100)
			emit(i)
		}
	}
}
fun main() = runBlocking {
    
	myMethod().collect {
     log(it) }
}

The program reports the following exception :

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@405324edf, [email protected]],
but emission happened in [DispatchedCoroutine{Active}@40452e23e, DefaultDispatcher].
...

The collection thread occurs in the main thread , but emit The thread occurs in the background thread .withContext(Dispatchers.Default) A sentence has been modified flow The context of , take flow The distributor of has been modified to Dispatchers.Default. but collect The distributor used by our code is runBlocking The dispenser of .

flowOn Operator

To solve this problem ,kotlin Introduced flowOn Method :

private fun myMethod(): Flow<Int> = flow {
    
	for(i in 1..4){
    
		Thread.sleep(100)
    log("emit: $i")
		emit(i)
	}.flowOn(Dispatchers.Default)
}
fun main() = runBlocking {
    
	myMethod().collect {
     println("collect: $it") }
}

Run the program , The error no longer occurs .flowOn Method allows you to modify Intermediate operation Of Context , Make the intermediate operation and the termination operation run in different contexts .flowOn The operation has changed Flow The default order . Now the collection operation and the launch operation take place in different co processes ( Threads ):

[DefaultDispatcher-worker-1 @coroutine#2], emit: 1
[main @coroutine#1], collect: 1
[DefaultDispatcher-worker-1 @coroutine#2], emit: 2
[main @coroutine#1], collect: 2
[DefaultDispatcher-worker-1 @coroutine#2], emit: 3
[main @coroutine#1], collect: 3
[DefaultDispatcher-worker-1 @coroutine#2], emit: 4
[main @coroutine#1], collect: 4
buffer

If you allow flow Different parts of the are executed using different coroutines , It will help to realize parallel operation and improve performance . This uses the concept of buffering . Typically , We can get emit Operation and collect Parallel operation . such emit Operation in collect Operation will not be blocked at the same time , But go on emit Next element and buffer the result into the cache , The next collection will read directly from the cache .

privatet fun myMethod(): Flow<Int> = flow {
    
	for(i in 1..4){
    
		delay(100) //  Simulation time operation 
		emit(i)
	}
}
fun main() = runBlocking {
    
  val time = measureTimeMillis {
    
    myMethod().collect{
     
      delay(200)
      println(it)
    }
  }
  println(time)
}

here , Emitting an element requires 0.1 second , Collecting an element takes time 0.2 second , therefore 4 The total time of the elements is 1.2 second , So the output is as follows :

1
2
3
4
1223

If buffer operation is used , It doesn't take so much time :

myMethod().buffer().collect{ 
      delay(200)
      println(it)
    }

The only change is in collect One was added before buffer() operation , In this way, the element will not wait for the collection to complete after it is emitted , Instead, the results are buffered directly into the cache . What is really collected is actually the elements in the cache . This will undoubtedly speed up the entire launch and collection process :

1
2
3
4
1003

The first collection took place in 0.3 second (0.1 Second emission +0.2 Second collection ), The second collection occurs at 0.5 second , The third collection took place in 0.7 second , The fourth collection took place in 0.9 second . Add some build 、 Time to destroy the buffer , The approximate total time is 1 About seconds .

Buffering and flowOn There is a certain relationship . Essentially flowOn The operation needs to be changed CoroutineDispatcher The same buffering mechanism will be used in the , For example, above flowOn Example .

Flow The combination of

Put two flow The contents of are merged into one flow.

fun main()=runBlocking<Unit> {
    
	val nums = (1..5).asFlow()
  val strings = flowOf("a","b","c","d","e")
  // zip -  Merge operation 
  nums.zip(strings){
    
    a, b -> "$a$b"    
  }.collect{
     println(it) }
}

zip The operation takes out the elements in the first stream and the second stream in sequence , Then according to the second parameter lambda To deal with , And then launch . The output is as follows :

1a
2b
3c
4d
5e
Flattening operation

Will contain f low Of flow ( similar Flow<Flow) Convert to not containing flow Of flow(Flow) operation ( It is similar to converting a two-dimensional array into a one-dimensional array ).

private fun myMethod(i: Int): Flow<String> = flow {
    
	emit("$i: First")
	delay(500)
	emit("$i: Second")
}
fun main() = runBlocking<Unit> {
    
  val startTime = System.currentTimeMillis()
  (1..3).asFlow().onEach{
     // 1.  Traversing elements 
  	delay(100)
  }.flatMapConcat{
     // 2.  Strike a level 
    myMethod(it)
  }.collect {
     // 3.  End 
    println("$it: ${
      System.currentTimeMillis()-startTime} ms")
  }
}

myMethod Method

The output is as follows :

1: 144 ms
1: 646 ms
2: 751 ms
2: 1256 ms
3: 1360 ms
3: 1865 ms
  1. take 1…3 Convert to stream , In each element (1…3) On delay 0.1 second .

  2. take 3 Elements (1…3) Convert to 3 A flow ,myMethod Method produces two streams of the same elements each time (emit two ). such 1…3 Will become :

    [1,1],[2,2],[3,3]

    And since there is one between each stream delay 0.1 second , So when printing out , The interval between two identical numbers 0.5 second , And the interval between two groups 0.1 second .

  3. Besides flatMapConcat Will flatten it from a two-dimensional flow to a one-dimensional flow , So this stream will become 6 Individual elements :

    [1,1,2,2,3,3]

Flow It's abnormal

It can be done to flow The exception of the traditional try…catch Handle :

private fun myMethod():Flow<Int> = flow {
    
  for(i in 1..3) {
    
    println("emit: $i")
    emit(i)
  }
}
fun main() = runBlocking<Unit> {
    
  try {
    
    myMethod().collect {
    
      println(it)
      check(it <= 1){
    // check  The function checks the first argument , When true  Execute the second parameter (lambda), Otherwise throw a  IllegalStateException, meanwhile  error message  When lambda  The expression returns  Any  type .
        "collect $it"
      }
    }
  }catch(e: Throwable) {
    
    println("Caught $e")
  }
}

Execute the above code , Output :

emit: 1
1
emit: 2
2
Caught java.lang.IllegalStateException: collect  2

When the second element is 2 when ,check Fail to judge , Throw an exception , By try…catch Captured .

In the example above , The exception occurs during the collection phase . but try…catch You can also capture flow Launch phase and intermediate operation phase .

private fun myMethod():Flow<String> = flow {
    
	for(i in 1..3) {
    
    println("emit: $i")
    emit(i)
  }.map{
     value ->
  	check(value <= 1) {
     "crash on $value" }
    "string $value"
  }
}
fun main()=runBlocking<Unit> {
    
  try{
    
    myMethod().collect{
     println(it) }
  }catch(e: Throwable){
    
    println("caught $e")
  }
}

Run the program , Output :

emit: 1
string 1
emit: 2
caught java.lang.IllegalStateException: crash on 2

When element Greater than 1 when , Exception caught , also crash on 2 Saved in an abnormal message among .

complete

When flow After execution , You can add an additional completion action . This action can be imperative , It can also be in a declarative way .

private fun myMethod(): Flow<Int> = (1..10).asFlow()

fun main()=runBlocking<Unit> {
    
	try {
    
		myMethod().collect {
     println(it) }
	}finally {
    
		println("finally")
	}
}

finally The code in the block will eventually be executed , No matter what flow Whether the execution is completed normally or an exception is thrown , This is imperative , Take advantage of try…finally sentence :

1
2
...
10
finally

Declarative is more flexible , It is onCompletion Intermediate operation , It's in collect or Cancel After performing :

private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {
    
	myMethod().onCompletion{
     println("onCompletion") }
  .collect {
     println(it) }
}

The output is the same as before . It is worth noting that ,onCompletion Of action The parameter is a parameter lambda expression , The type of this parameter is nullable Throwable. You can use this parameter to get the thrown exception .

private fun myMethod(): Flow<Int> = flow {
    
	emit(1)
	throw RuntimeException() //  Throw an exception 
}
fun main() = runBlocking {
    
  myMethod().onCompletion {
     e -> if(e!=null) println("Flow stopped with an Exception.")}
  .catch {
     e -> println("Caught an Exception") }
  .collect {
     println(it) }
}

among ,catch Operation is also an intermediate operation , Used to capture flow The abnormal .

Print the results :

1
Flow stopped with an Exception.
Caught an Exception

But there is one caveat ,onCompletion Only from flow Upstream anomaly , But the downstream exception cannot be caught .

private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {
    
	myMethod().onCompletion{
     e -> println("Flow stopped with an Exception $e.")}
  .collect {
     value -> 
     check{
    value <= 1} {
     "collect $value" }
			println(value)
	}
}

The output is as follows :

1
Flow stopped with an Exception null.
Exception in thread "main" java.lang.IllegalStateException: Collect 2
...

here ,onCompletion hold e Print as null, This indicates that it did not catch this exception , because onCompletion It's just an intermediate operation , and collect Is to terminate the operation , Look at it this way collect yes onCompletion Downstream operation of , therefore collect The exception in cannot be onCompletion Capture .

Cancel

flow The cancellation of is actually realized through the cancellation of the collaboration process , There is no such thing as a cancel operation . For example, we cancel one cellect operation , Premise is flow Itself in a cancelable pending function ( Such as delay) Is suspended :

private fun myMethod(): Flow<Int> = flow {
    
  for (i int 1..4) {
    
    delay(100)
    println("Emit: $i")
    emit(i)
  }
}
fun main() = runBlocking<Unit> {
    
  withTimeoutOrNull(280) {
    //  Set timeout  280  millisecond 
  	myMethod().collect {
     println(it) }
  }
  println("Done.")
}

The console output is as follows :

Emit: 1
1
Emit: 2
2
Done.

The first 3、4 The second cycle was canceled due to timeout . here ,withTimeoutOrNull Is a cancelable suspend function .

原网站

版权声明
本文为[the Summer Palace]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206110522399808.html