当前位置:网站首页>Scala concurrent programming (II) akka
Scala concurrent programming (II) akka
2022-07-05 23:17:00 【Shawn Jeon】
Scala Concurrent programming Akka
summary
- Akka It's using Scala Developed library , It's based on event driven , Toolkit for building highly concurrent projects
- Akka characteristic :
- Provide asynchronous based non blocking , High performance event driven programming model
- Built in fault tolerance mechanism , allow Actor When something goes wrong , Restore or reset
- Lightweight event handling ( Every time GB Heap memory millions Actor. *
Lightweight event processing and heavyweight division , It mainly depends on whether it depends on the operating system and hardware , Dependence is a heavyweight , Independence is lightweight
- High concurrency applications can be built on a single machine , It can also build distributed applications in the network
Akka Communication process
- Students create a ActorSystem
- adopt ActorSystem To create a ActorRef( Teacher's quotation ), And send the message to ActorRef
- ActorRef Send message to Message Dispatcher( Message distributor )
- Message Dispatcher Save messages to the destination in order Actor Of MailBox in
- Message Dispatcher take MailBox Put it in a thread
- MailBox Take out messages in order , Finally pass it to TeacherActor Receiving method
API Introduce :
- ActorSystem: Responsible for creating and supervising Actor
- ActorSystem Is a singleton object , Many can be created through it Actor
- Use
context.system
You can get the management information Actor Of ActorSystem References to
- Realization Actor class
- Define class or singleton object inheritance Actor(
import akka.actor.Actor
)- Realization receive How to receive the message ( No need to add loop& react Method )
- Can achieve preStart() Method ( Optional ), The method in Actor Execute after the object is built , stay Actor Only once in the life cycle
- load Actor
- To create a Akka Of Actor object , Must be created first ActorSystem
- call ActorSystem.actorOf(Props(Actor object ), “Actor name ”) To load the Actor
Actor Path
- every last Actor There is one. Path, This path can be externally referenced
type | route |
---|---|
Local Actor | akka://actorSystem name /user/Actor name |
long-range Actor | akka.tcp://[email protected] Address :port/user/Actor name |
Introductory cases
- adopt ActorSystem Load two Actor(SenderActor& ReceiverActor), And from SenderActor Send a message , stay ReceiverActor receive , Then reply to the message
package com.akka.ex1
/**
* Message format of task submission
*
* @param msg Send a message
*/
case class SubmitTaskMessage(msg: String)
/**
* The format of the receipt information after the task is submitted successfully
*
* @param msg Receipt information
*/
case class SuccessSubmitTaskMessage(msg: String)
package com.akka.ex1
import akka.actor.Actor
object SenderActor extends Actor {
override def receive: Receive = {
// receive Entrance A message sent : start
case "start" => {
println("SenderActor, received: start!")
// obtain ReceiverActor The path of
val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")
// to ReceiverActor Send a message
receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.")
}
// receive ReceiverActor Return receipt information
case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}")
}
}
package com.akka.ex1
import akka.actor.Actor
object ReceiverActor extends Actor {
override def receive: Receive = {
// receive SenderActor A message sent
case SubmitTaskMessage(msg) => {
println(s"ReceiverActor, received: ${msg}")
// to SenderActor Reply message
sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.")
}
}
}
package com.akka.ex1
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Entrance {
def main(args: Array[String]): Unit = {
/**
* establish ActorSystem, Used to be responsible for creating and supervising Actor
*
* @param name : scala.Predef.String to ActorSystem Set the name
* @param config : com.typesafe.config.Config Configuration environment
*/
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
/**
* adopt ActorSystem To load custom Actor object
*
* @param props : akka.actor.Props Specify the Actor Companion
* @param name : scala.Predef.String To assign to Actor Object setting name
*/
val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
// Must be given to each Actor Set the name , Otherwise, we can't get from SenderActor Through internal context.actorSelection The way , get ReceiverActor The object of
// And will prompt Actor[akka://actorSystem/user/receiverActor] was not delivered
actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
// to SenderActor send out "start" character string
senderActor ! "start"
}
}
SenderActor, received: start!
ReceiverActor, received: Hello ReceiverActor!, This is SenderActor.
SenderActor, received: Hi!, This is ReceiverActor.
Timed mission cases
- adopt
ActorSystem.scheduler.schedule() Method
, Start timing task - Usage mode 1:
final def schedule(
initialDelay : FiniteDuration, // Start for the first time , Press this to set the time , Execution after delay
interval : FiniteDuration, // How often do I execute ( Start for the first time , Execute immediately , No delay
receiver : ActorRef, // Set the destination to receive messages Actor
message : Any) // Message to send
(implicit executor : ExecutionContext, sender : ActorRef = {}) // Implicit parameter , Need to import
- Usage mode 2:
final def schedule(
initialDelay : FiniteDuration, // Start for the first time , Press this to set the time , Execution after delay
interval : FiniteDuration // How often do I execute ( Start for the first time , Execute immediately , No delay
)(f : => Unit) // Functions to be executed regularly ( news
(implicit executor : ExecutionContext) // Implicit parameter , Need to import
package com.akka.ex2
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object MainActor {
object ReceiverActor extends Actor {
override def receive: Receive = {
case x => println(x)
}
}
def main(args: Array[String]): Unit = {
// establish ActorSystem, Used to be responsible for creating and supervising Actor
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
// adopt ActorSystem To load custom Actor object
val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
// Import implicit parameters & transformation
import actorSystem.dispatcher
import scala.concurrent.duration._
// By timer , Give to regularly ReceiverActor Send a message
// The way 1: Use the provided Any Message of data type parameter
actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.")
// The way 2: Messages using custom functions
actorSystem.scheduler.schedule(0 seconds, 5 seconds) {
receiverActor ! "Hello ReceiverActor!, 222."
}
}
}
Communication case between two processes
- WorkerActor send out "connect" A message to MasterActor
- MasterActor reply "success" A message to WorkerActor
- WorkerActor Receive and print the received message
package com.akka.master
import akka.actor.Actor
object MasterActor extends Actor {
override def receive: Receive = {
case "setup" => println("MasterActor started!")
// receive WorkerActor A message sent
case "connect" => {
println("MasterActor, received: connect!")
// To the sender (WorkerActor) Return receipt information
sender ! "success"
}
}
}
package com.akka.master
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Entrance {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")
// to MasterActor Send a message
masterActor ! "setup"
}
}
package com.akka.worker
import akka.actor.Actor
// WorkerActor The path of : akka.tcp://[email protected]:8081/user/workerActor
object WorkerActor extends Actor {
override def receive: Receive = {
case "setup" => {
println("WorkerActor started!")
// Remote access MasterActor
val masterActor = context.system.actorSelection("akka.tcp://[email protected]:8080/user/masterActor")
// to MasterActor Send string connect
masterActor ! "connect"
}
// receive MasterActor A message sent
case "success" => println("MasterActor, received: success!")
}
}
package com.akka.worker
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Entrance {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")
// to WorkerActor Send a message
workerActor ! "setup"
}
}
Simple version of Spark Communication framework implementation case
- simulation Spark Of Master With many Worker Communication for
- Operational steps :
start-up MasterActor
1.1 MasterActor After the object is built ,Turn on timed tasks ( For self-test , For removing timeouts WorkerActor
start-up WorkerActor
2.1 WorkerActor After the object is built , Encapsulate your own information intoRegistration information
after , issue MasterActorMasterActor receive WorkerActor Registration information , And save
3.1 to WorkerActor Receipt informationWorkerActor After requesting registration , Received information , And print Connection is successful!
4.1 Turn on timed tasks , to MasterActor Send a heartbeat messageMasterActor receive WorkerActor Heartbeat message from , Will be WorkerActor The last heartbeat time in the registration information of is updated to the current time
project name | explain |
---|---|
scala-spark-akka-common | Store public message entity classes |
scala-spark-akka-master | Akka Master node |
scala-spark-akka-worker | Akka Worker node |
package com.akka.spark.common
/**
* Used to save registered WorkerActor Of information
*
* @param workerId : WorkerActor Of Id(UUID
* @param cpuCores : WorkerActor Of CPU Check the number
* @param memory : WorkerActor The memory size of
* @param lastHeartBeatTime : Last heartbeat time
*/
case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long)
package com.akka.spark.common
/**
* WorkerActor Class that submits registration information
*
* @param workerId : WorkerActor Of Id(UUID
* @param cpuCores : WorkerActor Of CPU Check the number
* @param memory : WorkerActor The memory size of
*/
case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int)
/** The singleton object returned after successful registration */
case object RegisterSuccessMessage
/**
* WorkerActor Regularly trigger the heartbeat to MasterActor Information class of
*
* @param workerId : WorkerActor Of Id(UUID
* @param cpuCores : WorkerActor Of CPU Check the number
* @param memory : WorkerActor The memory size of
*/
case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int)
package com.akka.spark.master
import com.typesafe.config.{Config, ConfigFactory}
// Class used to read configuration file information
object ConfigUtils {
// 1. Get profile object
private val config: Config = ConfigFactory.load()
// 2. Get checked WorkerActor The interval between heartbeats
val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")
// 3. obtain WorkerActor Heartbeat timeout
val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout")
}
package com.akka.spark.master
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Master {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
actorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
package com.akka.spark.master
import java.util.Date
import akka.actor.Actor
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage}
object MasterActor extends Actor {
// 1. Define a variable Map aggregate , Used to save registered WorkerActor Information
private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]()
// MasterActor Regular inspection WorkerActor heartbeat , Will time out Worker remove
override def preStart(): Unit = {
// 1. Import time implicit parameters & transformation
import context.dispatcher
import scala.concurrent.duration._
// 2. Start timing task (MasterActor Self test removal timeout WorkerActor)
context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) {
// 3. Filter timeout WorkerActor( Returns the filtered workerId aggregate
val timeOutWorkerMap = regWorkerMap.filter {
keyVal => // keyVal Data format : workerId -> WorkerInfo(workerId, cpuCores, memory, lastHeartBeatTime)
// 3.1 Get current WorkerActor The last heartbeat time of the object
val lastHeartBeatTime = keyVal._2.lastHeartBeatTime
// 3.2 If the timeout , be true, otherwise false ( current time - Last heartbeat time ) > Maximum timeout * 1000
if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false
}
// 4. The to be removed has timed out workerId aggregate
if (!timeOutWorkerMap.isEmpty) {
regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b)
}
// 5. Effective WorkerActor, Sort in descending order by memory size
val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverse
println(s"Active WorkerActors: ${workerList}")
}
}
override def receive: Receive = {
// receive WorkerActor Registration information
case WorkerRegisterMessage(workerId, cpuCores, memory) => {
// Print the received registration information
println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}")
// Save the registration information to the hash table & And record the last heartbeat time
regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)
// For registered WorkerActor Receipt information
sender ! RegisterSuccessMessage
}
// receive WorkerActor Heartbeat message from
case WorkerHeartBeatMessage(workerId, cpuCores, memory) => {
println(s"MasterActor, received heartbeat: ${workerId}")
// Update the specified WorkerActor The last heartbeat time of the object
regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)
}
}
}
package com.akka.spark.worker
import com.typesafe.config.{Config, ConfigFactory}
// Class used to read configuration file information
object ConfigUtils {
// 1. Get profile object
private val config: Config = ConfigFactory.load()
// 2. obtain WorkerActor The interval between heartbeats
val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}
package com.akka.spark.worker
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Worker {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
actorSystem.actorOf(Props(WorkerActor), "workerActor")
}
}
package com.akka.spark.worker
import java.util.UUID
import akka.actor.{Actor, ActorSelection}
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage}
import scala.util.Random
object WorkerActor extends Actor {
// Express MasterActor References to
private var masterActor: ActorSelection = _
// WorkerActor Registration information
private var workerId: String = _
private var cpuCores: Int = _ // CPU Check the number
private var memory: Int = _ // Memory size
private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // CPU Random value range of core number
private val memoryList = List(512, 1024, 2048, 4096) // Random range of memory size
override def preStart(): Unit = {
// obtain MasterActor References to
masterActor = context.system.actorSelection("akka.tcp://[email protected]:8080/user/masterActor")
// Random setting number
workerId = UUID.randomUUID().toString
// Random selection WorkerActor Of CPU Number of cores and memory size
val r = new Random()
cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length))
memory = memoryList(r.nextInt(memoryList.length))
// encapsulation WorkerActor Registration information
val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory)
// issue MasterActor
masterActor ! registerMessage
}
override def receive: Receive = {
// Receipt information of successful registration
case RegisterSuccessMessage => {
println("Connection is successful!")
// 1. Import time implicit parameters & transformation
import context.dispatcher
import scala.concurrent.duration._
// Give to regularly MasterActor Send a heartbeat message
context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) {
masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory)
}
}
}
}
If you find it helpful , Welcome to like it ~ thank you !!
边栏推荐
- Design and implementation of secsha system
- Use the rewrite rule to rewrite all accesses to the a domain name to the B domain name
- MySQL (2) -- simple query, conditional query
- 3:第一章:认识JVM规范2:JVM规范,简介;
- Summary of binary tree recursive routines
- Object detection based on impulse neural network
- There are 14 God note taking methods. Just choose one move to improve your learning and work efficiency by 100 times!
- 如何快速理解复杂业务,系统思考问题?
- 6-axis and 9-axis IMU attitude estimation
- [speech processing] speech signal denoising and denoising based on MATLAB low-pass filter [including Matlab source code 1709]
猜你喜欢
fibonacci search
CJ mccullem autograph: to dear Portland
Hcip day 12 (BGP black hole, anti ring, configuration)
Go语言实现原理——Map实现原理
[speech processing] speech signal denoising and denoising based on MATLAB low-pass filter [including Matlab source code 1709]
Mathematical formula screenshot recognition artifact mathpix unlimited use tutorial
Registration of Electrical Engineering (elementary) examination in 2022 and the latest analysis of Electrical Engineering (elementary)
Development specification: interface unified return value format [resend]
第十七周作业
视频标准二三事
随机推荐
asp. Net pop-up layer instance
3 find the greatest common divisor and the least common multiple
第十七周作业
(4)UART应用设计及仿真验证2 —— TX模块设计(无状态机)
Solution to the packaging problem of asyncsocket long connecting rod
Getting started stm32--gpio (running lantern) (nanny level)
使用rewrite规则实现将所有到a域名的访问rewrite到b域名
(4)UART应用设计及仿真验证2 —— RX模块设计(无状态机)
How to quickly understand complex businesses and systematically think about problems?
2:第一章:认识JVM规范1:JVM简介;
Marginal probability and conditional probability
Commonly used probability distributions: Bernoulli distribution, binomial distribution, polynomial distribution, Gaussian distribution, exponential distribution, Laplace distribution and Dirac delta d
Southeast Asia e-commerce guide, how do sellers layout the Southeast Asia market?
CorelDRAW plug-in -- GMS plug-in development -- new project -- macro recording -- VBA editing -- debugging skills -- CDR plug-in (2)
yate.conf
Use of shell:for loop
YML configuration, binding and injection, verification, unit of bean
C Primer Plus Chapter 9 question 9 POW function
Openresty ngx Lua regular expression
【Note17】PECI(Platform Environment Control Interface)