当前位置:网站首页>[rust translation] implement rust asynchronous actuator from scratch
[rust translation] implement rust asynchronous actuator from scratch
2022-06-28 06:39:00 【51CTO】
This paper is about Stjepang Big guy's blog translation , From bai The contribution of .
- Interface
- Pass the output to JoinHandle
- Analysis of the task
- Actuator thread
- Task execution
- A little magic
- The improved JoinHandle
- Dealing with panic (panic)
- The efficiency of the actuator
- correctness
- Actuators for everyone
- Reprint note
Now we've built block_on function , It's time to turn it into a real actuator . We want our legacy to run more than one at a time future, It's about running multiple future!
The inspiration for this post comes from juliex, One of the smallest actuators , So is the author Rust Medium async/await One of the pioneers of function . Today we're going to start from scratch and write a more modern 、 More clearly juliex edition .
The goal of our actuators is to use only simple and completely safe code , But the performance can match the best available actuators .
We're going to rely on crate Include crossbeam、 async-task、 once_cell、 futures and num_cpus.
Interface
The actuator has only one function , It's just running a future:
Back to JoinHandle It is a kind of realization Future The type of , The output can be obtained after the task is completed .
Pay attention to this spawn() Functions and std::thread::spawn() The similarities between —— They are almost equivalent , Except for an asynchronous task , Another spawning thread .
Here is a simple example , Generate a task and wait for its output :
Pass the output to JoinHandle
since JoinHandle Is an implementation Future The type of , So let's briefly define it as a fixed to the heap future Another name for :
This method is feasible at present , But don't worry , We'll rewrite it as a new structure later , And do it manually Future.
Produced future The output of must be sent in some way to JoinHandle. One way is to create a oneshot passageway , And in future When finished, the output is sent through this channel . that JoinHandle It's one waiting for a message from the channel future:
use futures::channel::oneshot;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (s, r) = oneshot::channel();
let future = async move {
let _ = s.send(future.await);
};
todo!()
Box::pin(async { r.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
The next step is to allocate... On the heap future Wrappers , And push it into some kind of global task queue , In order to be processed by the execution program . We call this distributive future For a task .
Analysis of the task
Mission (task) Include future And its state . We need to track the status , To see if the task is scheduled to run 、 Whether it is currently running 、 Whether it has been completed, etc .
Here's ours Task The definition of type :
We're not sure what the state is , But it will be something that can be updated from any thread AtomicUsize. Let's talk about it later .
Future Is the output type of ()ーー This is because spawn () Function will be the original future Package as a send output to oneshot passageway , And then simply go back to ().
future Fixed to the pile . This is because there is only pin Of future To be polled (poll). But why is it still packaged in Mutex What about China? ?
Every task associated with Waker Will save one Task quote , In this way, it can wake up the task by pushing it into the global task queue . That's the problem : Task instances are shared between threads , But polling future Need variable access to it . Solution : We will future Encapsulate in mutex , To gain variable access to it .
If all this sounds confusing , Don't worry about , Once we've finished the whole actuator , It's much easier to understand !
Let's assign a save future And his state of Task To complete spawn function :
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (s, r) = oneshot::channel();
let future = async move {
let _ = s.send(future.await);
};
let task = Arc::new(Task {
state: AtomicUsize::new(0),
future: Mutex::new(Box::pin(future)),
});
QUEUE.send(task).unwrap();
Box::pin(async { r.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
Once the task is assigned , We push it into QUEUE, This is a global queue with runnable tasks .Spawn () The function is now complete , So let's define QUEUE...
Actuator thread
Because we're building an actuator , So there must be a background thread pool , It takes runnable tasks from the queue and runs them , That is, polling them for future.
Let's define a global task queue , And when it is first initialized, it generates an execution thread pool :
use crossbeam::channel;
use once_cell::sync::Lazy;
static QUEUE: Lazy<channel::Sender<Arc<Task>>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Arc<Task>>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || receiver.iter().for_each(|task| task.run()));
}
sender
});
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
It's simple —— The executor thread is actually a line of code ! The task queue is an unbounded channel , The executor thread receives tasks from this channel and runs each task .
The number of actuator threads is equal to the number of cores on the system , The number of cores is determined by nums_cpus Provide .
Now we have task queues and thread pools , The last part that needs to be implemented is run() Method .
Task execution
Running a task simply means polling its future. We have achieved from us block_on() Of Previous blog post Know how to poll future.
Run() The method is as follows :
Please note that , We need to lock in future, To get variable access and poll it . According to design , No other thread holds the lock at the same time , therefore try_lock() Must always succeed .
But how do we create a Wakener ? We're going to use async_task::waker_fn(), But what should wake-up functions do ?
We can't just put one Arc<Task> Put it in QUEUE in , Here are the potential competitive conflicts we should consider :
- If a task has been completed , And then wake up and what to do ? Waker The life cycle is going to be longer than it's related to Future, And we don't want to include completed tasks in the queue .
- If a task is before it runs , What happens if you wake up twice in a row ? We don't want the same task to appear twice in the queue .
- What if a task wakes up while it's running ? If you add it to the queue at this point , Another thread of execution might try to run it , This will cause a task to run on two threads at the same time .
If we think about it , We'll come up with two simple rules , Solving all these problems gracefully :
- If it's not awakened and is not currently running , The wake-up function will schedule this task
- If a task is awakened while it is running , By the current executor thread ( This is currently running future That thread of ) Reschedule it .
Let's sketch out the rules :
impl Task {
fn run(self: Arc<Task>) {
let waker = async_task::waker_fn(|| {
todo!("schedule if the task is not woken already and is not running");
});
let cx = &mut Context::from_waker(&waker);
self.future.try_lock().unwrap().as_mut().poll(cx);
todo!("schedule if the task was woken while running");
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
Remember when we were Task As defined in AtomicUsize The status field of the type ? Now it's time to store some useful data in it . About the mission , There are two messages that can help us wake up :
- Whether the task has been awakened
- Whether the task is running
Both values are true / false value , We can do it in state Fields are represented by two bits :
Wake up function settings “WOKEN” position . If both were previously 0( That is, the task is neither awakened nor running ) , So we schedule tasks by pushing references into the queue :
Polling future Before , We canceled WOKEN Bit setting , And set up RUNNING position :
Interestingly , If the task is done ( That is, its future No more pending) , We'll keep it forever in RUNNING state . such future After being awakened, it is impossible to enter the queue again .
We now have a real actuator ーー stay v1.rs See the complete implementation in .
A little magic
If you find handling Task Structures and their state transitions are challenging , I feel the same . But there is also good news , You don't have to do it yourself , Use asyc-task that will do !
We just need to use async_task::Task() Replace Arc<Task>, And use async-task::JoinHandle<()> Replace oneshot passageway .
That's how we simplify generation :
type Task = async_task::Task<()>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
task.schedule();
Box::pin(async { handle.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
async_task::spawn() Accept three parameters :
- To be run future
- A scheduling function that queues tasks . This function may be executed by the Wakener , May also be
run() Polling future After execution . - One containing arbitrary information tag, This tag Information will be stored in task in . In this blog we don't think about simply saving
(), That is to ignore it .
Then the constructor returns two values :
-
async_task::Task<()>, among () It's just introduced tag. -
async_task::JoinHandle<R, ()>, there () It's just tag. This JoinHandle It's a future, When it's finished, it will return a Option<R>. When to return to None It means that the task has happened panic Or it's canceled .
If you want to know schedule() Method , It just calls on the task schedule Function to push it to the queue . We can also push the task into QUEUE—— The end result is the same .
in summary , We ended up with this very simple actuator :
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || receiver.iter().for_each(|task| task.run()));
}
sender
});
type Task = async_task::Task<()>;
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
task.schedule();
Box::pin(async { handle.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
The complete code can be found in v2.rs Find .
Use here async_task::spawn() It's not just simple . It's also better than what we wrote ourselves Task More efficient , And more robust . Take an example of robustness ,async_task::Task Delete the future as soon as you're done , Instead of waiting for all references of the task to fail before deleting .
besides ,async-task It also provides some useful features , such as tags and cancellation, But we're not going to talk about that today . It is also worth mentioning that ,async-task It's a #[no_std]crate, It can even be used without a standard library .
The improved JoinHandle
If you look closely at our latest actuators , Another example of inefficiency ——JoinHandle Redundancy of Box::pin() Distribute .
It would be better if we could use the following type aliases , But we can't , because async_task::JoinHandle<R> Output Option<R>, and JoinHandle Output R:
We can only put async_task::JoinHandle Encapsulate in a new structure , If the task happens panic Or be cancelled , It will, too panic:
This sentence doesn't make sense , Need to see async_task Source code is OK
struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
impl<R> Future for JoinHandle<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
A complete implementation of the actuator can be done in v3.rs Find .
Dealing with panic (panic)
up to now , We haven't really thought about what happens when the task panics , That is to call poll() There will be panic when . Now? run () The method is just to spread the panic to the actuator . We should think about whether this is what we really want .
It's wise to deal with these fears in some way . for example , We can simply ignore panic , Continue operation . So they just print information on the screen , But it won't crash the whole process ーー Panic threads work exactly the same way .
To ignore the panic , We will run() Package as catch_unwind() :
use std::panic::catch_unwind;
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || {
receiver.iter().for_each(|task| {
let _ = catch_unwind(|| task.run());
})
});
}
sender
});
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
stay v4.rs You can find the complete actuator code that ignores panic .
There are many smart strategies for dealing with panic . Here are some in async-task Examples provided in the library :
- Ignore panic -- Panic is directly ignored , When
JoinHandle<R> stay await There will also be panic - Spreading panic panic Put back in waiting
JoinHandle<R> In the resulting task . - Output panic
JoinHandle<R> Output std::thread::Result<R>.
It's easy to implement any panic strategy you want . It's entirely up to you to decide which is the best !
The efficiency of the actuator
The current code is short 、 Simple 、 Security , But how fast is it ?
async_task::spawn () The assigned task is just an assignment , Store task status 、future as well as future Finished output . There are no other hidden costs ーーspawn We've actually reached the limit of speed !
Other actuators , Such as async-std and tokio, The way you assign tasks is exactly the same . The foundation of our actuator is essentially an optimal implementation , Now we are only one step away from competing with popular actuators : Task stealing .
Now? , All executor threads share the same task queue . If all threads are accessing the queue at the same time , Because of the competition , Performance will be affected . The idea behind task theft is to assign a different queue to each executor thread . In this way, the executor thread only needs to steal tasks from other queues when its own queue is empty , This means that contention will only happen infrequently , Not all the time .
I'll talk more about task theft in another blog post .
correctness
Everyone tells us , Concurrency is difficult .Go The language provides a built-in race detector ,tokio Create yourself Concurrency Checker for loom To find concurrent errors , and crossbeam In some cases, formal proof is even used . It sounds terrible !
But we can sit down , Relax , Never mind . Competition detector , Sterilizer , even to the extent that miri( translator Miri It's an experimental Rust MIR Interpreter . It can run Rust Binary , Test it , You can check for some undefined behavior ) or loom, Can't be captured on our executor bug. The reason is that we only write security code , And security code is memory safe , That is, it can't contain data competition .Rust The type system has proven that our actuators are correct .
The burden of ensuring memory security is entirely dependent on crate To undertake , More specifically aysnc-task and crossbeam. don't worry , Both attach great importance to correctness .async-task There is a wide range that covers all edge cases test suite ,crossbeam There are Many tests , Even through Go and std::sync::mpsc test suite , The work stealing bidirectional queue is based on a passing through Proof of form The implementation of the , And based on epoch Our garbage collector also has Proof of correctness .
Actuators for everyone
since Alex and Aaron stay 2016 For the first time in designed zero-cost futures since , Their plan is for each spawn Of future Only one memory allocation :
Every “ Mission ” Need a distribution , The result is usually one allocation per connection .
However , A single assignment is a white lie ーー It took us years to actually get them . such as tokio 0.1 In the version spawn You need to assign a future, Then assign task status , Finally, assign one oneshot passageway . That is, every one of them spawn Three distribution points !
then , stay 2019 year 8 month ,async-task Be born 了 . For the first time ever , We succeeded in putting future、 The assignment of task state and channel is compressed into single assignment . The reason why it took so long , Because of the manual assignment and state transition management inside the task Very complicated . But now it's done , You don't have to worry about anything anymore .
Shortly thereafter , stay 2019 year 10 month ,tokio Similar to async-task Implementation method .
Now? , Anyone can assign tasks in a single time to structure An efficient actuator . Rocket science is no longer there .
https://stjepang.github.io/2020/01/31/build-your-own-executor.html
边栏推荐
- Tryout title code
- UPC -- expression evaluation
- RN7302三相电量检测(基于STM32单片机)
- Alert pop-up processing in Web Automation
- Cmake tips
- Wechat applet paging function, pull-down refresh function, direct dry goods
- [produced by Xinghai] operation and maintenance inspection collection
- Is it safe to open a stock account? How to open a stock account?
- ROS rviz_satellite功能包可视化GNSS轨迹,卫星地图的使用
- Differences between basic types and packaging classes
猜你喜欢

Development trend of mobile advertising: Leveraging stock and fine marketing

【网络教程】IPtables官方教程--学习笔记1

Linked list (I) - remove linked list elements

FPGA - 7系列 FPGA SelectIO -07- 高级逻辑资源之ISERDESE2

Build your jmeter+jenkins+ant

Tryout title code

MySQL (I) - Installation

AutoCAD C# 多段线自相交检测

FPGA - 7系列 FPGA SelectIO -08- 高级逻辑资源之OSERDESE2

The code is correct, and the rendering page does not display the reason
随机推荐
助力涨点 | YOLOv5结合Alpha-IoU
【星海出品】 运维巡检合集
Slow content advertising: the long-term principle of brand growth
[staff] arpeggio mark
Students who do not understand the code can also send their own token. The current universal dividend model can be divided into BSC and any generation B
最后的二十九天
普歌 -- 单例模式
4~20ma input /0~5v output i/v conversion circuit
The custom cube UI pop-up dialog supports multiple and multiple types of input boxes
eyebeam高级设置
链表(三)——反转链表
Call interface event API common event methods
Online facing such an online world, the only limitation is our imagination
Eyebeam advanced settings
ThreadLocal
pytorch RNN 学习笔记
实现这个 issue 得700块钱人民币,有人做嘛?
Overview, implementation and use of CRC32
socke.io長連接實現推送、版本控制、實時活躍用戶量統計
ImportError: cannot import name 'ensure_ dir_ Possible solutions for exists'