当前位置:网站首页>[rust notes] 17 concurrent (Part 1)
[rust notes] 17 concurrent (Part 1)
2022-07-05 06:07:00 【phial03】
17 - Concurrent
- The idea of system development and writing concurrent code :
- A background thread (background thread) Just one thing , Wake up periodically to perform tasks .
- Universal thread pool (worker pool) Communicate with clients through queues .
- The Conduit (pipeline) Import data from one thread to another , Each thread only does a small part of the work .
- Data parallelism (data parallelism) Suppose that the whole computer is mainly used for a large-scale calculation as the main task , The main task is divided into n Small tasks , stay n Execution on threads , I hope all n The cores of the machines work at the same time .
- Synchronization object sea (sea of synchronized object) Multiple threads in have the same data permission , The temporary locking scheme using low-level primitives such as mutexes avoids contention .
- Atomic integer operations (atomic integer operation) Allow multiple cores to pass information through fields that are the size of a machine word , Then realize communication .
- Use Rust Thread 3 Ways of planting :
- Parallel bifurcation — Merge ;
- passageway ;
- Share modifiable state .
17.1 - Parallel crossover - Merge
Threads can be used to perform several completely unrelated tasks at the same time .
Write a single threaded program :
fn process_files(filenames: Vec<String>) -> io::Result<()> { for document in filenames { let text = load(&document)?; // Read source file let results = process(text); // Calculate the statistics save(&document, results)?; // Write output file } Ok(()) }
Use “ Parallel bifurcation — Merge ” Pattern , Multithreaded task execution can be realized .
- The implementation idea is to divide data resources into multiple blocks , Then process each piece of data on separate threads .
- Bifurcation (fork): Start a new thread .
- Merge (join): Wait for the thread to complete .
- Work unit isolation is required .
17.1.1 - Generation and merger
Use
std::thread::spawn
Function can generate a new thread :spawn(|| { println!("hello from a child thread"); })
- Receive a parameter , namely
FnOnce
Closures or functions . - The resulting new thread is a real operating system thread , I have my own stack .
- Receive a parameter , namely
Use
spawn
Realize the previousprocess_files
The parallel version of the function :use std::thead::spawn; fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> { // Divide the work into several pieces const NTHREADS: usize = 8; let worklists = split_vec_into_chunks(filenames, NTHREADS); // Bifurcation : Each block generates a thread to process let mut thread_handles = vec![]; for worklist in worklists { thread_handles.push(spawn(move || process_files(worklist))); } // Merge : Wait for all threads to finish for handle in thread_handles { handle.join.unwrap()?; } Ok(()) }
The process of converting a file name list into a worker thread :
- In the parent thread , adopt
for
Loop definition and transferworklist
; - establish
move
Closure time ,wordlist
Is transferred to the closure ; - then
spawn
Will close ( as well aswordlist
vector ) Transfer to the new sub thread .
- In the parent thread , adopt
17.1.2 - Cross thread error handling
handle.join()
Method :
- Return to one
std::thread::Result
, If the child thread is surprised, it is an error . - This method passes the value returned by the child thread to the parent thread .
- Return to one
Surprise does not automatically propagate from one thread to other threads that depend on it .
The surprise of one thread will be reflected in other threads as containing errors
Result
.handle.join().unwrap()
:.unwrap()
Perform the assertion operation , Realize the spread of surprise . If the child thread is surprised , So it will returnOk
result . Then the parent thread of its call will also be surprised . It is equivalent to explicitly propagating surprise from the child thread to the parent thread .
17.1.3 - Sharing immutable data across threads
When passing a reference to a function , If a thread triggers IO error , It may cause the calling function to exit before other threads complete . Then other sub threads may continue to use the passed parameters after the main thread is released , This will cause data contention .
stay Rust This is not allowed in . As long as any thread owns
Arc<GigabyteMap>
, The mapping will not be released , Even if the parent thread has exited . becauseArc
The data in cannot be modified , There will be no data contention .use std::sync::Arc; fn process_files_in_parallel(filenames: Vec<String>, glossary: Arc<GigabyteMap>) -> io::Result<()> { ... for worklist in worklists { // call .clone(), clone Arc And trigger the reference count . No cloning GigabyteMap let glossary_for_child = glossary.clone(); thread_handles.push( spawn(move || process_files(worklist, &glossary_for_child)) ); } ... }
17.1.4-Rayon
Rayon
library : Specially for “ Parallel bifurcation — Merge ” Pattern design , There are two ways to run concurrent tasks :extern crate rayon; use rayon::prelude::*; // Implement two tasks in parallel let (v1, v2) = rayon::join(fn1, fn2); // Parallel implementation N A mission giant_vector.par_iter().for_each(|value| { // Create a ParallelIterator, Similar to iterators . do_thing_with_value(value); });
Use
Rayon
rewriteprocess_files_in_parallel
:extern crate rayon; use rayon::prelude::*; fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> { filenames.par_iter() // Create parallel iterators // Call filename , Get one ParallelIterator .map(|filename| process_file(filename, glossary)) // Combine the results , Return to one Option, Only filename Empty is None // Any parallel processing that occurs in the background , Can guarantee reduce_with Only when you return . .reduce_with(|r1, r2| { if r1.is_err() { r1 } else { r2 } }) .unwrap_or(Ok(())) // Let the result Ok(()) }
Rayon
It also supports sharing references between threads .To use
Rayon
You need to add the following code :main.rs
in :extern crate rayon; use rayon::prelude::*;
Cargo.toml
in :[dependencies] rayon = "0.4"
17.2 - passageway
passageway (channel): A one-way pipe that sends values from one thread to another , It is essentially a thread safe queue .
- Unix The pipeline sends byte data ;
- Rust The channel sends a value . yes
std::sync::mps
Part of the module .
sender.send(item)
Put a value into the channel ,
receiver.recv()
Then remove a value .
- Ownership of the value is transferred from the sending thread to the receiving thread .
- If the channel is empty , that
receiver.recv()
It will block until a value is sent .
Rust The passage is faster than the pipe :
- Piping is a way to provide flexibility 、 complexity 、 But the feature of non concurrency .
- Use channel , Threads can communicate by passing values , There is no need to use locks or shared memory .
- Sending values is a transfer, not a copy , And the transferred value is not limited to the size of the data .
17.2.1 - Send value
Inverted index (inverted index): A database , You can query which keywords have appeared where . It is one of the keys to realize search engine .
The code that starts the file reading thread :
use std::fs::File; use std::io::prelude::*; // Use Read::read_to_string use std::thread::spawn; use std::sync::mpsc::channel; let (sender, receiver) = channel(); // Queue data structure , Returns a pair of values : Sender and receiver . let handle = spawn( // Start thread std::thread::spawn. // The ownership of the sender will pass move The closure return is transferred to the new thread . move || { // Rust Perform type inference , Determine the type of channel for filename in documents { let mut f = File::open(filename)?; // Read files from disk let mut text = String::new(); f.read_to_string(&mut text)?; if sender.send(text).is_err() { // After reading the file , Put the text content text Send to channel break; } } Ok(()) // After the thread reads all documents , Program return Ok(()) } );
17.2.2 - Receives the value
Create a second thread loop call
receiver.recv()
.// while Cycle to achieve while let Ok(text) = receiver.recv() { do_something_with(text); } // for Cycle to achieve for text in receiver { do_something_with(text); }
Receiver thread example :
fn start_file_indexing_thread(texts: Receiver<>) -> (Receiver<InMemoryIndex>, JoinHandle<()>) { let (sender, receiver) = channel(); let handle = spawn( move || { for (doc_id, text) in texts.into_iter().enumerate() { let index = InMemoryIndex::from_single_document(doc_id, text); if sender.send(index).is_err() { break; } } } ); (receiver, handle) }
Reception time , If the thread happens IO error , I will quit immediately , The error will be stored in the thread
JoinHandle
in . Package combination recipient 、 Returners and new threadsJoinHandle
The code is as follows :fn start_file_reader_thread(documents: Vec<PathBuf>) -> (Receiver<Strig>, JoinHandle<io::Result<()>>) { let (sender, receiver) = channel(); let handle = spawn( move || { ... } ); (receiver, handle) }
17.2.3 - Run the pipeline
Merge indexes in memory , Until big enough :
fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>) -> (Receiver<InMemoryIndex>, JoinHandle<()>)
Write the large index to disk :
fn start_index_write_thread(big_indexes: Receiver<InMemoryIndex>, output_dir: &Path) -> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)
If there are multiple large files , Then use the file based merge algorithm , Combine them :
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path) -> io::Result<()>
Start thread , And check for errors :
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf) -> io::Result<()> { // Start all of the pipes 5 Stages let (texts, h1) = start_file_reader_thread(documents); let (pints, h2) = start_file_indexing_thread(texts); let (gallons, h3) = start_in_memory_merge_thread(pints); let (files, h4) = start_index_writer_thread(gallons, &output_dir); let result = merge_index_files(files, &output_dir); // Wait for the thread to complete , Save any errors let r1 = h1.join().unwrap(); h2.join().unwrap(); h3.join().unwrap(); let r4 = h4.join().unwrap(); // Return the first error encountered // h2 and h3 Will not fail , Because they are pure memory data processing r1?; r4?; result }
Pipeline realizes pipeline operation , The overall performance is limited by the generation capacity of the slowest stage .
17.2.4 - Channel features and performance
std::sync::mps
Special type :mpsc
(multi-producer, single-consumer) It is multi producer , Single consumer .Sender<T>
RealizedClone
Special type : To create a regular channel , Then clone the sender . You can put eachSender
Values are transferred to different threads .Receiver<T>
Unable to clone , If you need multiple threads to receive values from the same channel , You need to useMutex
.Backpressure (backpressure): If the speed of sending values exceeds the speed of receiving and processing values , This will cause the values inside the channel to accumulate more .
Synchronous channel (synchronous channel):Unix Each pipe of has a fixed size , If a process tries to write data to a pipeline that may be full at any time , The system will directly block the process , Until there is space in the pipe .
use std::sync::mpsc::sync_channel; let (sender, receiver) = sync_channel(1000);
- The synchronous channel is the same as the conventional channel , Just specify how many values it can save when creating .
sender.send(value)
Is a potential blocking operation .
17.2.5 - Thread safety
std::marker::Send
Special type : RealizationSend
The type of , You can safely pass the value to another thread , That is to realize the transfer of values between threads .std::marker::Sync
Special type : RealizationSync
The type of , It is safe to pass an unmodifiable reference to another thread , That is, they can share values between threads .- Rust Thread safety is achieved through the above features : No data contention and other undefined behaviors .
- If the fields of structure or enumeration support the above features , Then they are also supported .
- Rust The above features will be automatically implemented for custom types , No need to pass
#[derive]
The derived . - A few did not realize
Send
andSync
The type of is mainly used to provide modifiable capability under non thread safe conditions . For example, reference count pointer typestd::rc::Rc<T>
.Rust It is required to passspawn
When creating a thread , The incoming closure must beSend
.
17.2.6 - Connect all iterators to the channel
The following implementation applies to all iterators .
use std::thread::spawn;
impl<T> OffThreadExt for T where T: Iterator + Send + 'static, T::Item: Send + 'static {
fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
// Create a channel and pass items from the worker thread
let (sender, receiver) = mpsc::sync_channel(1024);
// Move this iterator to a new thread and run it there
spawn(
move || {
for item in self {
if sender.send(item).is_err() {
break;
}
}
}
);
// Returns an iterator that extracts values from the channel
receiver.into_iter()
}
}
See 《Rust Programming 》( Jim - Brandy 、 Jason, - By orendov , Translated by lisongfeng ) Chapter 19
Original address
边栏推荐
- Overview of variable resistors - structure, operation and different applications
- Flutter Web 硬件键盘监听
- Codeforces Round #732 (Div. 2) D. AquaMoon and Chess
- 【Rust 笔记】15-字符串与文本(上)
- 可变电阻器概述——结构、工作和不同应用
- Brief introduction to tcp/ip protocol stack
- [jailhouse article] jailhouse hypervisor
- Introduction and experience of wazuh open source host security solution
- Control unit
- [jailhouse article] look mum, no VM exits
猜你喜欢
Sword finger offer 53 - ii Missing numbers from 0 to n-1
SPI 详解
[cloud native] record of feign custom configuration of microservices
QQ电脑版取消转义符输入表情
MatrixDB v4.5.0 重磅发布,全新推出 MARS2 存储引擎!
On the characteristics of technology entrepreneurs from Dijkstra's Turing Award speech
RGB LED infinite mirror controlled by Arduino
Fried chicken nuggets and fifa22
Navicat連接Oracle數據庫報錯ORA-28547或ORA-03135
Time of process
随机推荐
Règlement sur la sécurité des réseaux dans les écoles professionnelles secondaires du concours de compétences des écoles professionnelles de la province de Guizhou en 2022
shared_ Repeated release heap object of PTR hidden danger
LeetCode 0108.将有序数组转换为二叉搜索树 - 数组中值为根,中值左右分别为左右子树
leetcode-6110:网格图中递增路径的数目
中职网络安全技能竞赛——广西区赛中间件渗透测试教程文章
【云原生】微服务之Feign自定义配置的记录
SQLMAP使用教程(一)
1.14 - 流水线
2022 pole technology communication arm virtual hardware accelerates the development of Internet of things software
7. Processing the input of multidimensional features
927. 三等分 模拟
【Rust 笔记】17-并发(下)
RGB LED infinite mirror controlled by Arduino
2022 极术通讯-Arm 虚拟硬件加速物联网软件开发
Light a light with stm32
QQ computer version cancels escape character input expression
leetcode-9:回文数
How many checks does kubedm series-01-preflight have
R language [import and export of dataset]
QT判断界面当前点击的按钮和当前鼠标坐标