当前位置:网站首页>[rust notes] 17 concurrent (Part 1)
[rust notes] 17 concurrent (Part 1)
2022-07-05 06:07:00 【phial03】
17 - Concurrent
- The idea of system development and writing concurrent code :
- A background thread (background thread) Just one thing , Wake up periodically to perform tasks .
- Universal thread pool (worker pool) Communicate with clients through queues .
- The Conduit (pipeline) Import data from one thread to another , Each thread only does a small part of the work .
- Data parallelism (data parallelism) Suppose that the whole computer is mainly used for a large-scale calculation as the main task , The main task is divided into n Small tasks , stay n Execution on threads , I hope all n The cores of the machines work at the same time .
- Synchronization object sea (sea of synchronized object) Multiple threads in have the same data permission , The temporary locking scheme using low-level primitives such as mutexes avoids contention .
- Atomic integer operations (atomic integer operation) Allow multiple cores to pass information through fields that are the size of a machine word , Then realize communication .
- Use Rust Thread 3 Ways of planting :
- Parallel bifurcation — Merge ;
- passageway ;
- Share modifiable state .
17.1 - Parallel crossover - Merge
Threads can be used to perform several completely unrelated tasks at the same time .
Write a single threaded program :
fn process_files(filenames: Vec<String>) -> io::Result<()> { for document in filenames { let text = load(&document)?; // Read source file let results = process(text); // Calculate the statistics save(&document, results)?; // Write output file } Ok(()) }Use “ Parallel bifurcation — Merge ” Pattern , Multithreaded task execution can be realized .
- The implementation idea is to divide data resources into multiple blocks , Then process each piece of data on separate threads .
- Bifurcation (fork): Start a new thread .
- Merge (join): Wait for the thread to complete .
- Work unit isolation is required .
17.1.1 - Generation and merger
Use
std::thread::spawnFunction can generate a new thread :spawn(|| { println!("hello from a child thread"); })- Receive a parameter , namely
FnOnceClosures or functions . - The resulting new thread is a real operating system thread , I have my own stack .
- Receive a parameter , namely
Use
spawnRealize the previousprocess_filesThe parallel version of the function :use std::thead::spawn; fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> { // Divide the work into several pieces const NTHREADS: usize = 8; let worklists = split_vec_into_chunks(filenames, NTHREADS); // Bifurcation : Each block generates a thread to process let mut thread_handles = vec![]; for worklist in worklists { thread_handles.push(spawn(move || process_files(worklist))); } // Merge : Wait for all threads to finish for handle in thread_handles { handle.join.unwrap()?; } Ok(()) }The process of converting a file name list into a worker thread :
- In the parent thread , adopt
forLoop definition and transferworklist; - establish
moveClosure time ,wordlistIs transferred to the closure ; - then
spawnWill close ( as well aswordlistvector ) Transfer to the new sub thread .
- In the parent thread , adopt
17.1.2 - Cross thread error handling
handle.join()Method :
- Return to one
std::thread::Result, If the child thread is surprised, it is an error . - This method passes the value returned by the child thread to the parent thread .
- Return to one
Surprise does not automatically propagate from one thread to other threads that depend on it .
The surprise of one thread will be reflected in other threads as containing errors
Result.handle.join().unwrap():.unwrap()Perform the assertion operation , Realize the spread of surprise . If the child thread is surprised , So it will returnOkresult . Then the parent thread of its call will also be surprised . It is equivalent to explicitly propagating surprise from the child thread to the parent thread .
17.1.3 - Sharing immutable data across threads
When passing a reference to a function , If a thread triggers IO error , It may cause the calling function to exit before other threads complete . Then other sub threads may continue to use the passed parameters after the main thread is released , This will cause data contention .
stay Rust This is not allowed in . As long as any thread owns
Arc<GigabyteMap>, The mapping will not be released , Even if the parent thread has exited . becauseArcThe data in cannot be modified , There will be no data contention .use std::sync::Arc; fn process_files_in_parallel(filenames: Vec<String>, glossary: Arc<GigabyteMap>) -> io::Result<()> { ... for worklist in worklists { // call .clone(), clone Arc And trigger the reference count . No cloning GigabyteMap let glossary_for_child = glossary.clone(); thread_handles.push( spawn(move || process_files(worklist, &glossary_for_child)) ); } ... }
17.1.4-Rayon
Rayonlibrary : Specially for “ Parallel bifurcation — Merge ” Pattern design , There are two ways to run concurrent tasks :extern crate rayon; use rayon::prelude::*; // Implement two tasks in parallel let (v1, v2) = rayon::join(fn1, fn2); // Parallel implementation N A mission giant_vector.par_iter().for_each(|value| { // Create a ParallelIterator, Similar to iterators . do_thing_with_value(value); });Use
Rayonrewriteprocess_files_in_parallel:extern crate rayon; use rayon::prelude::*; fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> { filenames.par_iter() // Create parallel iterators // Call filename , Get one ParallelIterator .map(|filename| process_file(filename, glossary)) // Combine the results , Return to one Option, Only filename Empty is None // Any parallel processing that occurs in the background , Can guarantee reduce_with Only when you return . .reduce_with(|r1, r2| { if r1.is_err() { r1 } else { r2 } }) .unwrap_or(Ok(())) // Let the result Ok(()) }RayonIt also supports sharing references between threads .To use
RayonYou need to add the following code :main.rsin :extern crate rayon; use rayon::prelude::*;Cargo.tomlin :[dependencies] rayon = "0.4"
17.2 - passageway
passageway (channel): A one-way pipe that sends values from one thread to another , It is essentially a thread safe queue .
- Unix The pipeline sends byte data ;
- Rust The channel sends a value . yes
std::sync::mpsPart of the module .
sender.send(item)Put a value into the channel ,
receiver.recv()Then remove a value .
- Ownership of the value is transferred from the sending thread to the receiving thread .
- If the channel is empty , that
receiver.recv()It will block until a value is sent .
Rust The passage is faster than the pipe :
- Piping is a way to provide flexibility 、 complexity 、 But the feature of non concurrency .
- Use channel , Threads can communicate by passing values , There is no need to use locks or shared memory .
- Sending values is a transfer, not a copy , And the transferred value is not limited to the size of the data .
17.2.1 - Send value
Inverted index (inverted index): A database , You can query which keywords have appeared where . It is one of the keys to realize search engine .
The code that starts the file reading thread :
use std::fs::File; use std::io::prelude::*; // Use Read::read_to_string use std::thread::spawn; use std::sync::mpsc::channel; let (sender, receiver) = channel(); // Queue data structure , Returns a pair of values : Sender and receiver . let handle = spawn( // Start thread std::thread::spawn. // The ownership of the sender will pass move The closure return is transferred to the new thread . move || { // Rust Perform type inference , Determine the type of channel for filename in documents { let mut f = File::open(filename)?; // Read files from disk let mut text = String::new(); f.read_to_string(&mut text)?; if sender.send(text).is_err() { // After reading the file , Put the text content text Send to channel break; } } Ok(()) // After the thread reads all documents , Program return Ok(()) } );
17.2.2 - Receives the value
Create a second thread loop call
receiver.recv().// while Cycle to achieve while let Ok(text) = receiver.recv() { do_something_with(text); } // for Cycle to achieve for text in receiver { do_something_with(text); }Receiver thread example :
fn start_file_indexing_thread(texts: Receiver<>) -> (Receiver<InMemoryIndex>, JoinHandle<()>) { let (sender, receiver) = channel(); let handle = spawn( move || { for (doc_id, text) in texts.into_iter().enumerate() { let index = InMemoryIndex::from_single_document(doc_id, text); if sender.send(index).is_err() { break; } } } ); (receiver, handle) }Reception time , If the thread happens IO error , I will quit immediately , The error will be stored in the thread
JoinHandlein . Package combination recipient 、 Returners and new threadsJoinHandleThe code is as follows :fn start_file_reader_thread(documents: Vec<PathBuf>) -> (Receiver<Strig>, JoinHandle<io::Result<()>>) { let (sender, receiver) = channel(); let handle = spawn( move || { ... } ); (receiver, handle) }
17.2.3 - Run the pipeline
Merge indexes in memory , Until big enough :
fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>) -> (Receiver<InMemoryIndex>, JoinHandle<()>)Write the large index to disk :
fn start_index_write_thread(big_indexes: Receiver<InMemoryIndex>, output_dir: &Path) -> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)If there are multiple large files , Then use the file based merge algorithm , Combine them :
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path) -> io::Result<()>Start thread , And check for errors :
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf) -> io::Result<()> { // Start all of the pipes 5 Stages let (texts, h1) = start_file_reader_thread(documents); let (pints, h2) = start_file_indexing_thread(texts); let (gallons, h3) = start_in_memory_merge_thread(pints); let (files, h4) = start_index_writer_thread(gallons, &output_dir); let result = merge_index_files(files, &output_dir); // Wait for the thread to complete , Save any errors let r1 = h1.join().unwrap(); h2.join().unwrap(); h3.join().unwrap(); let r4 = h4.join().unwrap(); // Return the first error encountered // h2 and h3 Will not fail , Because they are pure memory data processing r1?; r4?; result }Pipeline realizes pipeline operation , The overall performance is limited by the generation capacity of the slowest stage .
17.2.4 - Channel features and performance
std::sync::mpsSpecial type :mpsc(multi-producer, single-consumer) It is multi producer , Single consumer .Sender<T>RealizedCloneSpecial type : To create a regular channel , Then clone the sender . You can put eachSenderValues are transferred to different threads .Receiver<T>Unable to clone , If you need multiple threads to receive values from the same channel , You need to useMutex.Backpressure (backpressure): If the speed of sending values exceeds the speed of receiving and processing values , This will cause the values inside the channel to accumulate more .
Synchronous channel (synchronous channel):Unix Each pipe of has a fixed size , If a process tries to write data to a pipeline that may be full at any time , The system will directly block the process , Until there is space in the pipe .
use std::sync::mpsc::sync_channel; let (sender, receiver) = sync_channel(1000);- The synchronous channel is the same as the conventional channel , Just specify how many values it can save when creating .
sender.send(value)Is a potential blocking operation .
17.2.5 - Thread safety
std::marker::SendSpecial type : RealizationSendThe type of , You can safely pass the value to another thread , That is to realize the transfer of values between threads .std::marker::SyncSpecial type : RealizationSyncThe type of , It is safe to pass an unmodifiable reference to another thread , That is, they can share values between threads .- Rust Thread safety is achieved through the above features : No data contention and other undefined behaviors .
- If the fields of structure or enumeration support the above features , Then they are also supported .
- Rust The above features will be automatically implemented for custom types , No need to pass
#[derive]The derived . - A few did not realize
SendandSyncThe type of is mainly used to provide modifiable capability under non thread safe conditions . For example, reference count pointer typestd::rc::Rc<T>.Rust It is required to passspawnWhen creating a thread , The incoming closure must beSend.
17.2.6 - Connect all iterators to the channel
The following implementation applies to all iterators .
use std::thread::spawn;
impl<T> OffThreadExt for T where T: Iterator + Send + 'static, T::Item: Send + 'static {
fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
// Create a channel and pass items from the worker thread
let (sender, receiver) = mpsc::sync_channel(1024);
// Move this iterator to a new thread and run it there
spawn(
move || {
for item in self {
if sender.send(item).is_err() {
break;
}
}
}
);
// Returns an iterator that extracts values from the channel
receiver.into_iter()
}
}
See 《Rust Programming 》( Jim - Brandy 、 Jason, - By orendov , Translated by lisongfeng ) Chapter 19
Original address
边栏推荐
- Error ora-28547 or ora-03135 when Navicat connects to Oracle Database
- SQLMAP使用教程(二)实战技巧一
- 2020ccpc Qinhuangdao J - Kingdom's power
- 927. 三等分 模拟
- The sum of the unique elements of the daily question
- 多屏电脑截屏会把多屏连着截下来,而不是只截当前屏
- CF1634 F. Fibonacci Additions
- PC register
- LeetCode 0107.二叉树的层序遍历II - 另一种方法
- QT判断界面当前点击的按钮和当前鼠标坐标
猜你喜欢

LVS简介【暂未完成(半成品)】
![[practical skills] technical management of managers with non-technical background](/img/4d/1081c71df6ee2087359111baf7498a.png)
[practical skills] technical management of managers with non-technical background

【Jailhouse 文章】Jailhouse Hypervisor

Full Permutation Code (recursive writing)

实时时钟 (RTC)
![[practical skills] how to do a good job in technical training?](/img/a3/7a1564cd9eb564abfd716fef08a9e7.jpg)
[practical skills] how to do a good job in technical training?

Educational Codeforces Round 116 (Rated for Div. 2) E. Arena

Brief introduction to tcp/ip protocol stack

CCPC Weihai 2021m eight hundred and ten thousand nine hundred and seventy-five

Some common problems in the assessment of network engineers: WLAN, BGP, switch
随机推荐
[rust notes] 14 set (Part 2)
PC register
927. Trisection simulation
One question per day 2047 Number of valid words in the sentence
QQ computer version cancels escape character input expression
QT判断界面当前点击的按钮和当前鼠标坐标
2022 极术通讯-Arm 虚拟硬件加速物联网软件开发
Kubedm series-00-overview
Fried chicken nuggets and fifa22
【Rust 笔记】16-输入与输出(下)
Navicat連接Oracle數據庫報錯ORA-28547或ORA-03135
[practical skills] technical management of managers with non-technical background
Typical use cases for knapsacks, queues, and stacks
shared_ Repeated release heap object of PTR hidden danger
CF1634E Fair Share
leetcode-22:括号生成
传统数据库逐渐“难适应”,云原生数据库脱颖而出
redis发布订阅命令行实现
MIT-6874-Deep Learning in the Life Sciences Week 7
Bit mask of bit operation