当前位置:网站首页>Block queue - delayedworkqueue Source Analysis

Block queue - delayedworkqueue Source Analysis

2022-06-11 00:38:00 Pour avoir de la valeur

Photos

Préface

Threadpool Runtime,Les tâches sont constamment récupérées dans la file d'attente des tâches,Et ensuite exécuter la tâche.Si nous voulons retarder ou programmer l'exécution des tâches,Il est important que les files d'attente des tâches soient triées en fonction du temps de latence des tâches,Plus le délai est court, plus la file d'attente est longue,Obtenir d'abord l'exécution.

Les files d'attente sont des structures de données FIFO,Les données qui entrent dans la file d'attente en premier,Obtenir d'abord.Mais il y a une file d'attente spéciale appeléeFile d'attente prioritaire,Il priorise les données insérées,Veiller à ce que les données de priorité plus élevée soient obtenues en premier,Indépendamment de l'ordre d'insertion des données.

Une façon courante de rendre les files d'attente prioritaires efficaces est d'utiliser heap.À propos de l'implémentation heap《Mise en œuvre et caractéristiques du réacteur et du réacteur binaire》

ScheduledThreadPoolExecutorPool de Threads

ScheduledThreadPoolExecutorDeThreadPoolExecutor, Donc sa structure interne de données et ThreadPoolExecutorC'est presque pareil, En outre, la fonction d'exécution des tâches selon le calendrier a été ajoutée , Divisé en tâches retardées et en tâches périodiques .

ScheduledThreadPoolExecutor Le constructeur ne peut passer que 3ParamètrescorePoolSize、ThreadFactory、RejectedExecutionHandler,Par défautmaximumPoolSizePourInteger.MAX_VALUE.

Les files d'attente de travail sont des files d'attente de blocage de latence hautement personnalisées DelayedWorkQueue,En fait, le principe de mise en œuvre etDelayQueueC'est presque pareil, La structure des données de base est la file d'attente prioritaire du plus petit tas binaire , Augmente automatiquement la capacité lorsque la file d'attente est pleine ,Alors...offer L'opération ne sera jamais bloquée ,maximumPoolSize Ça ne marchera pas , Ainsi, il y aura toujours au plus corePoolSize Threads Worker Running .

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

DelayedWorkQueue Retarder le blocage de la file d'attente

DelayedWorkQueue Est également une file d'attente retardée conçue pour les tâches programmées ,Sa mise en œuvre etDelayQueueC'est pareil, Mais mettre en file d'attente les priorités et DelayQueue Le processus d'implémentation pour migrer vers son propre corps de méthode , Ainsi, les appels de méthode spécifiques aux tâches programmées peuvent être ajoutés de façon flexible dans le processus .

Comment ça marche

ScheduledThreadPoolExecutor La raison pour laquelle vous devez mettre en place votre propre file d'attente de travail bloquée ,Parce que ScheduleThreadPoolExecutor La file d'attente de travail requise est quelque peu spéciale .

DelayedWorkQueue Est une structure de données basée sur le tas ,Similaire àDelayQueueEtPriorityQueue. Lors de l'exécution d'une tâche programmée , Le temps d'exécution de chaque tâche est différent ,Alors...DelayedWorkQueue L'ordre croissant du temps d'exécution , Plus le temps d'exécution est proche de l'heure actuelle, plus la file d'attente est avancée (Attention!: L'ordre ici n'est pas absolu. , Le tri dans le tas ne garantit que la prochaine exécution du noeud enfant est plus longue que la prochaine exécution du noeud parent , Et les noeuds foliaires ne sont pas nécessairement séquentiels ).

La structure du tas est la suivante :PhotosVisible,DelayedWorkQueue Est une file d'attente basée sur la structure minimale du tas . La structure du tas peut être représentée par un tableau , Peut être converti en un tableau comme suit: :PhotosDans cette structure,, Les caractéristiques suivantes peuvent être trouvées : Hypothèses“Le premier élément” L'index dans le tableau est 0 Et si, La relation de position entre le noeud parent et le noeud enfant est la suivante :

  • Index As L'index de l'enfant de gauche est ;
  • Index As L'index de l'enfant droit est ;
  • Index As L'index du noeud parent de est ;

