当前位置:网站首页>Block queue - delayedworkqueue Source Analysis
Block queue - delayedworkqueue Source Analysis
2022-06-11 00:38:00 【Pour avoir de la valeur】

Préface
Threadpool Runtime,Les tâches sont constamment récupérées dans la file d'attente des tâches,Et ensuite exécuter la tâche.Si nous voulons retarder ou programmer l'exécution des tâches,Il est important que les files d'attente des tâches soient triées en fonction du temps de latence des tâches,Plus le délai est court, plus la file d'attente est longue,Obtenir d'abord l'exécution.
Les files d'attente sont des structures de données FIFO,Les données qui entrent dans la file d'attente en premier,Obtenir d'abord.Mais il y a une file d'attente spéciale appeléeFile d'attente prioritaire,Il priorise les données insérées,Veiller à ce que les données de priorité plus élevée soient obtenues en premier,Indépendamment de l'ordre d'insertion des données.
Une façon courante de rendre les files d'attente prioritaires efficaces est d'utiliser heap.À propos de l'implémentation heap《Mise en œuvre et caractéristiques du réacteur et du réacteur binaire》
ScheduledThreadPoolExecutorPool de Threads
ScheduledThreadPoolExecutorDeThreadPoolExecutor, Donc sa structure interne de données et ThreadPoolExecutorC'est presque pareil, En outre, la fonction d'exécution des tâches selon le calendrier a été ajoutée , Divisé en tâches retardées et en tâches périodiques .
ScheduledThreadPoolExecutor Le constructeur ne peut passer que 3ParamètrescorePoolSize、ThreadFactory、RejectedExecutionHandler,Par défautmaximumPoolSizePourInteger.MAX_VALUE.
Les files d'attente de travail sont des files d'attente de blocage de latence hautement personnalisées DelayedWorkQueue,En fait, le principe de mise en œuvre etDelayQueueC'est presque pareil, La structure des données de base est la file d'attente prioritaire du plus petit tas binaire , Augmente automatiquement la capacité lorsque la file d'attente est pleine ,Alors...offer L'opération ne sera jamais bloquée ,maximumPoolSize Ça ne marchera pas , Ainsi, il y aura toujours au plus corePoolSize Threads Worker Running .
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
DelayedWorkQueue Retarder le blocage de la file d'attente
DelayedWorkQueue Est également une file d'attente retardée conçue pour les tâches programmées ,Sa mise en œuvre etDelayQueueC'est pareil, Mais mettre en file d'attente les priorités et DelayQueue Le processus d'implémentation pour migrer vers son propre corps de méthode , Ainsi, les appels de méthode spécifiques aux tâches programmées peuvent être ajoutés de façon flexible dans le processus .
Comment ça marche
ScheduledThreadPoolExecutor La raison pour laquelle vous devez mettre en place votre propre file d'attente de travail bloquée ,Parce que ScheduleThreadPoolExecutor La file d'attente de travail requise est quelque peu spéciale .
DelayedWorkQueue Est une structure de données basée sur le tas ,Similaire àDelayQueueEtPriorityQueue. Lors de l'exécution d'une tâche programmée , Le temps d'exécution de chaque tâche est différent ,Alors...DelayedWorkQueue L'ordre croissant du temps d'exécution , Plus le temps d'exécution est proche de l'heure actuelle, plus la file d'attente est avancée (Attention!: L'ordre ici n'est pas absolu. , Le tri dans le tas ne garantit que la prochaine exécution du noeud enfant est plus longue que la prochaine exécution du noeud parent , Et les noeuds foliaires ne sont pas nécessairement séquentiels ).
La structure du tas est la suivante :
Visible,DelayedWorkQueue Est une file d'attente basée sur la structure minimale du tas . La structure du tas peut être représentée par un tableau , Peut être converti en un tableau comme suit: :
Dans cette structure,, Les caractéristiques suivantes peuvent être trouvées : Hypothèses“Le premier élément” L'index dans le tableau est 0 Et si, La relation de position entre le noeud parent et le noeud enfant est la suivante :
- Index As L'index de l'enfant de gauche est ;
- Index As L'index de l'enfant droit est ;
- Index As L'index du noeud parent de est ;
Pourquoi utiliserDelayedWorkQueueEt alors??
- La tâche la plus récente à exécuter doit être récupérée lors de l'exécution de la tâche programmée. , Par conséquent, chaque fois qu'une tâche sort de la file d'attente, elle doit être la plus rapide de la file d'attente actuelle , Il est donc naturel d'utiliser une file d'attente prioritaire .
- DelayedWorkQueueEst une file d'attente prioritaire, Il garantit que chaque tâche sortant de la file d'attente est la tâche la plus exécutée de la file d'attente actuelle , Parce qu'il s'agit d'une file d'attente basée sur la structure du tas , La plus grande complexité temporelle de la structure du tas lors des opérations d'insertion et de suppression est O(logN).
Analyse des sources
Définition
DelayedWorkQueue La relation d'héritage de classe pour est la suivante :
Les méthodes qu'il contient sont définies comme suit: :
Propriétés des membres
// Au début,Taille de la longueur du tableau.
private static final int INITIAL_CAPACITY = 16;
// Utilisez un tableau pour stocker les éléments de la file d'attente , Créé à partir de la capacité initiale RunnableScheduledFutureTableau des types
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// Utiliserlock Pour assurer la sécurité simultanée de plusieurs fils .
private final ReentrantLock lock = new ReentrantLock();
// Taille des éléments stockés dans la file d'attente
private int size = 0;
// Où se trouve la tâche d'en - tête de file d'attente leaderThread
private Thread leader = null;
// Quand le délai de tâche de l'en - tête de file d'attente arrive , Ou un nouveau thread pourrait avoir besoin d'être leader, Pour réveiller le fil d'attente
private final Condition available = lock.newCondition();
DelayedWorkQueue Est d'utiliser un tableau pour stocker des éléments dans une file d'attente , La structure des données de base est la file d'attente prioritaire du plus petit tas binaire , Augmente automatiquement la capacité lorsque la file d'attente est pleine .
Attention icileader,C'est...Leader-FollowerVariantes du modèle, Pour réduire les temps d'attente inutiles .Qu'est - ce que ça veut dire?
Pour un modèle de réseau multithreadé : Tous les Threads auront une des trois identités :leaderEtfollower,Et un état de travail:proccesser. Son principe de base est ,Il n'y aura jamais plus qu'unleader.Et toutfollowerJ'attends d'êtreleader.Lorsque le pool de Threads démarre, unLeaderResponsable de l'attente du réseauIOÉvénements,Quand un événement se produit,LeaderLe thread informe d'abord unFollower Thread l'a promu au nouveau Leader,Et je suis allé travailler seul,Pour gérer cet événement réseau,Ajouter après le traitementFollowerFiletage en attente,Attendez que la prochaine foisLeader.Cette méthode peut améliorerCPUSimilarité du cache, Et éliminer l'allocation dynamique de la mémoire et l'échange de données entre les fils .
Constructeur
DelayedWorkQueue - Oui. ScheduledThreadPoolExecutor Classe statique de , Par défaut, il n'y a qu'une seule méthode de construction sans paramètres .
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// ...
}
Méthode d'entrée
DelayedWorkQueue Offre put/add/offer(Avec le temps) Trois méthodes d'insertion d'éléments . Nous avons constaté que, comparativement à la file d'attente générale bloquée , Les trois méthodes d'ajout sont des appels offerMéthodes. C'est parce qu'il n'a pas la condition que la file d'attente soit pleine , C'est - à - dire qu'il peut constamment DelayedWorkQueueAjouter un élément, Lorsque le nombre d'éléments dépasse la longueur du tableau ,Le tableau est agrandi.
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
offerAjouter un élément
ScheduledThreadPoolExecutor Lorsque vous soumettez une tâche, vous appelez DelayedWorkQueue.add,Etadd、put Attendez que certaines des méthodes d'ajout d'éléments fournies à l'extérieur soient appelées offer.
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// Utiliserlock Assurer la sécurité des opérations simultanées
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// Si vous voulez dépasser la longueur du tableau , On va agrandir le tableau
if (i >= queue.length)
// Expansion du tableau
grow();
// Ajouter un élément à la file d'attente
size = i + 1;
// Si c'est le premier élément, Alors pas besoin de trier ,Il suffit d'attribuer une valeur directement
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// AppelezsiftUpMéthodes, Commande les éléments insérés .
siftUp(i, e);
}
// Indique que l'élément nouvellement inséré est un en - tête de file d'attente , En - tête de file d'attente remplacé ,
// Alors Réveillez le thread qui attend la tâche d'acquisition .
if (queue[0] == e) {
leader = null;
// Réveillez le thread qui attend d'obtenir la tâche
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
Le processus de base est le suivant::
- Il sert d'entrée aux producteurs ,Obtenez d'abord la serrure.
- Déterminer si la file d'attente est pleine (
size >= queue.length),Agrandir quand il est pleingrow(). - La file d'attente n'est pas pleine,size+1.
- Déterminer si l'élément ajouté est le premier , Oui, pas besoin de tas. .
- L'élément ajouté n'est pas le premier , Il faut empiler
siftUp. - Si l'élément supérieur est exactement l'élément ajouté à ce moment - là ,Alors Réveillez - voustakeConsommation de fil.
- Verrouillage Final.
offerL'organigramme de base est le suivant::

