当前位置:网站首页>Rust async: SMOL source code analysis -executor
Rust async: SMOL source code analysis -executor
2022-06-27 07:53:00 【51CTO】
author : Laizhichao
smol Is a streamlined and efficient asynchronous runtime , contains Executor,Reactor and Timer The implementation of the . This paper analyzes the Executor part , With the help of async_task( The previous articles have been analyzed in detail ) The foundation laid ,executor The implementation of is very clear and concise , The whole code can be analyzed in a few hours .smol Realized executor There are three categories :
- thread-local: Used to perform !Send Of task, from Task::local establish ;
- work-stealing: Multithreading with work stealing , perform Task::spawn Created Task;
- blocking executor: The thread pool executes with blocking task, from Task::blocking establish ;
smol::run Will execute the future To end , At the same time, it acts as a worker thread to execute thread-local,work-stealing in spawn Coming out task And push reactor Medium IO Events and timers . It also provides smol::block_on Method , To execute a single future. The following is an analysis of each executor Implementation details .
smol Overall structure

Thread Local Executor
The Executor Is characterized by spawn Coming out task and spawn The thread where the call is bound , Whole task It will not leave the thread from creation to execution to destruction , Therefore, it can be used for !Send Of Future.
Structure definition
To reduce cross thread synchronization overhead ,ThreadLocalExecutor Two queues, concurrent and non concurrent, are used : When other threads wake up task when , take task Push into the concurrent queue ; When the local thread wants to spawn new task Or wake up task when , Push into the non concurrent queue . The structure is defined as follows :
pub(crate)struct ThreadLocalExecutor{
// Non concurrent main task queue queue: RefCell
<
VecDeque
<Runnable>>, // When other threads wake up task when , Put in the queue , Support concurrent calls injector: Arc
<
SegQueue
<Runnable>>,
// Used to inform executor Threads , So if it gets stuck in epoll You can be awakened immediately when you go up event: IoEvent,
}
- 1.
- 2.
- 3.
- 4.
ThreadLocalExecutor::new
Initialize the field
ThreadLocalExecutor::enter
executor nesting ( That is to say executor Internally, it creates executor), It is easy to cause problems such as lost notification and deadlock . To test this , The usual approach is to set a thread local variable , When entering executor Set this variable before , This allows you to detect nesting .enter Function to receive a closure , Before calling the closure, you will executor Set in thread local in , Then execute the closure , At the end of the call, the thread local Variable recovery .
ThreadLocalExecutor::spawn
spawn Used to create and schedule task, The key is to remember the present spawn Thread id, When task When I wake up , Get the wakeup thread id and spawn Threads id Contrast , If equal, push into the main queue , If they are not equal, they will be pushed into the concurrent queue .
pubfn spawn
<
T:
'static>(future: implFuture<Output=T>+'
static)-
> Task
<
T
>{
if!EXECUTOR.is_set(){
panic!("cannot spawn a thread-local task if not inside an executor");
}
EXECUTOR.with(|ex|{
// Weak references are used here because :Injector The queue holds task, and task Of waker( Including the following // Of schedule Closure ) contain injector References to , This avoids circular references .letinjector=Arc::downgrade(
&ex.injector);
letevent=ex.event.clone();
letid=thread_id();
letschedule=move|runnable|{
ifthread_id()==id{
// yes spawn Thread of time , Directly into the main queue EXECUTOR.with(|ex|ex.queue.borrow_mut().push_back(runnable));
}elseifletSome(injector)=injector.upgrade(){
// Put into the concurrent queue injector.push(runnable);
}
// notice executor The thread of ,event.notify();
};
// establish task, Put it in line , And back to handlelet(runnable,handle)=async_task::spawn_local(future,schedule,());
runnable.schedule();
Task(Some(handle))
})
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
ThreadLocalExecutor::execute
execute The function of is to take out from the queue in batches task And implement , But it's not written in a dead circle , This is to put the executor And others executor as well as reactor,timer And so on . To take care of the two queues fairly , Avoid hunger , Divide the execution into 4 Group , Each group performs 50 individual task.
pubfn execute(
&self)-> bool {
for_in0..4{
for_in0..50{
matchself.search(){
None=>{
returnfalse;
}
Some(r)=>{
// throttle The function of is to prevent a task Endless execution leads to // other task hunger , The mechanism will be introduced in the next chapter .throttle::setup(||r.run());
}
}
}
// From the concurrency queue task Take it out and put it in the main queue .self.fetch();
}
// Remind the caller that there may be other tasks to run true
}
// Find the next executable taskfn search(
&self)-> Option
<
Runnable
>{
// Look in the main line to see if there is ifletSome(r)=self.queue.borrow_mut().pop_front(){
returnSome(r);
}
// From the concurrency queue task Take it out and put it in the main queue .self.fetch();
// Then check if the main line has self.queue.borrow_mut().pop_front()
}
fn fetch(
&self){
letmutqueue=self.queue.borrow_mut();
whileletOk(r)=self.injector.pop(){
queue.push_back(r);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
Blocking Executor
The executor The main feature is that blocking can be performed task, There is a global singleton , Can be independent of smol::run The drive operates independently . The implementation mechanism is to adaptively open multiple threads behind the implementation : When idle , No thread creation and resource consumption ; Once there is a task , Start the threads corresponding to the task ( Of course not more than the upper limit 500 individual ). When an open thread runs without a task, it will wait for a period of time to see if a task is dispatched , End the thread without .
Structure definition
BlockingExecutor::get
Used to get the global singleton
BlockingExecutor::main_loop
This function is run by the worker thread , Keep pulling from the queue task To perform , No, task Just wait 500 MS exit .
fn main_loop(
&'staticself){
letmutstate=self.state.lock().unwrap();
loop{
// When a thread is created, it is considered idle , The current thread is working , So the number of free is reduced by one .state.idle_count-=1;
// Run tasks in the queue.whileletSome(runnable)=state.queue.pop_front(){
// See if you want to create a new thread to run tasks , The main state yes move Into the grow_pool Of ,// Therefore, the lock is released after the call .// So that other threads can get tasks from the queue .self.grow_pool(state);
// async_task::Task::run Occurs during execution panic,Handle You know it over there ,// Will throw the exception again , So there is no such thing as secretly swallowing the abnormality .let_=panic::catch_unwind(||runnable.run());
// Re acquire the lock and continue the cycle state=self.state.lock().unwrap();
}
// In my spare time state.idle_count+=1;
// Sleep first 500 millisecond , Wake me up when you have a task .lettimeout=Duration::from_millis(500);
let(s,res)=self.cvar.wait_timeout(state,timeout).unwrap();
state=s;
// It's over time , No mission , Exit thread .ifres.timed_out()
&
&state.queue.is_empty(){
state.idle_count-=1;
state.thread_count-=1;
break;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
BlockingExecutor::grow_pool
If the current task exceeds 5 Times the number of idle threads , At the same time, the total number of threads does not exceed 500 individual , Increase the number of worker threads .
fn grow_pool(
&'staticself,mutstate: MutexGuard
<
'static,State>){
whilestate.queue.len()>state.idle_count*5&&state.thread_count<500{
state.idle_count+=1;
state.thread_count+=1;
// There are a lot of tasks , Wake up all idle threads to work .self.cvar.notify_all();
thread::spawn(move||{
context::enter(||self.main_loop())
});
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
BlockingExecutor::spawn
Standard operation , establish task, And start scheduling execution .
pubfn spawn
<
T:
Send
+
'static>(
&'
staticself,
future:
implFuture
<Output=T>+Send+'static,
)-> Task
<
T
>{
let(runnable,handle)=async_task::spawn(future,move|r|self.schedule(r),());
runnable.schedule();
Task(Some(handle))
}
// hold task Push into the queue , And wake up a sleep thread to work .fn schedule(
&'staticself,runnable: Runnable){
letmutstate=self.state.lock().unwrap();
state.queue.push_back(runnable);
self.cvar.notify_one();
self.grow_pool(state);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
Work Stealing Executor
This executor Is characterized by one or more worker threads , Each thread has a work queue , When the queue is empty , You can steal from other threads task To perform . therefore task The entire life cycle of creation, execution and destruction of may be handled by multiple threads , Therefore, the corresponding Future Realization Send. The advantage of work stealing over having only one global shared queue is that each thread has a queue , Therefore, a large amount of inter thread synchronization overhead can be avoided , At the same time, it can also realize the load balance of work tasks between threads . On the implementation , This executor There is no basis for cpu To create a fixed worker thread , Instead, each worker thread must actively invoke smol::run To join the worker thread .
data structure
WorkStealingExecutor Structure has a global variable , Used for worker threads to join executor, Steal other worker Of task, And non worker threads spawn Of task.Worker There is one slot Used to cache a task, Because there are some task Just. poll End again ready, Put it in first slot in , In this way, the next execution will start from this slot To get , Can reduce the number of task Switching overhead , Improve cache utilization , It can also avoid the synchronization overhead when pushing into the queue . and tokio/async_std similar .
pub(crate)struct WorkStealingExecutor{
// For non worker thread insertion task.injector: deque::Injector
<
Runnable
>,
// Registered for stealing other worker Of task Of handlestealers: ShardedLock
<
Slab
<deque::Stealer
<
Runnable
>>>,
// Used to notify the worker thread , So if it gets stuck in epoll You can be awakened immediately when you go up event: IoEvent,
}
pub(crate)struct Worker
<
'a>{
// When you register as a worker thread ID key: usize,
slot: Cell<Option<Runnable>>,
// Work queue , other worker You can steal tasks queue: deque::Worker<Runnable>,
executor: &'
aWorkStealingExecutor,
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
Executor Global variables and Worker Thread-local variable
Overall executor Used for worker threads to join , And non worker threads spawn new task.worker Thread local variables and ThreadLocalExecutor Works in a similar way .
pubfn get()->
&'staticWorkStealingExecutor{
staticEXECUTOR: Lazy
<
WorkStealingExecutor
>=Lazy::new(||WorkStealingExecutor{
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
});
&EXECUTOR
}
scoped_thread_local!{
staticWORKER: for
<
'a>&'
aWorker
<'a>
}
implWorker
<
'_>{
// Get into worker The context of pubfn enter<T>(&self,f: implFnOnce()-> T)-> T{
// Has been set to indicate that there is nesting ifWORKER.is_set(){
panic!("cannot run an executor inside another executor");
}
WORKER.set(self,f)
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
WorkStealingExecutor::spawn
spawn Used to create and schedule task. When task When I wake up , If it is a worker thread, it will be directly pushed into the queue of the worker thread , Otherwise, it is pushed in through the global variable .
pubfn spawn
<
T:
Send
+
'static>(
&'
staticself,
future:
implFuture
<Output=T>+Send+'static,
)-> Task
<
T
>{
letschedule=move|runnable|{
ifWORKER.is_set(){
// Indicates that the current is worker In the thread , So directly task Push the worker Queues .WORKER.with(|w|w.push(runnable));
}else{
// Not worker Threads can only pass through global executor hold task Push the self.injector.push(runnable);
// Notification worker thread self.event.notify();
}
};
// establish task, Put it in line , And back to handlelet(runnable,handle)=async_task::spawn(future,schedule,());
runnable.schedule();
Task(Some(handle))
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
WorkStealingExecutor::worker
Used to register worker threads , hold stealer Of Handle Write to global executor in , So that other threads can steal task.
pubfn worker(
&self)-> Worker
<
'_>{
letmutstealers=self.stealers.write().unwrap();
letvacant=stealers.vacant_entry();
// Create a worker and put its stealer handle into the executor.letworker=Worker{
key: vacant.key(),
slot: Cell::new(None),
queue: deque::Worker::new_fifo(),
executor: self,
};
vacant.insert(worker.queue.stealer());
worker
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
WorkStealingExecutor::execute
Similar to the above , Get out of the queue in batches task And implement , Divide the execution into 4 Group , Each group performs 50 individual task.
pubfn execute(
&self)-> bool {
for_in0..4{
for_in0..50{
matchself.search(){
None=>{
returnfalse;
}
Some(r)=>{
// Notify other threads that there may be tasks to steal self.executor.event.notify();
ifthrottle::setup(||r.run()){
// slot Optimization actually disrupts fifo The fairness of , If task In the course of running ready 了 ,// Manually remove it from slot Brush into the queue .// such yield_now To work properly , Will not lead to task Non stop loop execution .self.flush_slot();
}
}
}
}
// Also for the sake of fairness , Occasionally slot Brush into the queue .self.flush_slot();
// from injector Steal some tasks from the queue to the local queue , Avoid hunger .ifletSome(r)=self.steal_global(){
self.push(r);
}
}
// Notify the caller that there may be other tasks to run true
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
smol::run
Three executor Of spawn The methods are as follows Task::local,Task::blocking and Task::spawn External exposure .spawn Coming out task Through the implementation of smol::run To drive (blocking executor Own thread pool , No drive required ). It executes in turn future,ThreadLocalExecutor::execute, WorkStealingExecutor::execute, as well as reactor( A detailed analysis will be made later ).
pubfn run
<
T
>(future: implFuture
<
Output
=
T
>)-> T{
// Create good executor and reactorletlocal=ThreadLocalExecutor::new();
letws_executor=WorkStealingExecutor::get();
letworker=ws_executor.worker();
letreactor=Reactor::get();
// Use local executor Of IoEvent To create Wakerletev=local.event().clone();
letwaker=async_task::waker_fn(move||ev.notify());
letcx=
&mutContext::from_waker(
&waker);
futures::pin_mut!(future);
// Used to set the context before execution , such as tokio Of runtime.letenter=context::enter;
letenter=|f|local.enter(||enter(f));
letenter=|f|worker.enter(||enter(f));
enter(||{
letio_events=[local.event(),ws_executor.event()];
letmutyields=0;
// Execute sequentially task, And then call reactor, And block and so on :loop{
ifletPoll::Ready(val)=throttle::setup(||future.as_mut().poll(cx)){
returnval;
}
letmore_local=local.execute();
letmore_worker=worker.execute();
// perform reactorifletSome(reactor_lock)=reactor.try_lock(){
yields=0;
// poll reactorreact(reactor_lock,
&io_events,more_local||more_worker);
continue;
}
ifmore_local||more_worker{
yields=0;
continue;
}
// The mission is over , Release thread time slice , Try again next time yields+=1;
ifyields
<
=
2{
thread::yield_now();
continue;
}
/
/
I haven't found the task yet , Start blocking thread at reactor Superior notice .yields=
0;
letlock=
reactor.lock();
letnotified=
local.event().notified();
futures::pin_mut!(lock);
futures::pin_mut!(notified);
ifletEither::Left((reactor_lock,_))=
block_on(future::select(lock,notified)){
react(reactor_lock,&io_events,false);
}
}
})
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
Precautions for use
async_std The runtime uses deferred instantiation , On demand auto start policy , So users can use it out of the box , No special configuration is required . Now, smol Adopted and tokio A similar strategy , The entire runtime needs to be enabled manually , Otherwise, runtime will be generated panic, Considering the current tokio This strategy has a lot of problems for new users ,smol It is estimated that there will be similar problems , At present, several have been mentioned issue 了 . meanwhile smol::run It will not open threads in the background to start the whole work and steal the running environment , Instead, you join the work stealing runtime as a worker thread , So you need code like this to start the entire multithreaded runtime :
summary
smol The whole code is very concise , Only a thousand , This article only discusses its executor Analysis is made , And its Reactor The part of is also very wonderful , A large number of existing libraries can be , Such as linux-timerfd,linux-inotify,uds And so on , I will make a summary later when I am free .
边栏推荐
- 无论LCD和OLED显示技术有多好,都无法替代这个古老的显示数码管
- University database mysql
- js用switch输出成绩是否合格
- Binary tree structure and heap structure foundation
- Mapping of Taobao virtual product store opening tutorial
- RNA SEQ data analysis in R - investigate differentially expressed genes in the data!
- 什么是期货反向跟单?
- Basic knowledge | JS Foundation
- 盲测调查显示女码农比男码农更优秀
- R language analyzing wine data
猜你喜欢