Pourquoi utiliserDelayedWorkQueueEt alors??

  • La tâche la plus récente à exécuter doit être récupérée lors de l'exécution de la tâche programmée. , Par conséquent, chaque fois qu'une tâche sort de la file d'attente, elle doit être la plus rapide de la file d'attente actuelle , Il est donc naturel d'utiliser une file d'attente prioritaire .
  • DelayedWorkQueueEst une file d'attente prioritaire, Il garantit que chaque tâche sortant de la file d'attente est la tâche la plus exécutée de la file d'attente actuelle , Parce qu'il s'agit d'une file d'attente basée sur la structure du tas , La plus grande complexité temporelle de la structure du tas lors des opérations d'insertion et de suppression est O(logN).

Analyse des sources

Définition

DelayedWorkQueue La relation d'héritage de classe pour est la suivante :Photos Les méthodes qu'il contient sont définies comme suit: :Photos

Propriétés des membres

// Au début,Taille de la longueur du tableau.
private static final int INITIAL_CAPACITY = 16;        
//  Utilisez un tableau pour stocker les éléments de la file d'attente , Créé à partir de la capacité initiale RunnableScheduledFutureTableau des types
private RunnableScheduledFuture<?>[] queue =  new RunnableScheduledFuture<?>[INITIAL_CAPACITY];        
// Utiliserlock Pour assurer la sécurité simultanée de plusieurs fils .
private final ReentrantLock lock = new ReentrantLock();        
//  Taille des éléments stockés dans la file d'attente 
private int size = 0;        
// Où se trouve la tâche d'en - tête de file d'attente leaderThread
private Thread leader = null;        
//  Quand le délai de tâche de l'en - tête de file d'attente arrive , Ou un nouveau thread pourrait avoir besoin d'être leader, Pour réveiller le fil d'attente 
private final Condition available = lock.newCondition();

DelayedWorkQueue Est d'utiliser un tableau pour stocker des éléments dans une file d'attente , La structure des données de base est la file d'attente prioritaire du plus petit tas binaire , Augmente automatiquement la capacité lorsque la file d'attente est pleine .

Attention icileader,C'est...Leader-FollowerVariantes du modèle, Pour réduire les temps d'attente inutiles .Qu'est - ce que ça veut dire?

Pour un modèle de réseau multithreadé : Tous les Threads auront une des trois identités :leaderEtfollower,Et un état de travail:proccesser. Son principe de base est ,Il n'y aura jamais plus qu'unleader.Et toutfollowerJ'attends d'êtreleader.Lorsque le pool de Threads démarre, unLeaderResponsable de l'attente du réseauIOÉvénements,Quand un événement se produit,LeaderLe thread informe d'abord unFollower Thread l'a promu au nouveau Leader,Et je suis allé travailler seul,Pour gérer cet événement réseau,Ajouter après le traitementFollowerFiletage en attente,Attendez que la prochaine foisLeader.Cette méthode peut améliorerCPUSimilarité du cache, Et éliminer l'allocation dynamique de la mémoire et l'échange de données entre les fils .

Constructeur

DelayedWorkQueue - Oui. ScheduledThreadPoolExecutor Classe statique de , Par défaut, il n'y a qu'une seule méthode de construction sans paramètres .

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
 // ...
}

Méthode d'entrée

DelayedWorkQueue Offre put/add/offer(Avec le temps) Trois méthodes d'insertion d'éléments . Nous avons constaté que, comparativement à la file d'attente générale bloquée , Les trois méthodes d'ajout sont des appels offerMéthodes. C'est parce qu'il n'a pas la condition que la file d'attente soit pleine , C'est - à - dire qu'il peut constamment DelayedWorkQueueAjouter un élément, Lorsque le nombre d'éléments dépasse la longueur du tableau ,Le tableau est agrandi.

public void put(Runnable e) {
 offer(e);
}        
public boolean add(Runnable e) {            
    return offer(e);
}        
public boolean offer(Runnable e, long timeout, TimeUnit unit) {            
    return offer(e);
}

