当前位置:网站首页>Deep dive kotlin synergy (XXII): flow treatment

Deep dive kotlin synergy (XXII): flow treatment

2022-07-08 00:22:00 RikkaTheWorld

Series eBook : Portal


We will Flow Described as a pipe , Values flow above , When flowing , These values can be changed in different ways : Delete 、 Multiply 、 Convert or merge . These in Flow All operations created to the terminal operation are called Flow To deal with . In this chapter , We will learn about the functions used for this purpose .

The functions provided here may remind you of the functions used to process collections , It's not a coincidence , Because they all represent the same concept . The difference is flow The elements on can be delivered on time .

map

The first important function we need to learn is map. It transforms each element of the flow according to the transformation function . Suppose you have an integer data stream , Yours map The operation is to square these numbers , Then the data on the final stream will be the square of these numbers .

suspend fun main() {
    
    flowOf(1, 2, 3) // [1, 2, 3]
        .map {
     it * it } // [1, 4, 9]
        .collect {
     print(it) } // 149
}

··· picture ···

after , I will use the image shown above to visualize flow How the handler changes elements over time . Horizontal lines indicate time , The element on this line is that at a certain point in time flow Elements emitted in . The upper line represents the data flow before the operation , The following line represents the data flow after the operation . This diagram can also be used to represent the use of multiple operations , Like in the picture below map and filter.

··· picture ···

majority Flow Processing function , It's easy to realize through the tools we learned in the previous chapter . To achieve map, We can use flow Builder to create a new flow. then , We can use the former flow Collect elements in , And emit transformed elements . The following implementation is from kotlin.coroutines In the library map A simplified version of :

fun <T, R> Flow<T>.map(
    transform: suspend (value: T) -> R
): Flow<R> = flow {
     //  Here we create a  flow
    collect {
     value -> //  Here we collect data from the receiver 
        emit(transform(value))
    }
}

map Is a very popular function . Its usage includes unpacking or converting values to different types .

//  Here we use  map  To get user actions from input events 
fun actionsFlow(): Flow<UserAction> =
    observeInputEvents()
        .map {
     toAction(it.code) }

//  Here we put  User  Turn it into  UserJson
fun getAllUser(): Flow<UserJson> =
    userRepository.getAllUsers()
        .map {
     it.toUserJson() }

filter

The next important function is filter, It returns a containing only the original flow Which matches the data of the given predicate flow.

suspend fun main() {
    
    (1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        .filter {
     it <= 5 } // [1, 2, 3, 4, 5]
        .filter {
     isEven(it) } // [2, 4]
        .collect {
     print(it) } // 24
}

fun isEven(num: Int): Boolean = num % 2 == 0

··· picture ···

Use flow The builder can also implement this function very easily . We just need to introduce one if Statements and predicates ( Instead of conversion ).

fun <T> Flow<T>.filter(
    predicate: suspend (T) -> Boolean
): Flow<T> = flow {
     //  Here we create a stream 
    collect {
     value -> //  Here we receive elements from the receiver 
        if (predicate(value)) {
    
            emit(value)
        }
    }
}

filter Usually used to exclude elements we don't need .

//  Here we use  filter  To filter out elements we don't want 
fun actionsFlow(): Flow<UserAction> =
    observeInputEvents()
        .filter {
     isValidAction(it.code) }
        .map {
     toAction(it.code) }

take and drop

We use take To pass a certain number of elements .

suspend fun main() {
    
    ('A'..'Z').asFlow()
        .take(5) // [A, B, C, D, E]
        .collect {
     print(it) } // ABCDE
}

··· picture ···

We use drop To ignore a specific number of elements .

suspend fun main() {
    
    ('A'..'Z').asFlow()
        .drop(20) // [U, V, W, X, Y, Z]
        .collect {
     print(it) } // UVWXYZ
}

How do they work at the lower level ?

We have seen quite a lot flow Processing and lifecycle functions . Their implementation is very simple , There is nothing magical in it . Most of these functions can be used flow Builder , And use collect To achieve . Here is a simple example of stream processing , Including simplified map and flowOf Realization :

suspend fun main() {
    
    flowOf('a', 'b')
        .map {
     it.uppercase() }
        .collect {
     print(it) } // AB
}

fun <T, R> Flow<T>.map(
    transform: suspend (value: T) -> R
): Flow<R> = flow {
    
    collect {
     value ->
        emit(transform(value))
    }
}

fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    
    for (element in elements) {
    
        emit(element)
    }
}

If inline flowOf and map function , You will get the following code ( I am here lambdas Added labels and notes with numbers ).

