当前位置:网站首页>【Rust 笔记】17-并发(下)

【Rust 笔记】17-并发(下)

2022-07-05 05:50:00 phial03

17.3 - 共享可修改状态

17.3.1 - 互斥量

  • 互斥量(或称锁):用于强制多线程依次访问特定的数据。

  • C++ 的互斥量实现举例:

    void FernEngine::JoinWaitingList(PlayerId player) {
        mutex.Acquire();  // 临界区(critical section):开始
    
        waitingList.push_back(player);
    
        // 如果等待的玩家满足条件则开始游戏
        if (waitingList.length() >= GAME_SIZE) {
            vector<PlayerId> players;
            waitingList.swap(players);
            StartGame(players);
        }
        mutex.Release();  // 临界区(critical section):结束
    }
    
  • 互斥量的作用:

    • 防止数据争用,避免多个线程并发读写同一块内存。
    • 防止不同线程的操作出现相互交错。
    • 互斥量支持通过不变性(invariant)编程,受保护数据由执行者负责初始化,由每个临界区来维护。

17.3.2-Mutex<T>

  • Rust 中受保护的数据保存在 Mutex 内部:

    let app = Arc::new(FernEmpireApp {
           // 创建整个应用,分配在堆上
        ...
        waiting_list: Mutex::new(vec![]),
        ...
    });
    
    • Arc 方便跨线程共享数据。
    • Mutex 方便跨线程共享可修改数据。
  • 举例:使用互斥量

    impl FernEmpireApp {
          
        fn join_waiting_list(&self, player: PlayerId) {
          
            let mut guard = self.waiting_list.lock().unwrap();
    
            guard.push(player);
            if guard.len() == GAME_SIZE {
          
                let players = guard.split_off(0);
                self.start_game(players);
            }
        }
    }
    
    • 取得互斥量的数据唯一的方式是调用.lock() 方法。

    • 在阻塞结束,guard 被清除后,锁也会被释放。但也可以手工清除:

      if guard.len() == GAME_SIZE {
              
          let players = guard.split_off(0);
          drop(guard);
          self.start_game(players);
      }
      

17.3.3-mutMutex

  • mut:意味着专有、排他访问(exclusive access)。
  • mut:意味着共享访问(shared access)。
  • Mutex 互斥量,提供对数据的专有 mut 访问权,即使很多线程有对 Mutex 本身的共享(非 mut)访问权。
  • Rust 编译器在编译时,通过类型系统可以动态控制专有访问。

17.3.4 - 互斥量的问题

  • 只依赖 “并行分叉 — 合并” 的程序具有确定性,不可能死锁。
  • 专门使用通道实现管道操作的程序也具有确定性,虽然消息传输的时间可能不同,但不会影响输出。
  • 数据争用:多线程并发读取同一块内存导致产生无意义的结果。安全的 Rust 代码不会触发数据争用。
  • 互斥量可能存在问题:
    • 有效的 Rust 程序不会出现数据争用,但仍然可能存在竞态条件(race condition),即程序行为取决于线程的执行时间,因此每次运行的结果可能都不相同。以非结构化方式使用互斥量会导致竞态条件。
    • 共享的可修改状态会影响程序设计。通道可以作为代码中抽象的边界。而互斥量鼓励添加一个方法解决问题,可能导致代码纠缠,难以剥离。
    • 互斥量的实现较为复杂。
  • 尽可能使用结构化方式编程,在必需时再使用互斥量 Mutex

17.3.5 - 死锁

  • 线程在尝试读取自己已经持有的锁时可能会造成死锁。

    let mut guard1 = self.waiting_list.lock().unwrap();
    let mut guard2 = self.waiting_list.lock().unwrap(); // 死锁
    
  • 使用通道也有可能导致死锁。如两个线程相互阻塞,每个都等待从另一个接收消息。

17.3.6 - 中毒的互斥量

  • 如果线程在持有

    Mutex
    

    时诧异了,那么 Rust 会将

    Mutex
    

    标记为已中毒。

    • 后续想要锁住这个受污染的 Mutex 的尝试都会得到一个错误结果。
    • .unwrap() 调用告诉 Rust 在这种情况下要诧异,把其他线程的诧异传播到当前线程。
    • 诧异的线程保证了程序其余部分出在安全状态。
  • Rust 通过毒化这个互斥量来防止其他线程在不经意间也出现这种局面。

    • 在完全互斥的情况下,可以锁住中毒的互斥量,同时访问其中的数据。
    • 详见 PoisonError::into_inner()

