当前位置:网站首页>Deep dive kotlin synergy (XXII): flow treatment
Deep dive kotlin synergy (XXII): flow treatment
2022-07-08 00:22:00 【RikkaTheWorld】
Series eBook : Portal
We will Flow Described as a pipe , Values flow above , When flowing , These values can be changed in different ways : Delete 、 Multiply 、 Convert or merge . These in Flow All operations created to the terminal operation are called Flow To deal with . In this chapter , We will learn about the functions used for this purpose .
The functions provided here may remind you of the functions used to process collections , It's not a coincidence , Because they all represent the same concept . The difference is flow The elements on can be delivered on time .
map
The first important function we need to learn is map
. It transforms each element of the flow according to the transformation function . Suppose you have an integer data stream , Yours map
The operation is to square these numbers , Then the data on the final stream will be the square of these numbers .
suspend fun main() {
flowOf(1, 2, 3) // [1, 2, 3]
.map {
it * it } // [1, 4, 9]
.collect {
print(it) } // 149
}
after , I will use the image shown above to visualize flow How the handler changes elements over time . Horizontal lines indicate time , The element on this line is that at a certain point in time flow Elements emitted in . The upper line represents the data flow before the operation , The following line represents the data flow after the operation . This diagram can also be used to represent the use of multiple operations , Like in the picture below map
and filter
.
majority Flow Processing function , It's easy to realize through the tools we learned in the previous chapter . To achieve map
, We can use flow Builder to create a new flow. then , We can use the former flow Collect elements in , And emit transformed elements . The following implementation is from kotlin.coroutines In the library map
A simplified version of :
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
// Here we create a flow
collect {
value -> // Here we collect data from the receiver
emit(transform(value))
}
}
map
Is a very popular function . Its usage includes unpacking or converting values to different types .
// Here we use map To get user actions from input events
fun actionsFlow(): Flow<UserAction> =
observeInputEvents()
.map {
toAction(it.code) }
// Here we put User Turn it into UserJson
fun getAllUser(): Flow<UserJson> =
userRepository.getAllUsers()
.map {
it.toUserJson() }
filter
The next important function is filter
, It returns a containing only the original flow Which matches the data of the given predicate flow.
suspend fun main() {
(1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter {
it <= 5 } // [1, 2, 3, 4, 5]
.filter {
isEven(it) } // [2, 4]
.collect {
print(it) } // 24
}
fun isEven(num: Int): Boolean = num % 2 == 0
Use flow The builder can also implement this function very easily . We just need to introduce one if Statements and predicates ( Instead of conversion ).
fun <T> Flow<T>.filter(
predicate: suspend (T) -> Boolean
): Flow<T> = flow {
// Here we create a stream
collect {
value -> // Here we receive elements from the receiver
if (predicate(value)) {
emit(value)
}
}
}
filter
Usually used to exclude elements we don't need .
// Here we use filter To filter out elements we don't want
fun actionsFlow(): Flow<UserAction> =
observeInputEvents()
.filter {
isValidAction(it.code) }
.map {
toAction(it.code) }
take and drop
We use take
To pass a certain number of elements .
suspend fun main() {
('A'..'Z').asFlow()
.take(5) // [A, B, C, D, E]
.collect {
print(it) } // ABCDE
}
We use drop To ignore a specific number of elements .
suspend fun main() {
('A'..'Z').asFlow()
.drop(20) // [U, V, W, X, Y, Z]
.collect {
print(it) } // UVWXYZ
}
How do they work at the lower level ?
We have seen quite a lot flow Processing and lifecycle functions . Their implementation is very simple , There is nothing magical in it . Most of these functions can be used flow Builder , And use collect
To achieve . Here is a simple example of stream processing , Including simplified map
and flowOf
Realization :
suspend fun main() {
flowOf('a', 'b')
.map {
it.uppercase() }
.collect {
print(it) } // AB
}
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
collect {
value ->
emit(transform(value))
}
}
fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
If inline flowOf
and map
function , You will get the following code ( I am here lambdas Added labels and notes with numbers ).
suspend fun main() {
flow [email protected]{
// 1
flow [email protected]{
// 2
for (element in arrayOf('a', 'b')) {
// 3
this@flowOf.emit(element) // 4
}
}.collect {
value -> // 5
this@map.emit(value.uppercase()) // 6
}
}.collect {
// 7
print(it) // 8
}
}
Let's analyze step by step .
- We're commenting 1 Start a flow, And in the comments 7 Its... Was called at
collect
function - When we start the collection , We will call
@map
( notes 1) Of lambda expression , It's commenting 2 Started another flow Builder , And in the comments 5 To collect . - So at this time , We start
@flowOn
( notes 2 It's about ) Of lambda expression , It iterates to include ‘a’ and ‘b’ Array of . - The first value is ‘a’ In the comments 4 Launched , Launched to comment 5 It's about , The lambda The expression converts the value to ‘A’ , And from @map Of
collect
Function , And call to 7 Situated lambda expression , The value is printed . - notes 7 It's over , Will be commenting on 6 Recovery at , Because notes 6 Nothing else , So it's over , So we are commenting 4 Place recovered
@flowOf
. - We continue to iterate and comment 4 Launch at ‘b’ , So we call annotations 5 Situated lambda function , Will value Convert into ‘B’, And from
@map
Ofcollect
Function . The value will be in the comment 7 Be collected , And then in the comments 8 Is printed . - Then comment 7 end , Revert to comments 6 It's about , It's done . So we are commenting 4 Recovery at
@flowOf
. It's done, too . Because there is no more content , We arrived at@map
Ending , such , We're commenting 7 recovery , And arrived main End of function .
In most flow Processing and lifecycle functions will be the same , So understanding this can make us better understand flow How it works .
merge, zip and combine
Next, learn to combine the two flows into one flow . There are several ways to do this . The simplest way is to combine the elements in two streams into one . No matter what flow Where does the element of come from , There is no need to make any changes , So , We can use merge
function .
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
// [1, 0.1, 0.2, 0.3, 2, 3]
// perhaps [1, 0.1, 0.2, 0.3, 2, 3]
// perhaps [0.1, 1, 2, 3, 0.2, 0.3]
// Or other combinations
}
The important point is , When we use merge
when , Elements in one stream do not need to wait for elements in another stream . for example , In the following example , From the first flow The element of is delayed , But this does not affect the second flow The elements of .
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
.onEach {
delay(1000) }
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
together.collect {
println(it) }
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3
When multiple event sources have the same operation , We can use merge
.
fun listenForMessages() {
merge(userSentMessages, messagesNotifications)
.onEach {
displayMessage(it) }
.launchIn(scope)
}
The next function is zip
, It's for two flow Pairing . We need to define a function to determine how elements are paired ( After pairing, the result will be sent to the new flow in ). Each element can only be a one-to-one pair , So it needs to wait for the one that matches it . Elements that are not paired will be lost , therefore , When one flow When the compression of is completed , new flow Will also complete ( Other flow The same is true ).
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach {
delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
flow1.zip(flow2) {
f1, f2 -> "${
f1}_${
f2}" }
.collect {
println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3
zip
Function reminds me of the traditional Polish dance —— Borneo dance . One of the characteristics of this dance is , A row of pairs separated from the middle , Then when they meet again, they will regroup .
Combine two flow The last important function of is combine
. It's like zip
equally , It also forms pairs from elements , fast flow The generated element must wait for the slower one flow Generated elements to produce the first pair . However , The similarities with Polish dance stop here . Use combine
when , Each new element replaces its previous element . If the first pair has been formed , A new element will come from another flow The previous element of generates a new pair together .
Be careful , zip
Need to pair , So when the first flow closed ,zip
It's closed. . and combine
There are no such restrictions , Until two flow Before closing , It will always transmit data .
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach {
delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
flow1.combine(flow2) {
f1, f2 -> "${
f1}_${
f2}" }
.collect {
println(it) }
}
// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4
When we need to observe the changes of two sources , You usually use combine
. We can also give each combination flow Add initial value ( To get the initial pair ).
userUpdateFlow.onStart {
emit(currentUser) }
A typical scenario is when a view has two data sources . for example , When a notification depends on both the current state of the user and some notifications , We can observe them at the same time and update the view with their changes .
userStateFlow
.combine(notificationsFlow) {
userState, notifications ->
updateNotificationBadge(userState, notifications)
}
.collect()
fold and scan
If you have used set handlers , You may know fold
( Fold ). For each element ( Start from initial value ) Apply the operation of merging two values into one value , Merge all values in the set into one .
for example , If the initial value is 0, The operation is addition , Then the result is the sum of all the numbers : We first take the initial value 0, then , Let's put the first element 1 Add ; In the result 1 On , We add the second number 2; For the result 3, We add the third number 3; For the result 6, Let's add the last number 4, The result of this operation is 10, That is to say fold
The result returned .
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) {
acc, i -> acc + i }
println(res) // 10
val res2 = list.fold(1) {
acc, i -> acc * i }
println(res2) // 24
}
fold
Is a terminal operation , It can also be used for flow, But it will hang until flow complete ( It's like collect
) equally .
suspend fun main() {
val list = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
val res = list.fold(0) {
acc, i -> acc + i }
println(res)
}
// (4 sec)
// 10
There is an alternative fold
The function is scan
. It is an intermediate operation that can produce all intermediate values :
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.scan(0) {
acc, i -> acc + i }
println(res) // [0, 1, 3, 6, 10]
}
scan
about flow It's very useful , Because it can receive the value generated in the previous step , Generate a new value immediately .
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
.scan(0) {
acc, v -> acc + v }
.collect {
println(it) }
}
// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10
We can use flow Builders and collect
Make it easy scan
. We first emit the initial value , Then the cumulative result of the next value is issued for each new element .
fun <T, R> Flow<T>.scan(
initial: R,
operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
var accumulator: R = initial
emit(accumulator)
collect {
value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
scan
A typical usage scenario is when we need to update or change flow , Or we need an object , When this object is the result of these changes .
val userStateFlow: Flow<User> = userChangesFlow
.scan(user) {
acc, change -> user.withChange(change) }
val messagesListFlow: Flow<List<Message>> = messagesFlow
.scan(messages) {
acc, message -> acc + message }
flatMapConcat, flatMapMerge and flatMapLatest
Another well-known set handler is flatMap
, It is similar to map
, But this conversion function finally returns a flattened set . for example , You have a list of departments , Each department has an employee list , Then you can use flatMap
To list all employees in all departments .
val allEmployees: List<Employee> = departments
.flatMap {
department -> department.employees }
val listOfListsOfEmployee: List<List<Employee>> = departments
.map {
department -> department.employees }
flatMap
How is it applied to flow What about it? ? We can expect to return one flow, And then the flow It should be flat . The problem is that flowing elements can spread over time , that , Whether an element on one stream should wait for an element on another stream , Or should we deal with them at the same time ? Because there is no clear answer , therefore Flow
No, flatMap
function , But there is flatMapConcat
、 flatMapMerge
and flatMapLatest
.
flatMapConcat
Function in turn processes the generated flow. therefore , the second flow Can be in the first flow Start when finished . In the following example , We use characters “A” “B” “C” To create a flow, They produce new flow Will include these characters and numbers , There is 1s Delay of .
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
The second function flatMapMerge
It's the most intuitive for me , It also handles flow Generated value .
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C
have access to concurrently
Parameters to set processing flow The number of concurrency of . The default value for this parameter is 16, But you can JVM Attribute DEFAULT_CONCURRENCY_PROPERTY_NAME
To change it . Note this default limit , Because if you are in a that contains many elements flow Upper use flatMapMerge
when , At the same time, it will only be processed at the same time 16 Elements .
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge(concurrency = 2) {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// 1_B
// (1 sec)
// 2_A
// 2_B
// (1 sec)
// 3_A
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
flatMapMerge
The typical usage scenario of is when we need to flow When each element in requests data . for example , We have a list of categories , You need to request a quote for each category . You already know that you can pass async
Function to achieve this . While using flatMapMerge
Flow has two advantages :
- We can control concurrency parameters , To decide how many categories we want to take at the same time ( To avoid sending hundreds of requests at the same time )
- We can go back to
Flow
And send them when the next element arrives ( therefore , At the function end , These values can be processed immediately )
suspend fun getOffers(
categories: List<Category>
): List<Offer> = coroutineScope {
categories
.map {
async {
api.requestOffers(it) } }
.flatMap {
it.await() }
}
// Better solution
suspend fun getOffers(
categories: List<Category>
): Flow<Offer> = categories
.asFlow()
.flatMapMerge(concurrency = 20) {
suspend {
api.requestOffers(it) }.asFlow()
// perhaps flow { emit(api.requestOffers(it)) }
}
The last function is flatMapLatest
. Once new flow appear , It will forget the previous flow. For each new value , Previous stream processing will be forgotten . therefore , If “A”、“B”、“C” There is no delay between , Then you will only see “1_C”,“2_C”、“3_C”.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapLatest {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
When initial flow When the element in has a delay , Things become interesting . In the following example ,(1.2s after )“A” Started its flow, That is to say flowFrom
. The flow stay 1s An element is generated in “1_A”, however 1200ms And then there was “B”, Previous flow Closed and forgotten . When “C” New flow when ,“B” Successfully produced “1_B”. This function will eventually generate elements “1_C”、“2_C”、“3_C”, There is 1s Delay of .
suspend fun main() {
flowOf("A", "B", "C")
.onEach {
delay(1200) }
.flatMapLatest {
flowFrom(it) }
.collect {
println(it) }
}
// (2.2 sec)
// 1_A
// (1.2 sec)
// 1_B
// (1.2 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
Terminal operation
Last , We have an end flow Handling operations . These are called terminal operations . up to now , We only used collect
. But there are other methods similar to those provided by sets and sequences :count
( Calculation flow Number of elements in )、first
and firstOrNull
( obtain flow The first element emitted from )、fold
and reduce
( Accumulate elements into an object ). Terminal operation will be suspended , And in flow Return value when finished .
suspend fun main() {
val flow = flowOf(1, 2, 3, 4) // [1, 2, 3, 4]
.map {
it * it } // [1, 4, 9, 16]
println(flow.first()) // 1
println(flow.count()) // 4
println(flow.reduce {
acc, value -> acc * value }) // 576
println(flow.fold(0) {
acc, value -> acc + value }) // 30
}
at present , flow Although there are not many terminal operations , But if you need some different operations , You can always achieve it yourself . For example, the following implementation of integer flow sum
:
suspend fun Flow<Int>.sum(): Int {
var sum = 0
collect {
value ->
sum += value
}
return sum
}
Similarly , Only for collect
Method can realize almost any terminal operation .
summary
There are many tools to support flow Handle . It's a good thing to know something about them , Because they are on the back end and Android It is very useful in development . in addition , If you need some different functions , Can pass collect
Methods and flow
Builders can easily implement them .
边栏推荐
- [研发人员必备]paddle 如何制作自己的数据集,并显示。
- 攻防世界Web进阶区unserialize3题解
- Using Google test in QT
- 某马旅游网站开发(登录注册退出功能的实现)
- 深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow
- Teach you to make a custom form label by hand
- Use filters to count URL request time
- 1293_FreeRTOS中xTaskResumeAll()接口的实现分析
- redis你到底懂不懂之list
- C language 005: common examples
猜你喜欢
QT establish signal slots between different classes and transfer parameters
备库一直有延迟,查看mrp为wait_for_log,重启mrp后为apply_log但过一会又wait_for_log
How to insert highlighted code blocks in WPS and word
[programming problem] [scratch Level 2] March 2019 draw a square spiral
[question de programmation] [scratch niveau 2] oiseaux volants en décembre 2019
5G NR 系统消息
How does the markdown editor of CSDN input mathematical formulas--- Latex syntax summary
接口测试要测试什么?
ROS从入门到精通(九) 可视化仿真初体验之TurtleBot3
[the most detailed in history] statistical description of overdue days in credit
随机推荐
LeetCode刷题
面试题详解:用Redis实现分布式锁的血泪史
Sqlite数据库存储目录结构邻接表的实现2-目录树的构建
Fully automated processing of monthly card shortage data and output of card shortage personnel information
浪潮云溪分布式数据库 Tracing(二)—— 源码解析
应用实践 | 数仓体系效率全面提升!同程数科基于 Apache Doris 的数据仓库建设
【编程题】【Scratch二级】2019.12 绘制十个正方形
【测试面试题】页面很卡的原因分析及解决方案
Open display PDF file in web page
How to learn a new technology (programming language)
QT and OpenGL: loading 3D models using the open asset import library (assimp) - Part 2
Smart regulation enters the market, where will meituan and other Internet service platforms go
Notice on organizing the second round of the Southwest Division (Sichuan) of the 2021-2022 National Youth electronic information intelligent innovation competition
ABAP ALV LVC模板
韦东山第二期课程内容概要
C# 泛型及性能比较
Set up personal network disk with nextcloud
Detailed explanation of interview questions: the history of blood and tears in implementing distributed locks with redis
腾讯安全发布《BOT管理白皮书》|解读BOT攻击,探索防护之道
52歲的周鴻禕,還年輕嗎?