offerAjouter un élément

ScheduledThreadPoolExecutor Lorsque vous soumettez une tâche, vous appelez DelayedWorkQueue.add,Etaddput Attendez que certaines des méthodes d'ajout d'éléments fournies à l'extérieur soient appelées offer.

public boolean offer(Runnable x) {            
    if (x == null)                
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;            
    // Utiliserlock Assurer la sécurité des opérations simultanées 
    final ReentrantLock lock = this.lock;
    lock.lock();            
    try {                
        int i = size;                
        //  Si vous voulez dépasser la longueur du tableau , On va agrandir le tableau 
        if (i >= queue.length)                    
            // Expansion du tableau
            grow();                
        //  Ajouter un élément à la file d'attente 
        size = i + 1;                
        // Si c'est le premier élément, Alors pas besoin de trier ,Il suffit d'attribuer une valeur directement
        if (i == 0) {
         queue[0] = e;
            setIndex(e, 0);
        } else {                    
            // AppelezsiftUpMéthodes, Commande les éléments insérés .
            siftUp(i, e);
        }                
        //  Indique que l'élément nouvellement inséré est un en - tête de file d'attente , En - tête de file d'attente remplacé ,
        //  Alors Réveillez le thread qui attend la tâche d'acquisition .
        if (queue[0] == e) {
         leader = null;                    
            //  Réveillez le thread qui attend d'obtenir la tâche 
            available.signal();
        }
    } finally {
     lock.unlock();
    }            
    return true;
}

Le processus de base est le suivant::

  1. Il sert d'entrée aux producteurs ,Obtenez d'abord la serrure.
  2. Déterminer si la file d'attente est pleine (size >= queue.length),Agrandir quand il est pleingrow().
  3. La file d'attente n'est pas pleine,size+1.
  4. Déterminer si l'élément ajouté est le premier , Oui, pas besoin de tas. .
  5. L'élément ajouté n'est pas le premier , Il faut empiler siftUp.
  6. Si l'élément supérieur est exactement l'élément ajouté à ce moment - là ,Alors Réveillez - voustakeConsommation de fil.
  7. Verrouillage Final.

offerL'organigramme de base est le suivant:

Photos

Expansion de la capacitégrow()

Je vois.,Quand la file d'attente est pleine,Ne bloque pas l'attente, Au lieu de cela, l'expansion continue .Nouvelles capacitésnewCapacity Dans l'ancienne capacité oldCapacity Sur la base de 50%(oldCapacity >> 1équivalent àoldCapacity /2).EnfinArrays.copyOf,D'abord selonnewCapacityCréer un nouveau tableau vide, Puis copiez les données de l'ancien tableau dans le nouveau tableau .

private void grow() {            
    int oldCapacity = queue.length;            
    //  Augmenter la moitié du tableau original à chaque expansion .
    // grow 50%
    int newCapacity = oldCapacity + (oldCapacity >> 1); 
    if (newCapacity < 0) // overflow
     newCapacity = Integer.MAX_VALUE;            
    // UtiliserArrays.copyOf Pour copier un nouveau tableau 
    queue = Arrays.copyOf(queue, newCapacity);
}

Empilage vers le Haut siftUp

Les éléments nouvellement ajoutés sont d'abord ajoutés au fond du tas , Puis, étape par étape, comparez avec le noeud père ci - dessus , Si le noeud est plus petit que le noeud parent, changez de position avec le noeud parent. , La comparaison des boucles ne se termine pas tant qu'elle n'est pas supérieure au noeud parent .Par le cycle,Pour trouver des élémentskey Il doit être inséré à l'emplacement du noeud dans l'arbre binaire du tas , Et interagir avec la position du noeud parent .

Empilage vers le Haut siftUp Le processus détaillé de 《Mise en œuvre et caractéristiques du réacteur et du réacteur binaire》

