当前位置:网站首页>Scala basic tutorial -- 19 -- actor

Scala basic tutorial -- 19 -- actor

2022-07-04 18:53:00 Empty.

Scala Basic course –19–Actor

Chapter goal

  1. understand Actor Overview of
  2. master Actor Send and receive messages
  3. master WordCount Case study

1. Actor Introduce

Scala Medium Actor The concurrent programming model can be used to develop better than Java More thread efficient concurrent programs . We learn Scala Actor The purpose of is mainly for follow-up learning Akka To prepare for .

1.1 Java Problems of concurrent programming

stay Java In concurrent programming , Each object has a logical monitor (monitor), Can be used to control multi-threaded access to objects . We add sychronized Keyword to mark , Synchronous lock access is required . such , The locking mechanism ensures that only one thread accesses the shared data at the same time . But there is competition for resources in this way 、 And deadlocks , The bigger the program, the more troublesome the problem is .
 Insert picture description here

thread deadlock
 Insert picture description here

1.2 Actor Concurrent programming model

Actor Concurrent programming model , yes Scala A method provided to programmers Java Concurrent programming is a completely different model of concurrent programming , It is a concurrency mechanism based on event model .Actor The concurrent programming model is one that does not share data , A concurrent programming pattern that relies on message passing , Effectively avoid resource competition 、 Deadlock, etc .

 Insert picture description here

1.3 Java Comparison of concurrent programming Actor Concurrent programming

Java Built in threading model Scala Actor Model
" Shared data - lock " Model (share data and lock)share nothing
Every object There is one monitor, Monitor thread access to shared data Don't share data ,Actor Through between Message Communications
Lock code use synchronized identification
The deadlock problem
Each thread executes sequentially Every Actor The internal execution is sequential

Be careful :

  1. scala stay 2.11.x Version added Akka Concurrent programming framework , The old version is obsolete .

  2. Actor The programming model and Akka It's like , We are here to learn Actor The purpose of is to learn Akka To prepare for .

2. establish Actor

We can use the class (class) Or a singleton object (object), Inherit Actor Idiosyncratic way , To create Actor object .

2.1 step

  1. Definition class or object Inherit Actor Trait
  2. rewrite act Method
  3. call Actor Of start Method execution Actor

Be careful : Every Actor Is executed in parallel , Mutual interference .

2.2 Case a : adopt class Realization

demand

  1. Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
  2. Use class Inherit Actor Realization .( If you need to create multiple identical... In the program Actor)

Reference code

import scala.actors.Actor

// Case study :Actor Introduction to concurrent programming ,  adopt class establish Actor
object ClassDemo01 {
    

  // demand :  Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
  //1.  establish Actor1,  For printing 1~10 The number of .
  class Actor1 extends Actor {
    
    override def act(): Unit = for (i <- 1 to 10) println("actor1: " + i)
  }

  //2.  establish Actor2,  For printing 11~20 The number of .
  class Actor2 extends Actor {
    
    override def act(): Unit = for (i <- 11 to 20) println("actor2: " + i)
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Start two Actor.
    new Actor1().start()
    new Actor2().start()
  }
}

2.3 Case 2 : adopt object Realization

demand

  1. Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20
  2. Use object Inherit Actor Realization .( If only one is created in the program Actor)

Reference code

import scala.actors.Actor

// Case study :Actor Introduction to concurrent programming ,  adopt object establish Actor
object ClassDemo02 {
    
  // demand :  Create two Actor, One Actor Print 1-10, the other one Actor Print 11-20

  //1.  establish Actor1,  For printing 1~10 The number of .
  object Actor1 extends Actor {
    
    override def act(): Unit = for (i <- 1 to 10) println("actor1: " + i)
  }

  //2.  establish Actor2,  For printing 11~20 The number of .
  object Actor2 extends Actor {
    
    override def act(): Unit = for (i <- 11 to 20) println("actor2: " + i)
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Start two Actor.
   Actor1.start()
   Actor2.start()
  }
}

2.4 Actor Program operation process

  1. call start() Method start up Actor
  2. Automatic execution act() Method
  3. towards Actor Send a message
  4. act After method execution , The program will call **exit()** Method to end program execution .

3. Send a message / receive messages