Expansion de la capacitégrow()
Je vois.,Quand la file d'attente est pleine,Ne bloque pas l'attente, Au lieu de cela, l'expansion continue .Nouvelles capacitésnewCapacity Dans l'ancienne capacité oldCapacity Sur la base de 50%(oldCapacity >> 1équivalent àoldCapacity /2).EnfinArrays.copyOf,D'abord selonnewCapacityCréer un nouveau tableau vide, Puis copiez les données de l'ancien tableau dans le nouveau tableau .
private void grow() {
int oldCapacity = queue.length;
// Augmenter la moitié du tableau original à chaque expansion .
// grow 50%
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
// UtiliserArrays.copyOf Pour copier un nouveau tableau
queue = Arrays.copyOf(queue, newCapacity);
}
Empilage vers le Haut siftUp
Les éléments nouvellement ajoutés sont d'abord ajoutés au fond du tas , Puis, étape par étape, comparez avec le noeud père ci - dessus , Si le noeud est plus petit que le noeud parent, changez de position avec le noeud parent. , La comparaison des boucles ne se termine pas tant qu'elle n'est pas supérieure au noeud parent .Par le cycle,Pour trouver des élémentskey Il doit être inséré à l'emplacement du noeud dans l'arbre binaire du tas , Et interagir avec la position du noeud parent .
Empilage vers le Haut siftUp Le processus détaillé de 《Mise en œuvre et caractéristiques du réacteur et du réacteur binaire》
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// Quandk==0Heure, C'est le noeud racine d'un tas d'arbres binaires ,Sortir de la boucle
while (k > 0) {
// Coordonnées de localisation du noeud parent , équivalent à(k - 1) / 2
int parent = (k - 1) >>> 1;
// Obtenir l'élément de localisation du noeud parent
RunnableScheduledFuture<?> e = queue[parent];
// Sikey L'élément est plus grand que l'élément de localisation du noeud parent ,Conditions remplies,Alors sortez de la boucle
// Parce qu'il est trié de petit en grand .
if (key.compareTo(e) >= 0)
break;
// Sinon, l'élément parent est stocké dans kEmplacement
queue[k] = e;
// C'est seulement si l'élément est ScheduledFutureTask L'Instance objet est utile , Pour annuler rapidement une tâche .
setIndex(e, k);
// Réattribuerk,Recherche d'élémentskey Le noeud qui devrait être inséré dans l'arbre binaire du tas
k = parent;
}
// Fin du cycle,kC'est l'élémentkey Emplacement du noeud à insérer
queue[k] = key;
setIndex(key, k);
}
Le Code est bien compris., Est la base du cycle key Le noeud et son parent pour juger ,Sikey Le temps d'exécution du noeud est inférieur à celui du noeud parent , Les deux noeuds sont échangés , Faire en sorte que les noeuds les plus avancés dans le temps d'exécution soient disposés devant la file d'attente .
Supposons que les noeuds nouvellement mis en file d'attente soient retardés (AppelezgetDelay()Méthode d'obtention)- Oui. 5 ,Le processus de mise en œuvre est le suivant::
- Ajoutez d'abord un nouveau noeud à la fin du tableau , L'index du nouveau noeud kPour7