suspend fun main() {
    
    flow [email protected]{
     // 1
        flow [email protected]{
     // 2
            for (element in arrayOf('a', 'b')) {
     // 3
                this@flowOf.emit(element) // 4
            }
        }.collect {
     value -> // 5
            this@map.emit(value.uppercase()) // 6
        }
    }.collect {
     // 7
        print(it) // 8
    }
}

Let's analyze step by step .

  1. We're commenting 1 Start a flow, And in the comments 7 Its... Was called at collect function
  2. When we start the collection , We will call @map( notes 1) Of lambda expression , It's commenting 2 Started another flow Builder , And in the comments 5 To collect .
  3. So at this time , We start @flowOn( notes 2 It's about ) Of lambda expression , It iterates to include ‘a’ and ‘b’ Array of .
  4. The first value is ‘a’ In the comments 4 Launched , Launched to comment 5 It's about , The lambda The expression converts the value to ‘A’ , And from @map Of collect Function , And call to 7 Situated lambda expression , The value is printed .
  5. notes 7 It's over , Will be commenting on 6 Recovery at , Because notes 6 Nothing else , So it's over , So we are commenting 4 Place recovered @flowOf.
  6. We continue to iterate and comment 4 Launch at ‘b’ , So we call annotations 5 Situated lambda function , Will value Convert into ‘B’, And from @map Of collect Function . The value will be in the comment 7 Be collected , And then in the comments 8 Is printed .
  7. Then comment 7 end , Revert to comments 6 It's about , It's done . So we are commenting 4 Recovery at @flowOf . It's done, too . Because there is no more content , We arrived at @map Ending , such , We're commenting 7 recovery , And arrived main End of function .

In most flow Processing and lifecycle functions will be the same , So understanding this can make us better understand flow How it works .

merge, zip and combine

Next, learn to combine the two flows into one flow . There are several ways to do this . The simplest way is to combine the elements in two streams into one . No matter what flow Where does the element of come from , There is no need to make any changes , So , We can use merge function .

suspend fun main() {
    
    val ints: Flow<Int> = flowOf(1, 2, 3)
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
    
    val together: Flow<Number> = merge(ints, doubles)
    print(together.toList())
    // [1, 0.1, 0.2, 0.3, 2, 3]
    //  perhaps  [1, 0.1, 0.2, 0.3, 2, 3]
    //  perhaps  [0.1, 1, 2, 3, 0.2, 0.3]
    //  Or other combinations 
}

··· picture ···

The important point is , When we use merge when , Elements in one stream do not need to wait for elements in another stream . for example , In the following example , From the first flow The element of is delayed , But this does not affect the second flow The elements of .

suspend fun main() {
    
    val ints: Flow<Int> = flowOf(1, 2, 3)
        .onEach {
     delay(1000) }
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)

    val together: Flow<Number> = merge(ints, doubles)
    together.collect {
     println(it) }
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3

When multiple event sources have the same operation , We can use merge.

fun listenForMessages() {
    
    merge(userSentMessages, messagesNotifications)
        .onEach {
     displayMessage(it) }
        .launchIn(scope)
}

The next function is zip, It's for two flow Pairing . We need to define a function to determine how elements are paired ( After pairing, the result will be sent to the new flow in ). Each element can only be a one-to-one pair , So it needs to wait for the one that matches it . Elements that are not paired will be lost , therefore , When one flow When the compression of is completed , new flow Will also complete ( Other flow The same is true ).

suspend fun main() {
    
    val flow1 = flowOf("A", "B", "C")
        .onEach {
     delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach {
     delay(1000) }
    flow1.zip(flow2) {
     f1, f2 -> "${
      f1}_${
      f2}" }
        .collect {
     println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3

··· picture ···

zip Function reminds me of the traditional Polish dance —— Borneo dance . One of the characteristics of this dance is , A row of pairs separated from the middle , Then when they meet again, they will regroup .

···· picture ···

Combine two flow The last important function of is combine. It's like zip equally , It also forms pairs from elements , fast flow The generated element must wait for the slower one flow Generated elements to produce the first pair . However , The similarities with Polish dance stop here . Use combine when , Each new element replaces its previous element . If the first pair has been formed , A new element will come from another flow The previous element of generates a new pair together .

··· picture ···

Be careful , zip Need to pair , So when the first flow closed ,zip It's closed. . and combine There are no such restrictions , Until two flow Before closing , It will always transmit data .

suspend fun main() {
    
    val flow1 = flowOf("A", "B", "C")
        .onEach {
     delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach {
     delay(1000) }
    flow1.combine(flow2) {
     f1, f2 -> "${
      f1}_${
      f2}" }
        .collect {
     println(it) }
}
// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4

When we need to observe the changes of two sources , You usually use combine. We can also give each combination flow Add initial value ( To get the initial pair ).

userUpdateFlow.onStart {
     emit(currentUser) }

A typical scenario is when a view has two data sources . for example , When a notification depends on both the current state of the user and some notifications , We can observe them at the same time and update the view with their changes .

userStateFlow
    .combine(notificationsFlow) {
     userState, notifications ->
        updateNotificationBadge(userState, notifications)
    }
    .collect()

fold and scan

If you have used set handlers , You may know fold( Fold ). For each element ( Start from initial value ) Apply the operation of merging two values into one value , Merge all values in the set into one .

for example , If the initial value is 0, The operation is addition , Then the result is the sum of all the numbers : We first take the initial value 0, then , Let's put the first element 1 Add ; In the result 1 On , We add the second number 2; For the result 3, We add the third number 3; For the result 6, Let's add the last number 4, The result of this operation is 10, That is to say fold The result returned .

fun main() {
    
    val list = listOf(1, 2, 3, 4)
    val res = list.fold(0) {
     acc, i -> acc + i }
    println(res) // 10
    val res2 = list.fold(1) {
     acc, i -> acc * i }
    println(res2) // 24
}

··· picture ···

fold Is a terminal operation , It can also be used for flow, But it will hang until flow complete ( It's like collect) equally .

suspend fun main() {
    
    val list = flowOf(1, 2, 3, 4)
        .onEach {
     delay(1000) }

    val res = list.fold(0) {
     acc, i -> acc + i }
    println(res)
}
// (4 sec)
// 10

There is an alternative fold The function is scan. It is an intermediate operation that can produce all intermediate values :

fun main() {
    
    val list = listOf(1, 2, 3, 4)
    val res = list.scan(0) {
     acc, i -> acc + i }
    println(res) // [0, 1, 3, 6, 10]
}

··· picture ···

scan about flow It's very useful , Because it can receive the value generated in the previous step , Generate a new value immediately .

suspend fun main() {
    
    flowOf(1, 2, 3, 4)
        .onEach {
     delay(1000) }
        .scan(0) {
     acc, v -> acc + v }
        .collect {
     println(it) }
}
// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10

··· picture ···

We can use flow Builders and collect Make it easy scan. We first emit the initial value , Then the cumulative result of the next value is issued for each new element .

fun <T, R> Flow<T>.scan(
    initial: R,
    operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
    
    var accumulator: R = initial
    emit(accumulator)
    collect {
     value ->
        accumulator = operation(accumulator, value)
        emit(accumulator)
    }
}

scan A typical usage scenario is when we need to update or change flow , Or we need an object , When this object is the result of these changes .

val userStateFlow: Flow<User> = userChangesFlow
    .scan(user) {
     acc, change -> user.withChange(change) }

val messagesListFlow: Flow<List<Message>> = messagesFlow
    .scan(messages) {
     acc, message -> acc + message }

flatMapConcat, flatMapMerge and flatMapLatest

Another well-known set handler is flatMap, It is similar to map, But this conversion function finally returns a flattened set . for example , You have a list of departments , Each department has an employee list , Then you can use flatMap To list all employees in all departments .

val allEmployees: List<Employee> = departments
    .flatMap {
     department -> department.employees }
    
val listOfListsOfEmployee: List<List<Employee>> = departments
    .map {
     department -> department.employees }

flatMap How is it applied to flow What about it? ? We can expect to return one flow, And then the flow It should be flat . The problem is that flowing elements can spread over time , that , Whether an element on one stream should wait for an element on another stream , Or should we deal with them at the same time ? Because there is no clear answer , therefore Flow No, flatMap function , But there is flatMapConcatflatMapMerge and flatMapLatest.

flatMapConcat Function in turn processes the generated flow. therefore , the second flow Can be in the first flow Start when finished . In the following example , We use characters “A” “B” “C” To create a flow, They produce new flow Will include these characters and numbers , There is 1s Delay of .

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach {
     delay(1000) }
    .map {
     "${
      it}_${
      elem} " }

suspend fun main() {
    
    flowOf("A", "B", "C")
        .flatMapConcat {
     flowFrom(it) }
        .collect {
     println(it) }
}
// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

··· picture ···

The second function flatMapMerge It's the most intuitive for me , It also handles flow Generated value .

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach {
     delay(1000) }
    .map {
     "${
      it}_${
      elem} " }

suspend fun main() {
    
    flowOf("A", "B", "C")
        .flatMapMerge {
     flowFrom(it) }
        .collect {
     println(it) }
}
// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C

··· picture ···
have access to concurrently Parameters to set processing flow The number of concurrency of . The default value for this parameter is 16, But you can JVM Attribute DEFAULT_CONCURRENCY_PROPERTY_NAME To change it . Note this default limit , Because if you are in a that contains many elements flow Upper use flatMapMerge when , At the same time, it will only be processed at the same time 16 Elements .

suspend fun main() {
    
    flowOf("A", "B", "C")
        .flatMapMerge(concurrency = 2) {
     flowFrom(it) }
        .collect {
     println(it) }
}
// (1 sec)
// 1_A
// 1_B
// (1 sec)
// 2_A
// 2_B
// (1 sec)
// 3_A
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

flatMapMerge The typical usage scenario of is when we need to flow When each element in requests data . for example , We have a list of categories , You need to request a quote for each category . You already know that you can pass async Function to achieve this . While using flatMapMerge Flow has two advantages :

  • We can control concurrency parameters , To decide how many categories we want to take at the same time ( To avoid sending hundreds of requests at the same time )
  • We can go back to Flow And send them when the next element arrives ( therefore , At the function end , These values can be processed immediately )
suspend fun getOffers(
    categories: List<Category>
): List<Offer> = coroutineScope {
    
    categories
        .map {
     async {
     api.requestOffers(it) } }
        .flatMap {
     it.await() }
}

//  Better solution 
suspend fun getOffers(
    categories: List<Category>
): Flow<Offer> = categories
    .asFlow()
    .flatMapMerge(concurrency = 20) {
    
        suspend {
     api.requestOffers(it) }.asFlow()
        //  perhaps  flow { emit(api.requestOffers(it)) }
    }

The last function is flatMapLatest. Once new flow appear , It will forget the previous flow. For each new value , Previous stream processing will be forgotten . therefore , If “A”、“B”、“C” There is no delay between , Then you will only see “1_C”,“2_C”、“3_C”.

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach {
     delay(1000) }
    .map {
     "${
      it}_${
      elem} " }
    
suspend fun main() {
    
    flowOf("A", "B", "C")
        .flatMapLatest {
     flowFrom(it) }
        .collect {
     println(it) }
}
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

··· picture ···

When initial flow When the element in has a delay , Things become interesting . In the following example ,(1.2s after )“A” Started its flow, That is to say flowFrom . The flow stay 1s An element is generated in “1_A”, however 1200ms And then there was “B”, Previous flow Closed and forgotten . When “C” New flow when ,“B” Successfully produced “1_B”. This function will eventually generate elements “1_C”、“2_C”、“3_C”, There is 1s Delay of .

suspend fun main() {
    
    flowOf("A", "B", "C")
        .onEach {
     delay(1200) }
        .flatMapLatest {
     flowFrom(it) }
        .collect {
     println(it) }
}
// (2.2 sec)
// 1_A
// (1.2 sec)
// 1_B
// (1.2 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

··· picture ···

Terminal operation

Last , We have an end flow Handling operations . These are called terminal operations . up to now , We only used collect. But there are other methods similar to those provided by sets and sequences :count( Calculation flow Number of elements in )、first and firstOrNull( obtain flow The first element emitted from )、fold and reduce( Accumulate elements into an object ). Terminal operation will be suspended , And in flow Return value when finished .

suspend fun main() {
    
    val flow = flowOf(1, 2, 3, 4) // [1, 2, 3, 4]
        .map {
     it * it } // [1, 4, 9, 16]

    println(flow.first()) // 1
    println(flow.count()) // 4
    
    println(flow.reduce {
     acc, value -> acc * value }) // 576
    println(flow.fold(0) {
     acc, value -> acc + value }) // 30
}

at present , flow Although there are not many terminal operations , But if you need some different operations , You can always achieve it yourself . For example, the following implementation of integer flow sum

suspend fun Flow<Int>.sum(): Int {
    
    var sum = 0
    collect {
     value ->
        sum += value
    }
    return sum
}

Similarly , Only for collect Method can realize almost any terminal operation .

summary

There are many tools to support flow Handle . It's a good thing to know something about them , Because they are on the back end and Android It is very useful in development . in addition , If you need some different functions , Can pass collect Methods and flow Builders can easily implement them .

原网站

版权声明
本文为[RikkaTheWorld]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/189/202207072217104168.html