当前位置:网站首页>Deep dive kotlin synergy (XXII): flow treatment
Deep dive kotlin synergy (XXII): flow treatment
2022-07-08 00:22:00 【RikkaTheWorld】
Series eBook : Portal
We will Flow Described as a pipe , Values flow above , When flowing , These values can be changed in different ways : Delete 、 Multiply 、 Convert or merge . These in Flow All operations created to the terminal operation are called Flow To deal with . In this chapter , We will learn about the functions used for this purpose .
The functions provided here may remind you of the functions used to process collections , It's not a coincidence , Because they all represent the same concept . The difference is flow The elements on can be delivered on time .
map
The first important function we need to learn is map. It transforms each element of the flow according to the transformation function . Suppose you have an integer data stream , Yours map The operation is to square these numbers , Then the data on the final stream will be the square of these numbers .
suspend fun main() {
flowOf(1, 2, 3) // [1, 2, 3]
.map {
it * it } // [1, 4, 9]
.collect {
print(it) } // 149
}

after , I will use the image shown above to visualize flow How the handler changes elements over time . Horizontal lines indicate time , The element on this line is that at a certain point in time flow Elements emitted in . The upper line represents the data flow before the operation , The following line represents the data flow after the operation . This diagram can also be used to represent the use of multiple operations , Like in the picture below map and filter.

majority Flow Processing function , It's easy to realize through the tools we learned in the previous chapter . To achieve map, We can use flow Builder to create a new flow. then , We can use the former flow Collect elements in , And emit transformed elements . The following implementation is from kotlin.coroutines In the library map A simplified version of :
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
// Here we create a flow
collect {
value -> // Here we collect data from the receiver
emit(transform(value))
}
}
map Is a very popular function . Its usage includes unpacking or converting values to different types .
// Here we use map To get user actions from input events
fun actionsFlow(): Flow<UserAction> =
observeInputEvents()
.map {
toAction(it.code) }
// Here we put User Turn it into UserJson
fun getAllUser(): Flow<UserJson> =
userRepository.getAllUsers()
.map {
it.toUserJson() }
filter
The next important function is filter, It returns a containing only the original flow Which matches the data of the given predicate flow.
suspend fun main() {
(1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter {
it <= 5 } // [1, 2, 3, 4, 5]
.filter {
isEven(it) } // [2, 4]
.collect {
print(it) } // 24
}
fun isEven(num: Int): Boolean = num % 2 == 0

Use flow The builder can also implement this function very easily . We just need to introduce one if Statements and predicates ( Instead of conversion ).
fun <T> Flow<T>.filter(
predicate: suspend (T) -> Boolean
): Flow<T> = flow {
// Here we create a stream
collect {
value -> // Here we receive elements from the receiver
if (predicate(value)) {
emit(value)
}
}
}
filter Usually used to exclude elements we don't need .
// Here we use filter To filter out elements we don't want
fun actionsFlow(): Flow<UserAction> =
observeInputEvents()
.filter {
isValidAction(it.code) }
.map {
toAction(it.code) }
take and drop
We use take To pass a certain number of elements .
suspend fun main() {
('A'..'Z').asFlow()
.take(5) // [A, B, C, D, E]
.collect {
print(it) } // ABCDE
}

We use drop To ignore a specific number of elements .
suspend fun main() {
('A'..'Z').asFlow()
.drop(20) // [U, V, W, X, Y, Z]
.collect {
print(it) } // UVWXYZ
}
How do they work at the lower level ?
We have seen quite a lot flow Processing and lifecycle functions . Their implementation is very simple , There is nothing magical in it . Most of these functions can be used flow Builder , And use collect To achieve . Here is a simple example of stream processing , Including simplified map and flowOf Realization :
suspend fun main() {
flowOf('a', 'b')
.map {
it.uppercase() }
.collect {
print(it) } // AB
}
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
collect {
value ->
emit(transform(value))
}
}
fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
If inline flowOf and map function , You will get the following code ( I am here lambdas Added labels and notes with numbers ).
suspend fun main() {
flow [email protected]{
// 1
flow [email protected]{
// 2
for (element in arrayOf('a', 'b')) {
// 3
this@flowOf.emit(element) // 4
}
}.collect {
value -> // 5
this@map.emit(value.uppercase()) // 6
}
}.collect {
// 7
print(it) // 8
}
}
Let's analyze step by step .
- We're commenting 1 Start a flow, And in the comments 7 Its... Was called at
collectfunction - When we start the collection , We will call
@map( notes 1) Of lambda expression , It's commenting 2 Started another flow Builder , And in the comments 5 To collect . - So at this time , We start
@flowOn( notes 2 It's about ) Of lambda expression , It iterates to include ‘a’ and ‘b’ Array of . - The first value is ‘a’ In the comments 4 Launched , Launched to comment 5 It's about , The lambda The expression converts the value to ‘A’ , And from @map Of
collectFunction , And call to 7 Situated lambda expression , The value is printed . - notes 7 It's over , Will be commenting on 6 Recovery at , Because notes 6 Nothing else , So it's over , So we are commenting 4 Place recovered
@flowOf. - We continue to iterate and comment 4 Launch at ‘b’ , So we call annotations 5 Situated lambda function , Will value Convert into ‘B’, And from
@mapOfcollectFunction . The value will be in the comment 7 Be collected , And then in the comments 8 Is printed . - Then comment 7 end , Revert to comments 6 It's about , It's done . So we are commenting 4 Recovery at
@flowOf. It's done, too . Because there is no more content , We arrived at@mapEnding , such , We're commenting 7 recovery , And arrived main End of function .
In most flow Processing and lifecycle functions will be the same , So understanding this can make us better understand flow How it works .
merge, zip and combine
Next, learn to combine the two flows into one flow . There are several ways to do this . The simplest way is to combine the elements in two streams into one . No matter what flow Where does the element of come from , There is no need to make any changes , So , We can use merge function .
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
// [1, 0.1, 0.2, 0.3, 2, 3]
// perhaps [1, 0.1, 0.2, 0.3, 2, 3]
// perhaps [0.1, 1, 2, 3, 0.2, 0.3]
// Or other combinations
}

