当前位置:网站首页>It's time for the kotlin coroutine to schedule thread switching to solve the mystery
It's time for the kotlin coroutine to schedule thread switching to solve the mystery
2022-06-30 11:13:00 【Android daily lecture】
1. How to specify the thread on which a coroutine runs ?
Android Common methods of switching threads
Conventional means
The usual way to switch to the main thread :Activity.runOnUiThread(xx),View.post(xx),Handler.sendMessage(xx) And so on . There are other frameworks , Such as AsyncTask、RxJava、 Thread pool, etc . They are essentially aided by Looper+Handler function .
Let's have a look Demo, Get student information in the sub thread , After getting the results, switch to the main thread display :
private inner class MyHandler : Handler(Looper.getMainLooper()) {
override fun handleMessage(msg: Message) {
// The main thread pops up toast
Toast.makeText(context, msg.obj.toString(), Toast.LENGTH_SHORT).show()
}
}
// Access to student information
fun showStuInfo() {
thread {
// Simulate network requests
Thread.sleep(3000)
var handler = MyHandler()
var msg = Message.obtain()
msg.obj = " I am a little fish man "
// Send to the main thread to execute
handler.sendMessage(msg)
}
}
We know Android UI Refresh is event driven , The main thread has been trying to get the pending events from the event queue , Wait until you get it , After getting it, execute the corresponding event . This is also Looper Core functions , Continuously detect the event queue , Putting events in the queue is through Handler To operate the .
The child thread passes through Handler Put events in the queue , The main thread is traversing the queue , This is the process of a child thread switching to the main thread for running .
Yes, of course , Because the main thread has a message queue , If you want to throw an event to a child thread for execution , The message queue can be constructed in the sub thread .
The coroutine switches to the main thread
Same function , Realize... With CO process :
fun showStuInfoV2() {
GlobalScope.launch(Dispatchers.Main) {
var stuInfo = withContext(Dispatchers.IO) {
// Simulate network requests
Thread.sleep(3000)
" I am a little fish man "
}
Toast.makeText(context, stuInfo, Toast.LENGTH_SHORT).show()
}
}
Obviously , There are too many simple processes .
Compared with conventional means , The coroutine does not need to show the construction thread , There is also no need to display pass Handler send out , stay Handler To receive information and display .
We have reason to guess , The coordination process also passes through Handler+Looper To switch to the main thread .
Coprocess switching thread
Of course, a coroutine can not only switch from a child thread to a main thread , You can also switch from the main thread to the child thread , Even switching between child threads .
fun switchThread() {
println(" I'm on a thread , Ready to switch to the main thread ")
GlobalScope.launch(Dispatchers.Main) {
println(" I'm on the main thread , Ready to switch to the child thread ")
withContext(Dispatchers.IO) {
println(" I'm in the sub thread , Ready to switch to the child thread ")
withContext(Dispatchers.Default) {
println(" I'm in the sub thread , Ready to switch to the main thread ")
withContext(Dispatchers.Main) {
println(" I'm on the main thread ")
}
}
}
}
}
Whether it's launch() Function or withContext() function , As long as we specify the thread to run , Then the coroutine will run on the specified thread .
2. Principle of concurrent scheduler
Specifies the thread on which the coroutine runs
Next from launch() Starting from the source code , Step by step, explore how the process switches threads .
launch() Simple writing :
fun launch1() {
GlobalScope.launch {
println("launch default")
}
}
launch() The function has three arguments , The first two parameters have default values , The third is our collaborative process body , That is to say GlobalScope.launch What's in curly braces .
#Builders.common.kt
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// Construct a new context
val newContext = newCoroutineContext(context)
// structure completion
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
// Start the process
coroutine.start(start, coroutine, block)
return coroutine
}
Then look at newCoroutineContext Realization :
#CoroutineContext.kt
actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
// stay Demo In the environment coroutineContext = EmptyCoroutineContext
val combined = coroutineContext + context
//DEBUG = false
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
// No distributor specified , The default distributor used is :Dispatchers.Default
// If a distributor is specified , Just use the specified
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
This involves CoroutineContext Some overloaded operator operations , About CoroutineContext This time we will not go into , Just understand the meaning .
Just need to know :
CoroutineContext There is a dispenser for the cooperation process .
What are the distributors of the collaboration process ?
Dispatchers.Main
UI Threads , stay Android In the main thread
Dispatchers.IO
IO Threads , Main execution IO operation
Dispatchers.Default
Main execution CPU Intensive operation , For example, some computational tasks
Dispatchers.Unconfined
Do not specify the thread to use
Specifies that the coroutine runs on the main thread
Do not use default parameters , Specify the dispatcher of the collaboration :
fun launch1() {
GlobalScope.launch(Dispatchers.Main) {
println(" I execute on the main thread ")
}
}
Take this as an example , Continue to analyze its source code .
As mentioned above , Start the process to use coroutine.start(start, coroutine, block) function :
#AbstractCoroutine.kt
fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
//start by CoroutineStart The function in
// Will eventually be called to invoke
start(block, receiver, this)
}
#CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
//this refer to StandaloneCoroutine, Default walk default
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
CoroutineStart.DEFAULT、CoroutineStart.ATOMIC It indicates the startup mode of the collaboration process , among DEFAULT Means start immediately , It is also the default startup mode .
The next step is through block To call a series of startup functions , We have analyzed this part in detail before , Here is a brief introduction :
block It represents the synergetic body , The actual compilation result is : Anonymous inner class , This class inherits from SuspendLambda, and SuspendLambda Indirectly realized Continuation Interface .
Continue to look at block Call to :
#Cancellable.kt
//block The extension function of
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
//runSafely Is a higher order function , Inside, it calls "{}" Contents of Li
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
Flow to createCoroutineUnintercepted() Function , stay juvenile , You know Kotlin The initial appearance of the cooperation process ? There is a key analysis in : This function is where you really create the body of a coroutine .
Go straight to the code :
#IntrinsicsJvm.kt
actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
// packing completion
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
// Create a collaboration body class
//receiver completion They are all the objects of the collaborative process StandaloneCoroutine
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
The function is to create a coroutine class , Let's call it MyAnnoy.
class MyAnnoy extends SuspendLambda implements Function2 {
@Nullable
@Override
protected Object invokeSuspend(@NotNull Object o) {
//... Coprocessor logic
return null;
}
@NotNull
@Override
public Continuation<Unit> create(@NotNull Continuation<?> completion) {
//... establish MyAnnoy
return null;
}
@Override
public Object invoke(Object o, Object o2) {
return null;
}
}
new MyAnnoy After creation , call intercepted(xx) function , This function is critical :
#Intrinsics.Jvm.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// Judge if yes ContinuationImpl, Turn to ContinuationImpl type
// Then call intercepted() function
(this as? ContinuationImpl)?.intercepted() ?: this
Why would you like to MyAnnoy To ContinuationImpl ?
Because it calls ContinuationImpl Inside intercepted() function :
#ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
intercepted
//1、 If intercepted If it is empty, it will start from context Take the data
//2、 If context Can't get , Then return to yourself , Finally, give intercepted assignment
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also {
intercepted = it }
First look at intercepted Variable type :
#ContinuationImpl.kt
private var intercepted: Continuation<Any?>? = null
still Continuation type , At the beginning intercepted = null.
context[ContinuationInterceptor] From CoroutineContext Take it out key by ContinuationInterceptor Of Element.
Since you want to take it out , So when you have to put it in , When did you put it in ?
The answer is :
newCoroutineContext(context) A new CoroutineContext, The dispenser is stored inside .
And because we set the distribution in the main thread :Dispatchers.Main, therefore context[ContinuationInterceptor] What comes out is Dispatchers.Main.
Dispatchers.Main Definition :
#Dispatchers.kt
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
#MainCoroutineDispatcher.kt
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
}
MainCoroutineDispatcher Inherited from CoroutineDispatcher, And there's a function in it :
#CoroutineDispatcher.kt
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
and Dispatchers.Main That's what's called interceptContinuation(xx) function .
The input parameter of this function is Continuation type , That is to say MyAnnoy object , The content of the function is very simple :
- structure DispatchedContinuation object , The parameters passed in are Dispatchers.Main and MyAnnoy object .
- Dispatchers.Main、MyAnnoy Assign values to member variables respectively dispatcher and continuation.
DispatchedContinuation Inherited from DispatchedTask, It is inherited from SchedulerTask, It's essentially Task,Task Realized Runnable Interface :
#Tasks.kt
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
//...
}
thus , We focus on its implementation Runnable In the interface run() Function .
Looking back, we can see that the structure is good DispatchedContinuation after , call resumeCancellableWith() function :
#DispatchedContinuation.kt
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
// Need to distribute
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
// Call the distributor to distribute
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
and Demo In here dispatcher That is to say Dispatchers.Main.
Okay , To sum up launch() Function functions :
Dispatchers.Main Realization
Let's take a look at Dispatchers.Main How to distribute tasks , Let's look at the implementation first :
#MainDispatcherLoader.java
internal object MainDispatcherLoader {
// Default true
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
// Construct the main thread distribution
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
// Load distributor factory ①
FastServiceLoader.loadMainDispatcherFactory()
} else {
...
}
// Through factory class , Create a dispenser ②
factories.maxByOrNull {
it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
...
}
}
}
First look at ①:
#FastServiceLoader.kt
internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
//...
return try {
// Reflection construct factory class :AndroidDispatcherFactory
val result = ArrayList<MainDispatcherFactory>(2)
FastServiceLoader.createInstanceOf(clz,
"kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply {
result.add(this) }
FastServiceLoader.createInstanceOf(clz,
"kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply {
result.add(this) }
result
} catch (e: Throwable) {
//...
}
}
The factory class returned by this function is :AndroidDispatcherFactory.
Look again ②, After getting the factory class , It's time to use it to create concrete entities :
#HandlerDispatcher.kt
internal class AndroidDispatcherFactory : MainDispatcherFactory {
// rewrite createDispatcher function , return HandlerContext
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
//...
}
// Definition
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
}
Finally created HandlerContext.
HandlerContext Inheriting from class :HandlerDispatcher
#HandlerDispatcher.kt
sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
// Rewrite the distribution function
override fun dispatch(context: CoroutineContext, block: Runnable) {
// Throw it to the main thread to execute ,handler For the main thread Handler
handler.post(block)
}
}
Obviously the ,DispatchedContinuation Inner aid dispatcher.dispatch() distributed , and dispatcher yes Dispatchers.Main, The final implementation is HandlerContext.
therefore dispatch() Function calls HandlerDispatcher.dispatch() function , In this function block Thrown to the main thread for execution .
block Why ?
block It's actually DispatchedContinuation object , From the above analysis, we can see , It indirectly realizes Runnable Interface .
View its implementation :
#DispatchedTask.kt
override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
//delegate by DispatchedContinuation In itself
val delegate = delegate as DispatchedContinuation<T>
//delegate.continuation For our collaborative process MyAnnoy
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
//...
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
//...
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
// Execution coordination body
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
//...
} finally {
//...
}
}
continuation Variables are our coroutines :MyAnnoy.
MyAnnoy.resume(xx) We are familiar with this function , Familiarize yourself with :
#ContinuationImpl.kt
override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
with(current) {
//completion Is defined at the beginning StandaloneCoroutine
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// Execute the code in the collaboration body
val outcome = invokeSuspend(param)
if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) return
kotlin.Result.success(outcome)
} catch (exception: Throwable) {
kotlin.Result.failure(exception)
}
//...
}
}
}
invokeSuspend(param) What is called is the code in the coroutine body , That is to say launch What's in curly braces , Therefore, the contents are executed by the main thread .
Look again. launch(Dispatchers.Main) The function performs the following steps :
- The dispenser HandlerContext Stored in CoroutineContext( Collaboration context ) in .
- structure DispatchedContinuation The dispenser , It holds variables dispatcher=HandlerContext,continuation=MyAnnoy.
- DispatchedContinuation call dispatcher(HandlerContext) distributed .
- HandlerContext take Runnable(DispatchedContinuation) Throw to main thread .
After the above steps ,launch(Dispatchers.Main) The task is finished , as for Runnable When to execute it has nothing to do with it .
When Runnable After the main thread is executed , from DispatchedContinuation Take it out continuation(MyAnnoy), And call continuation.resume() function , And then perform MyAnnoy.invokeSuspend() function , Finally implemented launch{} Contents in the process body .
So the coroutine is happily executed in the main thread .
Old rules , Combine code with function call graph :
3. Thread selection during process recovery
Take the main thread as an example , We know the principle of specifying the thread to run in a coroutine .
Imagine another scenario :
Switch the execution of the sub thread in the cooperation process , Will the sub thread return to the main thread for execution after the sub thread is completed ?
For the above Demo To transform :
fun launch2() {
GlobalScope.launch(Dispatchers.Main) {
println(" I execute on the main thread ")
withContext(Dispatchers.IO) {
println(" I execute in a child thread ")//②
}
println(" On which thread do I execute ?")//③
}
}
Let's guess first ③ What is the answer ? Is it a main thread or a sub thread ?
withContext(xx) Function part 1 ( Speak true ,Kotlin The suspension of the cooperation process is not so mysterious ( Principles )) It has been analyzed in depth , It is a suspend function , The main role :
Switch thread execution coroutine .
MyAnnoy1 Corresponding to the process body 1, For the father process body .
MyAnnoy2 Corresponding to the process body 2, Is a subprocess body .
When ② After execution , Will switch to the parent process execution , Let's take a look at the process of switching parent processes .
The execution of each coroutine must go through the following function :
#BaseContinuationImpl.kt
override fun resumeWith(result: Result<Any?>) {
//...
while (true) {
//..
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// Execution coordination body
val outcome = invokeSuspend(param)
if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) return
kotlin.Result.success(outcome)
} catch (exception: Throwable) {
kotlin.Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
//...
} else {
// If the process body of the previous step is not blocked , execute completion
completion.resumeWith(outcome)
return
}
}
}
}
Here to withContext(xx) Take the execution of function coroutine body as an example , its completion What is it ?
As mentioned above launch() When starting the collaboration , Its synergetic body completion by StandaloneCoroutine, in other words MyAnnoy1.completion = StandaloneCoroutine.
from withContext(xx) From the source code , its completion by DispatchedCoroutine,DispatchedCoroutine, It is inherited from ScopeCoroutine,ScopeCoroutine A member variable is :uCont: Continuation.
When construction DispatchedCoroutine when , The passed in coroutine body is assigned to uCont. That is to say DispatchedCoroutine.uCont = MyAnnoy1,MyAnnoy2.completion = DispatchedCoroutine.
here , The child process body and the parent process adopt DispatchedCoroutine It's connected .
therefore completion.resumeWith(outcome)==DispatchedCoroutine.resumeWith(outcome). Direct view The latter can be implemented :
#AbstractCoroutine.kt
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
#Builders.common.kt
#DispatchedCoroutine In class
override fun afterResume(state: Any?) {
//uCont For the father process body
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
It's all clear now ,uCont.intercepted() Find its interceptor , because uCont by MyAnnoy1, Its interceptor is HandlerContext, Again, throw back to the main thread for execution .
therefore , above Demo in ③ The answer is :
It executes on the main thread .
In summary , Just two steps :
- The parent process executes in the main thread , When a pending method is encountered in the middle, switch to the sub thread ( Zixie Cheng ) perform .
- After the execution of the subprocess , Find the process body of the parent process , Keep it distributed according to the original rules .
Old rules , There are codes, pictures and truth :
thus , The principle of switching to the main thread has been analyzed .
Curious friends may ask : For example, you are switching from the sub thread to the main thread , If a child thread switches to a child thread ?
Switch to the main thread Handler, And the subthread switch depends on the thread pool , This piece of content is more , Take it out alone and analyze .
Now that this point has been mentioned , Here is another question :
fun launch3() {
GlobalScope.launch(Dispatchers.IO) {
withContext(Dispatchers.Default) {
println(" Which thread do I run on ")
delay(2000)
println("delay Which thread do I run on ")
}
println(" Which thread do I run on ")
}
}
Do you know the answer above ?
In the next chapter, we will focus on analyzing the scheduling principle of the thread pool in the collaboration process , You will know the above answer through it .
This article is based on Kotlin 1.5.3, The text is complete Demo Please click on
边栏推荐
- LVGL 8.2 Simple Image button
- SQL必需掌握的100个重要知识点:使用表别名
- Wechat Emoji is written into the judgment, and every Emoji you send may become evidence in court
- Oceanbase installation Yum source configuration error and Solutions
- SQL必需掌握的100个重要知识点:创建和操纵表
- SQL必需掌握的100个重要知识点:插入数据
- LVGL 8.2 Simple Drop down list
- 再测云原生数据库性能:PolarDB依旧最强,TDSQL-C、GaussDB变化不大
- 【leetcode 239】滑动窗口
- promise async和await的方法与使用
猜你喜欢
File sharing server
Jetpack Compose DropdownMenu跟随手指点击位置显示
Kotlin 协程调度切换线程是时候解开谜团了
[xi'anjiaotonguniversity] information sharing of the first and second postgraduate entrance examinations
导致系统性能失败的10个原因
中移OneOS开发板学习入门
国产自研系统的用户突破4亿,打破美国企业的垄断,谷歌后悔不迭
Go language defer
ArrayList与顺序表
What is erdma as illustrated by Coptic cartoon?
随机推荐
Deep dive kotlin synergy (18): hot and cold data flow
Methods and usage of promise async and await
深潜Kotlin协程(十八):冷热数据流
【STL源码剖析】容器(待补充)
Pycharm项目使用pyinstalle打包过程中问题及解决方案
Wireguard simple configuration
数据库什么时候需要使用索引【杭州多测师】【杭州多测师_王sir】
LVGL 8.2 Drop down in four directions
[leetcode 16] sum of three numbers
第一届中国数字藏品大会即将召开
Unity Shader - 踩坑 - BRP 管线中的 depth texture 的精度问题(暂无解决方案,推荐换 URP)
datax json说明
Rejuvenated Dell and apple hit each other, and the two old PC enterprises declined rapidly
LVGL8.2 Simple Checkboxes
China will force a unified charging interface. If Apple does not bow its head, iPhone will be kicked out of the Chinese market
MCU firmware packaging Script Software
LVGL 8.2 Simple Colorwheel
从开源项目探讨“FPGA挖矿”的本质
Line generation (Gauss elimination method, linear basis)
promise async和await的方法与使用