We introduced Actor When , Said Actor It's based on events ( news ) The concurrent programming model of , that Actor How do you send and receive messages ?

3.1 Usage mode

3.1.1 Send a message

We can send messages in three ways :

Send asynchronous message , no return value
!? Send synchronization message , Wait for the return value
!! Send asynchronous message , The return value is Future[Any]

for example : To give actor1 Send an asynchronous string message , Use the following code :

actor1 ! " Hello !"
3.1.2 receive messages

Actor Use in receive Method To receive messages , Need to give receive Method passes in a partial function

{
    
    case  Variable name 1: Message type 1 =>  Business processing 1
    case  Variable name 2: Message type 2 =>  Business processing 2
    ...
}

Be careful : receive Method only receives a message once , Continue after receiving act Method

3.2 Case a : Send and receive a sentence

demand

  1. Create two Actor(ActorSender、ActorReceiver)
  2. ActorSender Send an asynchronous string message to ActorReceiver
  3. ActorReceiver After receiving the message , Print out

[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-wLuMBeNp-1656569569791)(D:/%E6%95%99%E6%A1%88/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%B0%B1%E4%B8%9A%E7%8F%AD/%E5%8C%97%E4%BA%AC31~40%E6%9C%9F/%E5%8C%97%E4%BA%AC34%E6%9C%9F/Day03_Scala%E7%AC%AC%E4%B8%89%E5%A4%A9/%E8%AE%B2%E4%B9%89/%E5%90%88%E5%B9%B6%E7%89%88/assets/1552791021244.png)]

Reference code

// Case study :  use   Asynchronous no return form ,  Send a message .
object ClassDemo03 {
    
  //1.  Create a to send messages Actor, ActorSender,  Send a sentence to ActorReceiver
  object ActorSender extends Actor {
    
    override def act(): Unit = {
    
      // Send a sentence to ActorReceiver
      ActorReceiver ! " How do you do ,  I am a ActorSender!"

      // Send the second sentence 
      ActorReceiver ! " What's your name ? "
    }
  }

  //2.  Create a to receive messages Actor, ActorReceiver
  object ActorReceiver extends Actor {
    
    override def act(): Unit = {
    
      // Receive messages sent .
      receive {
    
        case x: String => println(x)
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Start two Actor
    ActorSender.start()
    ActorReceiver.start()
  }
}

3.3 Case 2 : Continuously send and receive messages

If we want to achieve ActorSender Keep sending messages , ActorReceiver Be able to receive messages all the time , How to achieve it ?

answer : We only need to use one while(true) loop , Keep calling receive Just come and receive the message .

demand

  1. Create two Actor(ActorSender、ActorReceiver)
  2. ActorSender Continue to send an asynchronous string message to ActorReceiver
  3. ActorReceiver Continue to receive messages , And print it out

Reference code

// Case study :Actor  Continuously send and receive messages .
object ClassDemo04 {
    
  //1.  Create a to send messages Actor, ActorSender,  Send a sentence to ActorReceiver
  object ActorSender extends Actor {
    
    override def act(): Unit = {
    
        while(true) {
    
          // Send a sentence to ActorReceiver
          ActorReceiver ! " How do you do ,  I am a ActorSender!"
          // Sleep 3 second .
          TimeUnit.SECONDS.sleep(3)       // The unit is :  second 
        }
    }
  }

  //2.  Create a to receive messages Actor, ActorReceiver
  object ActorReceiver extends Actor {
    
