当前位置:网站首页>[rust notes] 17 concurrent (Part 1)

[rust notes] 17 concurrent (Part 1)

2022-07-05 06:07:00 phial03

17 - Concurrent

  • The idea of system development and writing concurrent code :
    • A background thread (background thread) Just one thing , Wake up periodically to perform tasks .
    • Universal thread pool (worker pool) Communicate with clients through queues .
    • The Conduit (pipeline) Import data from one thread to another , Each thread only does a small part of the work .
    • Data parallelism (data parallelism) Suppose that the whole computer is mainly used for a large-scale calculation as the main task , The main task is divided into n Small tasks , stay n Execution on threads , I hope all n The cores of the machines work at the same time .
    • Synchronization object sea (sea of synchronized object) Multiple threads in have the same data permission , The temporary locking scheme using low-level primitives such as mutexes avoids contention .
    • Atomic integer operations (atomic integer operation) Allow multiple cores to pass information through fields that are the size of a machine word , Then realize communication .
  • Use Rust Thread 3 Ways of planting :
    • Parallel bifurcation — Merge ;
    • passageway ;
    • Share modifiable state .

17.1 - Parallel crossover - Merge

  • Threads can be used to perform several completely unrelated tasks at the same time .

  • Write a single threaded program :

    fn process_files(filenames: Vec<String>) -> io::Result<()> {
          
        for document in filenames {
          
            let text = load(&document)?;  //  Read source file 
            let results = process(text);  //  Calculate the statistics 
            save(&document, results)?;    //  Write output file 
        }
        Ok(())
    }
    
  • Use “ Parallel bifurcation — Merge ” Pattern , Multithreaded task execution can be realized .

    • The implementation idea is to divide data resources into multiple blocks , Then process each piece of data on separate threads .
    • Bifurcation (fork): Start a new thread .
    • Merge (join): Wait for the thread to complete .
    • Work unit isolation is required .

17.1.1 - Generation and merger

  • Use std::thread::spawn Function can generate a new thread :

    spawn(|| {
          
        println!("hello from a child thread");
    })
    
    • Receive a parameter , namely FnOnce Closures or functions .
    • The resulting new thread is a real operating system thread , I have my own stack .
  • Use spawn Realize the previous process_files The parallel version of the function :

    use std::thead::spawn;
    
    fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
          
        //  Divide the work into several pieces 
        const NTHREADS: usize = 8;
        let worklists = split_vec_into_chunks(filenames, NTHREADS);
    
        //  Bifurcation : Each block generates a thread to process 
        let mut thread_handles = vec![];
        for worklist in worklists {
          
            thread_handles.push(spawn(move || process_files(worklist)));
        }
    
        //  Merge : Wait for all threads to finish 
        for handle in thread_handles {
          
            handle.join.unwrap()?;
        }
        Ok(())
    }
    
  • The process of converting a file name list into a worker thread :

    • In the parent thread , adopt for Loop definition and transfer worklist;
    • establish move Closure time ,wordlist Is transferred to the closure ;
    • then spawn Will close ( as well as wordlist vector ) Transfer to the new sub thread .

17.1.2 - Cross thread error handling

  • handle.join()
    

    Method :

    • Return to one std::thread::Result, If the child thread is surprised, it is an error .
    • This method passes the value returned by the child thread to the parent thread .
  • Surprise does not automatically propagate from one thread to other threads that depend on it .

  • The surprise of one thread will be reflected in other threads as containing errors Result.

  • handle.join().unwrap().unwrap() Perform the assertion operation , Realize the spread of surprise . If the child thread is surprised , So it will return Ok result . Then the parent thread of its call will also be surprised . It is equivalent to explicitly propagating surprise from the child thread to the parent thread .

17.1.3 - Sharing immutable data across threads

  • When passing a reference to a function , If a thread triggers IO error , It may cause the calling function to exit before other threads complete . Then other sub threads may continue to use the passed parameters after the main thread is released , This will cause data contention .

  • stay Rust This is not allowed in . As long as any thread owns Arc<GigabyteMap>, The mapping will not be released , Even if the parent thread has exited . because Arc The data in cannot be modified , There will be no data contention .

    use std::sync::Arc;
    
    fn process_files_in_parallel(filenames: Vec<String>, glossary: Arc<GigabyteMap>)
        -> io::Result<()>
    {
          
        ...
        for worklist in worklists {
          
            //  call .clone(), clone Arc And trigger the reference count . No cloning GigabyteMap
            let glossary_for_child = glossary.clone();
            thread_handles.push(
                spawn(move || process_files(worklist, &glossary_for_child))
            );
        }
        ...
    }
    