- Calculer l'index du nouveau noeud parent :parent = (k - 1) >>> 1,parent = 3,Alorsqueue[3] La valeur de l'intervalle de temps pour est 8,Parce que 5 < 8 ,Sera mis en œuvrequeue[7] = queue[3]

- À ce moment - là,kSet to3,Continuez le cycle,RecalculerparentPour1,queue[1]L'intervalle de temps pour3,Parce que 5 > 3 ,Pour le moment, quittez le cycle,FinalkPour3

Visible, Chaque fois qu'un noeud est ajouté , C'est juste basé sur le noeud parent. , Sans affecter le noeud frère .
Méthode de sortie
DelayedWorkQueue Voici quelques façons de sortir de l'équipe
- take(), Attendre d'obtenir l'élément d'en - tête de file d'attente
- poll() , Obtenez l'élément d'en - tête de file d'attente maintenant
- poll(long timeout, TimeUnit unit) , Délai d'attente pour obtenir l'élément d'en - tête de file d'attente
takeÉléments de consommation
Worker Une fois le thread Worker démarré, les éléments de la file d'attente de travail sont recyclés ,Parce queScheduledThreadPoolExecutorDekeepAliveTime=0, Donc la tâche de consommation n'appelle que DelayedWorkQueue.take.takeLe processus de base est le suivant::
- Obtenez d'abord la serrure interruptible , Déterminer si l'élément supérieur est vide , Vide pour bloquer l'attente
available.await(). - L'élément supérieur n'est pas vide , Obtient son délai d'exécution
delay,delay <= 0Indique qu'il est temps d'exécuter ,Hors de la file d'attentefinishPoll. delay > 0Il n'est pas encore temps d'exécuter ,JugementleaderLe thread est - il vide?, Non vide indique qu'il y a d'autres take Les fils attendent aussi ,En courstake Bloquez indéfiniment l'attente .leaderLe thread est nul,En courstakeThread set toleader,Et bloquer l'attentedelayDurée.- En coursleaderThread waitdelay Le réveil automatique de longue durée est protégé par d'autres takeThread wake up, Et finalement
leaderSet tonull. - Recirculation d'un jugement
delay <= 0Hors de la file d'attente. - Jugement après avoir sauté du cycle leader Vide et l'élément supérieur n'est pas vide , Réveillez les autres takeThread, Est - ce que la dernière serrure .
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// Sans mission, Laissez le thread availableAttendez dans les conditions.
if (first == null)
available.await();
else {
// Obtenir le délai restant pour la tâche
long delay = first.getDelay(NANOSECONDS);
// Si le délai est écoulé , Retournez à cette tâche ,Pour la mise en œuvre.
if (delay <= 0)
return finishPoll(first);
// Oui.firstSet tonull, Quand le fil attend ,Ne pas détenirfirstRéférences
first = null; // don't retain ref while waiting
// Si c'est toujours le thread qui attend la tâche d'en - tête de file d'attente ,
// Indique que la tâche d'en - tête de file d'attente n'a pas encore été retardée ,Continue d'attendre..
if (leader != null)
available.await();
else {
// Enregistre le thread qui attend actuellement la tâche d'en - tête de file d'attente
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// Quand le délai de la Mission est arrivé , Capable de se réveiller automatiquement avec un délai d'attente .
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null) // Réveillez le thread qui attend la tâche
available.signal();
ock.unlock();
}
}
takeL'organigramme de base est le suivant::