    override def act(): Unit = {
    
      // Receive messages sent ,  Keep receiving .
      while(true) {
    
        receive {
    
          case x: String => println(x)
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Start two Actor
    ActorSender.start()
    ActorReceiver.start()
  }
}

3.4 Case three : Optimize continuous message reception

The above code , Yes, it is while Loop to continuously receive messages , Doing so may encounter the following problems :

  • If at present Actor No message received , The thread will be blocked
  • If there are a lot of Actor, It may cause many threads to be blocked
  • Every time new news comes , Recreate the thread to handle
  • Frequent thread creation 、 Destroy and switch , It will affect the operation efficiency

In view of the above situation , We can use loop(), combination react() To reuse threads , This way ratio while loop + receive() More efficient .

demand

  1. Create two Actor(ActorSender、ActorReceiver)
  2. ActorSender Continue to send an asynchronous string message to ActorReceiver
  3. ActorReceiver Continue to receive messages , And print it out

Be careful : Use loop + react Rewrite the above case .

Reference code

// Case study :  Use loop + react Loop to receive messages .
object ClassDemo05 {
    

  //1.  Create a to send messages Actor, ActorSender,  Send a sentence to ActorReceiver
  object ActorSender extends Actor {
    
    override def act(): Unit = {
    
      while(true) {
    
        // Send a sentence to ActorReceiver
        ActorReceiver ! " How do you do ,  I am a ActorSender!"
        // Sleep 3 second .
        TimeUnit.SECONDS.sleep(3)       // The unit is :  second 
      }
    }
  }

  //2.  Create a to receive messages Actor, ActorReceiver
  object ActorReceiver extends Actor {
    
    override def act(): Unit = {
    
      // Receive messages sent ,  Keep receiving .
      loop{
    
        react {
    
          case x: String => println(x)
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Start two Actor
    ActorSender.start()
    ActorReceiver.start()
  }
}

3.5 Case four : Send and receive custom messages

The messages we sent earlier are of string type ,Actor Sending custom messages is also supported in , for example : Use the sample class to encapsulate the message , Then send processing .

3.5.1 Example 1 : Send synchronization with return message

demand

  1. Create a MsgActor, And send it a synchronization message , The message contains two fields (id、message)
  2. MsgActor Reply to a message , The message contains two fields (message、name)
  3. Print a reply message

Be careful :

  • Use !? To send synchronization messages
  • stay Actor Of act In the method , have access to sender Get the sender's Actor quote

Reference code

// Case study : Actor Send and receive custom messages ,  use   Synchronization has the form of return 
object ClassDemo06 {
    

  //1.  Define two sample classes Message( Means send data ), ReplyMessage( Indicates the returned data .)
  case class Message(id: Int, message: String) // Custom send message   Sample class 
  case class ReplyMessage(message: String, name: String) // Custom receive message   Sample class 


  //2.  Create a MsgActor, For reception MainActor Messages sent ,  And reply to it with a message .
  object MsgActor extends Actor {
    
    override def act(): Unit = {
    
      //2.1  receive   Lord Actor(MainActor)  Messages sent .
      loop {
    
        react {
    
          // Combined with partial function 
          case Message(id, message) => println(s" I am a MsgActor,  The message I received was : ${
      id}, ${
      message}")

          //2.2  to MainActor Reply to a message .
          //sender:  Get the message sender's Actor object 
          sender ! ReplyMessage(" I'm very bad ,  Smoked to death !...", " Che Lei ")
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Turn on MsgActor
    MsgActor.start()

    //4.  adopt MainActor,  to MsgActor Send a  Message object .
    // use  !?  Synchronization has returned .
    val reply:Any = MsgActor !? Message(1, " How do you do ,  I am a MainActor,  I'm sending you a message !")
    //resutl Indicates the finally received   Return message .
    val result = reply.asInstanceOf[ReplyMessage]
    //5.  Output results .
    println(result)
  }
}
3.5.2 Example 2 : Send asynchronous no return message

demand

Create a MsgActor, And send it an asynchronous no return message , The message contains two fields (id, message)

Be careful : Use ! Send asynchronous no return message

Reference code

// Case study : Actor Send and receive custom messages ,  use   asynchronous   No return form 
object ClassDemo07 {
    

  //1.  Define a sample class Message( Means send data )
  case class Message(id: Int, message: String) // Custom send message   Sample class 

  //2.  Create a MsgActor, For reception MainActor Messages sent ,  And print .
  object MsgActor extends Actor {
    
    override def act(): Unit = {
    
      //2.1  receive   Lord Actor(MainActor)  Messages sent .
      loop {
    
        react {
    
          // Combined with partial function 
          case Message(id, message) => println(s" I am a MsgActor,  The message I received was : ${
      id}, ${
      message}")
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Turn on MsgActor
    MsgActor.start()

    //4.  adopt MainActor,  to MsgActor Send a  Message object .
    // use  !  Asynchronous no return 
   MsgActor ! Message(1, " I use   Asynchronous no return   Send messages in the form of !")

  }
}
3.5.3 Example 3 : Send asynchronous return message

demand

  1. Create a MsgActor, And send it an asynchronous return message , The message contains two fields (id、message)
  2. MsgActor Reply to a message , The message contains two fields (message、name)
  3. Print a reply message

Be careful :

  • Use !! Send asynchronous return message
  • After sending , The return type is Future[Any] The object of
  • Future Represents the encapsulation of asynchronously returned data , Although obtained Future The return value of , But not necessarily worth it , The message may not return until some time in the future
  • Future Of isSet() Check whether the return message has been received ,apply() Method to get the return data

The illustration

 Insert picture description here

Reference code

// Case study : Actor Send and receive custom messages ,  use   Asynchrony has the form of return 
object ClassDemo08 {
    

  //1.  Define two sample classes Message( Means send data ), ReplyMessage( Indicates the returned data .)
  case class Message(id: Int, message: String) // Custom send message   Sample class 
  case class ReplyMessage(message: String, name: String) // Custom receive message   Sample class 


  //2.  Create a MsgActor, For reception MainActor Messages sent ,  And reply to it with a message .
  object MsgActor extends Actor {
    
    override def act(): Unit = {
    
      //2.1  receive   Lord Actor(MainActor)  Messages sent .
      loop {
    
        react {
    
          // Combined with partial function 
          case Message(id, message) => println(s" I am a MsgActor,  The message I received was : ${
      id}, ${
      message}")

            //2.2  to MainActor Reply to a message .
            //sender:  Get the message sender's Actor object 
            sender ! ReplyMessage(" I'm very bad ,  Smoked to death !...", " Sugar ")
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    //3.  Turn on MsgActor
    MsgActor.start()

    //4.  adopt MainActor,  to MsgActor Send a  Message object .
    // use  !!  Asynchronous return .
    val future: Future[Any] = MsgActor !! Message(1, " How do you do ,  I am a MainActor,  I'm sending you a message !")

    //5.  because future Data may not be available immediately ,  So we need to check .
    //Future Of isSet() Check whether the return message has been received ,apply() Method to get the return data 
    //!future.isSet Express :  No specific return message was received ,  All the time .
    while(!future.isSet){
    }

    // adopt Future Of apply() Method to get the returned data .
    val result = future.apply().asInstanceOf[ReplyMessage]
    //5.  Output results .
    println(result)
  }
}

4. Case study : WordCount

4.1 demand

Next , We're going to use Actor Concurrent programming model implementation Word statistics of multiple files .

Case introduction

Given several text files ( Text files are separated by spaces ), Use Actor Concurrent programming to count the number of words .

Thought analysis

 Insert picture description here

Realize the idea

  1. MainActor Get the file for word statistics
  2. Create corresponding according to the number of files WordCountActor
  3. Encapsulate the file name as a message and send it to WordCountActor
  4. WordCountActor receive messages , And count the word count of a single file
  5. Send word count results to MainActor
  6. MainActor Wait for all WordCountActor Have successfully returned the message , Then merge the results

4.2 Step one : Get file list

Realize the idea

  1. Under the current project data There is... Under the folder : 1.txt, 2.txt Two text files , The specific storage contents are as follows :

    1.txt The storage contents of the text file are as follows :

    hadoop sqoop hadoop
    hadoop hadoop flume
    hadoop hadoop hadoop
    spark
    

    2.txt The storage contents of the text file are as follows :

    flink hadoop hive
    hadoop sqoop hadoop
    hadoop hadoop hadoop
    spark
    
  2. Get the path of the above two text files , And print the results to the console .

Reference code

object MainActor {
    

  def main(args: Array[String]): Unit = {
    
    //1.  Get the path of all files to be counted .
    //1.1  Defining variables dir,  Keep records of all documents :  Folder path . ./data/
    var dir = "./data/"
    //1.2  Get the folder ,  All file names .
    var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt")
    //1.3  Encapsulate the obtained file name ,  Get its full path . ./data/1.txt ./data/2.txt
    var fileDirList = fileNameList.map(dir + _)
    //println(fileDirList)
  }
}

4.3 Step two : establish WordCountActor

Realize the idea

  1. Create a corresponding number of... According to the number of files WordCountActor object .
  2. In order to facilitate the subsequent sending of messages to Actor, Each one Actor Associated with the file name

Implementation steps

  1. establish WordCountActor
  2. Convert the file list to WordCountActor
  3. For subsequent convenience, send a message to Actor, take Actor List and file list zipped together
  4. Print test

Reference code

  • WordCountActor.scala Code in file

    //2.1  First create WordCountActor class ,  Used to obtain WordCountActor object .
    // establish WordCountActor class ,  every last WordCountActor object ,  Count a file .
    class WordCountActor extends Actor {
          
      override def act(): Unit = {
           
      }
    }
    
    
  • MainActor.scala Code in file

    object MainActor {
          
    
      def main(args: Array[String]): Unit = {
          
        //1.  Get the path of all files to be counted .
        //1.1  Defining variables dir,  Keep records of all documents :  Folder path . ./data/
        var dir = "./data/"
        //1.2  Get the folder ,  All file names .
        var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt")
        //1.3  Encapsulate the obtained file name ,  Get its full path . ./data/1.txt ./data/2.txt
        var fileDirList = fileNameList.map(dir + _)
        //println(fileDirList)
    
        //2.  According to the number of documents ,  Create the corresponding WordCountActor object .
        //2.1  First create WordCountActor class ,  Used to obtain WordCountActor object .
        //2.2  According to the number of documents ,  Create the corresponding WordCountActor object .
        val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file ,  I created two wordCount object .
        //println(wordCountList)
        //2.3  take WordCountActor Associated with the full path of the file 
        val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt
        println(actorWithFile)
      }
    }
    

4.4 Step three : start-up Actor/ send out / Receive task messages

Realize the idea

Start all WordCountActor object , And send word statistics task message to each WordCountActor object .

Be careful : Here should be Send asynchronous return message

Implementation steps

  1. Create a WordCountTask Sample class message , Encapsulate the file name for word counting
  2. Start all WordCountActor, And send asynchronous return message
  3. Get all the WordCountActor Messages returned in ( Package into a Future In the list )
  4. stay WordCountActor Receive and print messages in

Reference code

  • MessagePackage.scala Code in file

    /** *  Express : MainActor  To every one. WordCountActor Sending tasks   Format . * @param fileName  Specific statistics   File path . */
    case class WordCountTask(fileName:String)
    
  • MainActor.scala Code in file

    object MainActor {
          
    
      def main(args: Array[String]): Unit = {
          
        //1.  Get the path of all files to be counted .
        //1.1  Defining variables dir,  Keep records of all documents :  Folder path . ./data/
        var dir = "./data/"
        //1.2  Get the folder ,  All file names .
        var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt")
        //1.3  Encapsulate the obtained file name ,  Get its full path . ./data/1.txt ./data/2.txt
        var fileDirList = fileNameList.map(dir + _)
        //println(fileDirList)
    
        //2.  According to the number of documents ,  Create the corresponding WordCountActor object .
        //2.1  First create WordCountActor class ,  Used to obtain WordCountActor object .
        //2.2  According to the number of documents ,  Create the corresponding WordCountActor object .
        val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file ,  I created two wordCount object .
        //println(wordCountList)
        //2.3  take WordCountActor Associated with the full path of the file 
        val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt
        println(actorWithFile)
    
        //3.  start-up WordCountActor,  And give everyone WordCountActor Send task .
        /* Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1) Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1) */
        val futureList: List[Future[Any]] = actorWithFile.map {
                 //futureList:  It records all WordCountActor The results of the statistics .
          keyVal => //keyVal The format of : WordCountActor -> ./data/1.txt
            //3.1  Get the specific to start WordCountActor object .
            val actor = keyVal._1 //actor: WordCountActor
            //3.2  Start specific WordCountActor.
            actor.start()
            
            //3.3  For each WordCountActor Send specific tasks ( File path )  Asynchronous return .
            val future: Future[Any] = actor !! WordCountTask(keyVal._2)
            future      // It records a certain WordCountActor Returned statistical results .
        }
      }
    }
    
  • WordCountActor.scala Code in file

    //2.1  First create WordCountActor class ,  Used to obtain WordCountActor object .
    // establish WordCountActor class ,  every last WordCountActor object ,  Count a file .
    class WordCountActor extends Actor {
          
      override def act(): Unit = {
           
          loop {
          
              react {
          
                //3.4  Receive specific tasks 
                case WordCountTask(fileName) =>
                  //3.5  Print specific tasks 
                  println(s" The specific tasks received are : ${
            fileName}")
              }
          }
      }
    }
    

4.5 Step four : Count the word count of the file

Realize the idea

Read file text , And count the number of words . for example :

(hadoop, 3), (spark, 1)...

Implementation steps

  1. Read file contents , And convert it into a list
  2. Cut the text according to the space , And convert it into words one by one
  3. To facilitate counting , Convert words to tuples
  4. Group by word , Then aggregate statistics
  5. Print aggregate statistics

Reference code

  • WordCountActor.scala Code in file

    class WordCountActor extends Actor {
          
      override def act(): Unit = {
          
        // use loop + react  Way to receive data .
        loop {
          
          react {
          
            //3.4  Receive specific tasks 
            case WordCountTask(fileName) =>
              //3.5  Print specific tasks 
              println(s" The specific tasks received are : ${
            fileName}")
    
            //4.  Count the number of each word in the received file .
            //4.1  Get all the files in the specified file . List("hadoop sqoop hadoop","hadoop hadoop flume")
            val lineList = Source.fromFile(fileName).getLines().toList
            //4.2  The data obtained above ,  Convert to a string one by one . 
            //List("hadoop", "sqoop", "hadoop","hadoop", "hadoop", "flume")
            val strList = lineList.flatMap(_.split(" "))
            //4.3  Add the number of times after each string ,  The default is 1. 
            //List("hadoop"->1, "sqoop"->1, "hadoop"->1, "hadoop"->1, "flume"->1)
            val wordAndCount = strList.map(_ -> 1)
            //4.4  according to   String content grouping . 
            //"hadoop" -> List("hadoop"->1, "hadoop"->1), "sqoop" -> List("sqoop"->1)
            val groupMap = wordAndCount.groupBy(_._1)
            //4.5  Count the contents after grouping ,  Count the total number of times of each word . "hadoop" -> 2, "sqoop" -> 1
            val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
            //4.6  Print the statistical results . 
            println(wordCountMap)
          }
        }
      }
    }
    

4.6 Step five : Return the result to MainActor

Realize the idea

  • Encapsulate the result of the word count into a sample class message , And send it to MainActor
  • MainActor Wait for all WordCountActor After all have been returned , Get each WordCountActor The result of word calculation

Implementation steps

  1. Define a sample class to encapsulate the word count results
  2. Send word count results to MainActor
  3. MainActor Detect all WordCountActor Whether all have returned , If all have been returned , Then get and convert the result
  4. Print the results

Reference code

  • MessagePackage.scala Code in file

    /** *  Express : MainActor  To every one. WordCountActor Sending tasks   Format . * @param fileName  Specific statistics   File path . */
    case class WordCountTask(fileName:String)
    
    
    /** *  Every WordCountActor After statistics, the returned results :  Format  * @param wordCountMap  Specific return results ,  for example : Map("hadoop"->6, "sqoop"->1) */
    case class WordCountResult(wordCountMap:Map[String, Int])
    
  • WordCountActor.scala Code in file

    class WordCountActor extends Actor {
          
      override def act(): Unit = {
          
        // use loop + react  Way to receive data .
        loop {
          
          react {
          
            //3.4  Receive specific tasks 
            case WordCountTask(fileName) =>
              //3.5  Print specific tasks 
              println(s" The specific tasks received are : ${
            fileName}")
    
            //4.  Count the number of each word in the received file .
            //4.1  Get all the files in the specified file . List("hadoop sqoop hadoop","hadoop hadoop flume")
            val lineList = Source.fromFile(fileName).getLines().toList
            //4.2  The data obtained above ,  Convert to a string one by one . List("hadoop", "sqoop", "hadoop","hadoop", "hadoop", "flume")
            val strList = lineList.flatMap(_.split(" "))
            //4.3  Add the number of times after each string ,  The default is 1. List("hadoop"->1, "sqoop"->1, "hadoop"->1,"hadoop"->1, "hadoop"->1, "flume"->1)
            val wordAndCount = strList.map(_ -> 1)
            //4.4  according to   String content grouping . "hadoop" -> List("hadoop"->1, "hadoop"->1), "sqoop" -> List("sqoop"->1)
            val groupMap = wordAndCount.groupBy(_._1)
            //4.5  Count the contents after grouping ,  Count the total number of times of each word . "hadoop" -> 2, "sqoop" -> 1
            val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
            //4.6  Return the statistical results to : MainActor.
            sender ! WordCountResult(wordCountMap)
          }
        }
      }
    }
    

4.7 Step six : The results merge

Realize the idea

Merge all received word counts .

Reference code

  • MainActor.scala Code in file

    object MainActor {
          
    
      def main(args: Array[String]): Unit = {
          
        //1.  Get the path of all files to be counted .
        //1.1  Defining variables dir,  Keep records of all documents :  Folder path . ./data/
        var dir = "./data/"
        //1.2  Get the folder ,  All file names .
        var fileNameList = new File(dir).list().toList //List("1.txt", "2.txt")
        //1.3  Encapsulate the obtained file name ,  Get its full path . ./data/1.txt ./data/2.txt
        var fileDirList = fileNameList.map(dir + _)
        //println(fileDirList)
    
        //2.  According to the number of documents ,  Create the corresponding WordCountActor object .
        //2.1  First create WordCountActor class ,  Used to obtain WordCountActor object .
        //2.2  According to the number of documents ,  Create the corresponding WordCountActor object .
        val wordCountList = fileNameList.map(_ => new WordCountActor) // According to two txt file ,  I created two wordCount object .
        //println(wordCountList)
        //2.3  take WordCountActor Associated with the full path of the file 
        val actorWithFile = wordCountList.zip(fileDirList) //WordCountActor -> ./data/1.txt , WordCountActor -> ./data/2.txt
        println(actorWithFile)
    
        //3.  start-up WordCountActor,  And give everyone WordCountActor Send task .
        /* Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1) Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1) */
        val futureList: List[Future[Any]] = actorWithFile.map {
                 //futureList:  It records all WordCountActor The results of the statistics .
          keyVal => //keyVal The format of : WordCountActor -> ./data/1.txt
            //3.1  Get the specific to start WordCountActor object .
            val actor = keyVal._1 //actor: WordCountActor
            //3.2  Start specific WordCountActor.
            actor.start()
    
            //3.3  For each WordCountActor Send specific tasks ( File path )  Asynchronous return .
            val future: Future[Any] = actor !! WordCountTask(keyVal._2)
            future      // It records a certain WordCountActor Returned statistical results .
        }
    
        //5. MainActor Merge the received data .
        //5.1  Judge all future All have return values ,  I'm going to keep going .
        //  Filter those without return value future  Not for 0 There are also future No value received 
        while(futureList.filter(!_.isSet).size != 0) {
          } //futureList: future1, future2
        //5.2  From every one future Get data in .
        //wordCountMap: List(Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1), Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1))
        val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap)
        //5.3  The obtained data is analyzed flatten, groupBy, map,  Then count .
        val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
        //5.4  Print the results 
        println(result)
      }
    }
    

start()

      //3.3  For each WordCountActor Send specific tasks ( File path )   Asynchronous return .
      val future: Future[Any] = actor !! WordCountTask(keyVal._2)
      future      // It records a certain WordCountActor Returned statistical results .
  }

  //5. MainActor Merge the received data .
  //5.1  Judge all future All have return values ,  I'm going to keep going .
  //        Filter those without return value future          Not for 0 There are also future No value received 
  while(futureList.filter(!_.isSet).size != 0) {} //futureList:  future1, future2
  //5.2  From every one future Get data in .
  //wordCountMap:  List(Map(spark -> 1, hadoop -> 7, sqoop -> 1, flume -> 1), Map(sqoop -> 1, flink -> 1, hadoop -> 6, spark -> 1, hive -> 1))
  val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap)
  //5.3  The obtained data is analyzed flatten, groupBy, map,  Then count .
  val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
  //5.4  Print the results 
  println(result)
}

}


原网站

版权声明
本文为[Empty.]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/185/202207041654425696.html