当前位置:网站首页>Exécuteur - shutdown, shutdown Now, awaittermination details and actual Fighting

Exécuteur - shutdown, shutdown Now, awaittermination details and actual Fighting

2022-06-10 23:17:00 Bit 666.

Un..Introduction

Utiliser executor Souvent utilisé dans les pools de threads shutdown / shutdownNow + awaitTermination Méthode de fermeture du pool de threads,Voici la définition et l'utilisation courante de plusieurs méthodes.

2..API Interprétation

1.shutdown

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdown Les principaux travaux sont les suivants::

Initiates an orderly shutdown in which previously submitted
tasks are executed - Suite de la soumission précédente

but no new tasks will be accepted. - Ne plus recevoir de nouvelles tâches

This method does not wait for previously submitted tasks to
complete execution. - La méthode n'attend pas que les tâches précédemment engagées soient terminées ,Ça peut aller avec awaitTermination Méthode d'attente.

2.shutdownNow

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

shutdownNow Les principaux travaux sont les suivants::

Attempts to stop all actively executing tasks - Tentative d'arrêter la tâche en cours

halts the processing of waiting tasks - Arrêter d'attendre la tâche

and returns a list of the tasks that were awaiting execution - Tâche de retour à la liste d'attente

These tasks are drained (removed) from the task queue upon return from this method. - En revenant de cette méthode , Supprimer ces tâches de la file d'attente

Tips:

La tâche n'attend pas la fin de la tâche proactive ,Ça peut aller avec awaitTermination Méthode d'attente.

Dans la mesure du possible, cette méthode arrête les tâches proactives ,Adoption Thread.interrupt Réalisation, Les tâches qui ne répondent pas à une interruption peuvent ne pas s'arrêter

Cette méthode est compatible avec shutdown La différence entre interruptIdleWorkers Et interruptWorkers,Ce dernier appellera interrutp Méthode à exécuter worker Allez., Le premier annule simplement la tâche en attente .

3.awaitTermination

    
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

awaitTermination La méthode attend que le thread arrive  TERMINATED C'est - à - dire l'état qui a pris fin .Si le pool de Threads est fermé,Retour direct true;Si le pool de Threads n'est pas fermé,La méthode est basée sur Timeout + TimeUnit Délai d'attente pour la fin du thread , Et renvoie selon l'état du pool de Threads expiré true Ou false, Notez que cette méthode ne ferme pas le pool de threads , Responsable uniquement du retard et de l'état de détection .

4.runState

A.Statut

Thread runState Plusieurs états et transitions de

RUNNING:Accepter de nouvelles tâches et traiter les tâches en file d'attente

SHUTDOWN:Ne pas accepter de nouvelles tâches,Mais gérer les tâches en file d'attente

STOP:Ne pas accepter de nouvelles tâches,Ne pas traiter les tâches en file d'attente,Et interrompre les tâches en cours

TIDYING:Toutes les tâches ont pris fin,workerCount Zéro, Le thread passe à l'état TIDYING,Va courir terminate() Hook

TERMINATED:Résiliation()Terminé

B.Conversion

RUNNING -> SHUTDOWN : En appelant shutdown() Heure, Peut être implicite dans finalize() Moyenne

(RUNNING or SHUTDOWN) -> STOP : Appelez shutdownNow()

SHUTDOWN -> TIDYING: Lorsque la file d'attente et le pool sont vides

STOP -> TIDYING:Quand la piscine est vide

TIDYING -> TERMINATED:Quand terminate() Quand la méthode du crochet est terminée

Trois.Pratique

1.shutdown + awaitTermination(500ms)

processNumBuffer Pour Runnable La logique à l'intérieur, Trouver le minimum pour un lot donné de nombres , Maximum et retour à la chaîne de résultats enregistrer ,Total 500000 Nombre,Chaque 50000 Les chiffres génèrent un Runnable. Appelé après l'exécution de la logique shutdown + awaitTermination.getCurrentThread La méthode est responsable de l'impression des fils actuellement disponibles , Utilisé pour observer les appels shutdown Et awaitTermination Changements de thread dans le pool post - thread .

import java.util.concurrent.{CopyOnWriteArraySet, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}


object ExecutorPoolShutdown {

  // Runnable  Logique d'exécution interne , Trouver les limites supérieure et inférieure 
  def processNumBuffer(nums: Array[Int], taskId: Long): String = {
    val maxNum = nums.max
    val minNum = nums.min
    val log = s"TaskId: $taskId Length: ${nums.length} Min: $minNum Max: $maxNum"
    log
  }