The important point is , When we use merge when , Elements in one stream do not need to wait for elements in another stream . for example , In the following example , From the first flow The element of is delayed , But this does not affect the second flow The elements of .
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
.onEach {
delay(1000) }
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
together.collect {
println(it) }
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3
When multiple event sources have the same operation , We can use merge.
fun listenForMessages() {
merge(userSentMessages, messagesNotifications)
.onEach {
displayMessage(it) }
.launchIn(scope)
}
The next function is zip, It's for two flow Pairing . We need to define a function to determine how elements are paired ( After pairing, the result will be sent to the new flow in ). Each element can only be a one-to-one pair , So it needs to wait for the one that matches it . Elements that are not paired will be lost , therefore , When one flow When the compression of is completed , new flow Will also complete ( Other flow The same is true ).
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach {
delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
flow1.zip(flow2) {
f1, f2 -> "${
f1}_${
f2}" }
.collect {
println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3

zip Function reminds me of the traditional Polish dance —— Borneo dance . One of the characteristics of this dance is , A row of pairs separated from the middle , Then when they meet again, they will regroup .

Combine two flow The last important function of is combine. It's like zip equally , It also forms pairs from elements , fast flow The generated element must wait for the slower one flow Generated elements to produce the first pair . However , The similarities with Polish dance stop here . Use combine when , Each new element replaces its previous element . If the first pair has been formed , A new element will come from another flow The previous element of generates a new pair together .

Be careful , zip Need to pair , So when the first flow closed ,zip It's closed. . and combine There are no such restrictions , Until two flow Before closing , It will always transmit data .
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach {
delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
flow1.combine(flow2) {
f1, f2 -> "${
f1}_${
f2}" }
.collect {
println(it) }
}
// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4
When we need to observe the changes of two sources , You usually use combine. We can also give each combination flow Add initial value ( To get the initial pair ).
userUpdateFlow.onStart {
emit(currentUser) }
A typical scenario is when a view has two data sources . for example , When a notification depends on both the current state of the user and some notifications , We can observe them at the same time and update the view with their changes .
userStateFlow
.combine(notificationsFlow) {
userState, notifications ->
updateNotificationBadge(userState, notifications)
}
.collect()
fold and scan
If you have used set handlers , You may know fold( Fold ). For each element ( Start from initial value ) Apply the operation of merging two values into one value , Merge all values in the set into one .
for example , If the initial value is 0, The operation is addition , Then the result is the sum of all the numbers : We first take the initial value 0, then , Let's put the first element 1 Add ; In the result 1 On , We add the second number 2; For the result 3, We add the third number 3; For the result 6, Let's add the last number 4, The result of this operation is 10, That is to say fold The result returned .
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) {
acc, i -> acc + i }
println(res) // 10
val res2 = list.fold(1) {
acc, i -> acc * i }
println(res2) // 24
}

