当前位置:网站首页>Scala concurrent programming (II) akka

Scala concurrent programming (II) akka

2022-07-05 23:17:00 Shawn Jeon

summary

  • Akka It's using Scala Developed library , It's based on event driven , Toolkit for building highly concurrent projects
  • Akka characteristic :
  1. Provide asynchronous based non blocking , High performance event driven programming model
  2. Built in fault tolerance mechanism , allow Actor When something goes wrong , Restore or reset
  3. Lightweight event handling ( Every time GB Heap memory millions Actor. * Lightweight event processing and heavyweight division , It mainly depends on whether it depends on the operating system and hardware , Dependence is a heavyweight , Independence is lightweight
  4. High concurrency applications can be built on a single machine , It can also build distributed applications in the network

Akka Communication process

 Insert picture description here

  1. Students create a ActorSystem
  2. adopt ActorSystem To create a ActorRef( Teacher's quotation ), And send the message to ActorRef
  3. ActorRef Send message to Message Dispatcher( Message distributor )
  4. Message Dispatcher Save messages to the destination in order Actor Of MailBox in
  5. Message Dispatcher take MailBox Put it in a thread
  6. MailBox Take out messages in order , Finally pass it to TeacherActor Receiving method

API Introduce :

  • ActorSystem: Responsible for creating and supervising Actor
  1. ActorSystem Is a singleton object , Many can be created through it Actor
  2. Use context.system You can get the management information Actor Of ActorSystem References to
  • Realization Actor class
  1. Define class or singleton object inheritance Actor(import akka.actor.Actor)
  2. Realization receive How to receive the message ( No need to add loop& react Method )
  3. Can achieve preStart() Method ( Optional ), The method in Actor Execute after the object is built , stay Actor Only once in the life cycle
  • load Actor
  1. To create a Akka Of Actor object , Must be created first ActorSystem
  2. call ActorSystem.actorOf(Props(Actor object ), “Actor name ”) To load the Actor

Actor Path

  • every last Actor There is one. Path, This path can be externally referenced
type route
Local Actorakka://actorSystem name /user/Actor name
long-range Actorakka.tcp://[email protected] Address :port/user/Actor name

Introductory cases

  • adopt ActorSystem Load two Actor(SenderActor& ReceiverActor), And from SenderActor Send a message , stay ReceiverActor receive , Then reply to the message

 Insert picture description here

package com.akka.ex1

/**
 *  Message format of task submission 
 *
 * @param msg  Send a message 
 */
case class SubmitTaskMessage(msg: String)

/**
 *  The format of the receipt information after the task is submitted successfully 
 *
 * @param msg  Receipt information 
 */
case class SuccessSubmitTaskMessage(msg: String)


package com.akka.ex1

import akka.actor.Actor

object SenderActor extends Actor {
  override def receive: Receive = {
    //  receive  Entrance A message sent : start
    case "start" => {
      println("SenderActor, received: start!")
      //  obtain  ReceiverActor The path of 
      val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")
      //  to  ReceiverActor Send a message 
      receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.")
    }
    //  receive  ReceiverActor Return receipt information 
    case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}")
  }
}


package com.akka.ex1

import akka.actor.Actor

object ReceiverActor extends Actor {
  override def receive: Receive = {
    //  receive  SenderActor A message sent 
    case SubmitTaskMessage(msg) => {
      println(s"ReceiverActor, received: ${msg}")
      //  to  SenderActor Reply message 
      sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.")
    }
  }
}


package com.akka.ex1

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    /**
     *  establish ActorSystem,  Used to be responsible for creating and supervising  Actor
     *
     * @param name : scala.Predef.String  to  ActorSystem Set the name 
     * @param config : com.typesafe.config.Config  Configuration environment 
     */
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())

    /**
     *  adopt  ActorSystem To load custom  Actor object 
     *
     * @param props : akka.actor.Props  Specify the  Actor Companion 
     * @param name : scala.Predef.String  To assign to  Actor Object setting name 
     */
    val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
    //  Must be given to each  Actor Set the name ,  Otherwise, we can't get from  SenderActor Through internal  context.actorSelection The way ,  get  ReceiverActor The object of 
    //  And will prompt  Actor[akka://actorSystem/user/receiverActor] was not delivered
    actorSystem.actorOf(Props(ReceiverActor), "receiverActor")

    //  to  SenderActor send out  "start" character string 
    senderActor ! "start"
  }
}
SenderActor, received: start!
ReceiverActor, received: Hello ReceiverActor!, This is SenderActor.
SenderActor, received: Hi!, This is ReceiverActor.