  def main(args: Array[String]): Unit = {

    // Stocker tout Task Journal de bord
    val logAll = new StringBuffer()
    //  Stocker tous les  TaskId
    val taskSet = new CopyOnWriteArraySet[Long]()

    // Initialisation du pool de Threads
    val executor = new ThreadPoolExecutor(6, 10, 3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())

    val numIterator = (0 until 500000).iterator


    // Chaque50000 Les données génèrent un  Task
    numIterator.grouped(50000).foreach(group => {
      executor.execute(new Runnable() {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          taskSet.add(taskId) // Ajouter taskID
          val res = processNumBuffer(group.toArray, taskId)
          logAll.append(res + "\n") //  Ajouter un journal statistique 
        }
      })
    })

    // Appelez shutdown
    executor.shutdown()

    // Obtenir le thread actuel
    println("After shutdown...")
    getCurrentThread()

    val st = System.currentTimeMillis()
    // Appelez awaitTerminatio
    val state = executor.awaitTermination(500, TimeUnit.MILLISECONDS)
    val cost = System.currentTimeMillis() - st
    println(s"Executor State: $state Cost: $cost")

    //  Obtenir à nouveau le thread courant 
    println("After awaitTermination...")
    getCurrentThread()

    println(logAll.toString)
    println(taskSet.toArray().mkString(","))

  }
}

Parce que la logique des tâches est simple ,Appelez shutdow Et awaitTermation Le nombre de fils arrière est 5 Et oui et pool Fils indépendants , Description du pool de threads Task In shutdow C'est fini depuis. ,C'est à partir de  awaitTermation L'état retourné est true Et les temps d'attente sont 0 Ça se voit aussi.Et enfin 500000 / 50000 = 10 Total général10Article (s) log,Encore une fois task Tout est fait, Enfin, il montre que cette tâche a été exécutée  6 - Oui. Task.

Tips:

shutdown Plus tard, si le pool de Threads est terminé ,Et  awaitTermation La méthode n'attend pas ,Retour direct true.

2.Délai de mission + shutdown + awaitTermination(500ms)

La vitesse d'exécution de la tâche ci - dessus est la vitesse maximale minimale ,Exécution rapide, C'est pas bon de voir shutdown Et awaitTerminatioN Le rôle de, Voici une simulation d'une tâche plus longue ,À l'origine Runnable Chine sleep(2000), Procédure d'extension 2sTemps de fonctionnement:

    numIterator.grouped(50000).foreach(group => {
      executor.execute(new Runnable() {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          taskSet.add(taskId)
          Thread.sleep(2000)
          val res = processNumBuffer(group.toArray, taskId)
          logAll.append(res + "\n")
        }
      })
    })

Encore une fois.:

Cette fois, c'est complètement différent. , Tout d'abord, peu importe l'appel shutdown Toujours awaitTermination, Vous pouvez voir que les fils actifs contiennent tous pool-1-thread-x, Ces deux - là. API La logique est toujours en cours d'exécution après l'appel n'est pas terminée ;Regarde encore awaitTermination L'état retourné est false, Indique que le pool de Threads n'est pas complètement fermé ,cost=507ms,Attendez. 500ms Le pool de fils arrière n'est pas encore complètement sorti , Mais le fil principal est terminé ,Alors... LogAll Il n'y a pas de journal dans , Seuls ceux qui ont été utilisés ont finalement été imprimés. TaskId.

Tips:

shutdown Le fil arrière continue de fonctionner ,Correspond à ce qui a été mentionné précédemment shutdown Les tâches en cours d'exécution se poursuivent après , Mais pas de nouvelles tâches. ,Et awaitTermination Le fil arrière est toujours actif ,Correspond à l'avant awaitTermination La méthode ne renvoie que l'état d'arrêt du pool de threads , Ne ferme pas le pool de Threads .

3.Délai de mission + shutdown + awaitTermination(2000ms)

Ajustement awaitTermination Temps d'attente pour,De500msÉlévation à2000ms

    val st = System.currentTimeMillis()
    val state = executor.awaitTermination(2000, TimeUnit.MILLISECONDS)
    val cost = System.currentTimeMillis() - st
    println(s"Executor State: $state Cost: $cost")

Encore une fois.: 

Par rapport à ce qui précède,executor L'état d'arrêt de false, Mais le temps d'attente a été reporté à environ 2000ms, Vous pouvez voir qu'à mesure que le temps d'attente augmente ,Une partie taskC'est fait, Mais ce n'est pas tout. . Modifier le délai pour 5000ms Encore une fois.:

Attendez. 4000ms Heure executor C'est fini,Alors... awaitTermination Retour à true, Pas de suivi non plus.  pool-1-thread-x Questions connexes task,Produit final log Et complet .

4.shutdownNow + awaitTermination(500ms)

shutdownNow Comparé à shutdown Il y aura une autre valeur de retour , Tâches de la liste d'attente .

    val tasks = executor.shutdownNow()
    tasks.asScala.foreach(task => {
      println(task)
    })

Cours.:

  Parce que la Mission est rapide , Donc, en mission rapide ,shutdown Et shutdownNow Même résultat,awaitTermination Retour à ture Et n'a pas attendu .

5.Délai de mission + shutdownNow + awaitTermination(500ms)

    numIterator.grouped(50000).foreach(group => {
      executor.execute(new Runnable() {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          taskSet.add(taskId)
          Thread.sleep(2000)
          val res = processNumBuffer(group.toArray, taskId)
          logAll.append(res + "\n")
        }
      })
    })

Augmentation du nombre de missions sleep 2000ms Après ça, lancez : 

  La tâche supérieure montre :

Caused by: java.lang.InterruptedException: sleep interrupted

C'est - à - dire la tâche sleep La période a été interrupt C'est,Donc l'exécution task Fin,Et shutdown Comparé à, Le thread en cours d'exécution n'est pas interrupt; Imprimez ci - dessous 4- Oui. Runnable, Parce que ces quatre - là Task Toujours dans la file d'attente ,shutdownNow Je les ai renvoyés directement ; Enfin, parce que executor Fermé,Donc l'état est true,Le temps d'attente est0.

6.Délai de mission + awaitTermination(xxxms) + shutdownNow

Par dessus shotdownNow + awaitTermination Comme vous pouvez le voir dans l'exemple de , Si la tâche ne peut pas être exécutée rapidement ,Alors appelez shotdownNow Le résultat est tout task Ce n'est pas fini. , La tâche n'a pas changé . Si vous voulez fixer une période pour une tâche , Combien peut - on faire? ,L'ordre peut être ajusté, Modifier comme suit:  awaitTermination Encore. shutdownNow:

    val st = System.currentTimeMillis()
    val state = executor.awaitTermination(3000, TimeUnit.MILLISECONDS)
    val cost = System.currentTimeMillis() - st
    println(s"Executor State: $state Cost: $cost")

    println("After awaitTermination...")
    getCurrentThread()

    val tasks = executor.shutdownNow()
    tasks.asScala.foreach(task => {
      println(task)
    })


    println("After shutdown...")
    getCurrentThread()

Exécuter à nouveau après ajustement de la séquence :

Temps d'attente fixé à 3000ms, L'équivalent de ce que vous demandez à votre mission est : 3000ms Combien d'entre eux peuvent courir? , Pas avant d'avoir fini ;Je vois. awaitTermination Retour à l'état après expiration false, Indique que toutes les tâches du thread ne sont pas terminées ;Regardez en bas shutdownNow Après, Il n'existe plus dans le thread pool-1-thread-x , Et imprimer certains résultats ,Total général6Article (s);En bas. interrupt Autres opérations en cours task Pile d'exception imprimée ,Procédure finale exit(0) Sortie normale.

Quatre.Résumé

Après analyse de code ci - dessus, Résumez quelques méthodes :

shutdown : Exécution des tâches en attente , N'ajoutez plus de nouvelles tâches

shutdownNow:interrupt Tâches en cours , N'ajoutez plus de nouvelles tâches , Retour à la tâche en attente

awaitTermination: N'affecte pas l'état du commutateur de pool de fils , Retour à l'état seulement , Le fil peut être bloqué pendant un certain temps

Peut être combiné avec 6 Exemples et durée et tolérance de vos tâches , Décider comment combiner les trois API, Si vous devez attendre executor Intérieur task Fermez - les quand vous aurez terminé executor Et il est difficile d'estimer l'intérieur task Temps de fonctionnement, Les opérations suivantes peuvent être effectuées :

      executor.shutdown()
      while (!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) {
         println("Task is Running...")
      }
      println("Task is Finish!")

Adoption while true Assurez - vous que dans le pool de Threads task Toutes les opérations de suivi sont terminées ,Mais attention Task Pas de circulation morte à l'intérieur. , Sinon, vous ne pouvez pas sauter While Cycle, Tout le programme est bloqué ici. .

原网站

版权声明
本文为[Bit 666.]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206102210006479.html