当前位置:网站首页>[rust notes] 17 concurrent (Part 1)
[rust notes] 17 concurrent (Part 1)
2022-07-05 06:07:00 【phial03】
17 - Concurrent
- The idea of system development and writing concurrent code :
- A background thread (background thread) Just one thing , Wake up periodically to perform tasks .
- Universal thread pool (worker pool) Communicate with clients through queues .
- The Conduit (pipeline) Import data from one thread to another , Each thread only does a small part of the work .
- Data parallelism (data parallelism) Suppose that the whole computer is mainly used for a large-scale calculation as the main task , The main task is divided into n Small tasks , stay n Execution on threads , I hope all n The cores of the machines work at the same time .
- Synchronization object sea (sea of synchronized object) Multiple threads in have the same data permission , The temporary locking scheme using low-level primitives such as mutexes avoids contention .
- Atomic integer operations (atomic integer operation) Allow multiple cores to pass information through fields that are the size of a machine word , Then realize communication .
- Use Rust Thread 3 Ways of planting :
- Parallel bifurcation — Merge ;
- passageway ;
- Share modifiable state .
17.1 - Parallel crossover - Merge
Threads can be used to perform several completely unrelated tasks at the same time .
Write a single threaded program :
fn process_files(filenames: Vec<String>) -> io::Result<()> { for document in filenames { let text = load(&document)?; // Read source file let results = process(text); // Calculate the statistics save(&document, results)?; // Write output file } Ok(()) }Use “ Parallel bifurcation — Merge ” Pattern , Multithreaded task execution can be realized .
- The implementation idea is to divide data resources into multiple blocks , Then process each piece of data on separate threads .
- Bifurcation (fork): Start a new thread .
- Merge (join): Wait for the thread to complete .
- Work unit isolation is required .
17.1.1 - Generation and merger
Use
std::thread::spawnFunction can generate a new thread :spawn(|| { println!("hello from a child thread"); })- Receive a parameter , namely
FnOnceClosures or functions . - The resulting new thread is a real operating system thread , I have my own stack .
- Receive a parameter , namely
Use
spawnRealize the previousprocess_filesThe parallel version of the function :use std::thead::spawn; fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> { // Divide the work into several pieces const NTHREADS: usize = 8; let worklists = split_vec_into_chunks(filenames, NTHREADS); // Bifurcation : Each block generates a thread to process let mut thread_handles = vec![]; for worklist in worklists { thread_handles.push(spawn(move || process_files(worklist))); } // Merge : Wait for all threads to finish for handle in thread_handles { handle.join.unwrap()?; } Ok(()) }The process of converting a file name list into a worker thread :
- In the parent thread , adopt
forLoop definition and transferworklist; - establish
moveClosure time ,wordlistIs transferred to the closure ; - then
spawnWill close ( as well aswordlistvector ) Transfer to the new sub thread .
- In the parent thread , adopt
17.1.2 - Cross thread error handling
handle.join()Method :
- Return to one
std::thread::Result, If the child thread is surprised, it is an error . - This method passes the value returned by the child thread to the parent thread .
- Return to one
Surprise does not automatically propagate from one thread to other threads that depend on it .
The surprise of one thread will be reflected in other threads as containing errors
Result.handle.join().unwrap():.unwrap()Perform the assertion operation , Realize the spread of surprise . If the child thread is surprised , So it will returnOkresult . Then the parent thread of its call will also be surprised . It is equivalent to explicitly propagating surprise from the child thread to the parent thread .
17.1.3 - Sharing immutable data across threads
When passing a reference to a function , If a thread triggers IO error , It may cause the calling function to exit before other threads complete . Then other sub threads may continue to use the passed parameters after the main thread is released , This will cause data contention .
stay Rust This is not allowed in . As long as any thread owns
Arc<GigabyteMap>, The mapping will not be released , Even if the parent thread has exited . becauseArcThe data in cannot be modified , There will be no data contention .use std::sync::Arc; fn process_files_in_parallel(filenames: Vec<String>, glossary: Arc<GigabyteMap>) -> io::Result<()> { ... for worklist in worklists { // call .clone(), clone Arc And trigger the reference count . No cloning GigabyteMap let glossary_for_child = glossary.clone(); thread_handles.push( spawn(move || process_files(worklist, &glossary_for_child)) ); } ... }
17.1.4-Rayon
Rayonlibrary : Specially for “ Parallel bifurcation — Merge ” Pattern design , There are two ways to run concurrent tasks :extern crate rayon; use rayon::prelude::*; // Implement two tasks in parallel let (v1, v2) = rayon::join(fn1, fn2); // Parallel implementation N A mission giant_vector.par_iter().for_each(|value| { // Create a ParallelIterator, Similar to iterators . do_thing_with_value(value); });Use
Rayonrewriteprocess_files_in_parallel:extern crate rayon; use rayon::prelude::*; fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> { filenames.par_iter() // Create parallel iterators // Call filename , Get one ParallelIterator .map(|filename| process_file(filename, glossary)) // Combine the results , Return to one Option, Only filename Empty is None // Any parallel processing that occurs in the background , Can guarantee reduce_with Only when you return . .reduce_with(|r1, r2| { if r1.is_err() { r1 } else { r2 } }) .unwrap_or(Ok(())) // Let the result Ok(()) }RayonIt also supports sharing references between threads .To use
RayonYou need to add the following code :main.rsin :extern crate rayon; use rayon::prelude::*;Cargo.tomlin :[dependencies] rayon = "0.4"
17.2 - passageway
passageway (channel): A one-way pipe that sends values from one thread to another , It is essentially a thread safe queue .
- Unix The pipeline sends byte data ;
- Rust The channel sends a value . yes
std::sync::mpsPart of the module .
sender.send(item)Put a value into the channel ,
receiver.recv()Then remove a value .
- Ownership of the value is transferred from the sending thread to the receiving thread .
- If the channel is empty , that
receiver.recv()It will block until a value is sent .
Rust The passage is faster than the pipe :
- Piping is a way to provide flexibility 、 complexity 、 But the feature of non concurrency .
- Use channel , Threads can communicate by passing values , There is no need to use locks or shared memory .
- Sending values is a transfer, not a copy , And the transferred value is not limited to the size of the data .
17.2.1 - Send value
Inverted index (inverted index): A database , You can query which keywords have appeared where . It is one of the keys to realize search engine .
The code that starts the file reading thread :
use std::fs::File; use std::io::prelude::*; // Use Read::read_to_string use std::thread::spawn; use std::sync::mpsc::channel; let (sender, receiver) = channel(); // Queue data structure , Returns a pair of values : Sender and receiver . let handle = spawn( // Start thread std::thread::spawn. // The ownership of the sender will pass move The closure return is transferred to the new thread . move || { // Rust Perform type inference , Determine the type of channel for filename in documents { let mut f = File::open(filename)?; // Read files from disk let mut text = String::new(); f.read_to_string(&mut text)?; if sender.send(text).is_err() { // After reading the file , Put the text content text Send to channel break; } } Ok(()) // After the thread reads all documents , Program return Ok(()) } );
17.2.2 - Receives the value
Create a second thread loop call
receiver.recv().// while Cycle to achieve while let Ok(text) = receiver.recv() { do_something_with(text); } // for Cycle to achieve for text in receiver { do_something_with(text); }Receiver thread example :
fn start_file_indexing_thread(texts: Receiver<>) -> (Receiver<InMemoryIndex>, JoinHandle<()>) { let (sender, receiver) = channel(); let handle = spawn( move || { for (doc_id, text) in texts.into_iter().enumerate() { let index = InMemoryIndex::from_single_document(doc_id, text); if sender.send(index).is_err() { break; } } } ); (receiver, handle) }Reception time , If the thread happens IO error , I will quit immediately , The error will be stored in the thread
JoinHandlein . Package combination recipient 、 Returners and new threadsJoinHandleThe code is as follows :fn start_file_reader_thread(documents: Vec<PathBuf>) -> (Receiver<Strig>, JoinHandle<io::Result<()>>) { let (sender, receiver) = channel(); let handle = spawn( move || { ... } ); (receiver, handle) }
17.2.3 - Run the pipeline
Merge indexes in memory , Until big enough :
fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>) -> (Receiver<InMemoryIndex>, JoinHandle<()>)Write the large index to disk :
fn start_index_write_thread(big_indexes: Receiver<InMemoryIndex>, output_dir: &Path) -> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)If there are multiple large files , Then use the file based merge algorithm , Combine them :
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path) -> io::Result<()>Start thread , And check for errors :
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf) -> io::Result<()> { // Start all of the pipes 5 Stages let (texts, h1) = start_file_reader_thread(documents); let (pints, h2) = start_file_indexing_thread(texts); let (gallons, h3) = start_in_memory_merge_thread(pints); let (files, h4) = start_index_writer_thread(gallons, &output_dir); let result = merge_index_files(files, &output_dir); // Wait for the thread to complete , Save any errors let r1 = h1.join().unwrap(); h2.join().unwrap(); h3.join().unwrap(); let r4 = h4.join().unwrap(); // Return the first error encountered // h2 and h3 Will not fail , Because they are pure memory data processing r1?; r4?; result }Pipeline realizes pipeline operation , The overall performance is limited by the generation capacity of the slowest stage .
17.2.4 - Channel features and performance
std::sync::mpsSpecial type :mpsc(multi-producer, single-consumer) It is multi producer , Single consumer .Sender<T>RealizedCloneSpecial type : To create a regular channel , Then clone the sender . You can put eachSenderValues are transferred to different threads .Receiver<T>Unable to clone , If you need multiple threads to receive values from the same channel , You need to useMutex.Backpressure (backpressure): If the speed of sending values exceeds the speed of receiving and processing values , This will cause the values inside the channel to accumulate more .
Synchronous channel (synchronous channel):Unix Each pipe of has a fixed size , If a process tries to write data to a pipeline that may be full at any time , The system will directly block the process , Until there is space in the pipe .
use std::sync::mpsc::sync_channel; let (sender, receiver) = sync_channel(1000);- The synchronous channel is the same as the conventional channel , Just specify how many values it can save when creating .
sender.send(value)Is a potential blocking operation .
17.2.5 - Thread safety
std::marker::SendSpecial type : RealizationSendThe type of , You can safely pass the value to another thread , That is to realize the transfer of values between threads .std::marker::SyncSpecial type : RealizationSyncThe type of , It is safe to pass an unmodifiable reference to another thread , That is, they can share values between threads .- Rust Thread safety is achieved through the above features : No data contention and other undefined behaviors .
- If the fields of structure or enumeration support the above features , Then they are also supported .
- Rust The above features will be automatically implemented for custom types , No need to pass
#[derive]The derived . - A few did not realize
SendandSyncThe type of is mainly used to provide modifiable capability under non thread safe conditions . For example, reference count pointer typestd::rc::Rc<T>.Rust It is required to passspawnWhen creating a thread , The incoming closure must beSend.
17.2.6 - Connect all iterators to the channel
The following implementation applies to all iterators .
use std::thread::spawn;
impl<T> OffThreadExt for T where T: Iterator + Send + 'static, T::Item: Send + 'static {
fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
// Create a channel and pass items from the worker thread
let (sender, receiver) = mpsc::sync_channel(1024);
// Move this iterator to a new thread and run it there
spawn(
move || {
for item in self {
if sender.send(item).is_err() {
break;
}
}
}
);
// Returns an iterator that extracts values from the channel
receiver.into_iter()
}
}
See 《Rust Programming 》( Jim - Brandy 、 Jason, - By orendov , Translated by lisongfeng ) Chapter 19
Original address
边栏推荐
- 【Rust 笔记】15-字符串与文本(下)
- [rust notes] 14 set (Part 2)
- [cloud native] record of feign custom configuration of microservices
- redis发布订阅命令行实现
- CPU内核和逻辑处理器的区别
- Sword finger offer 53 - ii Missing numbers from 0 to n-1
- [jailhouse article] look mum, no VM exits
- 中职网络安全技能竞赛——广西区赛中间件渗透测试教程文章
- Navicat連接Oracle數據庫報錯ORA-28547或ORA-03135
- R语言【数据集的导入导出】
猜你喜欢

Sword finger offer 53 - ii Missing numbers from 0 to n-1

Dichotomy, discretization, etc

Navicat连接Oracle数据库报错ORA-28547或ORA-03135

智慧工地“水电能耗在线监测系统”

LVS简介【暂未完成(半成品)】
![[jailhouse article] performance measurements for hypervisors on embedded ARM processors](/img/c0/4843f887f77b80e3b2329e12d28987.png)
[jailhouse article] performance measurements for hypervisors on embedded ARM processors

Typical use cases for knapsacks, queues, and stacks

1.13 - RISC/CISC
![[jailhouse article] jailhouse hypervisor](/img/f4/4809b236067d3007fa5835bbfe5f48.png)
[jailhouse article] jailhouse hypervisor
![[practical skills] technical management of managers with non-technical background](/img/4d/1081c71df6ee2087359111baf7498a.png)
[practical skills] technical management of managers with non-technical background
随机推荐
Appium automation test foundation - Summary of appium test environment construction
LeetCode 1200.最小绝对差
927. 三等分 模拟
【Rust 笔记】15-字符串与文本(上)
CF1637E Best Pair
1.14 - 流水线
leetcode-22:括号生成
Educational Codeforces Round 116 (Rated for Div. 2) E. Arena
【Jailhouse 文章】Jailhouse Hypervisor
1.13 - RISC/CISC
One question per day 1765 The highest point in the map
Simply sort out the types of sockets
Daily question 2013 Detect square
6. Logistic model
[jailhouse article] look mum, no VM exits
【Rust 笔记】14-集合(上)
从Dijkstra的图灵奖演讲论科技创业者特点
Scope of inline symbol
QQ电脑版取消转义符输入表情
Educational codeforces round 109 (rated for Div. 2) C. robot collisions D. armchairs