private void siftUp(int k, RunnableScheduledFuture<?> key) {            
    // Quandk==0Heure, C'est le noeud racine d'un tas d'arbres binaires ,Sortir de la boucle
    while (k > 0) {                
        //  Coordonnées de localisation du noeud parent , équivalent à(k - 1) / 2
        int parent = (k - 1) >>> 1;                
        //  Obtenir l'élément de localisation du noeud parent 
        RunnableScheduledFuture<?> e = queue[parent];                
        // Sikey L'élément est plus grand que l'élément de localisation du noeud parent ,Conditions remplies,Alors sortez de la boucle
        //  Parce qu'il est trié de petit en grand .
        if (key.compareTo(e) >= 0)                    
            break;                
        //  Sinon, l'élément parent est stocké dans kEmplacement
        queue[k] = e;                
        //  C'est seulement si l'élément est ScheduledFutureTask L'Instance objet est utile , Pour annuler rapidement une tâche .
        setIndex(e, k);                
        // Réattribuerk,Recherche d'élémentskey Le noeud qui devrait être inséré dans l'arbre binaire du tas 
        k = parent;
    }            
    // Fin du cycle,kC'est l'élémentkey Emplacement du noeud à insérer 
    queue[k] = key;
    setIndex(key, k);
}

Le Code est bien compris., Est la base du cycle key Le noeud et son parent pour juger ,Sikey Le temps d'exécution du noeud est inférieur à celui du noeud parent , Les deux noeuds sont échangés , Faire en sorte que les noeuds les plus avancés dans le temps d'exécution soient disposés devant la file d'attente .

Supposons que les noeuds nouvellement mis en file d'attente soient retardés (AppelezgetDelay()Méthode d'obtention)- Oui. 5 ,Le processus de mise en œuvre est le suivant::

  1. Ajoutez d'abord un nouveau noeud à la fin du tableau , L'index du nouveau noeud kPour7

Photos

  1. Calculer l'index du nouveau noeud parent :parent = (k - 1) >>> 1,parent = 3,Alorsqueue[3] La valeur de l'intervalle de temps pour est 8,Parce que 5 < 8 ,Sera mis en œuvrequeue[7] = queue[3]

Photos

  1. À ce moment - là,kSet to3,Continuez le cycle,RecalculerparentPour1,queue[1]L'intervalle de temps pour3,Parce que 5 > 3 ,Pour le moment, quittez le cycle,FinalkPour3

Photos

Visible, Chaque fois qu'un noeud est ajouté , C'est juste basé sur le noeud parent. , Sans affecter le noeud frère .

Méthode de sortie

DelayedWorkQueue Voici quelques façons de sortir de l'équipe

  • take(), Attendre d'obtenir l'élément d'en - tête de file d'attente
  • poll() , Obtenez l'élément d'en - tête de file d'attente maintenant
  • poll(long timeout, TimeUnit unit) , Délai d'attente pour obtenir l'élément d'en - tête de file d'attente

takeÉléments de consommation

Worker Une fois le thread Worker démarré, les éléments de la file d'attente de travail sont recyclés ,Parce queScheduledThreadPoolExecutorDekeepAliveTime=0, Donc la tâche de consommation n'appelle que DelayedWorkQueue.take.takeLe processus de base est le suivant::

  • Obtenez d'abord la serrure interruptible , Déterminer si l'élément supérieur est vide , Vide pour bloquer l'attente available.await().
  • L'élément supérieur n'est pas vide , Obtient son délai d'exécution delay,delay <= 0 Indique qu'il est temps d'exécuter ,Hors de la file d'attentefinishPoll.
  • delay > 0 Il n'est pas encore temps d'exécuter ,JugementleaderLe thread est - il vide?, Non vide indique qu'il y a d'autres take Les fils attendent aussi ,En courstake Bloquez indéfiniment l'attente .
  • leaderLe thread est nul,En courstakeThread set toleader,Et bloquer l'attentedelayDurée.
  • En coursleaderThread waitdelay Le réveil automatique de longue durée est protégé par d'autres takeThread wake up, Et finalement leaderSet tonull.
  • Recirculation d'un jugement delay <= 0Hors de la file d'attente.
  • Jugement après avoir sauté du cycle leader Vide et l'élément supérieur n'est pas vide , Réveillez les autres takeThread, Est - ce que la dernière serrure .