17.3.7 - 使用互斥量的多消费者通道

  • 一个通道只有一个 Receiver

    • 任何线程池都不能有多个线程使用一个 mpsc 通道共享工作成功。

    • 绕过这个限制的例外方法:可以为 Receiver 添加一个 Mutex,然后再共享。

      pub mod shared_channel {
              
          use std::sync::{
              Arc, Mutex};
          use std::sync::mpsc::{
              channel, Sender, Receiver};
      
          /// 对Receiver的线程安全的封装
          #[derive(Clone)]
          pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>); 
          // Arc<Mutex<Receiver<T>>>是嵌套的泛型
          impl <T> Iterator for SharedReceiver<T> {
              
              type Item = T;
      
              /// 从封装的接收者获取下一项
              fn next(&mut self) -> Option<T> {
              
                  let guard = self.0.lock().unwrap();
                  guard.recv().ok()
              }
          }
      
          /// 创建一个新通道,其接收者可以跨线程共享。
          /// 返回一个发送者和一个接收者,与stdlib的channel()类似。
          /// 有时候可以直接代替它使用。
          pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
              
              let (sender, receiver) = channel();
              (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
          }
      }
      

17.3.8 - 读写锁与 RwLock<T>

  • 互斥量使用 lock 方法读写数据。

  • 读写锁则使用 readwrite 两个方法读写数据。

    • RwLock::write 方法:与 Mutex::lock 类似,都是等待获取对受保护数据的专有 mut 访问。
    • RwLock::read 方法提供了非 mut 访问,几乎不需要等待,因为多个线程可以同时安全读取数据。
  • 互斥量与读写锁的区别:

    • 任意给定时刻,受保护数据只能有一个读取器或写入器。
    • 在使用读写锁的情况下,则可以有一个写入器或多个读取器,类似于 Rust 引用。
    • 优先推荐使用读写锁,而不是互斥量。
  • 优化上述 FernEmpireApp 程序:

    • 创建一个结构体保存配置信息,并由 RwLock 保护。

      use std::sync::RwLock;
      struct FernEmpireApp {
              
          ...
          config: RwLock<AppConfig>,
          ...
      }
      
    • 读取这个配置的方法可以使用 RwLock::read()

      fn mushrooms_enabled(&self) -> bool {
              
          let config_guard = self.config.read().unwrap();
          config_guard.mushrooms_enabled
      }
      
    • 重新加载这个配置的方法,则使用 RwLock::write()

      fn reload_config(&self) -> io::Result<()> {
              
          let new_config = AppConfig::load()?;
          let mut config_guard = self.config.write().unwrap();
          *config_guard = new_config;
          Ok(())
      }
      
    • self.config.read() 返回一个守卫,可以提供对 AppConfig 的非 mut(即共享)访问;

    • self.config.write() 返回一个不同类型的守卫,可以提供 mut(即专有)访问。

17.3.9 - 条件变量(Condvar

  • 一个线程经常需要等待某个条件变为

    true
    

    • 在服务器关机期间,主线程可能需要等待所有其他线程完全退出。
    • 工作线程在闲置时,需要等待要处理的数据。
    • 实现分布式共识协议的线程,需要等待足够多对等线程的响应。
  • 针对需要等待的条件,会有方便的阻塞 API,如果没有,那么可以使用条件变量(condition variable)来构建自己的 API。

    • std::sync::Condvar 类型实现了条件变量。
    • 它的.wait() 方法可以阻塞到某些线程调用其.notify_all()
  • MutexCondvar 有直接关联:条件变量始终代表由某个 Mutex 保护的数据或真或假的条件。

17.3.10 - 原子类型

  • std::sync::atomio
    

    模块包含无锁并发编程所使用的原子类型。

    • AtomicIsizeAtomicUsize:是共享的整数类型,对应单线程的 isizeusize 类型。
    • AtomicBool:是一个共享的 bool 值。
    • AtomicPtr<T>:是不安全的指针类型 *mut T 的共享值。
  • 原子的最简单应用是取消操作。假设有一个线程正在执行某个耗时的计算任何,比如渲染视频,而我们希望能够异步取消这个操作。那么可以通过一个共享的 AtomicBool 来实现。渲染完每个像素,线程都会调用.load() 方法检查取消标志的值。

  • 原子操作永远不会使用系统调用。加载和存储会编译为一个 CPU 指令。

  • 原子、MutexRwLock 的方法可以用 self 的共享引用为参数。它们也可以作为简单的全局变量来使用。

17.3.11 - 全局变量

  • 对于全局变量:必须通过某种方式保证线程安全。静态变量必须既是 Sync,又是非 mut
  • 静态初始化器不能调用函数。可以使用 lazy_staic 包来实现。

详见《Rust 程序设计》(吉姆 - 布兰迪、贾森 - 奥伦多夫著,李松峰译)第十九章
原文地址

原网站

版权声明
本文为[phial03]所创,转载请带上原文链接,感谢
https://blog.csdn.net/feiyanaffection/article/details/125575417