当前位置:网站首页>Deep dive kotlin synergy (20): build flow
Deep dive kotlin synergy (20): build flow
2022-07-03 03:59:00 【RikkaTheWorld】
Series eBook : Portal
Every flow Need to start from somewhere . There are many ways to do this , It depends on what we need , In this chapter , We will focus on the most important usage .
Flow Original value
establish flow The easiest way to do this is to use flowOf
function , In this function , We just need to define the flow What values should it have ( Similar to creating a list listOf
function ).
suspend fun main() {
flowOf(1, 2, 3, 4, 5)
.collect {
print(it) } // 12345
}
Sometimes , We may also need something that is not worth it flow. So , We can use emptyFlow
function ( Similar to creating an empty list emptyList
function ).
suspend fun main() {
emptyFlow<Int>()
.collect {
print(it) } // Nothing there?
}
converter
We can also use asFlow
The Iterable
、Iterator
or Sequence
Turn it into flow :
suspend fun main() {
listOf(1, 2, 3, 4, 5)
// or setOf(1, 2, 3, 4, 5)
// or sequenceOf(1, 2, 3, 4, 5)
.asFlow()
.collect {
print(it) } // 12345
}
These functions produce a stream of elements that are immediately available , And then we can use it flow Processing functions to process these element streams .
Convert a function to a stream
Flow It is often used to represent a flow that delays producing a single value ( It's like RxJava Medium Single). therefore , Convert the pending function to Flow It makes sense , The result of the function will be the only value in the stream . So ,asFlow
Functions also extend function types ( The parcel suspend () -> T
and () -> T
), Here it is used to put a pending lambda Expression to Flow.
suspend fun main() {
val function = suspend {
// This is pending lambda expression
delay(1000)
"UserName"
}
function.asFlow()
.collect {
println(it) }
}
// (1 sec)
// UserName
If you want to convert a regular function , We need to quote it first , Can be in Kotlin Use in ::.
suspend fun getUserName(): String {
delay(1000)
return "UserName"
}
suspend fun main() {
::getUserName
.asFlow()
.collect {
println(it) }
}
// (1 sec)
// UserName
Flow And responsive flow
If you use response flow in your application ( for example Reactor、RxJava2.x、RxJava3.X), Then you don't need to make too many changes in your code . All objects , for example Flux
,Flowable
and Obserable
It's all done Publisher
Interface , It can go through kotlinx-coroutines-reactive In the library asFlow
Function to Flow.
suspend fun main() = coroutineScope {
Flux.range(1, 5).asFlow()
.collect {
print(it) } // 12345
Flowable.range(1, 5).asFlow()
.collect {
print(it) } // 12345
Observable.range(1, 5).asFlow()
.collect {
print(it) } // 12345
}
If you want to convert in reverse , You need a specific library , Use kotlinx-coroutines-reactor, You can take Flow Turn into Flux
. Use kotlinx-coruotines-rx3( or kotlinx-coroutines-rx2), You can take Flow Convert to Flowable
or Observable
.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf(1, 2, 3, 4, 5)
flow.asFlux()
.doOnNext {
print(it) } // 12345
.subscribe()
flow.asFlowable()
.subscribe {
print(it) } // 12345
flow.asObservable()
.subscribe {
print(it) } // 12345
}
Flow Builder
Mainstream creation flow The way is to use flow Builder . We have used it in the previous chapter . Its behavior is similar to that of creating a sequence sequence
Builder 、 Or create a channel Of produce
Builder . We call flow
Function to start a builder , And in its lambda Use in expressions emit
Function emits the next value . We can also use emitAll
To launch from Channel or Flow All values ( emitAll(Flow)
yes flow.collect { emit(it) }
Abbreviation ).
fun makeFlow(): Flow<Int> = flow {
repeat(3) {
num ->
delay(1000)
emit(num)
}
}
suspend fun main() {
makeFlow()
.collect {
println(it) }
}
// (1 sec)
// 0
// (1 sec)
// 1
// (1 sec)
// 2
This builder has been used in previous chapters , It will be used many times in the next chapter , So we will see many uses of it . Now? , Let me review an example in the chapter sequence builder ,flow The builder is used to pull from the network API User flow from page by page request in :
fun allUsersFlow(
api: UserApi
): Flow<User> = flow {
var page = 0
do {
val users = api.takePage(page++) // Hang up
emitAll(users)
} while (!users.isNullOrEmpty())
}
understand flow Builder
flow Builders are the most basic way to create streams . All other options are based on this .
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
We just need to understand how the builder works , We knew flow How it works .flow The inside of the builder is actually very simple : It creates only one implementation Flow
Object of the interface , This object only calls collect Method internal block function :
fun <T> flow(
block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = object : Flow<T>() {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
fun interface FlowCollector<in T> {
suspend fun emit(value: T)
}
public suspend inline fun <T> Flow<T>.collect(
crossinline action: suspend (value: T) -> Unit
): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
Knowing that , Let's analyze how the following code works :
fun main() = runBlocking {
flow {
// 1
emit("A")
emit("B")
emit("C")
}.collect {
value -> // 2
println(value)
}
}
// A
// B
// C
When we call flow Builder time , We just created an object . however , When calling the collect
function , It means calling collector On the interface block
function . In this case block
Functions are comments 1 Situated lambda expression , Its receiver is collector, That is to say, notes 2 As defined in . When we use lambda Expression defines a function interface ( Such as FlowCollector) when , This lambda The body of the expression will be used as the only function of the function interface ( In this case emit
) The main body of . therefore , emmit
The body of the function is println(value)
. therefore , When we call collect
Function time , We begin to execute the annotation 1 As defined in lambda expression . This is it. flow How it works , Everything else is based on this .
channelFlow
Flow It's a cold data flow , So it generates values on demand . If you think about the above allUsersFlow
, Then only when the receiver requests , The next user page will be pulled . This is necessary in some cases , for example , Suppose we are looking for a specific user , If it's on the first page , We don't need to ask for more pages . In order to prove this in practice , In the following example , We use Flow The builder generates the next element , Please note that , The next page is all lazy requests .
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNumber: Int): List<User>
}
class FakeUserApi : UserApi {
private val users = List(20) {
User("User$it") }
private val pageSize: Int = 3
override suspend fun takePage(
pageNumber: Int
): List<User> {
delay(1000) // Hang up
return users
.drop(pageSize * pageNumber)
.take(pageSize)
}
}
fun allUsersFlow(api: UserApi): Flow<User> = flow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // Hang up
emitAll(users.asFlow())
} while (!users.isNullOrEmpty())
}
suspend fun main() {
val api = FakeUserApi()
val users = allUsersFlow(api)
val user = users
.first {
println("Checking $it")
delay(1000) // Hang up
it.name == "User3"
}
println(user)
}
// Fetching page 0
// (1 sec)
// Checking User(name=User0)
// (1 sec)
// Checking User(name=User1)
// (1 sec)
// Checking User(name=User2)
// (1 sec)
// Fetching page 1
// (1 sec)
// Checking User(name=User3)
// (1 sec)
// User(name=User3)
On the other hand , We may need to pull down page data while processing elements . In this case , Doing so may lead to more network requests , But it may also produce results faster . To achieve this goal , We need independent production and consumption . This independence is thermal data flow ( Such as channel) Typical characteristics of . therefore , We need to mix channel and flow. Yes , This is supportive : We just need to call channelFlow
function , It is associated with Flow similar , Because it does Flow Interface . This builder is a regular function , It operates with a terminal ( Such as collect) Start . It's also similar to Channel, Because once started , It will generate values in separate coroutines , Without waiting for the recipient . therefore , Pull the next page and check the user information at the same time .
fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // Hang up
users?.forEach {
send(it) }
} while (!users.isNullOrEmpty())
}
suspend fun main() {
val api = FakeUserApi()
val users = allUsersFlow(api)
val user = users
.first {
println("Checking $it")
delay(1000)
it.name == "User3"
}
println(user)
}
// Fetching page 0
// (1 sec)
// Checking User(name=User0)
// Fetching page 1
// (1 sec)
// Checking User(name=User1)
// Fetching page 2
// (1 sec)
// Checking User(name=User2)
// Fetching page 3
// (1 sec)
// Checking User(name=User3)
// Fetching page 4
// (1 sec)
// User(name=User3)
stay channelFlow Internal , We are ProducerScope<T>
Operate on the scope ,ProduceScope
And produce
The builder uses the same type . It has achieved CoroutineScope
, So we can use it to start a new process . To generate elements , We're going to use send
instead of emit
. We can also use SendChannel
Function access or direct control channel.
interface ProducerScope<in E>:
CoroutineScope, SendChannel<E> {
val channel: SendChannel<E>
}
channelFlow
A typical use of is when we need to calculate independently . To support that , channelFlow
Created a collaboration scope , So we can use things like launch
Such a function starts the coroutine . The following code does not apply to flow, Because it cannot create the scope required by the collaboration builder .
fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> =
channelFlow {
launch {
collect {
send(it) }
}
other.collect {
send(it) }
}
fun <T> contextualFlow(): Flow<T> = channelFlow {
launch(Dispatchers.IO) {
send(computeIoValue())
}
launch(Dispatchers.Default) {
send(computeCpuValue())
}
}
Like all other collaborations , channelFlow
Will wait for , It will not end until all its subroutines are in the terminal state .
callbackFlow
Suppose you need to monitor the flow of events , Such as user clicks or other types of operations . The listening process should be independent of the process of handling these events , therefore channelFlow
A good candidate . however , There's a better way : callbackFlow
.
For a long time ,channelFlow
and callbackFlow
There is no difference between . stay 1.3.4 In the version , Introduced some minor changes , To reduce the possibility of errors when using callbacks . However , The biggest difference is how people understand these functions : callFlow
It exists to encapsulate callbacks .
stay callbackFlow
Inside , We're still here ProducerScope<T>
Operate on the scope . Here are some functions that may be useful for wrapping callbacks :
awaitClose {...}
—— One hangs until channel Closed functions . once channel Closing will call its parameters .awaitClose
aboutcallbackFlow
It's very important . Take a look at the following example , withoutawaitClose
, The collaboration will end immediately after the callback is registered . This is very natural for synergy : Its main body is over , And it has no sub processes to wait for , So it's over , We useawaitClose
( Even if its body is empty ) To prevent this from happening , And we listen to elements , until channel Until it is closed in other waystrySendBlocking(value)
—— Be similar tosend
, But it is blocked, not suspended , So it can be used for non suspended functionsclose()
—— Close this channelcancel(throwable)
—— close channel And to flow Send an exception
Here is a use callbackFlow
Typical example :
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
try {
trySendBlocking(value)
} catch (e: Exception) {
// stay channel Exception handling in
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
awaitClose {
api.unregister(callback) }
}
summary
In this chapter , We understand flow Different creation methods . There are many for starting flow Function of , From simple flowOf
or emptyFlow
、 Convert to flow, To flow Builder . The simplest flow The builder is just a flow
function . You can use emit
Function generates the next value . also channelFlow
and callbackFlow
Builder , They are created with Channel Some characteristics of flow. These functions have their own usage . In order to make full use of Flow The potential of , It is useful to know them .
边栏推荐
- 递归使用和多维数组对象变一维数组对象
- Ffmpeg recording screen and screenshot
- [mathematical logic] propositional logic (propositional and connective review | propositional formula | connective priority | truth table satisfiable contradiction tautology)
- For instruction, uploading pictures and display effect optimization of simple wechat applet development
- [learning notes] seckill - seckill project - (11) project summary
- [Blue Bridge Road -- bug free code] interpretation of some codes of matrix keyboard
- vim 的实用操作
- Applet get user avatar and nickname
- 基于Pytorch和RDKit的QSAR模型建立脚本
- "Final review" 16/32-bit microprocessor (8086) basic register
猜你喜欢
leetcode:297. 二叉树的序列化与反序列化
pytorch怎么下载?pytorch在哪里下载?
Web会话管理安全问题
How to move towards IPv6: IPv6 Transition Technology - Shangwen network quigo
The 10th China Cloud Computing Conference · China Station: looking forward to the trend of science and technology in the next decade
Some preliminary preparations for QQ applet development: make an appointment for a development account, download and install developer tools, and create QQ applet
Is it better to speculate in the short term or the medium and long term? Comparative analysis of differences
Docker install and start MySQL service
MPLS setup experiment
『期末复习』16/32位微处理器(8086)基本寄存器
随机推荐
Is pytorch difficult to learn? How to learn pytorch well?
China Mobile Internet of things oneos and onenet were selected in the list of 2021 Internet of things demonstration projects
CEPH Shangwen network xUP Nange that releases the power of data
Without sxid, suid & sgid will be in danger- Shangwen network xUP Nange
Nat. Comm. | 使用Tensor-cell2cell对细胞通讯进行环境感知去卷积
Hutool动态添加定时任务
IPv6 transition technology-6to4 manual tunnel configuration experiment -- Kuige of Shangwen network
[brush questions] find the number pair distance with the smallest K
C language hashtable/hashset library summary
Error c2694 "void logger:: log (nvinfer1:: ilogger:: severity, const char *)": rewrite the restrictive exception specification of virtual functions than base class virtual member functions
105. SAP UI5 Master-Detail 布局模式的联动效果实现明细介绍
Numpy warning visibledeprecationwarning: creating an ndarray from ragged needed sequences
[Blue Bridge Road -- bug free code] interpretation of some codes of matrix keyboard
没有sXid,suid&sgid将进入险境!-尚文网络xUP楠哥
Wechat applet + Alibaba IOT platform + Hezhou air724ug build a serverless IOT system (III) -- wechat applet is directly connected to Alibaba IOT platform aliiot
@The difference between Autowired, @qualifier, @resource
Download and install captura and configure ffmpeg in captura
【刷题篇】 找出第 K 小的数对距离
2022年已过半,得抓紧
Filter