当前位置:网站首页>Analysis on the principle of flow
Analysis on the principle of flow
2022-07-29 06:01:00 【Code and thinking】
Flow It is an asynchronous data flow based on coprocessing , So learning flow Before the principle, you need to master the knowledge related to synergy .
Don't talk much , Direct swing , No , Just start .
First, the simplest creation flow We all know flow{ emit(vale) }, Use here lambda Direct will block Pass in flow Inside the , We all know flow It's cold flow , Only when we collect collect flow When , Inside block Will execute , So no one objects , But how did this happen , In fact, it skillfully applies the hang function of the coroutine .
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
You can see us flow{} Back to a SafeFlow() object , Code block block Inherited is a suspended interface FlowCollector, Does it come to mind here launch A parameter in the source code block: suspend CoroutineScope.() -> Unit, Yes, it's the same !
public interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}
This FlowCollector Very important , The official interpretation is :FlowCollector Used as a The intermediate or terminal collector of the stream , Express The entity that accepts the value emitted by the stream . This interface should not be implemented directly , Instead, it should be used as a receiver in the flow generator when implementing custom operators . The implementation of this interface is not thread safe . There is only one interface emit Method , We are all familiar with this method , This is the method we use to stuff data into the stream , You can see emit Is a suspend function .( Here is an analogy liveData The implementation of the co process )
Let's take a look at SafeFlow There is a rewritten collectSafely Method , See the specific implementation SafeFlow inherited AbstractFlow class
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
*
* A valid implementation of this method has the following constraints:
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
* The emission should happen in the context of the [collect] call.
* Please refer to the top-level [Flow] documentation for more details.
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not
* thread-safe by default.
* To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
*
* @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
AbstractFlow You can see collect Implementation method , You can see AbstractFlow What is inherited is Flow Base class ,Flow There are no implementation methods in the base class , only one collect Abstract method of , So let's focus on AbstractFlow Inside collect How to realize asynchronous execution emit Of .
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
No matter use collect() still collect{} All the calls are Flow Of collect Abstract method ,collect The real realization is still AbstractFlow Inside , It's just going to be collect Of block Encapsulation is FlowCollector( rewrite emit Method to execute block)
Packaged FlowCollector and CoroutineContext Continue to be encapsulated as SafeCollector type , And pass the instance into collectSafely Method is used in SafeFlow To realize block Implementation .
@Suppress("UNCHECKED_CAST")
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
......
/**
* This is a clever implementation of state machine reuse . First , It checks whether it is not used at the same time ( We expressly prohibit ), Then cache only one completion instance ,
* Avoid extra allocation on each launch , Make it effectively garbage free on its thermal path .
*/
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn [email protected]{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
// Save raised emit abnormal ( Even check the context ) Fact
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// Check flow Whether in the same context
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// Check context
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionElement) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}
From the above analysis, we can see , Not invoked collect I didn't call collectSafely Methods don't collect what we stuff emit Data in , Only a call collect Method will be called to , Then call... In the form of an extension function flow Of block,block Inside, we are emit The data written inside , So as a receiver SafeCollector Called emit Method .
FlowCollector Of block stay SafeCollector China and Israel value In the form of , The last one to return emitFun Variable , The type of this variable is FlowCollector Of emit Of block type .
To see this emitFun You will find how there is one more parameter , because emit The default parameter of the suspended function is Continuation, This piece of kotlin Did it for us , But for java In general, it must pass Continuation go in . Pass in the same... In this way Continuation, To ensure the Continuation The unity of .
Having said so much, let's summarize :
Using the properties of extension function , Call to flow Of block And then called SafeCollector Of emit, And here it is emit Will be called to the incoming FlowCollector Of emit, And it came in emit The function is overridden and called block, So it will call collect Of block. Because it only calls collect Then call safeFlow Of collect function , Then it will call collectSafely Function to execute flow Code for . So don't call collect Words ,flow The code building blocks of will not be executed , Return at most one safeFlow The object of .
author :Lu_Hsiang
link :https://juejin.cn/post/7120112404393885733
边栏推荐
- Simple optimization of interesting apps for deep learning (suitable for novices)
- Machine learning makes character recognition easier: kotlin+mvvm+ Huawei ml Kit
- 并发编程学习笔记 之 工具类CountDownLatch、CyclicBarrier详解
- 剑指核心-TaoCloud全闪SDS助力构建高性能云服务
- [pycharm] pycharm remote connection server
- Use of file upload (2) -- upload to Alibaba cloud OSS file server
- Training log III of "Shandong University mobile Internet development technology teaching website construction" project
- Android studio login registration - source code (connect to MySQL database)
- 以‘智’提‘质|金融影像平台解决方案
- 【目标检测】6、SSD
猜你喜欢

【目标检测】Generalized Focal Loss V1

How to PR an open source composer project

Refresh, swagger UI theme changes

Flink connector Oracle CDC 实时同步数据到MySQL(Oracle19c)

全闪分布式,如何深度性能POC?

【目标检测】KL-Loss:Bounding Box Regression with Uncertainty for Accurate Object Detection

浅谈分布式全闪存储自动化测试平台设计

File文件上传的使用(2)--上传到阿里云Oss文件服务器

Ffmpeg creation GIF expression pack tutorial is coming! Say thank you, brother black fly?

北京宝德&TaoCloud共建信创之路
随机推荐
Study and research the way of programming
ssm整合
XDFS&空天院HPC集群典型案例
Windos下安装pyspider报错:Please specify --curl-dir=/path/to/built/libcurl解决办法
Thinkphp6 output QR code image format to solve the conflict with debug
数组的基础使用--遍历循环数组求出数组最大值,最小值以及最大值下标,最小值下标
Tear the ORM framework by hand (generic + annotation + reflection)
Flink, the mainstream real-time stream processing computing framework, is the first experience.
mysql在查询字符串类型的时候带单引号和不带的区别和原因
Markdown syntax
与张小姐的春夏秋冬(2)
Reporting Service 2016 自定义身份验证
Detailed explanation of tool classes countdownlatch and cyclicbarrier of concurrent programming learning notes
[clustmaps] visitor statistics
"Shandong University mobile Internet development technology teaching website construction" project training log I
[go] use of defer
Valuable blog and personal experience collection (continuous update)
Machine learning makes character recognition easier: kotlin+mvvm+ Huawei ml Kit
xtrabackup 的使用
Interesting talk about performance optimization thread pool: is the more threads open, the better?