当前位置:网站首页>Scala basic tutorial -- 19 -- actor
Scala basic tutorial -- 19 -- actor
2022-07-04 18:53:00 【Empty.】
Scala Basic course –19–Actor
Chapter goal
- understand Actor Overview of
- master Actor Send and receive messages
- master WordCount Case study
1. Actor Introduce
Scala Medium Actor The concurrent programming model can be used to develop better than Java More thread efficient concurrent programs . We learn Scala Actor The purpose of is mainly for follow-up learning Akka To prepare for .
1.1 Java Problems of concurrent programming
stay Java In concurrent programming , Each object has a logical monitor (monitor), Can be used to control multi-threaded access to objects . We add sychronized Keyword to mark , Synchronous lock access is required . such , The locking mechanism ensures that only one thread accesses the shared data at the same time . But there is competition for resources in this way 、 And deadlocks , The bigger the program, the more troublesome the problem is .
thread deadlock
1.2 Actor Concurrent programming model
Actor Concurrent programming model , yes Scala A method provided to programmers Java Concurrent programming is a completely different model of concurrent programming , It is a concurrency mechanism based on event model .Actor The concurrent programming model is one that does not share data , A concurrent programming pattern that relies on message passing , Effectively avoid resource competition 、 Deadlock, etc .
1.3 Java Comparison of concurrent programming Actor Concurrent programming
Java Built in threading model | Scala Actor Model |
---|---|
" Shared data - lock " Model (share data and lock) | share nothing |
Every object There is one monitor, Monitor thread access to shared data | Don't share data ,Actor Through between Message Communications |
Lock code use synchronized identification | |
The deadlock problem | |
Each thread executes sequentially | Every Actor The internal execution is sequential |
Be careful :
scala stay 2.11.x Version added Akka Concurrent programming framework , The old version is obsolete .
Actor The programming model and Akka It's like , We are here to learn Actor The purpose of is to learn Akka To prepare for .
2. establish Actor
We can use the class (class) Or a singleton object (object), Inherit Actor Idiosyncratic way , To create Actor object .
2.1 step
- Definition class or object Inherit Actor Trait
- rewrite act Method
- call Actor Of start Method execution Actor
Be careful : Every Actor Is executed in parallel , Mutual interference .
2.2 Case a : adopt class Realization
demand
- Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
- Use class Inherit Actor Realization .( If you need to create multiple identical... In the program Actor)
Reference code
import scala.actors.Actor
// Case study :Actor Introduction to concurrent programming , adopt class establish Actor
object ClassDemo01 {
// demand : Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
//1. establish Actor1, For printing 1~10 The number of .
class Actor1 extends Actor {
override def act(): Unit = for (i <- 1 to 10) println("actor1: " + i)
}
//2. establish Actor2, For printing 11~20 The number of .
class Actor2 extends Actor {
override def act(): Unit = for (i <- 11 to 20) println("actor2: " + i)
}
def main(args: Array[String]): Unit = {
//3. Start two Actor.
new Actor1().start()
new Actor2().start()
}
}
2.3 Case 2 : adopt object Realization
demand
- Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
- Use object Inherit Actor Realization .( If only one is created in the program Actor)
Reference code
import scala.actors.Actor
// Case study :Actor Introduction to concurrent programming , adopt object establish Actor
object ClassDemo02 {
// demand : Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
//1. establish Actor1, For printing 1~10 The number of .
object Actor1 extends Actor {
override def act(): Unit = for (i <- 1 to 10) println("actor1: " + i)
}
//2. establish Actor2, For printing 11~20 The number of .
object Actor2 extends Actor {
override def act(): Unit = for (i <- 11 to 20) println("actor2: " + i)
}
def main(args: Array[String]): Unit = {
//3. Start two Actor.
Actor1.start()
Actor2.start()
}
}
2.4 Actor Program operation process
- call start() Method start up Actor
- Automatic execution act() Method
- towards Actor Send a message
- act After method execution , The program will call **exit()** Method to end program execution .
3. Send a message / receive messages
We introduced Actor When , Said Actor It's based on events ( news ) The concurrent programming model of , that Actor How do you send and receive messages ?
3.1 Usage mode
3.1.1 Send a message
We can send messages in three ways :
! | Send asynchronous message , no return value |
---|---|
!? | Send synchronization message , Wait for the return value |
!! | Send asynchronous message , The return value is Future[Any] |
for example : To give actor1 Send an asynchronous string message , Use the following code :
actor1 ! " Hello !"
3.1.2 receive messages
Actor Use in receive Method
To receive messages , Need to give receive Method passes in a partial function
{
case Variable name 1: Message type 1 => Business processing 1
case Variable name 2: Message type 2 => Business processing 2
...
}
Be careful : receive Method only receives a message once , Continue after receiving act Method
3.2 Case a : Send and receive a sentence
demand
- Create two Actor(ActorSender、ActorReceiver)
- ActorSender Send an asynchronous string message to ActorReceiver
- ActorReceiver After receiving the message , Print out
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-wLuMBeNp-1656569569791)(D:/%E6%95%99%E6%A1%88/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%B0%B1%E4%B8%9A%E7%8F%AD/%E5%8C%97%E4%BA%AC31~40%E6%9C%9F/%E5%8C%97%E4%BA%AC34%E6%9C%9F/Day03_Scala%E7%AC%AC%E4%B8%89%E5%A4%A9/%E8%AE%B2%E4%B9%89/%E5%90%88%E5%B9%B6%E7%89%88/assets/1552791021244.png)]
Reference code
// Case study : use Asynchronous no return form , Send a message .
object ClassDemo03 {
//1. Create a to send messages Actor, ActorSender, Send a sentence to ActorReceiver
object ActorSender extends Actor {
override def act(): Unit = {
// Send a sentence to ActorReceiver
ActorReceiver ! " How do you do , I am a ActorSender!"
// Send the second sentence
ActorReceiver ! " What's your name ? "
}
}
//2. Create a to receive messages Actor, ActorReceiver
object ActorReceiver extends Actor {
override def act(): Unit = {
// Receive messages sent .
receive {
case x: String => println(x)
}
}
}
def main(args: Array[String]): Unit = {
//3. Start two Actor
ActorSender.start()
ActorReceiver.start()
}
}
3.3 Case 2 : Continuously send and receive messages
If we want to achieve ActorSender Keep sending messages , ActorReceiver Be able to receive messages all the time
, How to achieve it ?
answer : We only need to use one while(true) loop , Keep calling receive Just come and receive the message .
demand
- Create two Actor(ActorSender、ActorReceiver)
- ActorSender Continue to send an asynchronous string message to ActorReceiver
- ActorReceiver Continue to receive messages , And print it out
Reference code
// Case study :Actor Continuously send and receive messages .
object ClassDemo04 {
//1. Create a to send messages Actor, ActorSender, Send a sentence to ActorReceiver
object ActorSender extends Actor {
override def act(): Unit = {
while(true) {
// Send a sentence to ActorReceiver
ActorReceiver ! " How do you do , I am a ActorSender!"
// Sleep 3 second .
TimeUnit.SECONDS.sleep(3) // The unit is : second
}
}
}
//2. Create a to receive messages Actor, ActorReceiver
object ActorReceiver extends Actor {
override def act(): Unit = {
// Receive messages sent , Keep receiving .
while(true) {
receive {
case x: String => println(x)
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Start two Actor
ActorSender.start()
ActorReceiver.start()
}
}
3.4 Case three : Optimize continuous message reception
The above code , Yes, it is while Loop to continuously receive messages , Doing so may encounter the following problems :
- If at present Actor No message received , The thread will be blocked
- If there are a lot of Actor, It may cause many threads to be blocked
- Every time new news comes , Recreate the thread to handle
- Frequent thread creation 、 Destroy and switch , It will affect the operation efficiency
In view of the above situation , We can use loop(), combination react()
To reuse threads , This way ratio while loop + receive()
More efficient .
demand
- Create two Actor(ActorSender、ActorReceiver)
- ActorSender Continue to send an asynchronous string message to ActorReceiver
- ActorReceiver Continue to receive messages , And print it out
Be careful : Use loop + react Rewrite the above case .
Reference code
// Case study : Use loop + react Loop to receive messages .
object ClassDemo05 {
//1. Create a to send messages Actor, ActorSender, Send a sentence to ActorReceiver
object ActorSender extends Actor {
override def act(): Unit = {
while(true) {
// Send a sentence to ActorReceiver
ActorReceiver ! " How do you do , I am a ActorSender!"
// Sleep 3 second .
TimeUnit.SECONDS.sleep(3) // The unit is : second
}
}
}
//2. Create a to receive messages Actor, ActorReceiver
object ActorReceiver extends Actor {
override def act(): Unit = {
// Receive messages sent , Keep receiving .
loop{
react {
case x: String => println(x)
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Start two Actor
ActorSender.start()
ActorReceiver.start()
}
}
3.5 Case four : Send and receive custom messages
The messages we sent earlier are of string type ,Actor Sending custom messages is also supported in , for example : Use the sample class to encapsulate the message , Then send processing .
3.5.1 Example 1 : Send synchronization with return message
demand
- Create a MsgActor, And send it a synchronization message , The message contains two fields (id、message)
- MsgActor Reply to a message , The message contains two fields (message、name)
- Print a reply message
Be careful :
- Use
!?
To send synchronization messages- stay Actor Of act In the method , have access to sender Get the sender's Actor quote
Reference code
// Case study : Actor Send and receive custom messages , use Synchronization has the form of return
object ClassDemo06 {
//1. Define two sample classes Message( Means send data ), ReplyMessage( Indicates the returned data .)
case class Message(id: Int, message: String) // Custom send message Sample class
case class ReplyMessage(message: String, name: String) // Custom receive message Sample class
//2. Create a MsgActor, For reception MainActor Messages sent , And reply to it with a message .
object MsgActor extends Actor {
override def act(): Unit = {
//2.1 receive Lord Actor(MainActor) Messages sent .
loop {
react {
// Combined with partial function
case Message(id, message) => println(s" I am a MsgActor, The message I received was : ${
id}, ${
message}")
//2.2 to MainActor Reply to a message .
//sender: Get the message sender's Actor object
sender ! ReplyMessage(" I'm very bad , Smoked to death !...", " Che Lei ")
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Turn on MsgActor
MsgActor.start()
//4. adopt MainActor, to MsgActor Send a Message object .
// use !? Synchronization has returned .
val reply:Any = MsgActor !? Message(1, " How do you do , I am a MainActor, I'm sending you a message !")
//resutl Indicates the finally received Return message .
val result = reply.asInstanceOf[ReplyMessage]
//5. Output results .
println(result)
}
}
3.5.2 Example 2 : Send asynchronous no return message
demand
Create a MsgActor, And send it an asynchronous no return message , The message contains two fields (id, message)
Be careful : Use
!
Send asynchronous no return message
Reference code
// Case study : Actor Send and receive custom messages , use asynchronous No return form
object ClassDemo07 {
//1. Define a sample class Message( Means send data )
case class Message(id: Int, message: String) // Custom send message Sample class
//2. Create a MsgActor, For reception MainActor Messages sent , And print .
object MsgActor extends Actor {
override def act(): Unit = {
//2.1 receive Lord Actor(MainActor) Messages sent .
loop {
react {
// Combined with partial function
case Message(id, message) => println(s" I am a MsgActor, The message I received was : ${
id}, ${
message}")
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Turn on MsgActor
MsgActor.start()
//4. adopt MainActor, to MsgActor Send a Message object .
// use ! Asynchronous no return
MsgActor ! Message(1, " I use Asynchronous no return Send messages in the form of !")
}
}
3.5.3 Example 3 : Send asynchronous return message
demand
- Create a MsgActor, And send it an asynchronous return message , The message contains two fields (id、message)
- MsgActor Reply to a message , The message contains two fields (message、name)
- Print a reply message
Be careful :
- Use
!!
Send asynchronous return message- After sending , The return type is Future[Any] The object of
- Future Represents the encapsulation of asynchronously returned data , Although obtained Future The return value of , But not necessarily worth it , The message may not return until some time in the future
- Future Of isSet() Check whether the return message has been received ,apply() Method to get the return data
The illustration
Reference code
// Case study : Actor Send and receive custom messages , use Asynchrony has the form of return
object ClassDemo08 {
//1. Define two sample classes Message( Means send data ), ReplyMessage( Indicates the returned data .)
case class Message(id: Int, message: String) // Custom send message Sample class
case class ReplyMessage(message: String, name: String) // Custom receive message Sample class
//2. Create a MsgActor, For reception MainActor Messages sent , And reply to it with a message .
object MsgActor extends Actor {
override def act(): Unit = {
//2.1 receive Lord Actor(MainActor) Messages sent .
loop {
react {
// Combined with partial function
case Message(id, message) => println(s" I am a MsgActor, The message I received was : ${
id}, ${
message}")
//2.2 to MainActor Reply to a message .
//sender: Get the message sender's Actor object
sender ! ReplyMessage(" I'm very bad , Smoked to death !...", " Sugar ")
}
}
}
}
def main(args: Array[String]): Unit = {
//3. Turn on MsgActor
MsgActor.start()
//4. adopt MainActor, to MsgActor Send a Message object .
// use !! Asynchronous return .
val future: Future[Any] = MsgActor !! Message(1, " How do you do , I am a MainActor, I'm sending you a message !")
//5. because future Data may not be available immediately , So we need to check .
//Future Of isSet() Check whether the return message has been received ,apply() Method to get the return data
//!future.isSet Express : No specific return message was received , All the time .
while(!future.isSet){
}
// adopt Future Of apply() Method to get the returned data .
val result = future.apply().asInstanceOf[ReplyMessage]
//5. Output results .
println(result)
}
}
4. Case study : WordCount
4.1 demand
Next , We're going to use Actor Concurrent programming model implementation Word statistics of multiple files
.
Case introduction
Given several text files ( Text files are separated by spaces ), Use Actor Concurrent programming to count the number of words .
Thought analysis
Realize the idea
- MainActor Get the file for word statistics
- Create corresponding according to the number of files WordCountActor
- Encapsulate the file name as a message and send it to WordCountActor
- WordCountActor receive messages , And count the word count of a single file
- Send word count results to MainActor
- MainActor Wait for all WordCountActor Have successfully returned the message , Then merge the results
4.2 Step one : Get file list
Realize the idea
Under the current project data There is... Under the folder : 1.txt, 2.txt Two text files , The specific storage contents are as follows :
1.txt The storage contents of the text file are as follows :
hadoop sqoop hadoop hadoop hadoop flume hadoop hadoop hadoop spark
2.txt The storage contents of the text file are as follows :
flink hadoop hive hadoop sqoop hadoop hadoop hadoop hadoop spark
Get the path of the above two text files , And print the results to the console .
Reference code
object MainActor {
def main(args: Array[String]): Unit = {
//1. Get the path of all files to be counted .
//1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/
var dir = "./data/"
//1.2 Get the folder , All file names .
var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt")
//1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt
var fileDirList = fileNameList.map(dir + _)
//println(fileDirList)
}
}
4.3 Step two : establish WordCountActor
Realize the idea
- Create a corresponding number of... According to the number of files WordCountActor object .
- In order to facilitate the subsequent sending of messages to Actor, Each one Actor Associated with the file name
Implementation steps
- establish WordCountActor
- Convert the file list to WordCountActor
- For subsequent convenience, send a message to Actor, take Actor List and file list zipped together
- Print test
Reference code
WordCountActor.scala Code in file
//2.1 First create WordCountActor class , Used to obtain WordCountActor object . // establish WordCountActor class , every last WordCountActor object , Count a file . class WordCountActor extends Actor { override def act(): Unit = { } }
MainActor.scala Code in file
object MainActor { def main(args: Array[String]): Unit = { //1. Get the path of all files to be counted . //1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/ var dir = "./data/" //1.2 Get the folder , All file names . var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt") //1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt var fileDirList = fileNameList.map(dir + _) //println(fileDirList) //2. According to the number of documents , Create the corresponding WordCountActor object . //2.1 First create WordCountActor class , Used to obtain WordCountActor object . //2.2 According to the number of documents , Create the corresponding WordCountActor object . val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file , I created two wordCount object . //println(wordCountList) //2.3 take WordCountActor Associated with the full path of the file val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt println(actorWithFile) } }
4.4 Step three : start-up Actor/ send out / Receive task messages
Realize the idea
Start all WordCountActor object , And send word statistics task message to each WordCountActor object .
Be careful : Here should be
Send asynchronous return message
Implementation steps
- Create a WordCountTask Sample class message , Encapsulate the file name for word counting
- Start all WordCountActor, And send asynchronous return message
- Get all the WordCountActor Messages returned in ( Package into a Future In the list )
- stay WordCountActor Receive and print messages in
Reference code
MessagePackage.scala Code in file
/** * Express : MainActor To every one. WordCountActor Sending tasks Format . * @param fileName Specific statistics File path . */ case class WordCountTask(fileName:String)
MainActor.scala Code in file
object MainActor { def main(args: Array[String]): Unit = { //1. Get the path of all files to be counted . //1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/ var dir = "./data/" //1.2 Get the folder , All file names . var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt") //1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt var fileDirList = fileNameList.map(dir + _) //println(fileDirList) //2. According to the number of documents , Create the corresponding WordCountActor object . //2.1 First create WordCountActor class , Used to obtain WordCountActor object . //2.2 According to the number of documents , Create the corresponding WordCountActor object . val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file , I created two wordCount object . //println(wordCountList) //2.3 take WordCountActor Associated with the full path of the file val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt println(actorWithFile) //3. start-up WordCountActor, And give everyone WordCountActor Send task . /* Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1) Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1) */ val futureList: List[Future[Any]] = actorWithFile.map { //futureList: It records all WordCountActor The results of the statistics . keyVal => //keyVal The format of : WordCountActor -> ./data/1.txt //3.1 Get the specific to start WordCountActor object . val actor = keyVal._1 //actor: WordCountActor //3.2 Start specific WordCountActor. actor.start() //3.3 For each WordCountActor Send specific tasks ( File path ) Asynchronous return . val future: Future[Any] = actor !! WordCountTask(keyVal._2) future // It records a certain WordCountActor Returned statistical results . } } }
WordCountActor.scala Code in file
//2.1 First create WordCountActor class , Used to obtain WordCountActor object . // establish WordCountActor class , every last WordCountActor object , Count a file . class WordCountActor extends Actor { override def act(): Unit = { loop { react { //3.4 Receive specific tasks case WordCountTask(fileName) => //3.5 Print specific tasks println(s" The specific tasks received are : ${ fileName}") } } } }
4.5 Step four : Count the word count of the file
Realize the idea
Read file text , And count the number of words . for example :
(hadoop, 3), (spark, 1)...
Implementation steps
- Read file contents , And convert it into a list
- Cut the text according to the space , And convert it into words one by one
- To facilitate counting , Convert words to tuples
- Group by word , Then aggregate statistics
- Print aggregate statistics
Reference code
WordCountActor.scala Code in file
class WordCountActor extends Actor { override def act(): Unit = { // use loop + react Way to receive data . loop { react { //3.4 Receive specific tasks case WordCountTask(fileName) => //3.5 Print specific tasks println(s" The specific tasks received are : ${ fileName}") //4. Count the number of each word in the received file . //4.1 Get all the files in the specified file . List("hadoop sqoop hadoop","hadoop hadoop flume") val lineList = Source.fromFile(fileName).getLines().toList //4.2 The data obtained above , Convert to a string one by one . //List("hadoop", "sqoop", "hadoop","hadoop", "hadoop", "flume") val strList = lineList.flatMap(_.split(" ")) //4.3 Add the number of times after each string , The default is 1. //List("hadoop"->1, "sqoop"->1, "hadoop"->1, "hadoop"->1, "flume"->1) val wordAndCount = strList.map(_ -> 1) //4.4 according to String content grouping . //"hadoop" -> List("hadoop"->1, "hadoop"->1), "sqoop" -> List("sqoop"->1) val groupMap = wordAndCount.groupBy(_._1) //4.5 Count the contents after grouping , Count the total number of times of each word . "hadoop" -> 2, "sqoop" -> 1 val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum) //4.6 Print the statistical results . println(wordCountMap) } } } }
4.6 Step five : Return the result to MainActor
Realize the idea
- Encapsulate the result of the word count into a sample class message , And send it to MainActor
- MainActor Wait for all WordCountActor After all have been returned , Get each WordCountActor The result of word calculation
Implementation steps
- Define a sample class to encapsulate the word count results
- Send word count results to MainActor
- MainActor Detect all WordCountActor Whether all have returned , If all have been returned , Then get and convert the result
- Print the results
Reference code
MessagePackage.scala Code in file
/** * Express : MainActor To every one. WordCountActor Sending tasks Format . * @param fileName Specific statistics File path . */ case class WordCountTask(fileName:String) /** * Every WordCountActor After statistics, the returned results : Format * @param wordCountMap Specific return results , for example : Map("hadoop"->6, "sqoop"->1) */ case class WordCountResult(wordCountMap:Map[String, Int])
WordCountActor.scala Code in file
class WordCountActor extends Actor { override def act(): Unit = { // use loop + react Way to receive data . loop { react { //3.4 Receive specific tasks case WordCountTask(fileName) => //3.5 Print specific tasks println(s" The specific tasks received are : ${ fileName}") //4. Count the number of each word in the received file . //4.1 Get all the files in the specified file . List("hadoop sqoop hadoop","hadoop hadoop flume") val lineList = Source.fromFile(fileName).getLines().toList //4.2 The data obtained above , Convert to a string one by one . List("hadoop", "sqoop", "hadoop","hadoop", "hadoop", "flume") val strList = lineList.flatMap(_.split(" ")) //4.3 Add the number of times after each string , The default is 1. List("hadoop"->1, "sqoop"->1, "hadoop"->1,"hadoop"->1, "hadoop"->1, "flume"->1) val wordAndCount = strList.map(_ -> 1) //4.4 according to String content grouping . "hadoop" -> List("hadoop"->1, "hadoop"->1), "sqoop" -> List("sqoop"->1) val groupMap = wordAndCount.groupBy(_._1) //4.5 Count the contents after grouping , Count the total number of times of each word . "hadoop" -> 2, "sqoop" -> 1 val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum) //4.6 Return the statistical results to : MainActor. sender ! WordCountResult(wordCountMap) } } } }
4.7 Step six : The results merge
Realize the idea
Merge all received word counts .
Reference code
MainActor.scala Code in file
object MainActor { def main(args: Array[String]): Unit = { //1. Get the path of all files to be counted . //1.1 Defining variables dir, Keep records of all documents : Folder path . ./data/ var dir = "./data/" //1.2 Get the folder , All file names . var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt") //1.3 Encapsulate the obtained file name , Get its full path . ./data/1.txt ./data/2.txt var fileDirList = fileNameList.map(dir + _) //println(fileDirList) //2. According to the number of documents , Create the corresponding WordCountActor object . //2.1 First create WordCountActor class , Used to obtain WordCountActor object . //2.2 According to the number of documents , Create the corresponding WordCountActor object . val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file , I created two wordCount object . //println(wordCountList) //2.3 take WordCountActor Associated with the full path of the file val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt println(actorWithFile) //3. start-up WordCountActor, And give everyone WordCountActor Send task . /* Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1) Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1) */ val futureList: List[Future[Any]] = actorWithFile.map { //futureList: It records all WordCountActor The results of the statistics . keyVal => //keyVal The format of : WordCountActor -> ./data/1.txt //3.1 Get the specific to start WordCountActor object . val actor = keyVal._1 //actor: WordCountActor //3.2 Start specific WordCountActor. actor.start() //3.3 For each WordCountActor Send specific tasks ( File path ) Asynchronous return . val future: Future[Any] = actor !! WordCountTask(keyVal._2) future // It records a certain WordCountActor Returned statistical results . } //5. MainActor Merge the received data . //5.1 Judge all future All have return values , I'm going to keep going . // Filter those without return value future Not for 0 There are also future No value received while(futureList.filter(!_.isSet).size != 0) { } //futureList: future1, future2 //5.2 From every one future Get data in . //wordCountMap: List(Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1), Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1)) val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap) //5.3 The obtained data is analyzed flatten, groupBy, map, Then count . val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum) //5.4 Print the results println(result) } }
start()
//3.3 For each WordCountActor Send specific tasks ( File path ) Asynchronous return .
val future: Future[Any] = actor !! WordCountTask(keyVal._2)
future // It records a certain WordCountActor Returned statistical results .
}
//5. MainActor Merge the received data .
//5.1 Judge all future All have return values , I'm going to keep going .
// Filter those without return value future Not for 0 There are also future No value received
while(futureList.filter(!_.isSet).size != 0) {} //futureList: future1, future2
//5.2 From every one future Get data in .
//wordCountMap: List(Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1), Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1))
val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap)
//5.3 The obtained data is analyzed flatten, groupBy, map, Then count .
val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
//5.4 Print the results
println(result)
}
}
边栏推荐
- Principle and application of ThreadLocal
- 字节跳动Dev Better技术沙龙成功举办,携手华泰分享Web研发效能提升经验
- Li Kou brush question diary /day5/2022.6.27
- 爬虫(6) - 网页数据解析(2) | BeautifulSoup4在爬虫中的使用
- 如何使用 wget 和 curl 下载文件
- [cloud voice suggestion collection] cloud store renewal and upgrading: provide effective suggestions, win a large number of code beans, Huawei AI speaker 2!
- Journal des problèmes de brosse à boutons de force / day6 / 6.28
- 基于lex和yacc的词法分析器+语法分析器
- Summary of subsidy policies across the country for dcmm certification in 2022
- [cloud native] what is the "grid" of service grid?
猜你喜欢
Li Kou brush question diary /day2/2022.6.24
力扣刷题日记/day7/2022.6.29
如何提高开发质量
TCP两次挥手,你见过吗?那四次握手呢?
Nature microbiology | viral genomes in six deep-sea sediments that can infect Archaea asgardii
Just today, four experts from HSBC gathered to discuss the problems of bank core system transformation, migration and reconstruction
被忽视的问题:测试环境配置管理
Improve the accuracy of 3D reconstruction of complex scenes | segmentation of UAV Remote Sensing Images Based on paddleseg
Mxnet implementation of googlenet (parallel connection network)
力扣刷题日记/day2/2022.6.24
随机推荐
中国农科院基因组所汪鸿儒课题组诚邀加入
【OpenCV入门到精通之九】OpenCV之视频截取、图片与视频互转
未来几年中,软件测试的几大趋势是什么?
Android uses sqliteopenhelper to flash back
I wrote a learning and practice tutorial for beginners!
Wanghongru research group of Institute of genomics, Chinese Academy of Agricultural Sciences is cordially invited to join
ByteDance dev better technology salon was successfully held, and we joined hands with Huatai to share our experience in improving the efficiency of web research and development
【211】go 处理excel的库的详细文档
机器学习概念漂移检测方法(Aporia)
The block:usdd has strong growth momentum
Grain Mall (I)
线上MySQL的自增id用尽怎么办?
Scala基础教程--14--隐式转换
Interpretation of SIGMOD '22 hiengine paper
Unity makes revolving door, sliding door, cabinet door drawer, click the effect of automatic door opening and closing, and automatically play the sound effect (with editor extension code)
ESP32-C3入门教程 问题篇⑫——undefined reference to rom_temp_to_power, in function phy_get_romfunc_addr
[go ~ 0 to 1] read, write and create files on the sixth day
Is it safe to open an account online? is that true?
The controversial line of energy replenishment: will fast charging lead to reunification?
激进技术派 vs 项目保守派的微服务架构之争