Timed mission cases

  • adopt ActorSystem.scheduler.schedule() Method , Start timing task
  • Usage mode 1:

	final def schedule(
		initialDelay : FiniteDuration, //  Start for the first time ,  Press this to set the time ,  Execution after delay 
		interval : FiniteDuration, //  How often do I execute ( Start for the first time ,  Execute immediately ,  No delay 
		receiver : ActorRef, //  Set the destination to receive messages  Actor
		message : Any) //  Message to send 
	(implicit executor : ExecutionContext, sender : ActorRef = {}) //  Implicit parameter ,  Need to import 

  • Usage mode 2:

	final def schedule(
		initialDelay : FiniteDuration, //  Start for the first time ,  Press this to set the time ,  Execution after delay 
		interval : FiniteDuration //  How often do I execute ( Start for the first time ,  Execute immediately ,  No delay 
	)(f : => Unit) //  Functions to be executed regularly ( news 
	(implicit executor : ExecutionContext) //  Implicit parameter ,  Need to import 

package com.akka.ex2

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object MainActor {
  object ReceiverActor extends Actor {
    override def receive: Receive = {
      case x => println(x)
    }
  }

  def main(args: Array[String]): Unit = {
    //  establish ActorSystem,  Used to be responsible for creating and supervising  Actor
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    //  adopt  ActorSystem To load custom  Actor object 
    val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")

    //  Import implicit parameters &  transformation 
    import actorSystem.dispatcher
    import scala.concurrent.duration._

    //  By timer ,  Give to regularly  ReceiverActor Send a message 
    //  The way  1:  Use the provided  Any Message of data type parameter 
    actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.")

    //  The way  2:  Messages using custom functions 
    actorSystem.scheduler.schedule(0 seconds, 5 seconds) {
      receiverActor ! "Hello ReceiverActor!, 222."
    }
  }
}

Communication case between two processes

 Insert picture description here

  1. WorkerActor send out "connect" A message to MasterActor
  2. MasterActor reply "success" A message to WorkerActor
  3. WorkerActor Receive and print the received message
package com.akka.master

import akka.actor.Actor

object MasterActor extends Actor {
  override def receive: Receive = {
    case "setup" => println("MasterActor started!")
    //  receive  WorkerActor A message sent 
    case "connect" => {
      println("MasterActor, received: connect!")
      //  To the sender (WorkerActor) Return receipt information 
      sender ! "success"
    }
  }
}

package com.akka.master

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")
    //  to  MasterActor Send a message 
    masterActor ! "setup"
  }
}


package com.akka.worker

import akka.actor.Actor

// WorkerActor The path of : akka.tcp://[email protected]:8081/user/workerActor
object WorkerActor extends Actor {
  override def receive: Receive = {
    case "setup" => {
      println("WorkerActor started!")
      //  Remote access  MasterActor
      val masterActor = context.system.actorSelection("akka.tcp://[email protected]:8080/user/masterActor")
      //  to  MasterActor Send string  connect
      masterActor ! "connect"
    }
    //  receive  MasterActor A message sent 
    case "success" => println("MasterActor, received: success!")
  }
}

package com.akka.worker

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")
    //  to  WorkerActor Send a message 
    workerActor ! "setup"
  }
}

Simple version of Spark Communication framework implementation case

  • simulation Spark Of Master With many Worker Communication for

 Insert picture description here

  • Operational steps :
  1. start-up MasterActor
    1.1 MasterActor After the object is built , Turn on timed tasks ( For self-test , For removing timeouts WorkerActor

  2. start-up WorkerActor
    2.1 WorkerActor After the object is built , Encapsulate your own information into Registration information after , issue MasterActor

  3. MasterActor receive WorkerActor Registration information , And save
    3.1 to WorkerActor Receipt information

  4. WorkerActor After requesting registration , Received information , And print Connection is successful!
    4.1 Turn on timed tasks , to MasterActor Send a heartbeat message

  5. MasterActor receive WorkerActor Heartbeat message from , Will be WorkerActor The last heartbeat time in the registration information of is updated to the current time

project name explain
scala-spark-akka-common Store public message entity classes
scala-spark-akka-masterAkka Master node
scala-spark-akka-workerAkka Worker node
package com.akka.spark.common

/**
 *  Used to save registered  WorkerActor Of information 
 *
 * @param workerId : WorkerActor Of  Id(UUID
 * @param cpuCores : WorkerActor Of  CPU Check the number 
 * @param memory : WorkerActor  The memory size of 
 * @param lastHeartBeatTime :  Last heartbeat time 
 */
case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long)


package com.akka.spark.common

/**
 * WorkerActor Class that submits registration information 
 *
 * @param workerId : WorkerActor Of  Id(UUID
 * @param cpuCores : WorkerActor Of  CPU Check the number 
 * @param memory : WorkerActor  The memory size of 
 */
case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int)

/**  The singleton object returned after successful registration */
case object RegisterSuccessMessage

/**
 * WorkerActor Regularly trigger the heartbeat to  MasterActor Information class of 
 *
 * @param workerId : WorkerActor Of  Id(UUID
 * @param cpuCores : WorkerActor Of  CPU Check the number 
 * @param memory : WorkerActor  The memory size of 
 */
