当前位置:网站首页>Mise en œuvre de l'actionneur asynchrone d'exécution à partir de zéro
Mise en œuvre de l'actionneur asynchrone d'exécution à partir de zéro
2022-06-28 06:37:00 【51CTO】
Cet article est Stjepang Traduction du blog Big Boss,De bai Contributions de.
- Interface
- Transmettre la sortie àJoinHandle
- Analyse des tâches
- Fils d'actionneur
- Exécution des tâches
- Un peu de magie.
- AméliorationJoinHandle
- Gérer la panique(panic)
- Efficacité de l'actionneur
- Exactitude
- Actionneur pour tous
- Description de la reproduction
Maintenant que nous avons construitblock_onFonctions,Il est temps de passer à un véritable actionneur.Nous voulons que nos actionneurs résiduels ne fonctionnent pas seulement un à la foisfuture,Au lieu d'exécuter plusieursfuture!
Ce blog s'inspire de juliex,Un actionneur minimal,L'auteur aussi.RustDansasync/awaitUn des pionniers de la fonction.Aujourd'hui, nous allons écrire un article plus moderne à partir de zéro、Plus clairjuliexVersion.
L'objectif de notre actionneur est d'utiliser uniquement un code simple et entièrement sécurisé,Mais les performances peuvent rivaliser avec les meilleurs actionneurs disponibles.
Nous allons utiliser comme dépendancescrateY compris: crossbeam、 async-task、 once_cell、 futures Et num_cpus.
Interface
L'actionneur n'a qu'une seule fonction,Est d'exécuter unfuture:
Retour àJoinHandleC'est une réalisationFutureType de, Les résultats peuvent être obtenus une fois la tâche terminée .
Regarde ça.spawn()Fonctions et std::thread::spawn()Les similitudes entre—— Ils sont presque équivalents , Sauf pour une tâche asynchrone , Un autre fil de génération .
Voici un exemple simple, Générer une tâche et attendre sa sortie :
Transmettre la sortie àJoinHandle
Puisque JoinHandleEst une réalisation Future Type de, Alors, définissons - le brièvement comme un futureAlias de:
Cette méthode est actuellement réalisable ,Mais ne vous inquiétez pas, Plus tard, nous le réécrirons clairement comme une nouvelle structure , Et l'implémenter manuellement Future.
Produit future La sortie doit être envoyée d'une manière ou d'une autre à JoinHandle. Une façon est de créer un oneshot Accès,Et dansfuture Envoyer la sortie sur ce canal une fois terminé .Alors JoinHandle C'est celui qui attend le message du canal future:
use futures::channel::oneshot;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (s, r) = oneshot::channel();
let future = async move {
let _ = s.send(future.await);
};
todo!()
Box::pin(async { r.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
L'étape suivante consiste à distribuer sur le tas futureEmballage, Et le pousser dans une sorte de file d'attente de tâches globale , À traiter par l'exécuteur testamentaire . Nous appelons cette distribution future Pour une mission .
Analyse des tâches
Mission(task)Y compris:future Et son état . Nous devons suivre l'état , Pour savoir si la tâche est programmée pour fonctionner 、 Est actuellement en cours d'exécution 、 Terminé, etc. .
Voici notreTaskDéfinition du type:
On n'est pas encore sûrs de l'état , Mais ce sera quelque chose qui peut être mis à jour à partir de n'importe quel thread AtomicUsize. On en reparlera plus tard .
Future Le type de sortie pour est ()ーーC'est parce que spawn () La fonction sera originale future Emballé pour envoyer la sortie à oneshot Accès, Ensuite, retournez simplement à ().
future Fixé au tas .C'est parce qu'il n'y a quepinDefuture Pour être élu (poll). Mais pourquoi est - il toujours emballé dans MutexOui.?
Chaque Waker Ils en sauveront un. Task Références, De cette façon, il peut réveiller la tâche en la poussant dans la file d'attente globale des tâches .Voilà le problème.: Les instances de tâche sont partagées entre les Threads , Mais le vote future Nécessite un accès variable .Solutions: On vafuture Encapsulé dans un objet mutex , Pour obtenir un accès variable .
Si tout cela semble déroutant ,Ne t'inquiète pas., Une fois l'actionneur terminé , C'est beaucoup plus facile à comprendre !
Assignons une sauvegarde future Et son état TaskPour finirspawnFonctions:
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (s, r) = oneshot::channel();
let future = async move {
let _ = s.send(future.await);
};
let task = Arc::new(Task {
state: AtomicUsize::new(0),
future: Mutex::new(Box::pin(future)),
});
QUEUE.send(task).unwrap();
Box::pin(async { r.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
Une fois la tâche assignée , Nous l'avons poussé dans QUEUE, .Il s'agit d'une file d'attente globale qui contient des tâches exécutables .Spawn () La fonction est maintenant terminée , Alors, définissons QUEUE...
Fils d'actionneur
Parce que nous construisons un actionneur , Il doit donc y avoir un pool de Threads de fond , Il prend les tâches exécutables de la file d'attente et les exécute , C'est - à - dire que ceux qui les ont sondés future.
Définissons la file d'attente globale des tâches , Et génère un pool de Threads d'exécution lorsqu'il est initialisé pour la première fois :
use crossbeam::channel;
use once_cell::sync::Lazy;
static QUEUE: Lazy<channel::Sender<Arc<Task>>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Arc<Task>>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || receiver.iter().for_each(|task| task.run()));
}
sender
});
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
C'est très simple.—— Le thread de l'actionneur est en fait une ligne de code ! La file d'attente des tâches est un canal illimité , Le thread de l'actionneur reçoit les tâches de ce canal et exécute chaque tâche .
Le nombre de fils d'actionneur est égal au nombre de noyaux sur le système , Cette quantité de base est déterminée par nums_cpusOffre.
Maintenant nous avons la file d'attente des tâches et le pool de Threads , La dernière partie à réaliser est run()Méthodes.
Exécution des tâches
L'exécution d'une tâche signifie simplement le vote future. Nous avons réalisé block_on()De Un billet de blog précédent Je sais comment voter future.
Run()La méthode est la suivante::
Votre attention, s'il vous plaît., Il faut verrouiller future, Pour obtenir un accès variable et le vote .Selon la conception, Aucun autre fil ne tient la serrure en même temps ,Donc,try_lock() Doit toujours réussir .
Mais comment créer un réveil ? Nous l'utiliserons comme la dernière fois async_task::waker_fn(), Mais que doit faire la fonction wake up ?
On ne peut pas juste mettre un Arc<Task>Mets - le.QUEUEMoyenne, Voici les conflits de concurrence potentiels que nous devrions considérer :
- Si une tâche a été accomplie , Et si on se réveillait? ? Waker Le cycle de vie dépassera son lien Future, Et nous ne voulons pas non plus inclure dans la file d'attente les tâches déjà accomplies .
- Si une tâche est exécutée avant , Et si on se réveillait deux fois de suite ? Nous ne voulons pas que la même tâche apparaisse deux fois dans la file d'attente .
- Et si une mission se réveille pendant qu'elle est en cours ? Si vous le mettez dans la file d'attente à ce stade , Un autre thread d'exécution pourrait essayer de l'exécuter , Cela fait qu'une tâche fonctionne sur deux Threads en même temps .
Si nous y réfléchissons, , Nous trouverons deux règles simples , Résoudre tous ces problèmes avec élégance :
- Si vous n'avez pas été réveillé et que vous n'êtes pas en cours d'exécution , La fonction Wake - up programmera cette tâche
- Si une tâche est réveillée pendant qu'elle est en cours d'exécution , Par le thread actuel de l'actionneur ( Ceci est actuellement en cours d'exécution futureLe fil de) Reprogrammez - le .
Esquissons ces règles :
impl Task {
fn run(self: Arc<Task>) {
let waker = async_task::waker_fn(|| {
todo!("schedule if the task is not woken already and is not running");
});
let cx = &mut Context::from_waker(&waker);
self.future.try_lock().unwrap().as_mut().poll(cx);
todo!("schedule if the task was woken while running");
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
Tu te souviens quand on était là? Task Défini dans AtomicUsize Type de champ d'état ? Il est temps de stocker des données utiles . À propos de la Mission , Il y a deux messages qui peuvent nous aider à nous réveiller :
- La tâche a - t - elle été réveillée?
- La tâche fonctionne - t - elle?
Ces deux valeurs sont true / false Valeur,Nous pouvons state Les champs sont représentés par deux bits :
Paramètres de la fonction wakeup “WOKEN”Bits. Si les deux bits étaient 0( C'est - à - dire que la tâche n'est ni réveillée ni exécutée ) , Donc, nous avons programmé la tâche en poussant la référence dans la file d'attente :
Au scrutinfutureAvant,On a annuléWOKENConfiguration des bits,Et a mis en placeRUNNINGBits:
C'est drôle,Si la tâche est accomplie(C'est ça.futureCe n'est pluspending) , On va le garder pour toujours RUNNINGStatut.Voilà.future Il n'est pas possible de revenir dans la file d'attente après avoir été réveillé .
Nous avons maintenant un vrai actionneur ーーInv1.rs Voir la mise en œuvre complète dans .
Un peu de magie.
Si vous découvrez le traitement Task Les structures et leurs transitions d'état sont difficiles ,Je ressens la même chose.Mais il y a aussi de bonnes nouvelles., Aucun de ces travaux n'exige que vous le fassiez vous - même ,Utiliserasyc-taskC'est tout.!
On a juste besoin de l'utiliser.async_task::Task()RemplacerArc<Task>,Utilisation concomitanteasync-task::JoinHandle<()>RemplaceroneshotAccès.
C'est comme ça qu'on simplifie la construction :
type Task = async_task::Task<()>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
task.schedule();
Box::pin(async { handle.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
async_task::spawn()Accepter trois paramètres:
- À exécuter future
- Une fonction de programmation qui met une tâche en file d'attente . Cette fonction peut être exécutée par un réveil ,Ou peut - être par
run()Au scrutinfutureMise en œuvre postérieure. - Qui contient des informations arbitraires tag,C'esttag L'information sera conservée dans taskMoyenne. Dans ce blog, nous n'envisageons pas simplement de sauvegarder
(), C'est - à - dire l'ignorer .
Puis le constructeur renvoie deux valeurs :
-
async_task::Task<()>,Parmi eux() C'est ce qui vient d'arriver tag. -
async_task::JoinHandle<R, ()>,Ici.()Ou justetag. C'estJoinHandleC'est unfuture, Quand il sera terminé, il retournera un Option<R>. Quand on revientNone Ça veut dire que la Mission a eu lieu panic Ou annulé .
Si vous voulez savoirschedule()Méthodes, Il suffit d'appeler schedule Fonction pour le pousser dans la file d'attente . Nous pouvons aussi pousser la tâche nous - mêmes dans QUEUE——Le résultat final est le même.
En résumé, Nous avons fini par avoir cet actionneur très simple :
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || receiver.iter().for_each(|task| task.run()));
}
sender
});
type Task = async_task::Task<()>;
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
task.schedule();
Box::pin(async { handle.await.unwrap() })
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
Le code complet est disponible à l'adresse suivante: v2.rs Trouvé dans.
Utilisez iciasync_task::spawn() Les avantages ne sont pas seulement simples . C'est plus que ce que nous avons écrit nous - mêmes. TaskPlus efficace, Et plus robuste . Un exemple de robustesse ,async_task::Task Supprimer le futur dès qu'il est terminé , Au lieu d'attendre que toutes les références à la tâche soient invalidées avant de les supprimer .
En plus de ça,,async-task Quelques caractéristiques utiles sont également disponibles ,Par exemple,tagsEtcancellation, Mais nous n'en parlons pas aujourd'hui .Il convient également de mentionner,async-task C'est un#[no_std]crate, Peut même être utilisé sans Bibliothèque standard .
AméliorationJoinHandle
Si vous regardez attentivement nos derniers actionneurs , Un autre exemple d'inefficacité ——JoinHandleRedondanceBox::pin()Répartition.
Ce serait mieux si nous pouvions utiliser les alias de type suivants , Mais on ne peut pas ,Parce que async_task::JoinHandle<R> ProduitsOption<R>,EtJoinHandle ProduitsR:
On ne peut queasync_task::JoinHandle Encapsulé dans une nouvelle structure , Si la tâche arrive panicOu annulé,Ça va aussipanic:
Ça n'a aucun sens. ,J'ai besoin de voirasync_task Le code source fonctionne
struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
impl<R> Future for JoinHandle<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
L'implémentation complète de l'actionneur peut être réalisée à v3.rsTrouvé dans.
Gérer la panique(panic)
Jusqu'à présent, Nous n'avons pas vraiment réfléchi à ce qui se passe quand la Mission panique ,C'est - à - dire appeler poll() Il y a une panique .Maintenantrun () La méthode consiste simplement à répandre la panique dans l'actionneur . On devrait se demander si c'est ce qu'on veut vraiment .
Il est sage de gérer ces paniques d'une manière ou d'une autre .Par exemple, On peut simplement ignorer la panique ,Continuez à courir. De cette façon, ils impriment simplement l'information à l'écran , Mais ça n'écrase pas tout le processus. ーー Les fils paniqués fonctionnent exactement de la même façon .
Pour ignorer la panique ,On varun()Emballécatch_unwind() :
use std::panic::catch_unwind;
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || {
receiver.iter().for_each(|task| {
let _ = catch_unwind(|| task.run());
})
});
}
sender
});
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
Inv4.rs Le code complet de l'actionneur pour ignorer la panique se trouve dans .
Il y a beaucoup de stratégies intelligentes pour faire face à la panique .Voici quelques - uns desasync-task Exemples fournis dans la Bibliothèque :
- Ignorer la panique -- La panique est ignorée ,Quand
JoinHandle<R>Inawait Il y a aussi une panique - Répandre la panique panic A été remis en attente
JoinHandle<R> Dans la tâche qui en a résulté . - Sortie de panique
JoinHandle<R>Produitsstd::thread::Result<R>.
Il est facile de mettre en œuvre n'importe quelle stratégie de gestion de la panique que vous voulez . C'est à vous de décider lequel est le meilleur !
Efficacité de l'actionneur
Le Code actuel est court 、Simple.、Sécurité, Mais à quelle vitesse ?
async_task::spawn () La tâche assignée n'est qu'une tâche assignée , État de la tâche de stockage 、futureEtfuture Sortie après achèvement . Il n'y a pas d'autres coûts cachés ーーspawn La vitesse de !
Autres actionneurs ,Par exemple: async-std Et tokio, Les tâches sont assignées exactement de la même manière . La base de notre actionneur est essentiellement une réalisation optimale , Nous sommes à un pas de la compétition avec les actionneurs populaires : Vol de mission.
Maintenant, Tous les fils d'actionneur partagent la même file d'attente de tâches . Si tous les Threads accèdent à la file d'attente en même temps , En raison de la controverse , Les performances seront affectées . L'idée derrière le vol de tâches est d'assigner une file d'attente différente pour chaque thread d'actionneur . De cette façon, le thread de l'actionneur n'a qu'à voler des tâches d'autres files d'attente lorsque sa propre file d'attente est vide , Cela signifie que les différends ne se produisent que rarement , Au lieu de continuer à se produire .
J'en parlerai plus dans un autre billet de blog .
Exactitude
Tout le monde nous l'a dit. , La concurrence est difficile .Go La langue offre un détecteur de compétition intégré ,tokio S'est créé Inspecteur simultané pour loom Pour trouver des erreurs simultanées ,Etcrossbeam Dans certains cas, la preuve formelle a même été utilisée . Ça a l'air horrible !
Mais on peut s'asseoir. ,Détends - toi.,Ne t'inquiète pas. Détecteur de compétition , Stérilisateur ,Et mêmemiri(Traducteur MiriC'est une expérience Rust MIRInterpréteur.Ça marche.RustBinaires,Testez - le., Certains comportements non définis peuvent être vérifiés )Ouloom, Il n'y a aucun moyen de capturer bug. La raison en est que nous n'avons écrit que le Code de sécurité , Et le Code de sécurité est sécurisé en mémoire , C'est - à - dire qu'il ne peut pas contenir de données concurrentes .Rust Le type de système a prouvé que nos actionneurs sont corrects .
Le fardeau de la sécurité de la mémoire dépend entièrement de cratePrise en charge,Plus précisément,aysnc-taskEtcrossbeam.Ne vous inquiétez pas., Les deux attachent une grande importance à l'exactitude .async-task Il y a une large couverture de toutes les situations de bord Kit d'essai,crossbeam L'accès à Beaucoup de tests ,Même parGoEtstd::sync::mpscKit d'essai, La file d'attente bidirectionnelle de vol de travail est basée sur un passage Preuve formelleRéalisation,Et basé surepoch Le collecteur d'ordures de Preuve d'exactitude.
Actionneur pour tous
DepuisAlexEtAaronIn2016Première annéedesigned zero-cost futuresDepuis, Leurs plans sont spawnDefuture Allocation de mémoire une seule fois :
Chaque“Mission” Une affectation est nécessaire , Le résultat est généralement qu'une allocation est nécessaire pour chaque connexion .
Et pourtant, Une seule affectation est un mensonge bien intentionné ーー Il nous a fallu des années pour les avoir .Par exemple,tokio 0.1Dans la versionspawn Un future, Puis assigner l'état de la tâche , Enfin, un oneshotAccès.C'est tout.spawn Trois points de distribution !
Et puis,In2019Année8Mois,async-taskNaissanceC'est.Pour la première fois,Nous avons réussi àfuture、 L'état des tâches et l'attribution des canaux sont compressés en une seule affectation . La raison pour laquelle il a fallu si longtemps , C'est à cause de l'affectation manuelle et de la gestion des transitions d'état à l'intérieur de la tâche C'est très compliqué.. Mais c'est fait. , Tu n'as plus à t'inquiéter de quoi que ce soit .
Peu de temps après,In2019Année10Mois,tokioOn a également adopté des méthodes similaires à celles utilisées pour async-task Comment réaliser.
Maintenant, N'importe qui peut assigner une tâche en une seule fois Construire Un actionneur efficace . La science des fusées n'existe plus .
https://stjepang.github.io/2020/01/31/build-your-own-executor.html
边栏推荐
猜你喜欢

整型提升和大小端字节序

Lombok @equalsandhashcode annotation how to make objects The equals () method compares only some attributes
![[online tutorial] official iptables tutorial -- learning notes 1](/img/b9/8f94caa46eb46dab581c713494f36d.png)
[online tutorial] official iptables tutorial -- learning notes 1

FPGA - 7 Series FPGA selectio -09- io of advanced logic resources_ FIFO

FPGA - 7 Series FPGA selectio -07- iserdese2 of advanced logic resources

【Paper Reading-3D Detection】Fully Convolutional One-Stage 3D Object Detection on LiDAR Range Images

Overview, implementation and use of CRC32

freeswitch使用mod_shout模块播放mp3

职场IT老鸟的几点小习惯

eyebeam高级设置
随机推荐
ImportError: cannot import name 'ensure_ dir_ Possible solutions for exists'
链表(三)——反转链表
Failed to start component [StandardEngine[Catalina]. StandardHost[localhost]]
Create a gson object that formats the time zone. JSON parsing time formatting zoneddatetime
Drop down list processing in Web Automation
Cmake tips
JDBC learning (I) -- implementing simple CRUD operations
Install and manage multiple versions of PHP under mac
Tryout title code
Linked list (II) - Design linked list
Speech enhancement - spectrum mapping
Rust FFI 编程 - libc crate
pytorch RNN 学习笔记
extern “C“概述
Select trigger event from easyUI drop-down box
Boost the rising point | yolov5 combined with alpha IOU
【Paper Reading-3D Detection】Fully Convolutional One-Stage 3D Object Detection on LiDAR Range Images
AttributeError: 'callable_iterator' object has no attribute 'next'
VM332 WAService.js:2 Error: _vm.changeTabs is not a function报错
Lombok @equalsandhashcode annotation how to make objects The equals () method compares only some attributes