17.1.4-Rayon

  • Rayon library : Specially for “ Parallel bifurcation — Merge ” Pattern design , There are two ways to run concurrent tasks :

    extern crate rayon;
    use rayon::prelude::*;
    
    //  Implement two tasks in parallel 
    let (v1, v2) = rayon::join(fn1, fn2);
    
    //  Parallel implementation N A mission 
    giant_vector.par_iter().for_each(|value| {
            //  Create a ParallelIterator, Similar to iterators .
        do_thing_with_value(value);
    });
    
  • Use Rayon rewrite process_files_in_parallel

    extern crate rayon;
    
    use rayon::prelude::*;
    
    fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)
        -> io::Result<()>
    {
          
        filenames.par_iter()    //  Create parallel iterators 
            //  Call filename , Get one ParallelIterator
            .map(|filename| process_file(filename, glossary))
    
            //  Combine the results , Return to one Option, Only filename Empty is None
            //  Any parallel processing that occurs in the background , Can guarantee reduce_with Only when you return .
            .reduce_with(|r1, r2| {
              
                if r1.is_err() {
          
                    r1
                } else {
          
                    r2
                }
            })
            .unwrap_or(Ok(()))          //  Let the result Ok(())
    }
    
  • Rayon It also supports sharing references between threads .

  • To use Rayon You need to add the following code :

    • main.rs in :

      extern crate rayon;
      use rayon::prelude::*;
      
    • Cargo.toml in :

      [dependencies]
      rayon = "0.4"
      

17.2 - passageway

  • passageway (channel): A one-way pipe that sends values from one thread to another , It is essentially a thread safe queue .

    • Unix The pipeline sends byte data ;
    • Rust The channel sends a value . yes std::sync::mps Part of the module .
  • sender.send(item)
    

    Put a value into the channel ,

    receiver.recv()
    

    Then remove a value .

    • Ownership of the value is transferred from the sending thread to the receiving thread .
    • If the channel is empty , that receiver.recv() It will block until a value is sent .
  • Rust The passage is faster than the pipe :

    • Piping is a way to provide flexibility 、 complexity 、 But the feature of non concurrency .
    • Use channel , Threads can communicate by passing values , There is no need to use locks or shared memory .
    • Sending values is a transfer, not a copy , And the transferred value is not limited to the size of the data .

17.2.1 - Send value

  • Inverted index (inverted index): A database , You can query which keywords have appeared where . It is one of the keys to realize search engine .

  • The code that starts the file reading thread :

    use std::fs::File;
    use std::io::prelude::*;  //  Use Read::read_to_string
    use std::thread::spawn;
    use std::sync::mpsc::channel;
    
    let (sender, receiver) = channel(); //  Queue data structure , Returns a pair of values : Sender and receiver .
    
    let handle = spawn(  //  Start thread std::thread::spawn.
        //  The ownership of the sender will pass move The closure return is transferred to the new thread .
        move || {
             // Rust Perform type inference , Determine the type of channel 
            for filename in documents {
          
                let mut f = File::open(filename)?; //  Read files from disk 
                let mut text = String::new();
                f.read_to_string(&mut text)?;
    
                if sender.send(text).is_err() {
            //  After reading the file , Put the text content text Send to channel 
                    break;
                }
            }
            Ok(())   //  After the thread reads all documents , Program return Ok(())
        }
    );
    

17.2.2 - Receives the value

  • Create a second thread loop call receiver.recv().

    // while Cycle to achieve 
    while let Ok(text) = receiver.recv() {
          
        do_something_with(text);
    }
    
    // for Cycle to achieve 
    for text in receiver {
          
        do_something_with(text);
    }
    
  • Receiver thread example :

    fn start_file_indexing_thread(texts: Receiver<>)
        -> (Receiver<InMemoryIndex>, JoinHandle<()>)
    {
          
        let (sender, receiver) = channel();
        let handle = spawn(
            move || {
          
                for (doc_id, text) in texts.into_iter().enumerate() {
          
                    let index = InMemoryIndex::from_single_document(doc_id, text);
                    if sender.send(index).is_err() {
          
                        break;
                    }
                }
            }
        );
        (receiver, handle)
    }
    
  • Reception time , If the thread happens IO error , I will quit immediately , The error will be stored in the thread JoinHandle in . Package combination recipient 、 Returners and new threads JoinHandle The code is as follows :

    fn start_file_reader_thread(documents: Vec<PathBuf>)
        -> (Receiver<Strig>, JoinHandle<io::Result<()>>)
    {
          
        let (sender, receiver) = channel();
    
        let handle = spawn(
            move || {
           ... }
        );
    
        (receiver, handle)
    }
    