public RunnableScheduledFuture<?> take() throws InterruptedException {            
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();            
    try {                
        for (;;) {
         RunnableScheduledFuture<?> first = queue[0];                    
            // Sans mission, Laissez le thread availableAttendez dans les conditions.
            if (first == null)
             available.await();                    
            else {                        
                //  Obtenir le délai restant pour la tâche 
                long delay = first.getDelay(NANOSECONDS);                        
                //  Si le délai est écoulé , Retournez à cette tâche ,Pour la mise en œuvre.
                if (delay <= 0)                            
                    return finishPoll(first);                        
                // Oui.firstSet tonull, Quand le fil attend ,Ne pas détenirfirstRéférences
                first = null; // don't retain ref while waiting

                //  Si c'est toujours le thread qui attend la tâche d'en - tête de file d'attente ,
                //  Indique que la tâche d'en - tête de file d'attente n'a pas encore été retardée ,Continue d'attendre..
                if (leader != null)
                 available.await();                        
                else {                            
                    //  Enregistre le thread qui attend actuellement la tâche d'en - tête de file d'attente 
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;                            
                    try {                                
                        //  Quand le délai de la Mission est arrivé , Capable de se réveiller automatiquement avec un délai d'attente .
                        available.awaitNanos(delay);
                    } finally {                                
                        if (leader == thisThread)
                         leader = null;
                    }
                }
            }
        }
    } finally {                
        if (leader == null && queue[0] != null)                    //  Réveillez le thread qui attend la tâche 
         available.signal();
        ock.unlock();
    }
}

takeL'organigramme de base est le suivant:

Photos

takeBlocage du fil en attente

On peut voir que ce producteur take Le fil bloque l'attente dans deux cas :

  • L'élément supérieur est vide .
  • De l'élément supérieur delay > 0 .

take Quand la méthode a - t - elle été appelée ?

InThreadPoolExecutorMoyenne,getTaskMéthodes, Les fils de travail circulent à partir de workQueueTâche à accomplir. Mais les tâches programmées sont différentes , Parce que si une fois getTask La méthode sort la tâche et commence à l'exécuter , Et il n'est peut - être pas encore temps d'exécuter ,Donc, danstakeDans la méthode, Assurez - vous que la tâche ne peut être retirée que lorsque le temps d'exécution spécifié est écoulé .

leaderThread

Encore une foisleaderLe rôle de,Ici.leader Pour réduire les temps d'attente inutiles .leaderLa conception des fils,- Oui.Leader-FollowerVariante du modèle, Conçu pour attendre inutilement .Quand untakeLe fil devientleaderThread Time, Il suffit d'attendre le prochain délai ,Au lieu deleader Autres Threads take Les fils doivent attendre leader Le thread est sorti de la file d'attente pour réveiller les autres takeThread.