Stream常用操作以及原理探索

Origin of forward slash and backslash

JS example print the number and sum of multiples of all 7 between 1-100

Blind survey shows that female code farmers are better than male code farmers

JS to judge the odd and even function and find the function of circular area

Testing network connectivity with the blackbox exporter

【c ++ primer 笔记】第4章 表达式

二叉树结构以及堆结构基础

Cookie encryption 7 fidder analysis phase

认识O(NlogN)的排序
随机推荐
js来打印1-100间的质数并求总个数优化版
What is a flotation machine?
ACM course term summary
[paper reading] internally semi supervised methods
移动安全工具-jad
Mapping of Taobao virtual product store opening tutorial
2. QT components used in the project
cookie加密7 fidder分析阶段
Import and export database related tables from the win command line
野风药业IPO被终止:曾拟募资5.4亿 实控人俞蘠曾进行P2P投资
PayPal account has been massively frozen! How can cross-border sellers help themselves?
闭包问题
js用switch语句根据1-7输出对应英文星期几
(resolved) the following raise notimplementederror occurs when Minet tests
ACM课程学期总结
js打印99乘法表
Windows下mysql-8下载、安装、配置教程
Win10 how to manage startup items?
js求所有水仙花数
L'enquête en aveugle a montré que les femmes étaient meilleures que les hommes.