17.2.3 - Run the pipeline

  • Merge indexes in memory , Until big enough :

    fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
        -> (Receiver<InMemoryIndex>, JoinHandle<()>)
    
  • Write the large index to disk :

    fn start_index_write_thread(big_indexes: Receiver<InMemoryIndex>, output_dir: &Path)
        -> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)
    
  • If there are multiple large files , Then use the file based merge algorithm , Combine them :

    fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
        -> io::Result<()>
    
  • Start thread , And check for errors :

    fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
        -> io::Result<()>
    {
          
        //  Start all of the pipes 5 Stages 
        let (texts,   h1) = start_file_reader_thread(documents);
        let (pints,   h2) = start_file_indexing_thread(texts);
        let (gallons, h3) = start_in_memory_merge_thread(pints);
        let (files,   h4) = start_index_writer_thread(gallons, &output_dir);
        let result = merge_index_files(files, &output_dir);
    
        //  Wait for the thread to complete , Save any errors 
        let r1 = h1.join().unwrap();
        h2.join().unwrap();
        h3.join().unwrap();
        let r4 = h4.join().unwrap();
    
        //  Return the first error encountered 
        // h2 and h3 Will not fail , Because they are pure memory data processing 
        r1?;
        r4?;
        result
    }
    
  • Pipeline realizes pipeline operation , The overall performance is limited by the generation capacity of the slowest stage .

17.2.4 - Channel features and performance

  • std::sync::mps Special type :mpsc(multi-producer, single-consumer) It is multi producer , Single consumer .

  • Sender<T> Realized Clone Special type : To create a regular channel , Then clone the sender . You can put each Sender Values are transferred to different threads .

  • Receiver<T> Unable to clone , If you need multiple threads to receive values from the same channel , You need to use Mutex.

  • Backpressure (backpressure): If the speed of sending values exceeds the speed of receiving and processing values , This will cause the values inside the channel to accumulate more .

  • Synchronous channel (synchronous channel):Unix Each pipe of has a fixed size , If a process tries to write data to a pipeline that may be full at any time , The system will directly block the process , Until there is space in the pipe .

    use std::sync::mpsc::sync_channel;
    let (sender, receiver) = sync_channel(1000);
    
    • The synchronous channel is the same as the conventional channel , Just specify how many values it can save when creating .
    • sender.send(value) Is a potential blocking operation .

17.2.5 - Thread safety

  • std::marker::Send Special type : Realization Send The type of , You can safely pass the value to another thread , That is to realize the transfer of values between threads .
  • std::marker::Sync Special type : Realization Sync The type of , It is safe to pass an unmodifiable reference to another thread , That is, they can share values between threads .
  • Rust Thread safety is achieved through the above features : No data contention and other undefined behaviors .
  • If the fields of structure or enumeration support the above features , Then they are also supported .
  • Rust The above features will be automatically implemented for custom types , No need to pass #[derive] The derived .
  • A few did not realize Send and Sync The type of is mainly used to provide modifiable capability under non thread safe conditions . For example, reference count pointer type std::rc::Rc<T>.Rust It is required to pass spawn When creating a thread , The incoming closure must be Send.

17.2.6 - Connect all iterators to the channel

The following implementation applies to all iterators .

use std::thread::spawn;

impl<T> OffThreadExt for T where T: Iterator + Send + 'static, T::Item: Send + 'static {
    
    fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
    
        //  Create a channel and pass items from the worker thread 
        let (sender, receiver) = mpsc::sync_channel(1024);
        
        //  Move this iterator to a new thread and run it there 
        spawn(
            move || {
    
                for item in self {
    
                    if sender.send(item).is_err() {
    
                        break;
                    }
                }
            }
        );
        
        //  Returns an iterator that extracts values from the channel 
        receiver.into_iter()
    }
}

See 《Rust Programming 》( Jim - Brandy 、 Jason, - By orendov , Translated by lisongfeng ) Chapter 19
Original address

原网站

版权声明
本文为[phial03]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/186/202207050549561201.html