Par exemple,,Si ce n'est pas le casleader,Alors en cours d'exécutiontakeHeure,Tout doit être fait.available.awaitNanos(delay), Supposons que le thread actuel exécute ce fragment de code ,Pas encore.signal, Le deuxième thread a également exécuté ce code , Le deuxième fil sera bloqué . Il n'y a pas de raison d'exécuter ce fragment de code à ce stade. , Parce qu'il ne peut y avoir qu'un seul thread à partir de takeRetour au milieuqueue[0](Parce qu'il y alock), D'autres fils reviennent maintenant for Pris lors de l'exécution du cycle queue[0],Ce n'est plus comme avantqueue[0]C'est, Et puis continuer à bloquer .

Alors..., Afin de ne pas laisser plusieurs fils faire des temps d'attente inutiles fréquemment ,C'est ajoutéleader,SileaderPas vide, Le premier noeud de la file d'attente est déjà en attente de sortie. , Les autres fils sont bloqués , Réduction des blocages inutiles (Attention!,InfinallyAppelé danssignal() Pour réveiller un fil ,Au lieu designalAll()).

finishPollHors de la file d'attente

Élément supérieurdelay<=0, Le temps d'exécution est , Sortir de la file d'attente est un processus de tas vers le bas siftDown.

// Supprimer l'élément d'en - tête de file d'attente
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {            
    //  Réduire d'un le nombre d'éléments dans la file d'attente 
    int s = --size;            
    //  Obtenir l'élément de fin de file d'attente x
    RunnableScheduledFuture<?> x = queue[s];            
    //  L'élément final de la file d'attente originale est défini à null
    queue[s] = null;            
    if (s != 0)                
        //  Parce que l'élément d'en - tête de file d'attente a été supprimé , Alors réorganisez .
        siftDown(0, x);
    setIndex(f, -1);            
    return f;
}

La méthode d'enlèvement du tas est principalement divisée en trois étapes :

  1. Réduisez d'abord le nombre d'éléments dans la file d'attente d'un ;
  2. Définir l'élément de fin de file d'attente original comme élément d'en - tête de file d'attente , Ensuite, définissez l'élément final de la file d'attente à null;
  3. AppelezsetDown(O,x)Méthodes, Assurez - vous de trier les éléments par ordre de priorité .

Empiler vers le bas siftDown

Parce que l'élément supérieur du tas sort de la file d'attente , Ça détruit la structure du tas , Besoin d'organisation , Déplacer l'élément de queue vers le haut du tas , Puis empiler vers le bas :

  • Depuis le Haut de la pile , Le noeud parent est comparé au plus petit noeud enfant des noeuds enfants gauche et droit (L'enfant de gauche n'est pas nécessairement inférieur à l'enfant de droite).
  • Le noeud père est inférieur ou égal au noeud Enfant ,Pour mettre fin au cycle,Pas besoin de changer de position.
  • Si le noeud père est plus grand que le noeud Enfant ,Puis changez de position.
  • Continuer la boucle descendante pour déterminer la relation entre le noeud parent et le noeud Enfant , La boucle ne se termine pas tant que le noeud parent n'est pas inférieur ou égal au noeud Enfant .

Empiler vers le bas siftDown Le processus détaillé de 《Mise en œuvre et caractéristiques du réacteur et du réacteur binaire》

private void siftDown(int k, RunnableScheduledFuture<?> key) {     
    // Déplacement à droite non signé,équivalent àsize/2
    int half = size >>> 1;            
    // Par le cycle, Assurez - vous que la valeur du noeud parent ne peut pas être supérieure à celle du noeud Enfant .
    while (k < half) {                
        // Sous - noeud gauche, équivalent à (k * 2) + 1
        int child = (k << 1) + 1;                
        //  Élément de position du noeud enfant gauche 
        RunnableScheduledFuture<?> c = queue[child];                
        // Noeud enfant droit, équivalent à (k * 2) + 2
        int right = child + 1;                
        //  Si la valeur de l'élément enfant gauche est supérieure à la valeur de l'élément enfant droit , Alors le noeud enfant droit est le plus petit noeud Enfant .
        // Je vais le faire.cAvecchild Valeur réattribuée 
        if (right < size && c.compareTo(queue[right]) > 0)
         c = queue[child = right];                
        //  Si la valeur de l'élément parent est inférieure à la valeur de l'élément enfant plus petite ,Alors sortez de la boucle
        if (key.compareTo(c) <= 0)                    
            break;                
        // Sinon, Les éléments du noeud parent sont échangés avec les noeuds enfants 
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }            
    queue[k] = key;
    setIndex(key, k);
}

siftDown La méthode est exécutée dans deux cas , L'un est qu'il n'y a pas de noeuds enfants , L'un est d'avoir des noeuds enfants (SelonhalfJugement).

Par exemple:Cas sans enfant:

Supposons que le tas initial soit comme suit :Photos

  1. Hypothèses k = 3 ,Alors k = half ,Pas de noeud enfant,En courssiftDown La méthode est indexée directement à 3 Le noeud de est défini au dernier noeud du tableau :

Photos

Avec des enfants :

Hypothèses k = 0 ,Faites ce qui suit:

  1. Obtenir le noeud enfant gauche,child = 1 ,Obtenir le noeud enfant droit, right = 2 :

Photos

  1. Parce que right < size , Comparez la taille de l'intervalle de temps entre le noeud enfant gauche et le noeud enfant droit ,Ici. 3 < 7 ,Alors... c = queue[child] ;
  2. Comparaisonkey Est l'intervalle de temps inférieur à cL'intervalle de temps entre, Ce n'est pas satisfaisant ici ,Poursuivre la mise en œuvre,Indexer comme suit:k Le noeud de est défini à c,Et ensuite,kSet tochild;

Photos

  1. Parce que half = 3 ,k = 1 ,Continuer le cycle, L'index devient alors :

Photos

  1. Ensuite, après ce jugement, ,Oui.kLa valeur de3,Le résultat final est le suivant::

Photos

  1. Enfin,Si dansfinishPollLes mots appelés dans la méthode, Sera indexé comme 0 L'index du noeud pour est défini à -1, Indique que le noeud a été supprimé ,EtsizeEt moins.1,Le résultat final est le suivant:

PhotosVisible,siftdown La méthode n'est pas ordonnée après l'exécution ,Mais on peut trouver, Le temps d'exécution suivant du noeud enfant doit être supérieur au temps d'exécution suivant du noeud parent , Parce qu'à chaque fois, on prend le noeud avec le temps d'exécution le plus court du noeud enfant gauche et du noeud enfant droit , Donc on peut quand même s'assurer que takeEtpoll Les départs sont ordonnés .

poll()

Obtenez l'élément d'en - tête de file d'attente maintenant , Quand la tâche d'en - tête de file d'attente est null, Ou la tâche n'a pas été retardée , Indique que cette tâche ne peut pas être retournée ,Retour direct ànull.Sinon, appelezfinishPollMéthodes, Supprimer l'élément d'en - tête de file d'attente et retourner .

public RunnableScheduledFuture<?> poll() {            
    final ReentrantLock lock = this.lock;
    lock.lock();            
    try {
     RunnableScheduledFuture<?> first = queue[0];                
        //  La tâche d'en - tête de file d'attente est null, Ou la tâche n'a pas été retardée ,Tous en arrière.null
        if (first == null || first.getDelay(NANOSECONDS) > 0)                    
            return null;                
        else
         // Supprimer l'élément d'en - tête de file d'attente
            return finishPoll(first);
    } finally {
     lock.unlock();
    }
}

poll(long timeout, TimeUnit unit)

Délai d'attente pour obtenir l'élément d'en - tête de file d'attente ,Avectake Comparaison des méthodes , Compte tenu du délai fixé ,Si le délai est écoulé, Aucune tâche utile n'a été obtenue ,Alors reviens.null.Autrestake La logique est la même dans la méthode .

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)            
    throws InterruptedException {            
    long nanos = unit.toNanos(timeout);            
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();            
    try {                
        for (;;) {
         RunnableScheduledFuture<?> first = queue[0];                    
            // Sans mission.
            if (first == null) {                        
             // Délai écoulé,Revenez directement.null
                if (nanos <= 0)                            
                    return null;                        
                else
                 //  Sinon, laissez le thread availableAttendez dans les conditionsnanosTemps
                    nanos = available.awaitNanos(nanos);
            } else {                        
                //  Obtenir le délai restant pour la tâche 
                long delay = first.getDelay(NANOSECONDS);                        
                //  Si le délai est écoulé , Retournez à cette tâche ,Pour la mise en œuvre.
                if (delay <= 0)                            
                    return finishPoll(first);                        
                //  Si le délai est écoulé ,Revenez directement.null
                if (nanos <= 0)                            
                    return null;                        
                // Oui.firstSet tonull, Quand le fil attend ,Ne pas détenirfirstRéférences
                first = null; // don't retain ref while waiting
                //  Si le délai est inférieur au délai restant pour la tâche , Alors il est possible de ne pas obtenir la tâche .
                //  Ici, laissez le thread attendre le temps d'arrêt nanos
                if (nanos < delay || leader != null)
                 nanos = available.awaitNanos(nanos);                        
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;                            
                    try {                                
                        //  Quand le délai de la Mission est arrivé , Capable de se réveiller automatiquement avec un délai d'attente .
                        long timeLeft = available.awaitNanos(delay);                                
                        //  Calculer le temps d'arrêt restant 
                        nanos -= delay - timeLeft;
                    } finally {                                
                        if (leader == thisThread)
                         leader = null;
                    }
                }
            }
        }
    } finally {                
        if (leader == null && queue[0] != null)                    
            //  Réveillez le thread qui attend la tâche 
         available.signal();
        lock.unlock();
    }
}