takeBlocage du fil en attente
On peut voir que ce producteur take Le fil bloque l'attente dans deux cas :
- L'élément supérieur est vide .
- De l'élément supérieur delay > 0 .
take Quand la méthode a - t - elle été appelée ?
InThreadPoolExecutorMoyenne,getTaskMéthodes, Les fils de travail circulent à partir de workQueueTâche à accomplir. Mais les tâches programmées sont différentes , Parce que si une fois getTask La méthode sort la tâche et commence à l'exécuter , Et il n'est peut - être pas encore temps d'exécuter ,Donc, danstakeDans la méthode, Assurez - vous que la tâche ne peut être retirée que lorsque le temps d'exécution spécifié est écoulé .
leaderThread
Encore une foisleaderLe rôle de,Ici.leader Pour réduire les temps d'attente inutiles .leaderLa conception des fils,- Oui.Leader-FollowerVariante du modèle, Conçu pour attendre inutilement .Quand untakeLe fil devientleaderThread Time, Il suffit d'attendre le prochain délai ,Au lieu deleader Autres Threads take Les fils doivent attendre leader Le thread est sorti de la file d'attente pour réveiller les autres takeThread.
Par exemple,,Si ce n'est pas le casleader,Alors en cours d'exécutiontakeHeure,Tout doit être fait.available.awaitNanos(delay), Supposons que le thread actuel exécute ce fragment de code ,Pas encore.signal, Le deuxième thread a également exécuté ce code , Le deuxième fil sera bloqué . Il n'y a pas de raison d'exécuter ce fragment de code à ce stade. , Parce qu'il ne peut y avoir qu'un seul thread à partir de takeRetour au milieuqueue[0](Parce qu'il y alock), D'autres fils reviennent maintenant for Pris lors de l'exécution du cycle queue[0],Ce n'est plus comme avantqueue[0]C'est, Et puis continuer à bloquer .
Alors..., Afin de ne pas laisser plusieurs fils faire des temps d'attente inutiles fréquemment ,C'est ajoutéleader,SileaderPas vide, Le premier noeud de la file d'attente est déjà en attente de sortie. , Les autres fils sont bloqués , Réduction des blocages inutiles (Attention!,InfinallyAppelé danssignal() Pour réveiller un fil ,Au lieu designalAll()).
finishPollHors de la file d'attente
Élément supérieurdelay<=0, Le temps d'exécution est , Sortir de la file d'attente est un processus de tas vers le bas siftDown.
// Supprimer l'élément d'en - tête de file d'attente
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// Réduire d'un le nombre d'éléments dans la file d'attente
int s = --size;
// Obtenir l'élément de fin de file d'attente x
RunnableScheduledFuture<?> x = queue[s];
// L'élément final de la file d'attente originale est défini à null
queue[s] = null;
if (s != 0)
// Parce que l'élément d'en - tête de file d'attente a été supprimé , Alors réorganisez .
siftDown(0, x);
setIndex(f, -1);
return f;
}
La méthode d'enlèvement du tas est principalement divisée en trois étapes :
- Réduisez d'abord le nombre d'éléments dans la file d'attente d'un ;
- Définir l'élément de fin de file d'attente original comme élément d'en - tête de file d'attente , Ensuite, définissez l'élément final de la file d'attente à null;
- AppelezsetDown(O,x)Méthodes, Assurez - vous de trier les éléments par ordre de priorité .
Empiler vers le bas siftDown
Parce que l'élément supérieur du tas sort de la file d'attente , Ça détruit la structure du tas , Besoin d'organisation , Déplacer l'élément de queue vers le haut du tas , Puis empiler vers le bas :
- Depuis le Haut de la pile , Le noeud parent est comparé au plus petit noeud enfant des noeuds enfants gauche et droit (L'enfant de gauche n'est pas nécessairement inférieur à l'enfant de droite).
- Le noeud père est inférieur ou égal au noeud Enfant ,Pour mettre fin au cycle,Pas besoin de changer de position.
- Si le noeud père est plus grand que le noeud Enfant ,Puis changez de position.
- Continuer la boucle descendante pour déterminer la relation entre le noeud parent et le noeud Enfant , La boucle ne se termine pas tant que le noeud parent n'est pas inférieur ou égal au noeud Enfant .
Empiler vers le bas siftDown Le processus détaillé de 《Mise en œuvre et caractéristiques du réacteur et du réacteur binaire》
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// Déplacement à droite non signé,équivalent àsize/2
int half = size >>> 1;
// Par le cycle, Assurez - vous que la valeur du noeud parent ne peut pas être supérieure à celle du noeud Enfant .
while (k < half) {
// Sous - noeud gauche, équivalent à (k * 2) + 1
int child = (k << 1) + 1;
// Élément de position du noeud enfant gauche
RunnableScheduledFuture<?> c = queue[child];
// Noeud enfant droit, équivalent à (k * 2) + 2
int right = child + 1;
// Si la valeur de l'élément enfant gauche est supérieure à la valeur de l'élément enfant droit , Alors le noeud enfant droit est le plus petit noeud Enfant .
// Je vais le faire.cAvecchild Valeur réattribuée
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// Si la valeur de l'élément parent est inférieure à la valeur de l'élément enfant plus petite ,Alors sortez de la boucle
if (key.compareTo(c) <= 0)
break;
// Sinon, Les éléments du noeud parent sont échangés avec les noeuds enfants
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
siftDown La méthode est exécutée dans deux cas , L'un est qu'il n'y a pas de noeuds enfants , L'un est d'avoir des noeuds enfants (SelonhalfJugement).
Par exemple:Cas sans enfant:
Supposons que le tas initial soit comme suit :
- Hypothèses k = 3 ,Alors k = half ,Pas de noeud enfant,En courssiftDown La méthode est indexée directement à 3 Le noeud de est défini au dernier noeud du tableau :

Avec des enfants :
Hypothèses k = 0 ,Faites ce qui suit:
- Obtenir le noeud enfant gauche,child = 1 ,Obtenir le noeud enfant droit, right = 2 :

- Parce que right < size , Comparez la taille de l'intervalle de temps entre le noeud enfant gauche et le noeud enfant droit ,Ici. 3 < 7 ,Alors... c = queue[child] ;
- Comparaisonkey Est l'intervalle de temps inférieur à cL'intervalle de temps entre, Ce n'est pas satisfaisant ici ,Poursuivre la mise en œuvre,Indexer comme suit:k Le noeud de est défini à c,Et ensuite,kSet tochild;

- Parce que half = 3 ,k = 1 ,Continuer le cycle, L'index devient alors :

- Ensuite, après ce jugement, ,Oui.kLa valeur de3,Le résultat final est le suivant::

- Enfin,Si dansfinishPollLes mots appelés dans la méthode, Sera indexé comme 0 L'index du noeud pour est défini à -1, Indique que le noeud a été supprimé ,EtsizeEt moins.1,Le résultat final est le suivant:
Visible,siftdown La méthode n'est pas ordonnée après l'exécution ,Mais on peut trouver, Le temps d'exécution suivant du noeud enfant doit être supérieur au temps d'exécution suivant du noeud parent , Parce qu'à chaque fois, on prend le noeud avec le temps d'exécution le plus court du noeud enfant gauche et du noeud enfant droit , Donc on peut quand même s'assurer que takeEtpoll Les départs sont ordonnés .
poll()
Obtenez l'élément d'en - tête de file d'attente maintenant , Quand la tâche d'en - tête de file d'attente est null, Ou la tâche n'a pas été retardée , Indique que cette tâche ne peut pas être retournée ,Retour direct ànull.Sinon, appelezfinishPollMéthodes, Supprimer l'élément d'en - tête de file d'attente et retourner .
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
// La tâche d'en - tête de file d'attente est null, Ou la tâche n'a pas été retardée ,Tous en arrière.null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// Supprimer l'élément d'en - tête de file d'attente
return finishPoll(first);
} finally {
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
Délai d'attente pour obtenir l'élément d'en - tête de file d'attente ,Avectake Comparaison des méthodes , Compte tenu du délai fixé ,Si le délai est écoulé, Aucune tâche utile n'a été obtenue ,Alors reviens.null.Autrestake La logique est la même dans la méthode .
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// Sans mission.
if (first == null) {
// Délai écoulé,Revenez directement.null
if (nanos <= 0)
return null;
else
// Sinon, laissez le thread availableAttendez dans les conditionsnanosTemps
nanos = available.awaitNanos(nanos);
} else {
// Obtenir le délai restant pour la tâche
long delay = first.getDelay(NANOSECONDS);
// Si le délai est écoulé , Retournez à cette tâche ,Pour la mise en œuvre.
if (delay <= 0)
return finishPoll(first);
// Si le délai est écoulé ,Revenez directement.null
if (nanos <= 0)
return null;
// Oui.firstSet tonull, Quand le fil attend ,Ne pas détenirfirstRéférences
first = null; // don't retain ref while waiting
// Si le délai est inférieur au délai restant pour la tâche , Alors il est possible de ne pas obtenir la tâche .
// Ici, laissez le thread attendre le temps d'arrêt nanos
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// Quand le délai de la Mission est arrivé , Capable de se réveiller automatiquement avec un délai d'attente .
long timeLeft = available.awaitNanos(delay);
// Calculer le temps d'arrêt restant
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// Réveillez le thread qui attend la tâche
available.signal();
lock.unlock();
}
}
removeSupprimer l'élément spécifié
La suppression d'un élément spécifié est généralement utilisée pour annuler une tâche , La tâche bloque toujours la file d'attente , Vous devez supprimer . Lorsque l'élément supprimé n'est pas un élément de queue de tas , Besoin d'un traitement de tas .
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
//EntretienheapIndex
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
// Supprimer n'est pas un élément de queue de tas , Le traitement de gerbage est nécessaire
// Empiler d'abord vers le bas
siftDown(i, replacement);
if (queue[i] == replacement)
// S'il est empilé vers le bas ,i L'élément de localisation est toujours replacement, Note 4 ,
// Il faut empiler vers le haut
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
Supposons que la structure initiale du tas soit la suivante: :[Impossible de transférer l'image de la chaîne externe,Il peut y avoir un mécanisme antivol à la station source,Il est recommandé de sauvegarder l'image et de la télécharger directement(img-SqhXOn3E-1653913362599)()] À ce stade, supprimer 8Node of,À ce moment - là, k = 1,keyPour le dernier noeud:
En ce moment, à travers la paire ci - dessus siftDownAnalyse méthodologique,siftDown Les résultats de l'exécution de la méthode sont les suivants: :
Et vous verrez, La valeur du dernier noeud est plus petite que celle du noeud parent , C'est pour ça qu'il faut siftUp Méthode pour s'assurer que le temps d'exécution suivant du noeud enfant est plus grand que celui du noeud parent , Le résultat final est donc le suivant :
Résumé
Utiliser la file d'attente prioritaireDelayedWorkQueue, S'assurer que les tâches ajoutées à la file d'attente , Sera trié en fonction du délai de la tâche , Les tâches à faible délai sont d'abord acquises .
- DelayedWorkQueue La structure des données est basée sur l'implémentation heap ;
- DelayedWorkQueue Mise en œuvre du tas avec des tableaux ,Noeud racine hors ligne, Remplacer par le dernier noeud foliaire , Puis Poussez vers le bas jusqu'à ce que les conditions de mise en place du tas soient remplies. ; Enfin, les noeuds foliaires s'alignent , Puis Poussez vers le haut jusqu'à ce que les conditions de mise en place du tas soient remplies ;
- DelayedWorkQueue L'élément ajouté s'agrandit automatiquement lorsqu'il est plein 1/2, C'est - à - dire qu'il n'y aura jamais de blocage , Expansion maximale jusqu'à Integer.MAX_VALUE, Il y a donc au plus corePoolSize Threads Worker Running ;
- DelayedWorkQueue Éléments de consommationtake, L'élément supérieur est vide et delay >0 Heure,Bloquez l'attente;
- DelayedWorkQueue C'est une production qui ne bloque jamais , La consommation peut bloquer le Modèle producteur - consommateur ;
- DelayedWorkQueue Il y a unleader Variables pour le thread ,- Oui.Leader-FollowerVariante du modèle.Quand untakeLe fil devientleaderThread Time, Il suffit d'attendre le prochain délai ,Au lieu deleader Autres Threads take Les fils doivent attendre leader Le thread est sorti de la file d'attente pour réveiller les autres takeThread.
边栏推荐
- LeetCode 1996. Number of weak characters in the game*
- What is thread in concurrent programming
- [network planning] 2.2.3 user server interaction: cookies
- 字符串时间排序,对时间格式字符串进行排序
- Multipass Chinese document - Overview
- teterttet
- Bluetooth (5) -- about retransmission
- 图的最短路径问题 详细分解版
- 【无标题】测试下啊
- Qt客户端套接字QTcpSocket通过bind指定本地ip
猜你喜欢
![[JVM] memory model](/img/01/4a9ab79e340f19c5f6cf682577bf2a.jpg)
[JVM] memory model

Deploy netron services through kubernetes and specify model files at startup

【JVM】线程

阻塞队列 — DelayedWorkQueue源码分析

Download Google gcr IO image

Philips coo will be assigned to solve the dual crisis of "supply chain and product recall" in the face of crisis due to personnel change
![[MVC&Core]ASP. Introduction to net core MVC view value transfer](/img/c2/3e69cda2fed396505b5aa5888b9e5f.png)
[MVC&Core]ASP. Introduction to net core MVC view value transfer
![[network planning] 3.2 transport layer - UDP: connectionless service](/img/a8/74a1b44ce4d8b0b1a85043a091a91d.jpg)
[network planning] 3.2 transport layer - UDP: connectionless service

Unity自定义文件夹图标颜色 个性化Unity编译器

【JVM】类加载机制
随机推荐
VTK例子--三个相交的平面
Room first use
Installation of phpstudy
Pirate OJ 148 String inversion
快手处置超过5.4万个违规账号:如何打击平台上的违规账号
圖的最短路徑問題 詳細分解版
【JVM】内存模型
What is thread in concurrent programming
非重叠矩形中的随机点
【无标题】4555
Bluetooth development (2) -- initialization
Static method static learning
Bluetooth development (6) -- literacy of Bluetooth protocol architecture
Unity mesh patch generates parabola and polyline
[no title] 4555
系统应用安装时,签名校验失败问题
Décomposition détaillée du problème de chemin le plus court du graphique
Exemple VTK - - trois plans qui se croisent
The mystery of number idempotent and perfect square
teterttet