fold Is a terminal operation , It can also be used for flow, But it will hang until flow complete ( It's like collect) equally .
suspend fun main() {
val list = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
val res = list.fold(0) {
acc, i -> acc + i }
println(res)
}
// (4 sec)
// 10
There is an alternative fold The function is scan. It is an intermediate operation that can produce all intermediate values :
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.scan(0) {
acc, i -> acc + i }
println(res) // [0, 1, 3, 6, 10]
}

scan about flow It's very useful , Because it can receive the value generated in the previous step , Generate a new value immediately .
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
.scan(0) {
acc, v -> acc + v }
.collect {
println(it) }
}
// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10

We can use flow Builders and collect Make it easy scan. We first emit the initial value , Then the cumulative result of the next value is issued for each new element .
fun <T, R> Flow<T>.scan(
initial: R,
operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
var accumulator: R = initial
emit(accumulator)
collect {
value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
scan A typical usage scenario is when we need to update or change flow , Or we need an object , When this object is the result of these changes .
val userStateFlow: Flow<User> = userChangesFlow
.scan(user) {
acc, change -> user.withChange(change) }
val messagesListFlow: Flow<List<Message>> = messagesFlow
.scan(messages) {
acc, message -> acc + message }
flatMapConcat, flatMapMerge and flatMapLatest
Another well-known set handler is flatMap, It is similar to map, But this conversion function finally returns a flattened set . for example , You have a list of departments , Each department has an employee list , Then you can use flatMap To list all employees in all departments .
val allEmployees: List<Employee> = departments
.flatMap {
department -> department.employees }
val listOfListsOfEmployee: List<List<Employee>> = departments
.map {
department -> department.employees }
flatMap How is it applied to flow What about it? ? We can expect to return one flow, And then the flow It should be flat . The problem is that flowing elements can spread over time , that , Whether an element on one stream should wait for an element on another stream , Or should we deal with them at the same time ? Because there is no clear answer , therefore Flow No, flatMap function , But there is flatMapConcat、 flatMapMerge and flatMapLatest.
flatMapConcat Function in turn processes the generated flow. therefore , the second flow Can be in the first flow Start when finished . In the following example , We use characters “A” “B” “C” To create a flow, They produce new flow Will include these characters and numbers , There is 1s Delay of .
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

The second function flatMapMerge It's the most intuitive for me , It also handles flow Generated value .
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C

have access to concurrently Parameters to set processing flow The number of concurrency of . The default value for this parameter is 16, But you can JVM Attribute DEFAULT_CONCURRENCY_PROPERTY_NAME To change it . Note this default limit , Because if you are in a that contains many elements flow Upper use flatMapMerge when , At the same time, it will only be processed at the same time 16 Elements .
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge(concurrency = 2) {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// 1_B
// (1 sec)
// 2_A
// 2_B
// (1 sec)
// 3_A
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
flatMapMerge The typical usage scenario of is when we need to flow When each element in requests data . for example , We have a list of categories , You need to request a quote for each category . You already know that you can pass async Function to achieve this . While using flatMapMerge Flow has two advantages :
- We can control concurrency parameters , To decide how many categories we want to take at the same time ( To avoid sending hundreds of requests at the same time )
- We can go back to
FlowAnd send them when the next element arrives ( therefore , At the function end , These values can be processed immediately )
suspend fun getOffers(
categories: List<Category>
): List<Offer> = coroutineScope {
categories
.map {
async {
api.requestOffers(it) } }
.flatMap {
it.await() }
}
// Better solution
suspend fun getOffers(
categories: List<Category>
): Flow<Offer> = categories
.asFlow()
.flatMapMerge(concurrency = 20) {
suspend {
api.requestOffers(it) }.asFlow()
// perhaps flow { emit(api.requestOffers(it)) }
}
The last function is flatMapLatest. Once new flow appear , It will forget the previous flow. For each new value , Previous stream processing will be forgotten . therefore , If “A”、“B”、“C” There is no delay between , Then you will only see “1_C”,“2_C”、“3_C”.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapLatest {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

When initial flow When the element in has a delay , Things become interesting . In the following example ,(1.2s after )“A” Started its flow, That is to say flowFrom . The flow stay 1s An element is generated in “1_A”, however 1200ms And then there was “B”, Previous flow Closed and forgotten . When “C” New flow when ,“B” Successfully produced “1_B”. This function will eventually generate elements “1_C”、“2_C”、“3_C”, There is 1s Delay of .
suspend fun main() {
flowOf("A", "B", "C")
.onEach {
delay(1200) }
.flatMapLatest {
flowFrom(it) }
.collect {
println(it) }
}
// (2.2 sec)
// 1_A
// (1.2 sec)
// 1_B
// (1.2 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

Terminal operation
Last , We have an end flow Handling operations . These are called terminal operations . up to now , We only used collect. But there are other methods similar to those provided by sets and sequences :count( Calculation flow Number of elements in )、first and firstOrNull( obtain flow The first element emitted from )、fold and reduce( Accumulate elements into an object ). Terminal operation will be suspended , And in flow Return value when finished .
suspend fun main() {
val flow = flowOf(1, 2, 3, 4) // [1, 2, 3, 4]
.map {
it * it } // [1, 4, 9, 16]
println(flow.first()) // 1
println(flow.count()) // 4
println(flow.reduce {
acc, value -> acc * value }) // 576
println(flow.fold(0) {
acc, value -> acc + value }) // 30
}
at present , flow Although there are not many terminal operations , But if you need some different operations , You can always achieve it yourself . For example, the following implementation of integer flow sum:
suspend fun Flow<Int>.sum(): Int {
var sum = 0
collect {
value ->
sum += value
}
return sum
}
Similarly , Only for collect Method can realize almost any terminal operation .
summary
There are many tools to support flow Handle . It's a good thing to know something about them , Because they are on the back end and Android It is very useful in development . in addition , If you need some different functions , Can pass collect Methods and flow Builders can easily implement them .
边栏推荐
- fabulous! How does idea open multiple projects in a single window?
- Smart regulation enters the market, where will meituan and other Internet service platforms go
- If an exception is thrown in the constructor, the best way is to prevent memory leakage?
- 深潜Kotlin协程(二十二):Flow的处理
- [programming problem] [scratch Level 2] March 2019 draw a square spiral
- 2022-07-07:原本数组中都是大于0、小于等于k的数字,是一个单调不减的数组, 其中可能有相等的数字,总体趋势是递增的。 但是其中有些位置的数被替换成了0,我们需要求出所有的把0替换的方案数量:
- Trust orbtk development issues 2022
- 【编程题】【Scratch二级】2019.09 绘制雪花图案
- Is 35 really a career crisis? No, my skills are accumulating, and the more I eat, the better
- SQL knowledge summary 004: Postgres terminal command summary
猜你喜欢

腾讯安全发布《BOT管理白皮书》|解读BOT攻击,探索防护之道

大数据开源项目,一站式全自动化全生命周期运维管家ChengYing(承影)走向何方?

Zhou Hongqi, 52 ans, est - il encore jeune?

Go learning notes (1) environment installation and hello world

35岁真就成了职业危机?不,我的技术在积累,我还越吃越香了
![Cause analysis and solution of too laggy page of [test interview questions]](/img/8d/3ca92ce5f9cdc85d52dbcd826e477d.jpg)
Cause analysis and solution of too laggy page of [test interview questions]
![[basis of recommendation system] sampling and construction of positive and negative samples](/img/4b/753a61b583cf38826b597fd31e5d20.png)
[basis of recommendation system] sampling and construction of positive and negative samples
![[the most detailed in history] statistical description of overdue days in credit](/img/f7/5c3cbfec5b010171376ac122c704b2.png)
[the most detailed in history] statistical description of overdue days in credit

QT adds resource files, adds icons for qaction, establishes signal slot functions, and implements

Development of a horse tourism website (optimization of servlet)
随机推荐
SQL knowledge summary 004: Postgres terminal command summary
QT creator add custom new file / Project Template Wizard
paddle一个由三个卷积层组成的网络完成cifar10数据集的图像分类任务
QT establish signal slots between different classes and transfer parameters
Solution to the problem of unserialize3 in the advanced web area of the attack and defense world
RPA cloud computer, let RPA out of the box with unlimited computing power?
STM32F1與STM32CubeIDE編程實例-旋轉編碼器驅動
【GO记录】从零开始GO语言——用GO语言做一个示波器(一)GO语言基础
Open display PDF file in web page
[programming problem] [scratch Level 2] March 2019 draw a square spiral
Two small problems in creating user registration interface
韦东山第二期课程内容概要
【转载】解决conda安装pytorch过慢的问题
Robomaster visual tutorial (0) Introduction
Stm32f1 and stm32cubeide programming example - rotary encoder drive
Notice on organizing the second round of the Southwest Division (Sichuan) of the 2021-2022 National Youth electronic information intelligent innovation competition
Coindesk comments on the decentralization process of the wave field: let people see the future of the Internet
玩转Sonar
35岁真就成了职业危机?不,我的技术在积累,我还越吃越香了
接口测试要测试什么?