removeSupprimer l'élément spécifié

La suppression d'un élément spécifié est généralement utilisée pour annuler une tâche , La tâche bloque toujours la file d'attente , Vous devez supprimer . Lorsque l'élément supprimé n'est pas un élément de queue de tas , Besoin d'un traitement de tas .

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;
        //EntretienheapIndex
        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            // Supprimer n'est pas un élément de queue de tas , Le traitement de gerbage est nécessaire 
            // Empiler d'abord vers le bas 
            siftDown(i, replacement);
            if (queue[i] == replacement)
                // S'il est empilé vers le bas ,i L'élément de localisation est toujours replacement, Note 4 ,
                // Il faut empiler vers le haut 
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

Supposons que la structure initiale du tas soit la suivante: :[Impossible de transférer l'image de la chaîne externe,Il peut y avoir un mécanisme antivol à la station source,Il est recommandé de sauvegarder l'image et de la télécharger directement(img-SqhXOn3E-1653913362599)(data:image/gif;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVQImWNgYGBgAAAABQABh6FO1AAAAABJRU5ErkJggg==)] À ce stade, supprimer 8Node of,À ce moment - là, k = 1,keyPour le dernier noeud:Photos En ce moment, à travers la paire ci - dessus siftDownAnalyse méthodologique,siftDown Les résultats de l'exécution de la méthode sont les suivants: :PhotosEt vous verrez, La valeur du dernier noeud est plus petite que celle du noeud parent , C'est pour ça qu'il faut siftUp Méthode pour s'assurer que le temps d'exécution suivant du noeud enfant est plus grand que celui du noeud parent , Le résultat final est donc le suivant :Photos

