当前位置:网站首页>Scala concurrent programming (II) akka
Scala concurrent programming (II) akka
2022-07-05 23:17:00 【Shawn Jeon】
Scala Concurrent programming Akka
summary
- Akka It's using Scala Developed library , It's based on event driven , Toolkit for building highly concurrent projects
- Akka characteristic :
- Provide asynchronous based non blocking , High performance event driven programming model
- Built in fault tolerance mechanism , allow Actor When something goes wrong , Restore or reset
- Lightweight event handling ( Every time GB Heap memory millions Actor. *
Lightweight event processing and heavyweight division , It mainly depends on whether it depends on the operating system and hardware , Dependence is a heavyweight , Independence is lightweight
- High concurrency applications can be built on a single machine , It can also build distributed applications in the network
Akka Communication process
- Students create a ActorSystem
- adopt ActorSystem To create a ActorRef( Teacher's quotation ), And send the message to ActorRef
- ActorRef Send message to Message Dispatcher( Message distributor )
- Message Dispatcher Save messages to the destination in order Actor Of MailBox in
- Message Dispatcher take MailBox Put it in a thread
- MailBox Take out messages in order , Finally pass it to TeacherActor Receiving method
API Introduce :
- ActorSystem: Responsible for creating and supervising Actor
- ActorSystem Is a singleton object , Many can be created through it Actor
- Use
context.system
You can get the management information Actor Of ActorSystem References to
- Realization Actor class
- Define class or singleton object inheritance Actor(
import akka.actor.Actor
)- Realization receive How to receive the message ( No need to add loop& react Method )
- Can achieve preStart() Method ( Optional ), The method in Actor Execute after the object is built , stay Actor Only once in the life cycle
- load Actor
- To create a Akka Of Actor object , Must be created first ActorSystem
- call ActorSystem.actorOf(Props(Actor object ), “Actor name ”) To load the Actor
Actor Path
- every last Actor There is one. Path, This path can be externally referenced
type | route |
---|---|
Local Actor | akka://actorSystem name /user/Actor name |
long-range Actor | akka.tcp://[email protected] Address :port/user/Actor name |
Introductory cases
- adopt ActorSystem Load two Actor(SenderActor& ReceiverActor), And from SenderActor Send a message , stay ReceiverActor receive , Then reply to the message
package com.akka.ex1
/**
* Message format of task submission
*
* @param msg Send a message
*/
case class SubmitTaskMessage(msg: String)
/**
* The format of the receipt information after the task is submitted successfully
*
* @param msg Receipt information
*/
case class SuccessSubmitTaskMessage(msg: String)
package com.akka.ex1
import akka.actor.Actor
object SenderActor extends Actor {
override def receive: Receive = {
// receive Entrance A message sent : start
case "start" => {
println("SenderActor, received: start!")
// obtain ReceiverActor The path of
val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")
// to ReceiverActor Send a message
receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.")
}
// receive ReceiverActor Return receipt information
case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}")
}
}
package com.akka.ex1
import akka.actor.Actor
object ReceiverActor extends Actor {
override def receive: Receive = {
// receive SenderActor A message sent
case SubmitTaskMessage(msg) => {
println(s"ReceiverActor, received: ${msg}")
// to SenderActor Reply message
sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.")
}
}
}
package com.akka.ex1
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Entrance {
def main(args: Array[String]): Unit = {
/**
* establish ActorSystem, Used to be responsible for creating and supervising Actor
*
* @param name : scala.Predef.String to ActorSystem Set the name
* @param config : com.typesafe.config.Config Configuration environment
*/
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
/**
* adopt ActorSystem To load custom Actor object
*
* @param props : akka.actor.Props Specify the Actor Companion
* @param name : scala.Predef.String To assign to Actor Object setting name
*/
val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
// Must be given to each Actor Set the name , Otherwise, we can't get from SenderActor Through internal context.actorSelection The way , get ReceiverActor The object of
// And will prompt Actor[akka://actorSystem/user/receiverActor] was not delivered
actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
// to SenderActor send out "start" character string
senderActor ! "start"
}
}
SenderActor, received: start!
ReceiverActor, received: Hello ReceiverActor!, This is SenderActor.
SenderActor, received: Hi!, This is ReceiverActor.
Timed mission cases
- adopt
ActorSystem.scheduler.schedule() Method
, Start timing task - Usage mode 1:
final def schedule(
initialDelay : FiniteDuration, // Start for the first time , Press this to set the time , Execution after delay
interval : FiniteDuration, // How often do I execute ( Start for the first time , Execute immediately , No delay
receiver : ActorRef, // Set the destination to receive messages Actor
message : Any) // Message to send
(implicit executor : ExecutionContext, sender : ActorRef = {}) // Implicit parameter , Need to import
- Usage mode 2:
final def schedule(
initialDelay : FiniteDuration, // Start for the first time , Press this to set the time , Execution after delay
interval : FiniteDuration // How often do I execute ( Start for the first time , Execute immediately , No delay
)(f : => Unit) // Functions to be executed regularly ( news
(implicit executor : ExecutionContext) // Implicit parameter , Need to import
package com.akka.ex2
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object MainActor {
object ReceiverActor extends Actor {
override def receive: Receive = {
case x => println(x)
}
}
def main(args: Array[String]): Unit = {
// establish ActorSystem, Used to be responsible for creating and supervising Actor
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
// adopt ActorSystem To load custom Actor object
val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
// Import implicit parameters & transformation
import actorSystem.dispatcher
import scala.concurrent.duration._
// By timer , Give to regularly ReceiverActor Send a message
// The way 1: Use the provided Any Message of data type parameter
actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.")
// The way 2: Messages using custom functions
actorSystem.scheduler.schedule(0 seconds, 5 seconds) {
receiverActor ! "Hello ReceiverActor!, 222."
}
}
}
Communication case between two processes
- WorkerActor send out "connect" A message to MasterActor
- MasterActor reply "success" A message to WorkerActor
- WorkerActor Receive and print the received message
package com.akka.master
import akka.actor.Actor
object MasterActor extends Actor {
override def receive: Receive = {
case "setup" => println("MasterActor started!")
// receive WorkerActor A message sent
case "connect" => {
println("MasterActor, received: connect!")
// To the sender (WorkerActor) Return receipt information
sender ! "success"
}
}
}
package com.akka.master
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Entrance {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")
// to MasterActor Send a message
masterActor ! "setup"
}
}
package com.akka.worker
import akka.actor.Actor
// WorkerActor The path of : akka.tcp://[email protected]:8081/user/workerActor
object WorkerActor extends Actor {
override def receive: Receive = {
case "setup" => {
println("WorkerActor started!")
// Remote access MasterActor
val masterActor = context.system.actorSelection("akka.tcp://[email protected]:8080/user/masterActor")
// to MasterActor Send string connect
masterActor ! "connect"
}
// receive MasterActor A message sent
case "success" => println("MasterActor, received: success!")
}
}
package com.akka.worker
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Entrance {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")
// to WorkerActor Send a message
workerActor ! "setup"
}
}
Simple version of Spark Communication framework implementation case
- simulation Spark Of Master With many Worker Communication for
- Operational steps :
start-up MasterActor
1.1 MasterActor After the object is built ,Turn on timed tasks ( For self-test , For removing timeouts WorkerActor
start-up WorkerActor
2.1 WorkerActor After the object is built , Encapsulate your own information intoRegistration information
after , issue MasterActorMasterActor receive WorkerActor Registration information , And save
3.1 to WorkerActor Receipt informationWorkerActor After requesting registration , Received information , And print Connection is successful!
4.1 Turn on timed tasks , to MasterActor Send a heartbeat messageMasterActor receive WorkerActor Heartbeat message from , Will be WorkerActor The last heartbeat time in the registration information of is updated to the current time
project name | explain |
---|---|
scala-spark-akka-common | Store public message entity classes |
scala-spark-akka-master | Akka Master node |
scala-spark-akka-worker | Akka Worker node |
package com.akka.spark.common
/**
* Used to save registered WorkerActor Of information
*
* @param workerId : WorkerActor Of Id(UUID
* @param cpuCores : WorkerActor Of CPU Check the number
* @param memory : WorkerActor The memory size of
* @param lastHeartBeatTime : Last heartbeat time
*/
case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long)
package com.akka.spark.common
/**
* WorkerActor Class that submits registration information
*
* @param workerId : WorkerActor Of Id(UUID
* @param cpuCores : WorkerActor Of CPU Check the number
* @param memory : WorkerActor The memory size of
*/
case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int)
/** The singleton object returned after successful registration */
case object RegisterSuccessMessage
/**
* WorkerActor Regularly trigger the heartbeat to MasterActor Information class of
*
* @param workerId : WorkerActor Of Id(UUID
* @param cpuCores : WorkerActor Of CPU Check the number
* @param memory : WorkerActor The memory size of
*/
case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int)
package com.akka.spark.master
import com.typesafe.config.{Config, ConfigFactory}
// Class used to read configuration file information
object ConfigUtils {
// 1. Get profile object
private val config: Config = ConfigFactory.load()
// 2. Get checked WorkerActor The interval between heartbeats
val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")
// 3. obtain WorkerActor Heartbeat timeout
val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout")
}
package com.akka.spark.master
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Master {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
actorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
package com.akka.spark.master
import java.util.Date
import akka.actor.Actor
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage}
object MasterActor extends Actor {
// 1. Define a variable Map aggregate , Used to save registered WorkerActor Information
private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]()
// MasterActor Regular inspection WorkerActor heartbeat , Will time out Worker remove
override def preStart(): Unit = {
// 1. Import time implicit parameters & transformation
import context.dispatcher
import scala.concurrent.duration._
// 2. Start timing task (MasterActor Self test removal timeout WorkerActor)
context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) {
// 3. Filter timeout WorkerActor( Returns the filtered workerId aggregate
val timeOutWorkerMap = regWorkerMap.filter {
keyVal => // keyVal Data format : workerId -> WorkerInfo(workerId, cpuCores, memory, lastHeartBeatTime)
// 3.1 Get current WorkerActor The last heartbeat time of the object
val lastHeartBeatTime = keyVal._2.lastHeartBeatTime
// 3.2 If the timeout , be true, otherwise false ( current time - Last heartbeat time ) > Maximum timeout * 1000
if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false
}
// 4. The to be removed has timed out workerId aggregate
if (!timeOutWorkerMap.isEmpty) {
regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b)
}
// 5. Effective WorkerActor, Sort in descending order by memory size
val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverse
println(s"Active WorkerActors: ${workerList}")
}
}
override def receive: Receive = {
// receive WorkerActor Registration information
case WorkerRegisterMessage(workerId, cpuCores, memory) => {
// Print the received registration information
println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}")
// Save the registration information to the hash table & And record the last heartbeat time
regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)
// For registered WorkerActor Receipt information
sender ! RegisterSuccessMessage
}
// receive WorkerActor Heartbeat message from
case WorkerHeartBeatMessage(workerId, cpuCores, memory) => {
println(s"MasterActor, received heartbeat: ${workerId}")
// Update the specified WorkerActor The last heartbeat time of the object
regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)
}
}
}
package com.akka.spark.worker
import com.typesafe.config.{Config, ConfigFactory}
// Class used to read configuration file information
object ConfigUtils {
// 1. Get profile object
private val config: Config = ConfigFactory.load()
// 2. obtain WorkerActor The interval between heartbeats
val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}
package com.akka.spark.worker
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Worker {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
actorSystem.actorOf(Props(WorkerActor), "workerActor")
}
}
package com.akka.spark.worker
import java.util.UUID
import akka.actor.{Actor, ActorSelection}
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage}
import scala.util.Random
object WorkerActor extends Actor {
// Express MasterActor References to
private var masterActor: ActorSelection = _
// WorkerActor Registration information
private var workerId: String = _
private var cpuCores: Int = _ // CPU Check the number
private var memory: Int = _ // Memory size
private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // CPU Random value range of core number
private val memoryList = List(512, 1024, 2048, 4096) // Random range of memory size
override def preStart(): Unit = {
// obtain MasterActor References to
masterActor = context.system.actorSelection("akka.tcp://[email protected]:8080/user/masterActor")
// Random setting number
workerId = UUID.randomUUID().toString
// Random selection WorkerActor Of CPU Number of cores and memory size
val r = new Random()
cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length))
memory = memoryList(r.nextInt(memoryList.length))
// encapsulation WorkerActor Registration information
val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory)
// issue MasterActor
masterActor ! registerMessage
}
override def receive: Receive = {
// Receipt information of successful registration
case RegisterSuccessMessage => {
println("Connection is successful!")
// 1. Import time implicit parameters & transformation
import context.dispatcher
import scala.concurrent.duration._
// Give to regularly MasterActor Send a heartbeat message
context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) {
masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory)
}
}
}
}
If you find it helpful , Welcome to like it ~ thank you !!
边栏推荐
- YML configuration, binding and injection, verification, unit of bean
- Using LNMP to build WordPress sites
- Three.JS VR看房
- Leecode learning notes
- Data type, variable declaration, global variable and i/o mapping of PLC programming basis (CoDeSys)
- C Primer Plus Chapter 9 question 10 binary conversion
- Object detection based on impulse neural network
- Southeast Asia e-commerce guide, how do sellers layout the Southeast Asia market?
- 3D reconstruction of point cloud
- MySQL (2) -- simple query, conditional query
猜你喜欢
[digital signal denoising] improved wavelet modulus maxima digital signal denoising based on MATLAB [including Matlab source code 1710]
TypeError: this. getOptions is not a function
Go language implementation principle -- lock implementation principle
2022 registration examination for safety management personnel of hazardous chemical business units and simulated reexamination examination for safety management personnel of hazardous chemical busines
Matlab smooth curve connection scatter diagram
LabVIEW打开PNG 图像正常而 Photoshop打开得到全黑的图像
Common JVM tools and optimization strategies
Data analysis - Thinking foreshadowing
两数之和、三数之和(排序+双指针)
Thoroughly understand JVM class loading subsystem
随机推荐
Krypton Factor-紫书第七章暴力求解
golang代码检查工具
判断二叉树是否为完全二叉树
Alibaba Tianchi SQL training camp task4 learning notes
Debian 10 installation configuration
There are 14 God note taking methods. Just choose one move to improve your learning and work efficiency by 100 times!
[speech processing] speech signal denoising and denoising based on Matlab GUI low-pass filter [including Matlab source code 1708]
LeetCode145. Post order traversal of binary tree (three methods of recursion and iteration)
Practice of concurrent search
The PNG image is normal when LabVIEW is opened, and the full black image is obtained when Photoshop is opened
数据库基础知识(面试)
2022 R2 mobile pressure vessel filling review simulation examination and R2 mobile pressure vessel filling examination questions
Finally understand what dynamic planning is
Leetcode weekly The 280 game of the week is still difficult for the special game of the week's beauty team ~ simple simulation + hash parity count + sorting simulation traversal
C Primer Plus Chapter 9 question 10 binary conversion
Solution to the packaging problem of asyncsocket long connecting rod
Week 17 homework
如何快速理解复杂业务,系统思考问题?
Shell: operator
From the perspective of quantitative genetics, why do you get the bride price when you get married