当前位置:网站首页>Scala basic tutorial -- 20 -- akka
Scala basic tutorial -- 20 -- akka
2022-07-04 18:52:00 【Empty.】
Scala Basic course –20–Akka
Chapter goal
- understand Akka Introduction to concurrent programming framework
- master Akka Introductory cases
- master Akka Timing task code implementation
- Master the case of communication between two processes
- Master the easy version spark Communication framework case
1. Akka Introduction to concurrent programming framework
1.1 Akka summary
Akka It is used to build high concurrency 、 Distributed and extensible event driven application toolkit .Akka It's using scala Developed library , At the same time, you can use scala and Java Language to develop based on Akka Applications for .
1.2 Akka characteristic
- Provide asynchronous based non blocking 、 High performance event driven programming model
- Built in fault tolerance mechanism , allow Actor Recover or reset when an error occurs
- Super lightweight event handling ( Every time GB Heap memory millions Actor)
- Use Akka You can build highly concurrent programs on a single machine , You can also build distributed programs in the network .
1.3 Akka Communication process
The following pictures illustrate Akka Actor The basic process of concurrent programming model :
- Students create a ActorSystem
- adopt ActorSystem To create a ActorRef( Teacher's quotation ), And send the message to ActorRef
- ActorRef Send message to Message Dispatcher( Message distributor )
- Message Dispatcher Save messages to the destination in order Actor Of MailBox in
- Message Dispatcher take MailBox Put it in a thread
- MailBox Take out messages in order , Finally pass it to TeacherActor Accepted methods
2. establish Actor
Akka in , Is based on Actor To program . Similar to what I learned before Actor. however Akka Of Actor Compiling 、 The creation method is different from that before .
2.1 API Introduce
ActorSystem: It is responsible for creating and supervising Actor
- stay Akka in ,ActorSystem Is a heavyweight structure , It needs to allocate multiple threads .
- in application , ActorSystem Usually a singleton object , You can use it to create many Actor.
- Use it directly
context.system
You can get and manage the Actor Of ActorSystem References to .
Realization Actor class
- Define class or singleton object inheritance Actor( Be careful : To import akka.actor Under bag Actor)
- Realization receive Method ,receive In the method Process messages directly that will do , No need to add loop and react Method call . Akka Automatically called receive To receive messages .
- 【 Optional 】 It can also be realized preStart() Method , The method in Actor Execute after the object is built , stay Actor Only once in the life cycle .
load Actor
- To create a Akka Of Actor, You must first get and create a ActorSystem. Need to give ActorSystem Specify a name , And you can load some configuration items ( I'll use it later )
- call ActorSystem.actorOf(Props(Actor object ), “Actor name ”) To load the Actor.
2.2 Actor Path
every last Actor There is one. Path, This path can be externally referenced . The format of the path is as follows :
Actor type | route | Example |
---|---|---|
Local Actor | akka://actorSystem name /user/Actor name | akka://SimpleAkkaDemo/user/senderActor |
long-range Actor | akka.tcp://[email protected] Address :port/user/Actor name | akka.tcp://192.168.10.17:5678/user/service-b |
2.3 Introductory cases
2.3.1 demand
be based on Akka Create two Actor,Actor You can send messages to each other .
2.3.2 Implementation steps
- establish Maven modular
- Create and load Actor
- send out / receive messages
2.3.3 establish Maven modular
Use Akka Import required Akka library , Here we use Maven To manage the project , The specific steps are as follows :
establish Maven modular .
Selected items , Right click -> new -> Module -> Maven -> Next -> GroupId: com.itheima ArtifactId: akka-demo next -> Set up "module name" The value is "akka-demo" -> finish
open pom.xml file , Import akka Maven Dependencies and plug-ins .
//1. Send the information directly pom.xml Just paste the contents of the document . //2. The source directory is in : src/main/scala Next //3. The test code directory is : src/test/scala Next . //4. The above two folders do not exist by default , We need to manually create . //5. When it's created , Remember to change the type of the two folders . Selected folder , Right click -> Mark Directory as -> Source Roots // Store source code . Test Source Roots // Store test code .
2.3.4 Create and load Actor
Here we are , We have Maven The project is created , In the future, we will adopt Maven To manage our projects . Next , So let's do that :
Create and load Actor, here , We're going to create two Actor:
- SenderActor: Used to send messages
- ReceiverActor: For reception , Reply message
Specific steps
stay src/main/scala Create package under folder : com.itheima.akka.demo
Create two under this package Actor( Be careful : use object Decorated singleton object ).
SenderActor: Means to send a message Actor object .
ReceiverActor: Indicates that the message is received Actor object .
Create... Under the package
Singleton object Entrance, And package main Method
, Represents the entry of the whole program .Start the program , If no error is reported , There is no problem with the code .
Reference code
object SenderActor extends Actor {
/* details : stay Actor In the concurrent programming model , Need to achieve act Method , Want to receive messages continuously , It can be done by loop + react Realization . stay Akka In programming model , Need to achieve receive Method , Directly in receive Method to write a partial function to process messages . */
// rewrite receive() Method
override def receive: Receive = {
case x => println(x)
}
}
object ReceiverActor extends Actor{
// rewrite receive() Method
override def receive: Receive = {
case x => println(x)
}
}
object Entrance {
def main(args:Array[String]) = {
//1. Achieve one Actor Trait, In fact, it is to create two Actor object ( The above steps have been realized ).
//2. establish ActorSystem
// The two parameters mean :ActorSystem Name , Load profile ( Don't set )
val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())
//3. load Actor
//actorOf The two parameters of the method mean : 1. Concrete Actor object . 2. The Actor The name of the object
val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
}
}
2.3.5 send out / receive messages
Thought analysis
- Use the sample class to encapsulate the message
- SubmitTaskMessage—— Submit task message
- SuccessSubmitTaskMessage—— Task submission success message
- Use
!
Send asynchronous no return message .
Reference code
MessagePackage.scala Code in file
/** * Record the sending message Sample class . * @param msg Specific information to be sent . */ case class SubmitTaskMessage(msg:String) /** * Record Receipt information Sample class . * @param msg Specific receipt information . */ case class SuccessSubmitTaskMessage(msg:String)
Entrance.scala Code in file
// The main entry of the program . object Entrance { def main(args: Array[String]): Unit = { //1. establish ActorSystem, Used to manage all user-defined Actor. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. adopt ActorSystem, To manage our customized Actor(SenderActor, ReceiverActor) val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor") val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor") //3. from ActorSystem to SenderActor Send a sentence "start". senderActor ! "start" } }
SenderActor.scala Code in file
object SenderActor extends Actor{ override def receive: Receive = { //1. receive Entrance Sent by : start case "start" => { //2. Print received data . println("SenderActor Received : Entrance Sent by start Information .") //3. obtain ReceiverActor The specific path . // Parameters : To obtain the Actor The specific path . // Format : akka://actorSystem Name /user/ To obtain the Actor Name . val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor") //4. to ReceiverActor Send a message : Use the sample class SubmitTaskMessage receiverActor ! SubmitTaskMessage(" I am a SenderActor, I'm sending you a message !...") } //5. receive ReceiverActor The receipt message sent . case SuccessSubmitTaskMessage(msg) => println(s"SenderActor Receipt message received : ${ msg} ") } }
ReceiverActor.scala Code in file
object ReceiverActor extends Actor { override def receive: Receive = { //1. receive SenderActor Messages sent . case SubmitTaskMessage(msg) => { //2. Print received messages . println(s"ReceiverActor Received : ${ msg}") //3. Give the receipt information . sender ! SuccessSubmitTaskMessage(" Successfully received the task !. I am a ReceiverActor") } } }
Output results
SenderActor Received : Entrance Sent by start Information .
ReceiverActor Received : I am a SenderActor, I'm sending you a message !...
SenderActor Receipt message received : Successfully received the task !. I am a ReceiverActor
3. Akka Timing task
demand : If we want to use it Akka The framework performs some tasks regularly , How to deal with it ?
answer : stay Akka in , Provides a scheduler Object to realize the timing scheduling function . Use ActorSystem.scheduler.schedule() Method
, You can start a scheduled task .
3.1 schedule() The format of the method
Mode one : use
Send a message
In the form of .def schedule( initialDelay: FiniteDuration, // How long is the delay before starting the scheduled task interval: FiniteDuration, // How often do I execute receiver: ActorRef, // For which Actor Send a message message: Any) // Message to send (implicit executor: ExecutionContext) // Implicit parameter : Manual import required
Mode two : use
Custom method
Realization .def schedule( initialDelay: FiniteDuration, // How long is the delay before starting the scheduled task interval: FiniteDuration // How often do I execute )(f: ⇒ Unit) // Functions to be executed regularly , You can write the logic here (implicit executor: ExecutionContext) // Implicit parameter : Manual import required
Be careful : No matter which of the above methods is used to realize the timer , Need to be
Import implicit transformations and implicit parameters
, As follows :// Import implicit transformation , Used to support Timer . import actorSystem.dispatcher // Import implicit parameters , Used to set default parameters for timers . import scala.concurrent.duration._
3.2 Case study
demand
- Define a ReceiverActor, Used to receive messages circularly , And print the received content .
- Create a ActorSystem, Used to manage all user-defined Actor.
- relation ActorSystem and ReceiverActor.
- Import implicit transformations and implicit parameters .
- By timer , timing ( interval 1 second ) to ReceiverActor Send a sentence .
- Mode one : It is realized in the form of sending messages .
- Mode two : It is implemented in a customized way .
Reference code
// Case study : demonstration Akka Timer in .
object MainActor {
//1. Define a Actor, Used to receive messages circularly , And print .
object ReceiverActor extends Actor {
override def receive: Receive = {
case x => println(x) // No matter what you receive , Print all .
}
}
def main(args: Array[String]): Unit = {
//2. Create a ActorSystem, Used to manage all user-defined Actor.
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
//3. relation ActorSystem and ReceiverActor.
val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
//4. Import implicit transformations and implicit parameters .
// Import implicit transformation , Used to support Timer .
import actorSystem.dispatcher
// Import implicit parameters , Used to set default parameters for timers .
import scala.concurrent.duration._
//5. By timer , timing ( interval 1 second ) to ReceiverActor Send a sentence .
// Mode one : Through the first way of timer , Pass in four parameters .
//actorSystem.scheduler.schedule(3.seconds, 2.seconds, receiverActor, " Hello , I'm a kind of brother , I have seeds. Can you buy them ?...")
// Mode two : Through the second way of timer , Two times , And a function .
//actorSystem.scheduler.schedule(0 seconds, 2 seconds)(receiverActor ! " New seeds , You haven't seen it ! Hey, hey, hey ...")
// Practical development method
actorSystem.scheduler.schedule(0 seconds, 2 seconds){
receiverActor ! " New seeds , You haven't seen it ! Hey, hey, hey ..."
}
}
}
4. Realize the communication between two processes
4.1 Case introduction
be based on Akka Implemented in two process Inter send 、 receive messages .
- WorkerActor Connect after startup MasterActor, And send a message to MasterActor.
- MasterActor After receiving the message , Then reply to WorkerActor.
4.2 Worker Realization
step
Create a Maven modular , Import dependencies and configuration files .
establish Maven modular .
GroupId: com.itheima
ArtifactID: akka-worker
Put the information under pom.xml Content in file copied to Maven project akka-worker Of pom.xml In file
Put the information under application.conf Copied to the src/main/resources Under the folder .
open application.conf The configuration file , Change the port number to : 9999
Create a start WorkerActor.
- stay src/main/scala Create package under folder : com.itheima.akka
- Create... Under the package WorkerActor( Create in the form of a singleton object ).
- Create... Under the package Entrance Singleton object , Inside definition main Method
send out "setup" A message to WorkerActor,WorkerActor Receive print messages .
Start the test .
Reference code
WorkerActor.scala Code in file
//1. establish WorkActor, Used to receive and send messages . object WorkerActor extends Actor{ override def receive: Receive = { //2. receive messages . case x => println(x) } }
Entrance.scala Code in file
// Program entrance . // At present ActorSystem The path to the object akka.tcp://[email protected]:9999 object Entrance { def main(args: Array[String]): Unit = { //1. establish ActorSystem. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. adopt ActorSystem, Load custom WorkActor. val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor") //3. to WorkActor Send a sentence . workerActor ! "setup" } } // Start the test : Right click , perform , If the print result appears "setup", It shows that there is no problem with the execution of the program .
4.3 Master Realization
step
Create a Maven modular , Import dependencies and configuration files .
establish Maven modular .
GroupId: com.itheima
ArtifactID: akka-master
Put the information under pom.xml Content in file copied to Maven project akka-master Of pom.xml In file
Put the information under application.conf Copied to the src/main/resources Under the folder .
open application.conf The configuration file , Change the port number to : 8888
Create a start MasterActor.
- stay src/main/scala Create package under folder : com.itheima.akka
- Create... Under the package MasterActor( Create in the form of a singleton object ).
- Create... Under the package Entrance Singleton object , Inside definition main Method
WorkerActor send out "connect" A message to MasterActor
MasterActor reply "success" A message to WorkerActor
WorkerActor Receive and print the received message
start-up Master、Worker test
Reference code
MasterActor.scala Code in file
//MasterActor: For reception WorkerActor Data sent , And return it Receipt information . // Responsible for managing the MasterActor Of ActorSystem The address of : akka.tcp://[email protected]:8888 object MasterActor extends Actor{ override def receive: Receive = { //1. receive WorkerActor Data sent case "connect" => { println("MasterActor Received : connect!...") //2. to WorkerActor The receipt is in one sentence . sender ! "success" } } }
Entrance.scala Code in file
//Master The main entrance of the module object Entrance { def main(args: Array[String]): Unit = { //1. establish ActorSystem, Used to manage all user customizations Actor. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. relation ActorSystem and MasterActor. val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor") //3. to masterActor Send a sentence : Test data , For testing . //masterActor ! " Test data " } }
WorkerActor.scala Code in file ( It modifies the 3 Step )
//WorkerActor: For reception ActorSystem Message sent , And send a message to MasterActor, And then to receive MasterActor Receipt information for . // Responsible for managing the WorkerActor Of ActorSystem The address of : akka.tcp://[email protected]:9999 object WorkerActor extends Actor{ override def receive: Receive = { //1. receive Entrance Sent by : setup. case "setup" => { println("WorkerActor Received : Entrance Instructions sent setup!.") //2. obtain MasterActor References to . val masterActor = context.system.actorSelection("akka.tcp://[email protected]:8888/user/masterActor") //3. to MasterActor Send a sentence . masterActor ! "connect" } //4. receive MasterActor Receipt information for . case "success" => println("WorkerActor Received : success!") } }
5. Case study : Simple version of spark Communication framework
5.1 Case introduction
simulation Spark Of Master And Worker signal communication .
- One Master
- Manage multiple Worker
- Several Worker(Worker You can add as needed )
- towards Master Send registration information
- towards Master Send heartbeat information regularly
5.2 Realize the idea
- structure Master、Worker Stage
- structure Master ActorSystem、Actor
- structure Worker ActorSystem、Actor
- Worker Registration phase
- Worker Process to Master register ( To his own ID、CPU Check the number 、 Memory size (M) Send to Master)
- Worker Send heartbeat phase regularly
- Worker Regularly send to Master Send heartbeat message
- Master Timed heartbeat detection phase
- Master Regular inspection Worker heartbeat , Put some overtime Worker remove , Also on Worker Sort in reverse order by memory
- Multiple Worker Testing phase
- Start multiple Worker, Check whether the registration is successful , And stop a Worker See if it can be removed correctly
5.3 Project construction
demand
This project uses Maven Construction works .
step
- Set up the following projects respectively , Group ID All for unity : com.itheima, The specific project name is as follows :
project name | explain |
---|---|
spark-demo-common | Store public messages 、 Entity class |
spark-demo-master | Akka Master node |
spark-demo-worker | Akka Worker node |
Import dependence ( In the package pom.xml).
Be careful : master, worker You want to add common rely on , As follows :
<!-- Import spark-demo-common modular --> <dependency> <groupId>com.itheima</groupId> <artifactId>spark-demo-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Under three projects src/main, src/test Next , establish scala Catalog .
Import profile ( In the package application.conf)
- modify Master The port is 7000
- modify Worker The port is 8000
5.4 structure Master and Worker
demand
Build... Separately Master and Worker, And start the test
step
- Create and load Master Actor
- Create and load Worker Actor
- Test whether it can start successfully
Reference code
complete master The code in the module , namely : stay src/main/scala Create package : com.itheima.spark.master, The code in the package is as follows :
MasterActor.scala Code in file
//Master: Used to manage multiple Worker Of . //MasterActor The path of : akka.tcp://[email protected]:7000 object MasterActor extends Actor{ override def receive: Receive = { case x => println(x) } }
Master.scala Code in file
// Program entrance : Equivalent to what we wrote before MainActor object Master { def main(args: Array[String]): Unit = { //1. establish ActorSystem. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. adopt ActorSystem, relation MasterActor. val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor") //3. Start the program , If no error is reported , There is no problem with the code . } }
complete worker The code in the module , namely : stay src/main/scala Create package : com.itheima.spark.worker, The code in the package is as follows :
WorkerActor.scala Code in file
//WorkerActor The address of : akka.tcp://[email protected]:7100 object WorkerActor extends Actor{ override def receive: Receive = { case x => println(x) } }
Worker.scala Code in file
// Program entrance object Worker { def main(args: Array[String]): Unit = { //1. establish ActorSystem. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //2. adopt ActorSystem, relation MasterActor. val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor") //3. Start the program , If no error is reported , There is no problem with the code . workerActor ! "hello" } }
5.5 Worker The registration phase realizes
demand
stay Worker Startup time , Send a registration message to Master.
Thought analysis
- Worker towards Master Send registration message (workerid、cpu Check the number 、 Memory size )
- Random generation CPU nucleus (1、2、3、4、6、8)
- Randomly generate memory size (512、1024、2048、4096)( Company M)
- Master preservation Worker Information , And give Worker Reply to the registration success message
- Start the test
Specific steps
stay spark-demo-common Project src/main/scala Create package under folder : com.itheima.spark.commons
Put the information under MessagePackage.scala and Entities.scala These two files are copied to commons It's a bag .
stay WorkerActor Define some member variables in the singleton object , respectively :
- masterActorRef: Express MasterActor References to .
- workerid: At present WorkerActor Object's id.
- cpu: At present WorkerActor Object's CPU Check the number .
- mem: At present WorkerActor Object's memory size .
- cup_list: At present WorkerActor Object's CPU Value range of core number .
- mem_list: At present WorkerActor The value range of the memory size of the object .
stay WorkerActor Of preStart() In the method , Encapsulate registration information , And send it to MasterActor.
stay MasterActor In the receiving WorkerActor Submitted registration information , And save it to the double column set …
MasterActor to WorkerActor Send a receipt message ( Registration success information .).
stay WorkerActor In the receiving MasterActor reply Registration success information .
Reference code
WorkerActor.scala Code in file
//WorkerActor The address of : akka.tcp://[email protected]:7100 object WorkerActor extends Actor { //1 Define member variables , Record MasterActor References to , as well as WorkerActor Submitted registration parameter information . private var masterActorRef: ActorSelection = _ // Express MasterActor References to . private var workerid:String = _ // Express WorkerActor Of id private var cpu:Int = _ // Express WorkerActor Of CPU Check the number private var mem:Int = _ // Express WorkerActor The memory size of . private val cpu_list = List(1, 2, 3, 4, 6, 8) //CPU Value range of core number private val mem_list = List(512, 1024, 2048, 4096) // Memory size value range //2. rewrite preStart() Method , What's inside : stay Actor It will be executed before startup . override def preStart(): Unit = { //3. obtain Master References to . masterActorRef = context.actorSelection("akka.tcp://[email protected]:7000/usre/masterActor") //4. Build registration messages . workerid = UUID.randomUUID().toString // Set up workerActor Of id val r = new Random() cpu = cpu_list(r.nextInt(cpu_list.length)) mem = mem_list(r.nextInt(mem_list.length)) //5. take WorkerActor The submitted information is encapsulated into WorkerRegisterMessage object . var registerMessage = WorkerRegisterMessage(workerid, cpu, mem) //6. Send a message to MasterActor. masterActorRef ! registerMessage } override def receive: Receive = { case x => println(x) } }
MasterActor.scala Code in file
//Master: Used to manage multiple Worker Of . //MasterActor The path of : akka.tcp://[email protected]:7000 object MasterActor extends Actor{ //1. Define a variable Map aggregate , Used to save the successful registration Worker Information . private val regWorkerMap = collection.mutable.Map[String, WorkerInfo]() override def receive: Receive = { case WorkerRegisterMessage(workId, cpu, mem) => { //2. Print the received registration information println(s"MasterActor: Received worker Registration information , ${ workId}, ${ cpu}, ${ mem}") //3. Save the saved information after successful registration to : workInfo in . regWorkerMap += workId -> WorkerInfo(workId, cpu, mem) //4. Reply to a successful registration message . sender ! RegisterSuccessMessage } } }
modify WorkerActor.scala In file receive() Method code
override def receive: Receive = { case RegisterSuccessMessage => println("WorkerActor: Registered successfully !") }
5.6 Worker Send heartbeat phase regularly
demand
Worker Received Master After returning the registration success information , Give to regularly Master Send heartbeat message . and Master received Worker After sending the heartbeat message , Need to update the corresponding Worker The last heartbeat time of .
Thought analysis
- Write a tool class to read the heartbeat sending time interval
- Create heartbeat message
- Worker After receiving the successful registration , Send heartbeat messages regularly
- Master Heartbeat message received , to update Worker Last heartbeat time
- Start the test
Specific steps
stay worker Of src/main/resources Under folder application.conf Add a configuration to the file .
worker.heartbeat.interval = 5 // To configure worker Send heartbeat cycle ( The unit is s)
stay worker Project com.itheima.spark.work Create a new singleton object under the package : ConfigUtils, Used to read configuration file information .
stay WorkerActor Of receive() In the method , Give to regularly MasterActor Send heartbeat message .
Master Received heartbeat message , to update Worker Last heartbeat time . .
Reference code
worker Project ConfigUtils.scala Code in file
object ConfigUtils { //1. Get configuration information object . private val config = ConfigFactory.load() //2. obtain worker The specific cycle of heartbeat val `worker.heartbeat.interval` = config.getInt("worker.heartbeat.interval") }
modify WorkerActor.scala Of documents receive() The code in the method
override def receive: Receive = { case RegisterSuccessMessage => { //1. Print received Registration success message println("WorkerActor: Received the registration success message !") //2. Import time unit implicit conversion and Implicit parameter import scala.concurrent.duration._ import context.dispatcher //3. Give to regularly Master Send heartbeat message . context.system.scheduler.schedule(0 seconds, ConfigUtil.`worker.heartbeat.interval` seconds){ //3.1 Send in the form of customized messages Heartbeat information . masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem) } } }
MasterActor.scala Code in file
object MasterActor extends Actor { //1. Define a variable Map aggregate , Used to save the successful registration Worker Information . private val regWorkerMap = collection.mutable.Map[String, WorkerInfo]() override def receive: Receive = { // Receive registration information . case WorkerRegisterMessage(workId, cpu, mem) => { //2. Print the received registration information println(s"MasterActor: Received worker Registration information , ${ workId}, ${ cpu}, ${ mem}") //3. Save the saved information after successful registration to : workInfo in . regWorkerMap += workId -> WorkerInfo(workId, cpu, mem, new Date().getTime) //4. Reply to a successful registration message . sender ! RegisterSuccessMessage } // Receiving heartbeat messages case WorkerHeartBeatMessage(workId, cpu, mem) => { //1. Print the received heartbeat message . println(s"MasterActor: Received ${ workId} Heartbeat information of ") //2. Updates the specified Worker The last heartbeat of . regWorkerMap += workId -> WorkerInfo(workId, cpu, mem, new Date().getTime) //3. To test whether the code logic OK, We can print it regWorkerMap Information about println(regWorkerMap) } } }
5.7 Master Timed heartbeat detection phase
demand
If a worker No heartbeat has been sent for more than a period of time ,Master You need to worker From the present Worker Remove... From collection . Can pass Akka Scheduled tasks for , To realize heartbeat timeout check .
Thought analysis
- Write a tool class , Read check heartbeat interval interval 、 Timeout time
- Check your heart rate regularly , Filter out those greater than the timeout Worker
- Remove the timeout Worker
- To the existing Worker Sort in descending order by memory , Printing is available Worker
Specific steps
modify Master Of application.conf The configuration file , Add two configurations
# Configuration check Worker The time period of heartbeat ( Company : second )
master.check.heartbeat.interval = 6
# To configure worker Heartbeat timeout time ( second )
master.check.heartbeat.timeout = 15stay Master Project com.itheima.spark.master Package created under : ConfigUtils Tool class ( Singleton object ), Used to read configuration file information .
stay MasterActor Start to check the heartbeat ( namely : modify MasterActor#preStart The code in .).
Turn on Master, Then open Worker, To test .
Reference code
Master Project ConfigUtils.scala Code in file
// in the light of Master Tool class of . object ConfigUtil { //1. Get the configuration file object . private val config: Config = ConfigFactory.load() //2. Get checked Worker The time period of heartbeat ( Company : second ) val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval") //3. obtain worker Heartbeat timeout time ( second ) val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout") }
MasterActor.scala Of documents preStart() The code in the method
//5. Timing check worker Heartbeat information of override def preStart(): Unit = { //5.1 Import time conversion implicit type and Timed task implicit variable import scala.concurrent.duration._ import context.dispatcher //5.2 Start timing task . context.system.scheduler.schedule(0 seconds, ConfigUtil.`master.check.heartbeat.interval` seconds) { //5.3 Filter those greater than the timeout Worker. val timeOutWorkerMap = regWorkerMap.filter { keyval => //5.3.1 Get the last heartbeat update time . val lastHeatBeatTime = keyval._2.lastHeartBeatTime //5.3.2 Timeout formula : Current system time - Last heartbeat time > Timeout time ( Profile information * 1000) if (new Date().getTime - lastHeatBeatTime > ConfigUtil.`master.check.heartbeat.timeout` * 1000) true else false } //5.4 Remove the timeout Worker if(!timeOutWorkerMap.isEmpty) { // If it is to be removed Worker Set is not empty , Then remove this timeOutWorkerMap // Be careful : A two column set removes elements based on keys , So the last _._1 Is getting keys . regWorkerMap --= timeOutWorkerMap.map(_._1) } //5.5 Yes worker Sort by memory size in descending order , Print Worker //_._2 Get all WorkInfo object . val workerList = regWorkerMap.map(_._2).toList //5.6 Sort in descending order by memory . val sortedWorkerList = workerList.sortBy(_.mem).reverse //5.7 Print the results println(" In descending order of memory size Worker list : ") println(sortedWorkerList) } }
5.8 Multiple Worker Testing phase
demand
Modify the configuration file , Start multiple worker To test .
vernacular : Start a Worker, Just change it once Worker Under the project of application.conf The port number recorded in the file , And then turn it back on Worker that will do .
step
- Test start new Worker Whether it can register successfully
- stop it Worker, Test whether it can be deleted from the existing list
}if (new Date().getTime - lastHeatBeatTime > ConfigUtil.`master.check.heartbeat.timeout` * 1000) true else false
//5.4 Remove the timeout Worker
if(!timeOutWorkerMap.isEmpty) {
// If it is to be removed Worker Set is not empty , Then remove this timeOutWorkerMap
// Be careful : A two column set removes elements based on keys , So the last _.1 Is getting keys .
regWorkerMap --= timeOutWorkerMap.map(.1)
}
//5.5 Yes worker Sort by memory size in descending order , Print Worker
//.2 Get all WorkInfo object .
val workerList = regWorkerMap.map(.2).toList
//5.6 Sort in descending order by memory .
val sortedWorkerList = workerList.sortBy(.mem).reverse
//5.7 Print the results
println(" In descending order of memory size Worker list : ")
println(sortedWorkerList)
}
}
#### 5.8 Multiple Worker Testing phase
** demand **
Modify the configuration file , Start multiple worker To test .
> vernacular : Start a Worker, Just change it once Worker Under the project of application.conf The port number recorded in the file , And then turn it back on Worker that will do .
** step **
1. Test start new Worker Whether it can register successfully
2. stop it Worker, Test whether it can be deleted from the existing list
边栏推荐
- [cloud native] what is the "grid" of service grid?
- Neglected problem: test environment configuration management
- How to modify icons in VBS or VBE
- Nature microbiology | viral genomes in six deep-sea sediments that can infect Archaea asgardii
- Li Kou brush question diary /day7/2022.6.29
- 怎么开户才是安全的,
- 78 year old professor Huake impacts the IPO, and Fengnian capital is expected to reap dozens of times the return
- Li Kou brush question diary /day6/6.28
- With an estimated value of 90billion, the IPO of super chip is coming
- 1、 Introduction to C language
猜你喜欢
Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines
删除二叉搜索树中的节点附图详解
TorchDrug教程
Halcon模板匹配
基于NCF的多模块协同实例
Thawte通配符SSL证书提供的类型有哪些
爬虫(6) - 网页数据解析(2) | BeautifulSoup4在爬虫中的使用
Li Kou brush question diary /day7/6.30
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
Once the "king of color TV", he sold pork before delisting
随机推荐
Scala基础教程--12--读写数据
PB的扩展DLL开发(超级篇)(七)
Scala基础教程--14--隐式转换
ITSS运维能力成熟度分级详解|一文搞清ITSS证书
6.26CF模拟赛B:数组缩减题解
同事悄悄告诉我,飞书通知还能这样玩
Grain Mall (I)
ByteDance dev better technology salon was successfully held, and we joined hands with Huatai to share our experience in improving the efficiency of web research and development
[system disk back to U disk] record the operation of system disk back to U disk
爬虫(6) - 网页数据解析(2) | BeautifulSoup4在爬虫中的使用
力扣刷题日记/day8/7.1
【210】PHP 定界符的用法
Blue bridge: sympodial plant
LD_ LIBRARY_ Path environment variable setting
2022年全国CMMI认证补贴政策|昌旭咨询
【系统盘转回U盘】记录系统盘转回U盘的操作
Unity 制作旋转门 推拉门 柜门 抽屉 点击自动开门效果 开关门自动播放音效 (附带编辑器扩展代码)
中国农科院基因组所汪鸿儒课题组诚邀加入
Scala基础教程--18--集合(二)
李迟2022年6月工作生活总结