当前位置:网站首页>Kotlin data flow - flow

Kotlin data flow - flow

2022-06-13 06:26:00 m0_ forty-seven million nine hundred and fourteen thousand one

One .Flow The characteristics and use of

1. Use Flow Return multiple values

1. be known as flow Of Flow Type builder function .
2.flow{…} The code in the building block can be suspended .
3. function simpleFlow No longer marked suspend Modifier .
4. Use emit Function emission value .
5. Use collect Function to collect values .

 // adopt flow Return multiple values      flow Functions can be suspended 
    fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            delay(1000)// Hang up 
            emit(i)// Emission produces an element 
        }
    }

    // Here we write an asynchronism with a coroutine 
    fun testMultipleValue3() = runBlocking {
        simpleFlow().collect { value ->
            print(value)
        }
    }

Result demonstration :

2.Flow Cold flow

 flow Is a kind of similar to sequence Cold flow ,flow The code in the builder does not run until the stream is sent to the phone .

 fun simpleFlow02() = flow<Int> {
        println("Flow started")
        for (i in 1..3) {
            delay(1000)
            emit(i)
        }
    }

    // Cold flow     Only a call collect When Flow To collect elements 
    fun testFlowIsCold() = runBlocking {
        val simpleFlow02 = simpleFlow02()
        println("Calling collect...")
        simpleFlow02.collect { value ->
            println(value)
        }
        println("Calling collect again...")
        simpleFlow02.collect { value ->
            println(value)
        }
    }

Print the results :

3.Flow The continuity of

1. Each individual collection of streams is performed sequentially , Unless you use a special operator .
2. Each transition operator from upstream to downstream processes each emitted value , Then give it to the end operator .

 //Flow The continuity of ( call chaining )
    fun testFlowContinuation() = runBlocking {
        //Flow Builder for 
        (1..5).asFlow().filter {
            it % 2 == 0
        }.map {
            "string  $it"  // Transformation type 
        }.collect {
            println("Collect $it")
        }
    }

Result demonstration :

4.Flow Builder for

1.fIowOf The builder defines a stream that emits a fixed set of values .
2. Use .asFlow() spread function , Various sets and sequences can be converted into streams .

    //Flow Builder for  flowOf  asFlow
    fun testFlowBuilder() = runBlocking {
        flowOf("one", "two")
    //onEach  Back to a stream , The flow invokes the given operation before each value of the upstream flow is issued downstream .
            .onEach {
                delay(1000)
            }.collect {
                println(it)
            }
        listOf(1,2,3).asFlow().collect { value -> println(value) }
    }

5.flow The context of

1. The collection of flows always takes place in the context of calling a coroutine , This property of the stream is called context saving .
2.flow{...} The code in the builder must follow the context to save properties , And emission from other contexts is not allowed (emit).
3.fIowOn The operator , This function is used to change the context of stream emission .

 // change flow The context of 
    fun simpleFlow05() = flow<Int> {

        println("Flow started ${Thread.currentThread().name}")
        for (i in 1..5) {
            emit(i)
        }
    }.flowOn(Dispatchers.Default)


    fun testFlowChangeContext() = runBlocking {
        simpleFlow05().collect { value ->
            println(
                "Collected $value ${
                    Thread.currentThread().name
                }"
            )
        }
        println("")
    }

Print the results :

Flow started DefaultDispatcher-worker-1
Collected 1 main
Collected 2 main
Collected 3 main
Collected 4 main
Collected 5 main

6.Flow The coordination process uses

Use launchIn Replace collect We can start the collection of flows in a separate process

 //flow event 
    fun events() = (1..3).asFlow().onEach {
        delay(100)
    }

    //flow The flow is used in conjunction with the process 
    fun testFlowLaunch() = runBlocking {
        var job = events().onEach { event ->
            println("Event:  $event ${Thread.currentThread().name}")
        }.launchIn(
            CoroutineScope(Dispatchers.IO)
        )
        delay(1000)
        job.cancelAndJoin()
    }

Print the results :

Event:  1 DefaultDispatcher-worker-1
Event:  2 DefaultDispatcher-worker-1
Event:  3 DefaultDispatcher-worker-1

7.Flow Cancel

The flow adopts the same cooperative cancellation as the collaboration process . As usual , The collection of streams can be when the stream is in a cancelable suspend function ( for example delay) Cancel when suspended in .