Résumé

Utiliser la file d'attente prioritaireDelayedWorkQueue, S'assurer que les tâches ajoutées à la file d'attente , Sera trié en fonction du délai de la tâche , Les tâches à faible délai sont d'abord acquises .

  1. DelayedWorkQueue La structure des données est basée sur l'implémentation heap ;
  2. DelayedWorkQueue Mise en œuvre du tas avec des tableaux ,Noeud racine hors ligne, Remplacer par le dernier noeud foliaire , Puis Poussez vers le bas jusqu'à ce que les conditions de mise en place du tas soient remplies. ; Enfin, les noeuds foliaires s'alignent , Puis Poussez vers le haut jusqu'à ce que les conditions de mise en place du tas soient remplies ;
  3. DelayedWorkQueue L'élément ajouté s'agrandit automatiquement lorsqu'il est plein 1/2, C'est - à - dire qu'il n'y aura jamais de blocage , Expansion maximale jusqu'à Integer.MAX_VALUE, Il y a donc au plus corePoolSize Threads Worker Running ;
  4. DelayedWorkQueue Éléments de consommationtake, L'élément supérieur est vide et delay >0 Heure,Bloquez l'attente;
  5. DelayedWorkQueue C'est une production qui ne bloque jamais , La consommation peut bloquer le Modèle producteur - consommateur ;
  6. DelayedWorkQueue Il y a unleader Variables pour le thread ,- Oui.Leader-FollowerVariante du modèle.Quand untakeLe fil devientleaderThread Time, Il suffit d'attendre le prochain délai ,Au lieu deleader Autres Threads take Les fils doivent attendre leader Le thread est sorti de la file d'attente pour réveiller les autres takeThread.
原网站

版权声明
本文为[Pour avoir de la valeur]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206102326203953.html