当前位置:网站首页>Kotlin协程:创建、启动、挂起、恢复
Kotlin协程:创建、启动、挂起、恢复
2022-08-02 00:12:00 【LeeDuo.】
下面以launch方法为例进行分析。
一.协程的创建
launch方法的代码如下:
// CoroutineScope的扩展方法
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// 根据当前上下文,计算得到新的上下文
val newContext = newCoroutineContext(context)
// 根据启动模式,创建不同的续体
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
// 启动协程
coroutine.start(start, coroutine, block)
return coroutine
}
newCoroutineContext用于计算新的上下文,代码如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
// coroutineContext为CoroutineScope中保存的全局变量
// 对上下文进行相加
val combined = coroutineContext + context
// 用于debug
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
// 如果上下文中没有调度器,则添加一个默认的调度器
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
1.start方法
在不指定协程启动模式的情况下,协程将按照DEFAULT模式启动,在上述代码中,会调用StandaloneCoroutine对象的start方法。StandaloneCoroutine的代码如下:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
StandaloneCoroutine类中仅重写了handleJobException方法,用于处理父协程不处理的异常。因此这里调用的start方法实际是父类AbstractCoroutine的方法,AbstractCoroutine类的start方法代码如下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// 该方法用于完成父协程与子协程的绑定关联,同时确保父协程启动
initParentJob()
// 该方法的写法等同于start.invoke(block, receiver, this)
// 因此调用的CoroutineStart类的方法
start(block, receiver, this)
}
AbstractCoroutine类的start方法内,调用了CoroutineStart类的invoke方法。
2. CoroutineStart类
CoroutineStart是一个枚举类,用于根据不同的启动模式去启动协程,代码如下:
public enum class CoroutineStart {
// 四种启动模式
DEFAULT,
LAZY,
// 具有实验性,慎用
@ExperimentalCoroutinesApi
ATOMIC,
// 具有实验性,慎用
@ExperimentalCoroutinesApi
UNDISPATCHED;
// 根据不同的启动策略,启动协程,执行block
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // 该模式不主动启动,等待用户调用start方法
}
// 根据不同的启动策略,启动协程,执行block
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit
}
// 当前的启动模式是否为懒启动
@InternalCoroutinesApi
public val isLazy: Boolean get() = this === LAZY
}
CoroutineStart类中有两个invoke方法,其中一个参数中有receiver,另一个没有receiver。在Kotlin协程中,很多方法都重载了带有receiver的方法和不带有receiver的方法。
receiver用于为block执行提供一个环境。Kotlin中提供的启动协程的方法都是通过带receiver参数的start方法实现。通过receiver环境,可以更方便的实现一些操作,比如在launch启动的协程中再次调用launch启动新的协程。在没有receiver的环境下执行block,则更像是在suspend方法中执行,如果需要启动其他的协程,需要自己提供环境。
3. startCoroutineCancellable方法
startCoroutineCancellable是一个扩展方法,用来创建一个可以取消的协程,代码如下:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// createCoroutineUnintercepted:创建协程
// intercepted:拦截调度
// resumeCancellableWith:恢复执行
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
// 如果创建的过程发生异常,则通知续体恢复后续代码的执行
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
completion.resumeWith(Result.failure(e))
}
}
4.createCoroutineUnintercepted方法
createCoroutineUnintercepted方法用于创建一个新的、可挂起的、不受干扰的协程。
public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
在Kotlin中有很多被expect关键字标记的接口方法,需要找到对应平台下被actual标记的实现方法。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
// 用于debug
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
createCoroutineUnintercepted方法创建的协程需要手动调用resumeWith方法才可以启动,但重复的调用resumeWith方法可能会导致状态机发生异常。同时,参数中传入的completion可能会在任意的上下文中被调用。
正常情况下,我们编写的lambda表达式——block,在编译器编译时,会自动生成一个类,并继承SuspendLambda类,实现Continuation等接口。因为SuspendLambda继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,所以才有了上述代码中的判断逻辑。
如果当前的block对象的类型为BaseContinuationImpl,则调用create方法,这里的create方法是编译器生成的类里的重写方法,它的内部就是通过我们传入的参数,创建并返回根据blcok生成的类的一个实例对象。
如果当前的block对象的类型不为BaseContinuationImpl,则需要通过createCoroutineFromSuspendFunction方法创建协程。这里假设lambda表达式的类型不是BaseContinuationImpl。
5.createCoroutineFromSuspendFunction方法
该方法用于在createCoroutineUnintercepted方法中使用,当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。
有两种情况会调用该方法创建协程:第一种情况是lambda表达式中调用了其他的挂起方法;第二种情况是挂起方法是通过Java实现的。
createCoroutineFromSuspendFunction方法的代码如下:
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// 如果上下文为空
return if (context === EmptyCoroutineContext)
// 创建一个受限协程
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow()
block(this)
}
1 -> {
label = 2
result.getOrThrow()
}
else -> error("This coroutine had already completed")
}
}
else // 不为空,则创建一个正常的协程
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow()
block(this)
}
1 -> {
label = 2
result.getOrThrow()
}
else -> error("This coroutine had already completed")
}
}
}
受限协程是指协程在运行过程中的,只能调用协程作用域中提供的挂起方法发生挂起,其他挂起方法不能调用,因为在挂起方法会对续体进行拦截,可能导致后续代码的执行变得无法预测。
典型的例子就是sequence方法,它创建的协程就是受限协程,只能通过调用yield方法或者yieldAll方法才能发生挂起。由于受限协程中不能进行协程调度,因此其上下文是空的。
这里launch方法的上下文有一个默认调度器,因此会创建一个ContinuationImpl对象。
到这里,协程完成了创建。
二.协程的启动
再次回到startCoroutineCancellable方法,当调用createCoroutineUnintercepted创建好协程后,会调用intercepted方法,代码如下:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
intercepted方法是Continuation接口的扩展方法,内部调用了ContinuationImpl类的intercepted方法。
1.ContinuationImpl类
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
// 如果没有缓存,则从上下文中获取拦截器,调用interceptContinuation进行拦截,
// 将拦截的续体保存到全局变量
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also {
intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
这里的ContinuationInterceptor指的就是在newCoroutineContext方法中传入的Dispatchers.Default调度器。CoroutineDispatcher类的interceptContinuation方法的代码如下:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
// 将续体包裹成DispatchedContinuation,并传入当前调度器
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
...
}
2.resumeCancellableWith方法
再次回到startCoroutineCancellable方法,当调用intercepted方法进行拦截后,会调用resumeCancellableWith方法,代码如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
由于当前的Continuation对象的类型为DispatchedContinuation,因此调用DispatchedContinuation类的resumeCancellableWith方法,代码如下:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
...
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
// 是否进行调度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
// 进行调度
dispatcher.dispatch(context, this)
} else {
// Dispatcher.Unconfined调度器会走这里
executeUnconfined(state, MODE_CANCELLABLE) {
// 协程未被取消
if (!resumeCancelled()) {
// 恢复执行
resumeUndispatchedWith(result)
}
}
}
}
// 恢复执行前判断协程是否已经取消执行
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancelled(): Boolean {
// 获取当前的协程任务
val job = context[Job]
// 如果不为空且不活跃
if (job != null && !job.isActive) {
// 抛出异常
resumeWithException(job.getCancellationException())
return true
}
return false
}
@Suppress("NOTHING_TO_INLINE")
inline fun resumeUndispatchedWith(result: Result<T>) {
// 该方法在指定的上下文中执行,在执行后同步协程上下文变化
withCoroutineContext(context, countOrElement) {
// 调用续体的resumeWith方法
continuation.resumeWith(result)
}
}
...
}
// Dispatchers.Unconfined模式下的调度
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
): Boolean {
// 从ThreadLocal中获取EventLoop
val eventLoop = ThreadLocalEventLoop.eventLoop
// doYield表示是否正在让出执行
// 如果正在让出执行,并且执行队列还是空的,说明不需要执行,返回false
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
// 如果EventLoop当前还在被Unconfined调度器使用
return if (eventLoop.isUnconfinedLoopActive) {
_state = contState
resumeMode = mode
// 向队列中添加当前的任务
eventLoop.dispatchUnconfined(this)
// 返回 true
true
} else {
// 重新运行EventLoop
runUnconfinedEventLoop(eventLoop, block = block)
// 返回false
false
}
}
runUnconfinedEventLoop方法是一个扩展方法,用于启动EventLoop,代码如下:
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
// 引用计数+1
eventLoop.incrementUseCount(unconfined = true)
try {
// 先执行当前的任务
block()
// 循环分发任务
while (true) {
// 全部执行完毕,则退出分发
if (!eventLoop.processUnconfinedEvent()) break
}
} catch (e: Throwable) {
handleFatalException(e, null)
} finally {
// 引用计数+1
eventLoop.decrementUseCount(unconfined = true)
}
}
Dispatchers.Default调度器与Dispatchers.Unconfined调度器的调度逻辑基本都相同,最终都是调用Contination对象的resumeWith方法,同时传入Result对象作为参数。
这里的Contination是createCoroutineUnintercepted方法创建的继承ContinuationImpl的匿名内部类对象。Result是resumeCancellableWith方法传入的Result.success(Unit)对象,因为首次启动,所以传入类型为Unit。
调用匿名内部类的resumeWith方法,实际调用的是父类BaseContinuationImpl的resumeWith方法。
3.BaseContinuationImpl类
BaseContinuationImpl类的resumeWith方法的代码如下:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
// 循环
while (true) {
// 用于debug
probeCoroutineResumed(current)
// current环境下
with(current) {
// completion用于续体执行完的回调,为空,则抛出异常
// 这里的completion就是一开始创建的StandaloneCoroutine对象
val completion = completion!!
// 获取执行后的结果
val outcome: Result<Any?> =
try {
// 核心执行
val outcome = invokeSuspend(param)
// 如果返回值为COROUTINE_SUSPENDED,说明协程挂起,退出循环
if (outcome === COROUTINE_SUSPENDED) return
// 返回结果成功
Result.success(outcome)
} catch (exception: Throwable) {
// 返回结果失败
Result.failure(exception)
}
// 释放拦截的续体,状态机终止
releaseIntercepted()
// 这里没有直接调用resume,而是通过循环代替递归
// 这也是resumeWith方法声明为final的原因
if (completion is BaseContinuationImpl) {
// 这种情况一般为多个suspend方法按顺序执行
// 等待下一次循环
current = completion
param = outcome
} else {
// 返回结果
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
...
}
4.invokeSuspend方法
在上述代码中,resumeWith方法内部调用了invokeSuspend方法,这里的invokeSuspend方法实际就是createCoroutineFromSuspendFunction方法中创建的匿名内部类的invokeSuspend方法。匿名内部类的代码如下:
object : ContinuationImpl(completion as Continuation<Any?>, context) {
// 初始状态
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
// 先去获取一次结果,如果有异常,则直接抛出,避免执行
// 比如在调度器中,如果发现协程已经取消,
// 则调用resumeWithException方法,在这里直接被抛出
result.getOrThrow()
// 把当前续体传入,执行协程
// 可能发生挂起
block(this)
}
1 -> {
// 如果协程发生了挂起,那么恢复挂起后会走到这里
label = 2
// 获取最终的执行结果
result.getOrThrow()
}
else -> error("This coroutine had already completed")
}
}
三.协程的挂起与恢复
通过上述代码的分析,协程的挂起实际就是在协程返回结果时返回一个COROUTINE_SUSPENDED对象,在收到COROUTINE_SUSPENDED结果后直接返回,等待被再次调用resumeWith恢复。
COROUTINE_SUSPENDED对象定义在枚举类CoroutineSingletons中,代码如下:
internal enum class CoroutineSingletons {
COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
该枚举类代表了协程的三个状态,协程在创建后状态为UNDECIDED,如果执行过程中发生挂起,则状态变为COROUTINE_SUSPENDED,最后挂起恢复后状态变为RESUMED。
而协程的恢复实际就是在挂起方法执行完成后,通过调用协程执行时传入的续体的resumeWith方法,恢复后续代码的执行。
边栏推荐
猜你喜欢
Knowing the inorder traversal of the array and the preorder traversal of the array, return the postorder history array
Are test points the same as test cases?
路由策略
面试高频考题解法——栈的压入弹出序列、有效的括号、逆波兰表达式求值
Short video SEO search operation customer acquisition system function introduction
利用“栈”快速计算——逆波兰表达式
Arduino Basic Syntax
中缀转后缀、前缀表达式快速解决办法
Disk and file system management
测试点等同于测试用例吗
随机推荐
What is it like to trade for a living?
How does JSP use the page command to make the JSP file support Chinese encoding?
Play NFT summer: this collection of tools is worth collecting
Short video SEO search operation customer acquisition system function introduction
Mean Consistency Tracking of Time-Varying Reference Inputs for Multi-Agent Systems with Communication Delays
What does the errorPage attribute of the JSP page directive do?
Day11 shell脚本基础知识
2022/08/01 Study Notes (day21) Generics and Enums
CRS management and maintenance
Transient Stability Distributed Control of Power System with External Energy Storage
Redis-消息发布订阅
GIF making - very simple one-click animation tool
Pytorch seq2seq 模型架构实现英译法任务
不要用jOOQ串联字符串
An overview of the most useful DeFi tools
bgp 聚合 反射器 联邦实验
当奈飞的NFT忘记了Web2的业务安全
The Statement update Statement execution
07-SDRAM :FIFO控制模块
els 方块变形判断。