当前位置:网站首页>Deep dive kotlin synergy (19): flow overview
Deep dive kotlin synergy (19): flow overview
2022-07-03 03:59:00 【RikkaTheWorld】
Series eBook : Portal
Flow It means A data flow for asynchronous computing .Flow The interface itself allows only those flowing elements to be collected , This means that each element only reaches the end of the flow , We'll deal with them (Flow Of collect Similar to a collection forEach).
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
You can see , collect yes Flow The only member function in . Other functions are defined as extension functions . This is related to Iterable or Sequence similar , They all have iterator As a member function .
interface Iterable<out T> {
operator fun iterator(): Iterator<T>
}
interface Sequence<out T> {
operator fun iterator(): Iterator<T>
}
flow vs Other ways of representing data
For the use of RxJava or Reactor For people who , The concept of flow should be their most familiar , But for other unfamiliar people , A better explanation may be needed .
Suppose you need a function to return multiple values , If these values are provided at the same time , We will use things like List and Set Such a collection .
fun allUsers(): List<User> =
api.getAllUsers().map {
it.toUser() }
The essence here is List and Set Represents a fully computed set . Processing these values takes time , So we need to wait for all the values to be processed , Then you can get them .
fun getList(): List<Int> = List(3) {
Thread.sleep(1000)
"User$it"
}
fun main() {
val list = getList()
println("Function started")
list.forEach {
println(it) }
}
// (3 sec)
// Function started
// User0
// User1
// User2
If the elements appear one after another , One method we will use is Sequence.
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000)
yield("User$it")
}
}
fun main() {
val list = getSequence()
println("Function started")
list.forEach {
println(it) }
}
// Function started
// (1 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2
When the calculation may be CPU intensive ( For example, calculate complex results ) Or blocked ( Such as reading files ) When , Sequence is a suitable data flow for on-demand calculation . however , You have to know Terminal operation of the sequence ( Such as forEach) It won't hang , So any Hang in the sequence builder means blocking the waiting thread to process this value . Is that why sequence In the scope of the builder , In addition to the SequenceScope Functions called on the receiver (yield and yieldAll) Outside , You cannot use any pending functions .
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
delay(1000) // There is a compilation error
yield("User$it")
}
}
This mechanism is introduced to prevent the sequence from being misused . for example , Someone may want to use paging from Http Port to get a list of all users , Until you receive blank data .
Even if the above example can be compiled , It won't be right , Because of terminal operation ( Such as forEach) The thread will be blocked instead of suspended , This may cause unexpected thread blocking .
// Don't do this , We should use Flow Instead of Sequence
fun allUsersSequence(
api: UserApi
): Sequence<User> = sequence {
var page = 0
do {
val users = api.takePage(page++) // Hang up , So compilation error
yieldAll(users)
} while (!users.isNullOrEmpty())
}
I hope you've learned that thread blocking can be dangerous , Will lead to unexpected situations , To make this clearer , Take a look at the following example , We use Sequence, So it's forEach It's a blocking operation . That's why a coroutine started on the same thread waits , The execution of one process will block the execution of another process :
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000)
// Even if it can be used here delay(1000) , The result is the same
yield("User$it")
}
}
suspend fun main() {
withContext(newSingleThreadContext("main")) {
launch {
repeat(3) {
delay(100)
println("Processing on coroutine")
}
}
val list = getSequence()
list.forEach {
println(it) }
}
}
// (1 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
under these circumstances , We should use Flow instead of Sequence. It fully supports synergy . Its builder and operation can be suspended , And support structured concurrency and appropriate exception handling . We will explain these in the next chapter . But now let's see how it helps this case .
fun getFlow(): Flow<String> = flow {
repeat(3) {
delay(1000)
emit("User$it")
}
}
suspend fun main() {
withContext(newSingleThreadContext("main")) {
launch {
repeat(3) {
delay(100)
println("Processing on coroutine")
}
}
val list = getFlow()
list.collect {
println(it) }
}
}
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (1 - 3 * 0.1 = 0.7 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2
Flow It should be used for data flows that need to use coroutines . for example , It can be used to generate a from API Page by page user flow . for example , If we call allUserFlow(api).first(), We will get to the first page ; If we call allUserFlow(api).toList() , We will get all the data ; If we call allUserFlow(api).find { it.id == id }, We will always pull the page data , Until we find the page we want to find .
fun allUsersFlow(
api: UserApi
): Flow<User> = flow {
var page = 0
do {
val users = api.takePage(page++) // Hang up
emitAll(users)
} while (!users.isNullOrEmpty())
}
Flow Characteristics of
Flow Terminal operation of ( Such as collect) A collaboration will be suspended , Instead of blocking threads . They also support other coroutine functions , For example, exception handling .Flow Processing can be cancelled , And it can support structured concurrency externally . flow The builder will not hang , No scope is required .
The following example shows CoroutineName How context is passed from the collection to flow In the builder . It also shows that ,launch The cancellation of will also lead to flow The processing of is cancelled .
// Be careful , This function will not hang , And you don't need any CoroutineScope
fun usersFlow(): Flow<String> = flow {
repeat(3) {
delay(1000)
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
withContext(CoroutineName("Name")) {
val job = launch {
// collect It's suspended
users.collect {
println(it) }
}
launch {
delay(2100)
println("I got enough")
job.cancel()
}
}
}
// (1 sec)
// User0 in Name
// (1 sec)
// User1 in Name
// (0.1 sec)
// I got enough
Flow Nomenclature
- Flow Need to start somewhere , It usually starts with a flow builder , From different objects or from some helper Function to , The most important options are explained in the next chapter
- Flow The last operation on is called terminal operation , This is very important , Because it is usually the only suspended function , Or you need the operation of the scope of the collaboration . A typical terminal operation is
collect. However , There are other terminal operations , I will explain in the following chapters - Between start operation and terminal operation , We may have intermediate operations , Each operation modifies the flow in some way , We will be in Flow Life cycle of and Handle Flow Learn different intermediate operations in the chapter of

The actual cases
Practice shows , What we need more often is flow, instead of channel. If request data flow , We usually want to request on demand . If you need to observe something , For example, changes in the database or from UI Perception of components , You may want every observer to receive these events . When no one wants to observe , You should also stop listening . This is why in all these cases , Use flow It's better than using channel Better ( Although in some cases , We mix the two ).
flow The most typical uses include :
- Receive from Server Messages sent in the connectivity channel , Such as WebSocket、 Notice, etc
- Observe the user's actions , Such as text change or click
- Receive updates of other information from sensors or devices , Such as position or direction
- Observe the changes in the database
Here's how we use Room Library to observe SQL Database changes :
@Dao
interface MyDao {
@Query("SELECT * FROM somedata_table")
fun getData(): Flow<List<SomeData>>
}
Let's look at some examples , See how to use it flow To deal with from API Response flow for . First , Suppose you implement the chat function , The message passes Server Channel and notification sending . Treat two data sources as a stream , Combine them together , Then use this flow to update the view , It's very convenient . Another example might be to use it to provide better and better response results . for example , When we're in SkyScanner When searching for the best flight on , Some quotations will arrive soon , But over time , There will be more and better quotations to reach , therefore , You will see better and better results . This is also used flow A good example of .

In addition to these circumstances , For different concurrent processing , flow It is also a useful tool . for example , Suppose you have a list of sellers , You need to get the quotation of each seller . We already know we can use async Implement this in set processing :
suspend fun getOffers(
sellers: List<Seller>
): List<Offer> = coroutineScope {
sellers
.map {
seller ->
async {
api.requestOffers(seller.id) }
}
.flatMap {
it.await() }
}
The above method is correct in many cases , But it has one drawback : When the seller list is large , Sending so many requests at once is not good for us or the server . Of course , This can be done by limiting frequency or current in the server , But we also want to control it on the client , So we can use Flow. under these circumstances , To limit the number of concurrent calls to 20 individual , We can use flaotMapMerge, And set the maximum concurrent number concurrency It is amended as follows 20:
suspend fun getOffers(
sellers: List<Seller>
): List<Offer> = sellers
.asFlow()
.flatMapMerge(concurrency = 20) {
seller ->
suspend {
api.requestOffers(seller.id) }.asFlow()
}
.toList()
Yes Flow Not a collection , Let's talk about concurrency 、 Context 、 Exceptions, etc. for more control . We will explore these features in the next chapter , This is it. ( In my experience ) flow The most useful place . I hope after we introduce all the different functions of it , You can clearly understand this .
Last , Because I prefer the style of responsive programming , Some teams prefer to use response flows rather than suspend functions . This style is in Android It's very popular on TV , among RxJava It's very mainstream , But now Flow Usually considered a better choice .
As you can see ,flow There are quite a few use cases . In some projects , They will be widely used , In other projects , They are only used occasionally . But I hope you know it's useful , It's worth learning .
summary
In this chapter , We introduced Flow The concept of . It expresses support for the process ( Different sequences ) Asynchronous data flow . In quite a few use cases ,flow It is useful to .
边栏推荐
- 在写web项目的时候,文件上传用到了smartupload,用了new string()进行转码,但是在数据库中,还是会出现类似扑克的乱码
- 递归使用和多维数组对象变一维数组对象
- Without sxid, suid & sgid will be in danger- Shangwen network xUP Nange
- Use of sigaction
- pytorch是什么?pytorch是一个软件吗?
- Hutool动态添加定时任务
- ZIP文件的导出
- nodejs基础:浅聊url和querystring模块
- Some preliminary preparations for QQ applet development: make an appointment for a development account, download and install developer tools, and create QQ applet
- Introduction to eth
猜你喜欢

如何迈向IPv6之IPv6过渡技术-尚文网络奎哥

CVPR 2022 | 大连理工提出自校准照明框架,用于现实场景的微光图像增强

Web会话管理安全问题

China Mobile Internet of things oneos and onenet were selected in the list of 2021 Internet of things demonstration projects

Hutool dynamically adds scheduled tasks

pytorch怎么下载?pytorch在哪里下载?

Simple wechat applet development page Jump, data binding, obtaining user information, obtaining user location information

释放数据力量的Ceph-尚文网络xUP楠哥

Mutex and rwmutex in golang

Nat. Comm. | 使用Tensor-cell2cell对细胞通讯进行环境感知去卷积
随机推荐
Web会话管理安全问题
Use of sigaction
[Blue Bridge Road - bug free code] pcf8591 - code analysis of AD conversion
IPv6过渡技术-6to4手工隧道配置实验--尚文网络奎哥
Nat. Comm. | 使用Tensor-cell2cell对细胞通讯进行环境感知去卷积
Docker install and start MySQL service
第十届中国云计算大会·中国站:展望未来十年科技走向
记一次 .NET 差旅管理后台 CPU 爆高分析
Recursion: quick sort, merge sort and heap sort
递归:快速排序,归并排序和堆排序
Arduino application development - LCD display GIF dynamic diagram
2022 tea master (primary) examination questions and tea master (primary) examination question bank
IPv6 foundation construction experiment
如何迈向IPv6之IPv6过渡技术-尚文网络奎哥
Role of JS No
SAP ui5 application development tutorial 105 - detailed introduction to the linkage effect implementation of SAP ui5 master detail layout mode
[mathematical logic] predicate logic (predicate logic basic equivalent | eliminate quantifier equivalent | quantifier negative equivalent | quantifier scope contraction expansion equivalent | quantifi
基于Pytorch和RDKit的QSAR模型建立脚本
What can learning pytorch do?
pytorch项目怎么跑?