当前位置:网站首页>Deep dive kotlin synergy (XXII): flow treatment
Deep dive kotlin synergy (XXII): flow treatment
2022-07-08 00:22:00 【RikkaTheWorld】
Series eBook : Portal
We will Flow Described as a pipe , Values flow above , When flowing , These values can be changed in different ways : Delete 、 Multiply 、 Convert or merge . These in Flow All operations created to the terminal operation are called Flow To deal with . In this chapter , We will learn about the functions used for this purpose .
The functions provided here may remind you of the functions used to process collections , It's not a coincidence , Because they all represent the same concept . The difference is flow The elements on can be delivered on time .
map
The first important function we need to learn is map
. It transforms each element of the flow according to the transformation function . Suppose you have an integer data stream , Yours map
The operation is to square these numbers , Then the data on the final stream will be the square of these numbers .
suspend fun main() {
flowOf(1, 2, 3) // [1, 2, 3]
.map {
it * it } // [1, 4, 9]
.collect {
print(it) } // 149
}
after , I will use the image shown above to visualize flow How the handler changes elements over time . Horizontal lines indicate time , The element on this line is that at a certain point in time flow Elements emitted in . The upper line represents the data flow before the operation , The following line represents the data flow after the operation . This diagram can also be used to represent the use of multiple operations , Like in the picture below map
and filter
.
majority Flow Processing function , It's easy to realize through the tools we learned in the previous chapter . To achieve map
, We can use flow Builder to create a new flow. then , We can use the former flow Collect elements in , And emit transformed elements . The following implementation is from kotlin.coroutines In the library map
A simplified version of :
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
// Here we create a flow
collect {
value -> // Here we collect data from the receiver
emit(transform(value))
}
}
map
Is a very popular function . Its usage includes unpacking or converting values to different types .
// Here we use map To get user actions from input events
fun actionsFlow(): Flow<UserAction> =
observeInputEvents()
.map {
toAction(it.code) }
// Here we put User Turn it into UserJson
fun getAllUser(): Flow<UserJson> =
userRepository.getAllUsers()
.map {
it.toUserJson() }
filter
The next important function is filter
, It returns a containing only the original flow Which matches the data of the given predicate flow.
suspend fun main() {
(1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter {
it <= 5 } // [1, 2, 3, 4, 5]
.filter {
isEven(it) } // [2, 4]
.collect {
print(it) } // 24
}
fun isEven(num: Int): Boolean = num % 2 == 0
Use flow The builder can also implement this function very easily . We just need to introduce one if Statements and predicates ( Instead of conversion ).
fun <T> Flow<T>.filter(
predicate: suspend (T) -> Boolean
): Flow<T> = flow {
// Here we create a stream
collect {
value -> // Here we receive elements from the receiver
if (predicate(value)) {
emit(value)
}
}
}
filter
Usually used to exclude elements we don't need .
// Here we use filter To filter out elements we don't want
fun actionsFlow(): Flow<UserAction> =
observeInputEvents()
.filter {
isValidAction(it.code) }
.map {
toAction(it.code) }
take and drop
We use take
To pass a certain number of elements .
suspend fun main() {
('A'..'Z').asFlow()
.take(5) // [A, B, C, D, E]
.collect {
print(it) } // ABCDE
}
We use drop To ignore a specific number of elements .
suspend fun main() {
('A'..'Z').asFlow()
.drop(20) // [U, V, W, X, Y, Z]
.collect {
print(it) } // UVWXYZ
}
How do they work at the lower level ?
We have seen quite a lot flow Processing and lifecycle functions . Their implementation is very simple , There is nothing magical in it . Most of these functions can be used flow Builder , And use collect
To achieve . Here is a simple example of stream processing , Including simplified map
and flowOf
Realization :
suspend fun main() {
flowOf('a', 'b')
.map {
it.uppercase() }
.collect {
print(it) } // AB
}
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
collect {
value ->
emit(transform(value))
}
}
fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
If inline flowOf
and map
function , You will get the following code ( I am here lambdas Added labels and notes with numbers ).
suspend fun main() {
flow [email protected]{
// 1
flow [email protected]{
// 2
for (element in arrayOf('a', 'b')) {
// 3
this@flowOf.emit(element) // 4
}
}.collect {
value -> // 5
this@map.emit(value.uppercase()) // 6
}
}.collect {
// 7
print(it) // 8
}
}
Let's analyze step by step .
- We're commenting 1 Start a flow, And in the comments 7 Its... Was called at
collect
function - When we start the collection , We will call
@map
( notes 1) Of lambda expression , It's commenting 2 Started another flow Builder , And in the comments 5 To collect . - So at this time , We start
@flowOn
( notes 2 It's about ) Of lambda expression , It iterates to include ‘a’ and ‘b’ Array of . - The first value is ‘a’ In the comments 4 Launched , Launched to comment 5 It's about , The lambda The expression converts the value to ‘A’ , And from @map Of
collect
Function , And call to 7 Situated lambda expression , The value is printed . - notes 7 It's over , Will be commenting on 6 Recovery at , Because notes 6 Nothing else , So it's over , So we are commenting 4 Place recovered
@flowOf
. - We continue to iterate and comment 4 Launch at ‘b’ , So we call annotations 5 Situated lambda function , Will value Convert into ‘B’, And from
@map
Ofcollect
Function . The value will be in the comment 7 Be collected , And then in the comments 8 Is printed . - Then comment 7 end , Revert to comments 6 It's about , It's done . So we are commenting 4 Recovery at
@flowOf
. It's done, too . Because there is no more content , We arrived at@map
Ending , such , We're commenting 7 recovery , And arrived main End of function .
In most flow Processing and lifecycle functions will be the same , So understanding this can make us better understand flow How it works .
merge, zip and combine
Next, learn to combine the two flows into one flow . There are several ways to do this . The simplest way is to combine the elements in two streams into one . No matter what flow Where does the element of come from , There is no need to make any changes , So , We can use merge
function .
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
// [1, 0.1, 0.2, 0.3, 2, 3]
// perhaps [1, 0.1, 0.2, 0.3, 2, 3]
// perhaps [0.1, 1, 2, 3, 0.2, 0.3]
// Or other combinations
}
The important point is , When we use merge
when , Elements in one stream do not need to wait for elements in another stream . for example , In the following example , From the first flow The element of is delayed , But this does not affect the second flow The elements of .
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
.onEach {
delay(1000) }
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
together.collect {
println(it) }
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3
When multiple event sources have the same operation , We can use merge
.
fun listenForMessages() {
merge(userSentMessages, messagesNotifications)
.onEach {
displayMessage(it) }
.launchIn(scope)
}
The next function is zip
, It's for two flow Pairing . We need to define a function to determine how elements are paired ( After pairing, the result will be sent to the new flow in ). Each element can only be a one-to-one pair , So it needs to wait for the one that matches it . Elements that are not paired will be lost , therefore , When one flow When the compression of is completed , new flow Will also complete ( Other flow The same is true ).
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach {
delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
flow1.zip(flow2) {
f1, f2 -> "${
f1}_${
f2}" }
.collect {
println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3
zip
Function reminds me of the traditional Polish dance —— Borneo dance . One of the characteristics of this dance is , A row of pairs separated from the middle , Then when they meet again, they will regroup .
Combine two flow The last important function of is combine
. It's like zip
equally , It also forms pairs from elements , fast flow The generated element must wait for the slower one flow Generated elements to produce the first pair . However , The similarities with Polish dance stop here . Use combine
when , Each new element replaces its previous element . If the first pair has been formed , A new element will come from another flow The previous element of generates a new pair together .
Be careful , zip
Need to pair , So when the first flow closed ,zip
It's closed. . and combine
There are no such restrictions , Until two flow Before closing , It will always transmit data .
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach {
delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
flow1.combine(flow2) {
f1, f2 -> "${
f1}_${
f2}" }
.collect {
println(it) }
}
// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4
When we need to observe the changes of two sources , You usually use combine
. We can also give each combination flow Add initial value ( To get the initial pair ).
userUpdateFlow.onStart {
emit(currentUser) }
A typical scenario is when a view has two data sources . for example , When a notification depends on both the current state of the user and some notifications , We can observe them at the same time and update the view with their changes .
userStateFlow
.combine(notificationsFlow) {
userState, notifications ->
updateNotificationBadge(userState, notifications)
}
.collect()
fold and scan
If you have used set handlers , You may know fold
( Fold ). For each element ( Start from initial value ) Apply the operation of merging two values into one value , Merge all values in the set into one .
for example , If the initial value is 0, The operation is addition , Then the result is the sum of all the numbers : We first take the initial value 0, then , Let's put the first element 1 Add ; In the result 1 On , We add the second number 2; For the result 3, We add the third number 3; For the result 6, Let's add the last number 4, The result of this operation is 10, That is to say fold
The result returned .
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) {
acc, i -> acc + i }
println(res) // 10
val res2 = list.fold(1) {
acc, i -> acc * i }
println(res2) // 24
}
fold
Is a terminal operation , It can also be used for flow, But it will hang until flow complete ( It's like collect
) equally .
suspend fun main() {
val list = flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
val res = list.fold(0) {
acc, i -> acc + i }
println(res)
}
// (4 sec)
// 10
There is an alternative fold
The function is scan
. It is an intermediate operation that can produce all intermediate values :
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.scan(0) {
acc, i -> acc + i }
println(res) // [0, 1, 3, 6, 10]
}
scan
about flow It's very useful , Because it can receive the value generated in the previous step , Generate a new value immediately .
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach {
delay(1000) }
.scan(0) {
acc, v -> acc + v }
.collect {
println(it) }
}
// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10
We can use flow Builders and collect
Make it easy scan
. We first emit the initial value , Then the cumulative result of the next value is issued for each new element .
fun <T, R> Flow<T>.scan(
initial: R,
operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
var accumulator: R = initial
emit(accumulator)
collect {
value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
scan
A typical usage scenario is when we need to update or change flow , Or we need an object , When this object is the result of these changes .
val userStateFlow: Flow<User> = userChangesFlow
.scan(user) {
acc, change -> user.withChange(change) }
val messagesListFlow: Flow<List<Message>> = messagesFlow
.scan(messages) {
acc, message -> acc + message }
flatMapConcat, flatMapMerge and flatMapLatest
Another well-known set handler is flatMap
, It is similar to map
, But this conversion function finally returns a flattened set . for example , You have a list of departments , Each department has an employee list , Then you can use flatMap
To list all employees in all departments .
val allEmployees: List<Employee> = departments
.flatMap {
department -> department.employees }
val listOfListsOfEmployee: List<List<Employee>> = departments
.map {
department -> department.employees }
flatMap
How is it applied to flow What about it? ? We can expect to return one flow, And then the flow It should be flat . The problem is that flowing elements can spread over time , that , Whether an element on one stream should wait for an element on another stream , Or should we deal with them at the same time ? Because there is no clear answer , therefore Flow
No, flatMap
function , But there is flatMapConcat
、 flatMapMerge
and flatMapLatest
.
flatMapConcat
Function in turn processes the generated flow. therefore , the second flow Can be in the first flow Start when finished . In the following example , We use characters “A” “B” “C” To create a flow, They produce new flow Will include these characters and numbers , There is 1s Delay of .
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
The second function flatMapMerge
It's the most intuitive for me , It also handles flow Generated value .
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C
have access to concurrently
Parameters to set processing flow The number of concurrency of . The default value for this parameter is 16, But you can JVM Attribute DEFAULT_CONCURRENCY_PROPERTY_NAME
To change it . Note this default limit , Because if you are in a that contains many elements flow Upper use flatMapMerge
when , At the same time, it will only be processed at the same time 16 Elements .
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge(concurrency = 2) {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_A
// 1_B
// (1 sec)
// 2_A
// 2_B
// (1 sec)
// 3_A
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
flatMapMerge
The typical usage scenario of is when we need to flow When each element in requests data . for example , We have a list of categories , You need to request a quote for each category . You already know that you can pass async
Function to achieve this . While using flatMapMerge
Flow has two advantages :
- We can control concurrency parameters , To decide how many categories we want to take at the same time ( To avoid sending hundreds of requests at the same time )
- We can go back to
Flow
And send them when the next element arrives ( therefore , At the function end , These values can be processed immediately )
suspend fun getOffers(
categories: List<Category>
): List<Offer> = coroutineScope {
categories
.map {
async {
api.requestOffers(it) } }
.flatMap {
it.await() }
}
// Better solution
suspend fun getOffers(
categories: List<Category>
): Flow<Offer> = categories
.asFlow()
.flatMapMerge(concurrency = 20) {
suspend {
api.requestOffers(it) }.asFlow()
// perhaps flow { emit(api.requestOffers(it)) }
}
The last function is flatMapLatest
. Once new flow appear , It will forget the previous flow. For each new value , Previous stream processing will be forgotten . therefore , If “A”、“B”、“C” There is no delay between , Then you will only see “1_C”,“2_C”、“3_C”.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach {
delay(1000) }
.map {
"${
it}_${
elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapLatest {
flowFrom(it) }
.collect {
println(it) }
}
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
When initial flow When the element in has a delay , Things become interesting . In the following example ,(1.2s after )“A” Started its flow, That is to say flowFrom
. The flow stay 1s An element is generated in “1_A”, however 1200ms And then there was “B”, Previous flow Closed and forgotten . When “C” New flow when ,“B” Successfully produced “1_B”. This function will eventually generate elements “1_C”、“2_C”、“3_C”, There is 1s Delay of .
suspend fun main() {
flowOf("A", "B", "C")
.onEach {
delay(1200) }
.flatMapLatest {
flowFrom(it) }
.collect {
println(it) }
}
// (2.2 sec)
// 1_A
// (1.2 sec)
// 1_B
// (1.2 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
Terminal operation
Last , We have an end flow Handling operations . These are called terminal operations . up to now , We only used collect
. But there are other methods similar to those provided by sets and sequences :count
( Calculation flow Number of elements in )、first
and firstOrNull
( obtain flow The first element emitted from )、fold
and reduce
( Accumulate elements into an object ). Terminal operation will be suspended , And in flow Return value when finished .
suspend fun main() {
val flow = flowOf(1, 2, 3, 4) // [1, 2, 3, 4]
.map {
it * it } // [1, 4, 9, 16]
println(flow.first()) // 1
println(flow.count()) // 4
println(flow.reduce {
acc, value -> acc * value }) // 576
println(flow.fold(0) {
acc, value -> acc + value }) // 30
}
at present , flow Although there are not many terminal operations , But if you need some different operations , You can always achieve it yourself . For example, the following implementation of integer flow sum
:
suspend fun Flow<Int>.sum(): Int {
var sum = 0
collect {
value ->
sum += value
}
return sum
}
Similarly , Only for collect
Method can realize almost any terminal operation .
summary
There are many tools to support flow Handle . It's a good thing to know something about them , Because they are on the back end and Android It is very useful in development . in addition , If you need some different functions , Can pass collect
Methods and flow
Builders can easily implement them .
边栏推荐
- 3年经验,面试测试岗20K都拿不到了吗?这么坑?
- [programming questions] [scratch Level 2] March 2019 garbage classification
- Emotional post station 010: things that contemporary college students should understand
- 搭建ADG过程中复制报错 RMAN-03009 ORA-03113
- C language 001: download, install, create the first C project and execute the first C language program of CodeBlocks
- The standby database has been delayed. Check that the MRP is wait_ for_ Log, apply after restarting MRP_ Log but wait again later_ for_ log
- 【转载】解决conda安装pytorch过慢的问题
- 35岁真就成了职业危机?不,我的技术在积累,我还越吃越香了
- Introduction to paddle - using lenet to realize image classification method I in MNIST
- Teach you to make a custom form label by hand
猜你喜欢
Is Zhou Hongyi, 52, still young?
单机高并发模型设计
[question de programmation] [scratch niveau 2] oiseaux volants en décembre 2019
An error is reported during the process of setting up ADG. Rman-03009 ora-03113
QT and OpenGL: load 3D models using the open asset import library (assimp)
The underlying principles and templates of new and delete
Notice on organizing the second round of the Southwest Division (Sichuan) of the 2021-2022 National Youth electronic information intelligent innovation competition
C language 001: download, install, create the first C project and execute the first C language program of CodeBlocks
大数据开源项目,一站式全自动化全生命周期运维管家ChengYing(承影)走向何方?
QT adds resource files, adds icons for qaction, establishes signal slot functions, and implements
随机推荐
【编程题】【Scratch二级】2019.09 绘制雪花图案
LeetCode刷题
5G NR 系统消息
Development of a horse tourism website (realization of login, registration and exit function)
Is 35 really a career crisis? No, my skills are accumulating, and the more I eat, the better
QT adds resource files, adds icons for qaction, establishes signal slot functions, and implements
他们齐聚 2022 ECUG Con,只为「中国技术力量」
【转载】解决conda安装pytorch过慢的问题
[basis of recommendation system] sampling and construction of positive and negative samples
DNS 系列(一):为什么更新了 DNS 记录不生效?
Solution to prompt configure: error: curses library not found when configuring and installing crosstool ng tool
韦东山第三期课程内容概要
Teach you to make a custom form label by hand
How to put recyclerview in nestedscrollview- How to put RecyclerView inside NestedScrollView?
Prompt configure: error: required tool not found: libtool solution when configuring and installing crosstool ng tool
redis你到底懂不懂之list
C language 001: download, install, create the first C project and execute the first C language program of CodeBlocks
ROS从入门到精通(九) 可视化仿真初体验之TurtleBot3
How to add automatic sorting titles in typora software?
数据库查询——第几高的数据?