1. Time out cancellation

 fun simpleFlow06() = flow<Int> {
        for (i in 1..5) {
            emit(i)
            delay(1000)
            println("Emitting $i")
        }
    }
    // Time out cancellation 
    fun testCancelFlow() = runBlocking {
        withTimeoutOrNull(2500) {
            simpleFlow06().collect { value ->
                println(value)
            }
            println("Done")
        }
    }

Result demonstration :

  This stream is in 2.5 Seconds was canceled

2. Judge whether the condition is cancelled

 fun simpleFlow06() = flow<Int> {
        for (i in 1..5) {
            emit(i)
            delay(1000)
            println("Emitting $i")
        }
    }

 fun testCancelFlowCheck() = runBlocking {
        // This is the case because the elements all pass through emit Function can be used cancel Cancel 
        /*  simpleFlow06().collect {
            value ->
            print(value)
            if (value==3) cancel()
        }
    }*/
       // Another situation 
        //cancellable Definitely want to cancel , Each element collected will determine whether the cancellation condition is met 
        (1..5).asFlow().cancellable().collect { value ->
            println(value)
            if (value == 3) cancel()
        }

    }

Effect demonstration :

8. Handle Flow Back pressure

1.buffer(), Code that emits elements in a concurrent run stream .
2.conflate(), Merge launch items , Do not process each value .
3.collectLatest(), Cancel and retransmit the last value .
4. When you have to change CoroutineDispatcher when , To flowOn The operator uses the same buffering mechanism , however buffer The function explicitly requests a buffer and Do not change the execution context .

fun simpleFlow07() = flow<Int> {
        for (i in 1..5) {
            delay(100)
            emit(i)
            println("Emitting $i")
        }
    }
        fun testBackPressure()= runBlocking {
            // Because the time of occurrence here is faster than that of collection , But only after collection can it be considered as the end , So the time required for launch and collection adds up 
            val measureTimeMillis = measureTimeMillis {
                simpleFlow07()
                    // Switch threads due to this method and simpleFlow07 Functions are not in the same thread , Therefore, the launch can be carried out in parallel, and because of its own buffer , Here, the main thread will collect the emitted elements in order 
                    //.flowOn(Dispatchers.Default)
                    //.buffer(50)// The cache can run the code in the stream concurrently 
                    //.conflate()// Emit element merge , Note that this will lack elements 
                    //.collectLatest {}// This will collect the last one 
                    .collect { value ->
                    delay(300)
                    println(value)
                }
            }
            println(measureTimeMillis)
        }

Two .Flow The operator

1. Conversion operators

 suspend fun performRequest(request: Int):String{
        delay(1000)
        return "response $request"
    }
    // Modify the type operator 
    fun testTransformFlowOperator()= runBlocking {
        // Every time you change the type 
       /* (1..3).asFlow()
            .map { value ->
                performRequest(value)
            }.collect {
                println(it)
            }*/
        
        (1..3).asFlow()
        //transform() Each element is modified multiple times 
            .transform { value ->
              emit("Making request $value")
              emit(performRequest(value))
            }.collect {
                println(it)
            }
    }

Print the results :

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

2. The length limit operator

 fun numbers()= flow<Int> {
            try {
                emit(1)
                emit(2)
                println("This line will not execute")
                emit(3)
            }finally {
                println("Finally in numbers")
            }
        }
            
        fun testLimitLengthOperator()= runBlocking {
                    // The length limit operator  take():  Limit output length 
            numbers().take(2).collect {value ->  println(value) }
        }

Print the results :

1
2
Finally in numbers

3. End operator


    fun testTerminalOperator()= runBlocking {
        //map  Returns the result of a function operation 
        //reduce  Accumulate values from the first element , And apply the operation... To the current accumulator value and to each element 
        val sum=(1..5).asFlow().map { it*it }.reduce{
            // Here is the 1 To 5 Sum of squares 
            a,b->a+b
        }
        // When there is only one element single
        val single = flowOf(1).single()
        // Get first element 
        val first = flowOf(1..6).first()
        // First set the elements of the operation , And finally with fold Parameter operation of 
        val sum1=(1..5).asFlow().fold(3){
            a,b->a+b
        }
        
        println(sum)
        println(sum1)
    }

Print the results :

55
1
1..6
18

