当前位置:网站首页>Scala basic tutorial -- 19 -- actor
Scala basic tutorial -- 19 -- actor
2022-07-04 18:53:00 【Empty.】
Scala Basic course –19–Actor
Chapter goal
- understand Actor Overview of
- master Actor Send and receive messages
- master WordCount Case study
1. Actor Introduce
Scala Medium Actor The concurrent programming model can be used to develop better than Java More thread efficient concurrent programs . We learn Scala Actor The purpose of is mainly for follow-up learning Akka To prepare for .
1.1 Java Problems of concurrent programming
stay Java In concurrent programming , Each object has a logical monitor (monitor), Can be used to control multi-threaded access to objects . We add sychronized Keyword to mark , Synchronous lock access is required . such , The locking mechanism ensures that only one thread accesses the shared data at the same time . But there is competition for resources in this way 、 And deadlocks , The bigger the program, the more troublesome the problem is .
thread deadlock
1.2 Actor Concurrent programming model
Actor Concurrent programming model , yes Scala A method provided to programmers Java Concurrent programming is a completely different model of concurrent programming , It is a concurrency mechanism based on event model .Actor The concurrent programming model is one that does not share data , A concurrent programming pattern that relies on message passing , Effectively avoid resource competition 、 Deadlock, etc .
1.3 Java Comparison of concurrent programming Actor Concurrent programming
Java Built in threading model | Scala Actor Model |
---|---|
" Shared data - lock " Model (share data and lock) | share nothing |
Every object There is one monitor, Monitor thread access to shared data | Don't share data ,Actor Through between Message Communications |
Lock code use synchronized identification | |
The deadlock problem | |
Each thread executes sequentially | Every Actor The internal execution is sequential |
Be careful :
scala stay 2.11.x Version added Akka Concurrent programming framework , The old version is obsolete .
Actor The programming model and Akka It's like , We are here to learn Actor The purpose of is to learn Akka To prepare for .
2. establish Actor
We can use the class (class) Or a singleton object (object), Inherit Actor Idiosyncratic way , To create Actor object .
2.1 step
- Definition class or object Inherit Actor Trait
- rewrite act Method
- call Actor Of start Method execution Actor
Be careful : Every Actor Is executed in parallel , Mutual interference .
2.2 Case a : adopt class Realization
demand
- Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
- Use class Inherit Actor Realization .( If you need to create multiple identical... In the program Actor)
Reference code
import scala.actors.Actor
// Case study :Actor Introduction to concurrent programming , adopt class establish Actor
object ClassDemo01 {
// demand : Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
//1. establish Actor1, For printing 1~10 The number of .
class Actor1 extends Actor {
override def act(): Unit = for (i <- 1 to 10) println("actor1: " + i)
}
//2. establish Actor2, For printing 11~20 The number of .
class Actor2 extends Actor {
override def act(): Unit = for (i <- 11 to 20) println("actor2: " + i)
}
def main(args: Array[String]): Unit = {
//3. Start two Actor.
new Actor1().start()
new Actor2().start()
}
}
2.3 Case 2 : adopt object Realization
demand
- Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
- Use object Inherit Actor Realization .( If only one is created in the program Actor)
Reference code
import scala.actors.Actor
// Case study :Actor Introduction to concurrent programming , adopt object establish Actor
object ClassDemo02 {
// demand : Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
//1. establish Actor1, For printing 1~10 The number of .
object Actor1 extends Actor {
override def act(): Unit = for (i <- 1 to 10) println("actor1: " + i)
}
//2. establish Actor2, For printing 11~20 The number of .
object Actor2 extends Actor {
override def act(): Unit = for (i <- 11 to 20) println("actor2: " + i)
}
def main(args: Array[String]): Unit = {
//3. Start two Actor.
Actor1.start()
Actor2.start()
}
}
2.4 Actor Program operation process
- call start() Method start up Actor
- Automatic execution act() Method
- towards Actor Send a message
- act After method execution , The program will call **exit()** Method to end program execution .
3. Send a message / receive messages
We introduced Actor When , Said Actor It's based on events ( news ) The concurrent programming model of , that Actor How do you send and receive messages ?
3.1 Usage mode
3.1.1 Send a message
We can send messages in three ways :
! | Send asynchronous message , no return value |
---|---|
!? | Send synchronization message , Wait for the return value |
!! | Send asynchronous message , The return value is Future[Any] |
for example : To give actor1 Send an asynchronous string message , Use the following code :
actor1 ! " Hello !"
3.1.2 receive messages
Actor Use in receive Method
To receive messages , Need to give receive Method passes in a partial function
{
case Variable name 1: Message type 1 => Business processing 1
case Variable name 2: Message type 2 => Business processing 2
...
}
Be careful : receive Method only receives a message once , Continue after receiving act Method
3.2 Case a : Send and receive a sentence
demand
- Create two Actor(ActorSender、ActorReceiver)
- ActorSender Send an asynchronous string message to ActorReceiver
- ActorReceiver After receiving the message , Print out
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-wLuMBeNp-1656569569791)(D:/%E6%95%99%E6%A1%88/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%B0%B1%E4%B8%9A%E7%8F%AD/%E5%8C%97%E4%BA%AC31~40%E6%9C%9F/%E5%8C%97%E4%BA%AC34%E6%9C%9F/Day03_Scala%E7%AC%AC%E4%B8%89%E5%A4%A9/%E8%AE%B2%E4%B9%89/%E5%90%88%E5%B9%B6%E7%89%88/assets/1552791021244.png)]
Reference code
// Case study : use Asynchronous no return form , Send a message .
object ClassDemo03 {
//1. Create a to send messages Actor, ActorSender, Send a sentence to ActorReceiver
object ActorSender extends Actor {
override def act(): Unit = {
// Send a sentence to ActorReceiver
ActorReceiver ! " How do you do , I am a ActorSender!"
// Send the second sentence
ActorReceiver ! " What's your name ? "
}
}
//2. Create a to receive messages Actor, ActorReceiver
object ActorReceiver extends Actor {
override def act(): Unit = {
// Receive messages sent .
receive {
case x: String => println(x)
}
}
}
def main(args: Array[String]): Unit = {
//3. Start two Actor
ActorSender.start()
ActorReceiver.start()
}
}
3.3 Case 2 : Continuously send and receive messages
If we want to achieve ActorSender Keep sending messages , ActorReceiver Be able to receive messages all the time
, How to achieve it ?
answer : We only need to use one while(true) loop , Keep calling receive Just come and receive the message .
demand
- Create two Actor(ActorSender、ActorReceiver)
- ActorSender Continue to send an asynchronous string message to ActorReceiver
- ActorReceiver Continue to receive messages , And print it out
Reference code
// Case study :Actor Continuously send and receive messages .
object ClassDemo04 {
//1. Create a to send messages Actor, ActorSender, Send a sentence to ActorReceiver
object ActorSender extends Actor {
override def act(): Unit = {
while(true) {
// Send a sentence to ActorReceiver
ActorReceiver ! " How do you do , I am a ActorSender!"
// Sleep 3 second .
TimeUnit.SECONDS.sleep(3) // The unit is : second
}
}
}
//2. Create a to receive messages Actor, ActorReceiver
object ActorReceiver extends Actor {
override def act(): Unit = {
// Receive messages sent , Keep receiving .
while(true) {
receive {
case x: String => println(x)
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Start two Actor
ActorSender.start()
ActorReceiver.start()
}
}
3.4 Case three : Optimize continuous message reception
The above code , Yes, it is while Loop to continuously receive messages , Doing so may encounter the following problems :
- If at present Actor No message received , The thread will be blocked
- If there are a lot of Actor, It may cause many threads to be blocked
- Every time new news comes , Recreate the thread to handle
- Frequent thread creation 、 Destroy and switch , It will affect the operation efficiency
In view of the above situation , We can use loop(), combination react()
To reuse threads , This way ratio while loop + receive()
More efficient .
demand
- Create two Actor(ActorSender、ActorReceiver)
- ActorSender Continue to send an asynchronous string message to ActorReceiver
- ActorReceiver Continue to receive messages , And print it out
Be careful : Use loop + react Rewrite the above case .
Reference code
// Case study : Use loop + react Loop to receive messages .
object ClassDemo05 {
//1. Create a to send messages Actor, ActorSender, Send a sentence to ActorReceiver
object ActorSender extends Actor {
override def act(): Unit = {
while(true) {
// Send a sentence to ActorReceiver
ActorReceiver ! " How do you do , I am a ActorSender!"
// Sleep 3 second .
TimeUnit.SECONDS.sleep(3) // The unit is : second
}
}
}
//2. Create a to receive messages Actor, ActorReceiver
object ActorReceiver extends Actor {
override def act(): Unit = {
// Receive messages sent , Keep receiving .
loop{
react {
case x: String => println(x)
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Start two Actor
ActorSender.start()
ActorReceiver.start()
}
}
3.5 Case four : Send and receive custom messages
The messages we sent earlier are of string type ,Actor Sending custom messages is also supported in , for example : Use the sample class to encapsulate the message , Then send processing .
3.5.1 Example 1 : Send synchronization with return message
demand
- Create a MsgActor, And send it a synchronization message , The message contains two fields (id、message)
- MsgActor Reply to a message , The message contains two fields (message、name)
- Print a reply message
Be careful :
- Use
!?
To send synchronization messages- stay Actor Of act In the method , have access to sender Get the sender's Actor quote
Reference code
// Case study : Actor Send and receive custom messages , use Synchronization has the form of return
object ClassDemo06 {
//1. Define two sample classes Message( Means send data ), ReplyMessage( Indicates the returned data .)
case class Message(id: Int, message: String) // Custom send message Sample class
case class ReplyMessage(message: String, name: String) // Custom receive message Sample class
//2. Create a MsgActor, For reception MainActor Messages sent , And reply to it with a message .
object MsgActor extends Actor {
override def act(): Unit = {
//2.1 receive Lord Actor(MainActor) Messages sent .
loop {
react {
// Combined with partial function
case Message(id, message) => println(s" I am a MsgActor, The message I received was : ${
id}, ${
message}")
//2.2 to MainActor Reply to a message .
//sender: Get the message sender's Actor object
sender ! ReplyMessage(" I'm very bad , Smoked to death !...", " Che Lei ")
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Turn on MsgActor
MsgActor.start()
//4. adopt MainActor, to MsgActor Send a Message object .
// use !? Synchronization has returned .
val reply:Any = MsgActor !? Message(1, " How do you do , I am a MainActor, I'm sending you a message !")
//resutl Indicates the finally received Return message .
val result = reply.asInstanceOf[ReplyMessage]
//5. Output results .
println(result)
}
}
3.5.2 Example 2 : Send asynchronous no return message
demand
Create a MsgActor, And send it an asynchronous no return message , The message contains two fields (id, message)
Be careful : Use
!
Send asynchronous no return message
Reference code
// Case study : Actor Send and receive custom messages , use asynchronous No return form
object ClassDemo07 {
//1. Define a sample class Message( Means send data )
case class Message(id: Int, message: String) // Custom send message Sample class
//2. Create a MsgActor, For reception MainActor Messages sent , And print .
object MsgActor extends Actor {
override def act(): Unit = {
//2.1 receive Lord Actor(MainActor) Messages sent .
loop {
react {
// Combined with partial function
case Message(id, message) => println(s" I am a MsgActor, The message I received was : ${
id}, ${
message}")
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Turn on MsgActor
MsgActor.start()
//4. adopt MainActor, to MsgActor Send a Message object .
// use ! Asynchronous no return
MsgActor ! Message(1, " I use Asynchronous no return Send messages in the form of !")
}
}
3.5.3 Example 3 : Send asynchronous return message
demand
- Create a MsgActor, And send it an asynchronous return message , The message contains two fields (id、message)
- MsgActor Reply to a message , The message contains two fields (message、name)
- Print a reply message
Be careful :
- Use
!!
Send asynchronous return message- After sending , The return type is Future[Any] The object of
- Future Represents the encapsulation of asynchronously returned data , Although obtained Future The return value of , But not necessarily worth it , The message may not return until some time in the future
- Future Of isSet() Check whether the return message has been received ,apply() Method to get the return data
The illustration
Reference code
// Case study : Actor Send and receive custom messages , use Asynchrony has the form of return
object ClassDemo08 {
//1. Define two sample classes Message( Means send data ), ReplyMessage( Indicates the returned data .)
case class Message(id: Int, message: String) // Custom send message Sample class
case class ReplyMessage(message: String, name: String) // Custom receive message Sample class
//2. Create a MsgActor, For reception MainActor Messages sent , And reply to it with a message .
object MsgActor extends Actor {
override def act(): Unit = {
//2.1 receive Lord Actor(MainActor) Messages sent .
loop {
react {
// Combined with partial function
case Message(id, message) => println(s" I am a MsgActor, The message I received was : ${
id}, ${
message}")
//2.2 to MainActor Reply to a message .
//sender: Get the message sender's Actor object
sender ! ReplyMessage(" I'm very bad , Smoked to death !...", " Sugar ")
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Turn on MsgActor
MsgActor.start()
//4. adopt MainActor, to MsgActor Send a Message object .
// use !! Asynchronous return .
val future: Future[Any] = MsgActor !! Message(1, " How do you do , I am a MainActor, I'm sending you a message !")
//5. because future Data may not be available immediately , So we need to check .
//Future Of isSet() Check whether the return message has been received ,apply() Method to get the return data
//!future.isSet Express : No specific return message was received , All the time .
while(!future.isSet){
}
// adopt Future Of apply() Method to get the returned data .
val result = future.apply().asInstanceOf[ReplyMessage]
//5. Output results .
println(result)
}
}
4. Case study : WordCount
4.1 demand
Next , We're going to use Actor Concurrent programming model implementation Word statistics of multiple files
.
Case introduction
Given several text files ( Text files are separated by spaces ), Use Actor Concurrent programming to count the number of words .
Thought analysis
Realize the idea
- MainActor Get the file for word statistics
- Create corresponding according to the number of files WordCountActor
- Encapsulate the file name as a message and send it to WordCountActor
- WordCountActor receive messages , And count the word count of a single file
- Send word count results to MainActor
- MainActor Wait for all WordCountActor Have successfully returned the message , Then merge the results
4.2 Step one : Get file list
Realize the idea
Under the current project data There is... Under the folder : 1.txt, 2.txt Two text files , The specific storage contents are as follows :
1.txt The storage contents of the text file are as follows :
hadoop sqoop hadoop hadoop hadoop flume hadoop hadoop hadoop spark
2.txt The storage contents of the text file are as follows :
flink hadoop hive hadoop sqoop hadoop hadoop hadoop hadoop spark
Get the path of the above two text files , And print the results to the console .
Reference code
object MainActor {
def main(args: Array[String]): Unit = {
//1. Get the path of all files to be counted .
//1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/
var dir = "./data/"
//1.2 Get the folder , All file names .
var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt")
//1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt
var fileDirList = fileNameList.map(dir + _)
//println(fileDirList)
}
}
4.3 Step two : establish WordCountActor
Realize the idea
- Create a corresponding number of... According to the number of files WordCountActor object .
- In order to facilitate the subsequent sending of messages to Actor, Each one Actor Associated with the file name
Implementation steps
- establish WordCountActor
- Convert the file list to WordCountActor
- For subsequent convenience, send a message to Actor, take Actor List and file list zipped together
- Print test
Reference code
WordCountActor.scala Code in file
//2.1 First create WordCountActor class , Used to obtain WordCountActor object . // establish WordCountActor class , every last WordCountActor object , Count a file . class WordCountActor extends Actor { override def act(): Unit = { } }
MainActor.scala Code in file
object MainActor { def main(args: Array[String]): Unit = { //1. Get the path of all files to be counted . //1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/ var dir = "./data/" //1.2 Get the folder , All file names . var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt") //1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt var fileDirList = fileNameList.map(dir + _) //println(fileDirList) //2. According to the number of documents , Create the corresponding WordCountActor object . //2.1 First create WordCountActor class , Used to obtain WordCountActor object . //2.2 According to the number of documents , Create the corresponding WordCountActor object . val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file , I created two wordCount object . //println(wordCountList) //2.3 take WordCountActor Associated with the full path of the file val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt println(actorWithFile) } }
4.4 Step three : start-up Actor/ send out / Receive task messages
Realize the idea
Start all WordCountActor object , And send word statistics task message to each WordCountActor object .
Be careful : Here should be
Send asynchronous return message
Implementation steps
- Create a WordCountTask Sample class message , Encapsulate the file name for word counting
- Start all WordCountActor, And send asynchronous return message
- Get all the WordCountActor Messages returned in ( Package into a Future In the list )
- stay WordCountActor Receive and print messages in
Reference code
MessagePackage.scala Code in file
/** * Express : MainActor To every one. WordCountActor Sending tasks Format . * @param fileName Specific statistics File path . */ case class WordCountTask(fileName:String)
MainActor.scala Code in file
object MainActor { def main(args: Array[String]): Unit = { //1. Get the path of all files to be counted . //1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/ var dir = "./data/" //1.2 Get the folder , All file names . var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt") //1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt var fileDirList = fileNameList.map(dir + _) //println(fileDirList) //2. According to the number of documents , Create the corresponding WordCountActor object . //2.1 First create WordCountActor class , Used to obtain WordCountActor object . //2.2 According to the number of documents , Create the corresponding WordCountActor object . val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file , I created two wordCount object . //println(wordCountList) //2.3 take WordCountActor Associated with the full path of the file val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt println(actorWithFile) //3. start-up WordCountActor, And give everyone WordCountActor Send task . /* Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1) Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1) */ val futureList: List[Future[Any]] = actorWithFile.map { //futureList: It records all WordCountActor The results of the statistics . keyVal => //keyVal The format of : WordCountActor -> ./data/1.txt //3.1 Get the specific to start WordCountActor object . val actor = keyVal._1 //actor: WordCountActor //3.2 Start specific WordCountActor. actor.start() //3.3 For each WordCountActor Send specific tasks ( File path ) Asynchronous return . val future: Future[Any] = actor !! WordCountTask(keyVal._2) future // It records a certain WordCountActor Returned statistical results . } } }
WordCountActor.scala Code in file
//2.1 First create WordCountActor class , Used to obtain WordCountActor object . // establish WordCountActor class , every last WordCountActor object , Count a file . class WordCountActor extends Actor { override def act(): Unit = { loop { react { //3.4 Receive specific tasks case WordCountTask(fileName) => //3.5 Print specific tasks println(s" The specific tasks received are : ${ fileName}") } } } }
4.5 Step four : Count the word count of the file
Realize the idea
Read file text , And count the number of words . for example :
(hadoop, 3), (spark, 1)...
Implementation steps
- Read file contents , And convert it into a list
- Cut the text according to the space , And convert it into words one by one
- To facilitate counting , Convert words to tuples
- Group by word , Then aggregate statistics
- Print aggregate statistics
Reference code
WordCountActor.scala Code in file
class WordCountActor extends Actor { override def act(): Unit = { // use loop + react Way to receive data . loop { react { //3.4 Receive specific tasks case WordCountTask(fileName) => //3.5 Print specific tasks println(s" The specific tasks received are : ${ fileName}") //4. Count the number of each word in the received file . //4.1 Get all the files in the specified file . List("hadoop sqoop hadoop","hadoop hadoop flume") val lineList = Source.fromFile(fileName).getLines().toList //4.2 The data obtained above , Convert to a string one by one . //List("hadoop", "sqoop", "hadoop","hadoop", "hadoop", "flume") val strList = lineList.flatMap(_.split(" ")) //4.3 Add the number of times after each string , The default is 1. //List("hadoop"->1, "sqoop"->1, "hadoop"->1, "hadoop"->1, "flume"->1) val wordAndCount = strList.map(_ -> 1) //4.4 according to String content grouping . //"hadoop" -> List("hadoop"->1, "hadoop"->1), "sqoop" -> List("sqoop"->1) val groupMap = wordAndCount.groupBy(_._1) //4.5 Count the contents after grouping , Count the total number of times of each word . "hadoop" -> 2, "sqoop" -> 1 val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum) //4.6 Print the statistical results . println(wordCountMap) } } } }
4.6 Step five : Return the result to MainActor
Realize the idea
- Encapsulate the result of the word count into a sample class message , And send it to MainActor
- MainActor Wait for all WordCountActor After all have been returned , Get each WordCountActor The result of word calculation
Implementation steps
- Define a sample class to encapsulate the word count results
- Send word count results to MainActor
- MainActor Detect all WordCountActor Whether all have returned , If all have been returned , Then get and convert the result
- Print the results
Reference code
MessagePackage.scala Code in file
/** * Express : MainActor To every one. WordCountActor Sending tasks Format . * @param fileName Specific statistics File path . */ case class WordCountTask(fileName:String) /** * Every WordCountActor After statistics, the returned results : Format * @param wordCountMap Specific return results , for example : Map("hadoop"->6, "sqoop"->1) */ case class WordCountResult(wordCountMap:Map[String, Int])
WordCountActor.scala Code in file
class WordCountActor extends Actor { override def act(): Unit = { // use loop + react Way to receive data . loop { react { //3.4 Receive specific tasks case WordCountTask(fileName) => //3.5 Print specific tasks println(s" The specific tasks received are : ${ fileName}") //4. Count the number of each word in the received file . //4.1 Get all the files in the specified file . List("hadoop sqoop hadoop","hadoop hadoop flume") val lineList = Source.fromFile(fileName).getLines().toList //4.2 The data obtained above , Convert to a string one by one . List("hadoop", "sqoop", "hadoop","hadoop", "hadoop", "flume") val strList = lineList.flatMap(_.split(" ")) //4.3 Add the number of times after each string , The default is 1. List("hadoop"->1, "sqoop"->1, "hadoop"->1,"hadoop"->1, "hadoop"->1, "flume"->1) val wordAndCount = strList.map(_ -> 1) //4.4 according to String content grouping . "hadoop" -> List("hadoop"->1, "hadoop"->1), "sqoop" -> List("sqoop"->1) val groupMap = wordAndCount.groupBy(_._1) //4.5 Count the contents after grouping , Count the total number of times of each word . "hadoop" -> 2, "sqoop" -> 1 val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum) //4.6 Return the statistical results to : MainActor. sender ! WordCountResult(wordCountMap) } } } }
4.7 Step six : The results merge
Realize the idea
Merge all received word counts .
Reference code
MainActor.scala Code in file
object MainActor { def main(args: Array[String]): Unit = { //1. Get the path of all files to be counted . //1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/ var dir = "./data/" //1.2 Get the folder , All file names . var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt") //1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt var fileDirList = fileNameList.map(dir + _) //println(fileDirList) //2. According to the number of documents , Create the corresponding WordCountActor object . //2.1 First create WordCountActor class , Used to obtain WordCountActor object . //2.2 According to the number of documents , Create the corresponding WordCountActor object . val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file , I created two wordCount object . //println(wordCountList) //2.3 take WordCountActor Associated with the full path of the file val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt println(actorWithFile) //3. start-up WordCountActor, And give everyone WordCountActor Send task . /* Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1) Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1) */ val futureList: List[Future[Any]] = actorWithFile.map { //futureList: It records all WordCountActor The results of the statistics . keyVal => //keyVal The format of : WordCountActor -> ./data/1.txt //3.1 Get the specific to start WordCountActor object . val actor = keyVal._1 //actor: WordCountActor //3.2 Start specific WordCountActor. actor.start() //3.3 For each WordCountActor Send specific tasks ( File path ) Asynchronous return . val future: Future[Any] = actor !! WordCountTask(keyVal._2) future // It records a certain WordCountActor Returned statistical results . } //5. MainActor Merge the received data . //5.1 Judge all future All have return values , I'm going to keep going . // Filter those without return value future Not for 0 There are also future No value received while(futureList.filter(!_.isSet).size != 0) { } //futureList: future1, future2 //5.2 From every one future Get data in . //wordCountMap: List(Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1), Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1)) val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap) //5.3 The obtained data is analyzed flatten, groupBy, map, Then count . val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum) //5.4 Print the results println(result) } }
start()
//3.3 For each WordCountActor Send specific tasks ( File path ) Asynchronous return .
val future: Future[Any] = actor !! WordCountTask(keyVal._2)
future // It records a certain WordCountActor Returned statistical results .
}
//5. MainActor Merge the received data .
//5.1 Judge all future All have return values , I'm going to keep going .
// Filter those without return value future Not for 0 There are also future No value received
while(futureList.filter(!_.isSet).size != 0) {} //futureList: future1, future2
//5.2 From every one future Get data in .
//wordCountMap: List(Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1), Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1))
val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap)
//5.3 The obtained data is analyzed flatten, groupBy, map, Then count .
val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
//5.4 Print the results
println(result)
}
}
边栏推荐
- MySQL常用增删改查操作(CRUD)
- My colleagues quietly told me that flying Book notification can still play like this
- Li Kou brush question diary /day7/2022.6.29
- [211] go handles the detailed documents of Excel library
- Wanghongru research group of Institute of genomics, Chinese Academy of Agricultural Sciences is cordially invited to join
- Five thousand words to clarify team self-organization construction | Liga wonderful talk
- How to improve development quality
- 【云端心声 建议征集】云商店焕新升级:提有效建议,赢海量码豆、华为AI音箱2!
- [cloud native] what is the "grid" of service grid?
- SIGMOD’22 HiEngine论文解读
猜你喜欢
Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
Scala基础教程--15--递归
Imitation of numpy 2
Improve the accuracy of 3D reconstruction of complex scenes | segmentation of UAV Remote Sensing Images Based on paddleseg
力扣刷题日记/day8/7.1
.NET ORM框架HiSql实战-第二章-使用Hisql实现菜单管理(增删改查)
Unity makes revolving door, sliding door, cabinet door drawer, click the effect of automatic door opening and closing, and automatically play the sound effect (with editor extension code)
输入的查询SQL语句,是如何执行的?
[HCIA continuous update] network management and operation and maintenance
随机推荐
[mathematical modeling of graduate students in Jiangxi Province in 2022] analysis and code implementation of haze removal by nucleation of water vapor supersaturation
How to improve development quality
C语言打印练习
TorchDrug教程
爬虫初级学习
. Net ORM framework hisql practice - Chapter 2 - using hisql to realize menu management (add, delete, modify and check)
1、 Introduction to C language
ITSS运维能力成熟度分级详解|一文搞清ITSS证书
学习路之PHP--phpstudy创建项目时“hosts文件不存在或被阻止打开”
The money circle boss, who is richer than Li Ka Shing, has just bought a building in Saudi Arabia
爬虫(6) - 网页数据解析(2) | BeautifulSoup4在爬虫中的使用
Scala基础教程--17--集合
Scala基础教程--20--Akka
Blue bridge: sympodial plant
Is it science or metaphysics to rename a listed company?
Load test practice of pingcode performance test
李迟2022年6月工作生活总结
2022年DCMM认证全国各地补贴政策汇总
The controversial line of energy replenishment: will fast charging lead to reunification?
力扣刷题日记/day1/2022.6.23