case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int)


package com.akka.spark.master

import com.typesafe.config.{Config, ConfigFactory}

//  Class used to read configuration file information 
object ConfigUtils {
  // 1.  Get profile object 
  private val config: Config = ConfigFactory.load()
  // 2.  Get checked  WorkerActor The interval between heartbeats 
  val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")
  // 3.  obtain  WorkerActor Heartbeat timeout 
  val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout")
}


package com.akka.spark.master

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Master {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    actorSystem.actorOf(Props(MasterActor), "masterActor")
  }
}


package com.akka.spark.master

import java.util.Date
import akka.actor.Actor
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage}

object MasterActor extends Actor {
  // 1.  Define a variable  Map aggregate ,  Used to save registered  WorkerActor Information 
  private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]()

  // MasterActor Regular inspection  WorkerActor heartbeat ,  Will time out  Worker remove 
  override def preStart(): Unit = {
    // 1.  Import time implicit parameters &  transformation 
    import context.dispatcher
    import scala.concurrent.duration._

    // 2.  Start timing task (MasterActor Self test removal timeout  WorkerActor)
    context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) {
      // 3.  Filter timeout  WorkerActor( Returns the filtered  workerId aggregate 
      val timeOutWorkerMap = regWorkerMap.filter {
        keyVal => // keyVal Data format : workerId -> WorkerInfo(workerId, cpuCores, memory, lastHeartBeatTime)
          // 3.1  Get current  WorkerActor The last heartbeat time of the object 
          val lastHeartBeatTime = keyVal._2.lastHeartBeatTime
          // 3.2  If the timeout ,  be  true,  otherwise  false ( current time  -  Last heartbeat time ) >  Maximum timeout  * 1000
          if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false
      }

      // 4.  The to be removed has timed out  workerId aggregate 
      if (!timeOutWorkerMap.isEmpty) {
        regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b)
      }

      // 5.  Effective  WorkerActor,  Sort in descending order by memory size 
      val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverse
      println(s"Active WorkerActors: ${workerList}")
    }
  }

  override def receive: Receive = {
    //  receive  WorkerActor Registration information 
    case WorkerRegisterMessage(workerId, cpuCores, memory) => {
      //  Print the received registration information 
      println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}")

      //  Save the registration information to the hash table &  And record the last heartbeat time 
      regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)

      //  For registered  WorkerActor Receipt information 
      sender ! RegisterSuccessMessage
    }

    //  receive  WorkerActor Heartbeat message from 
    case WorkerHeartBeatMessage(workerId, cpuCores, memory) => {
      println(s"MasterActor, received heartbeat: ${workerId}")
      //  Update the specified  WorkerActor The last heartbeat time of the object 
      regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)
    }
  }
}


package com.akka.spark.worker

import com.typesafe.config.{Config, ConfigFactory}

//  Class used to read configuration file information 
object ConfigUtils {
  // 1.  Get profile object 
  private val config: Config = ConfigFactory.load()
  // 2.  obtain  WorkerActor The interval between heartbeats 
  val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}


package com.akka.spark.worker

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Worker {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    actorSystem.actorOf(Props(WorkerActor), "workerActor")
  }
}


package com.akka.spark.worker

import java.util.UUID
import akka.actor.{Actor, ActorSelection}
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage}
import scala.util.Random

object WorkerActor extends Actor {
  //  Express  MasterActor References to 
  private var masterActor: ActorSelection = _
  // WorkerActor Registration information 
  private var workerId: String = _
  private var cpuCores: Int = _ // CPU Check the number 
  private var memory: Int = _ //  Memory size 
  private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // CPU Random value range of core number 
  private val memoryList = List(512, 1024, 2048, 4096) //  Random range of memory size 

  override def preStart(): Unit = {
    //  obtain  MasterActor References to 
    masterActor = context.system.actorSelection("akka.tcp://[email protected]:8080/user/masterActor")

    //  Random setting number 
    workerId = UUID.randomUUID().toString

    //  Random selection  WorkerActor Of  CPU Number of cores and memory size 
    val r = new Random()
    cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length))
    memory = memoryList(r.nextInt(memoryList.length))

    //  encapsulation  WorkerActor Registration information 
    val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory)
    //  issue  MasterActor
    masterActor ! registerMessage
  }

  override def receive: Receive = {
    //  Receipt information of successful registration 
    case RegisterSuccessMessage => {
      println("Connection is successful!")

      // 1.  Import time implicit parameters &  transformation 
      import context.dispatcher
      import scala.concurrent.duration._

      //  Give to regularly  MasterActor Send a heartbeat message 
      context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) {
        masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory)
      }
    }
  }
}

If you find it helpful , Welcome to like it ~ thank you !!

原网站

版权声明
本文为[Shawn Jeon]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202140334179025.html