4. Combination operator

   //zip() Merge operators 
    fun testZip()= runBlocking {
        val asFlow = (1..3).asFlow().onEach { delay(300) }
        val flowOf = flowOf("one", "Two", "Three").onEach { delay(400) }
        val currentTimeMillis = System.currentTimeMillis()
       // The time required for merging is printed out here , Prove that the whole process is asynchronous (400m)
        asFlow.zip(flowOf){a,b->"$a   $b"}.collect { value -> println("$value at ${System.currentTimeMillis()-currentTimeMillis}") }
    }
    fun requestFlow(i:Int)= flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")
    }

Results the print :

1   one at 435
2   Two at 844
3   Three at 1255

5. Advection flow

A stream represents a sequence of values received asynchronously , So it's easy to encounter such a situation : Each value triggers a request for another sequence of values , However , Because of the asynchronous nature of flow , Therefore, different flattening modes are required , So , There are a series of flow flattening operators :

1.flatMapConcat Connection mode .

2.flatMapMerge Merge mode

3.flatMapLatest Latest flattening mode

// When one flow It's wrapped in another flow such as flow<flow<String>>, I need to use advection to turn into a flow
    fun estFlatMapConcat()= runBlocking {
        val  startTime=System.currentTimeMillis()
        (1..3).asFlow().onEach { delay(100) }
             
            //flatMapLatest{}
            //flatMapMerge The previous effect is similar to flatMapConcat The same order is different 
           // .flatMapMerge {  }
            .flatMapConcat{
            // This call map Words requestFlow The return value is flow<flow<String>>, So call flatMapConcat I will untie one layer by myself   Print the results        
            requestFlow(it)
        }.collect {
            value ->  println("$value at ${System.currentTimeMillis()-startTime}")
        }
    }

flatMapLatest Print the results :

1: First at 149
2: First at 266
3: First at 370
3: Second at 882

flatMapMerge Print the results :

//1: First at 143
//2: First at 240
//3: First at 347
//1: Second at 655
//2: Second at 748
//3: Second at 860

flatMapConcat Print the results :

1: First at 129
1: Second at 641
2: First at 749
2: Second at 1257
3: First at 1367
3: Second at 1874

3、 ... and .Flow abnormal

1.Flow Handling exceptions

1. An exception occurred while receiving

 fun simpleFlow08()= flow<Int> {
        for (i in 1..4){
            println("Emitting $i")
            emit(i)
        }
    }

    fun testFlowException()= runBlocking {
        try {
            simpleFlow08().collect {value ->
                println(value)
                check(value <=1 ){
                    "Collected $value"
                }
            }
        }catch (e:Throwable){
            println("Caught $e")
        }
    }

Print the results :

Emitting 1
1
Emitting 2
2

2. An exception occurred during the launch

fun testFlowException2()= runBlocking<Unit> {
        flow {
            emit(1)
            throw ArithmeticException("Div 0")
            // There is an abnormality in the upstream , Under the guarantee of not breaking flow Where principles are involved, it is recommended to use catch function 
        }.catch { e:Throwable-> println("Caught $e")
        emit(10) // You can write like this 
        }
            .flowOn(Dispatchers.IO)
            .collect {
                println(it)
            }
    }

Print the results :

Caught java.lang.ArithmeticException: Div 0
1
10

2.Flow Handling of exceptions

1. Normal completion

 fun testCompleteInFinally()= runBlocking {
        try {
            simpleFlow().collect { println(it) }
        }finally {
            println("Done")
        }
    }

     fun simpleFlow() =flow<Int> {
        emit(3)
        delay(100)
         throw  ActivityNotFoundException()
    }

2. Abnormal death

 fun simpleFlow() = flow<Int> {
        emit(3)
        delay(100)
        throw  ActivityNotFoundException()
    }

    fun testCompleteInCompletion() = runBlocking {
         /*  simpleFlow().onCompletion { exception -> println("Flow completed $exception") }
            .catch { e -> println(e) }
            .collect {
                println(it)
                /*  check(
                      it<=1){"Collected $it"}*/
            }*/
        // When he dies abnormally, he will get abnormal information    It will also be executed after normal completion 
        simpleFlow().onCompletion { println("Done") }.collect { println(it) }
    }

Print information :

原网站

版权声明
本文为[m0_ forty-seven million nine hundred and fourteen thousand one ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202270555276862.html