当前位置:网站首页>多线程之生产者和消费者模型
多线程之生产者和消费者模型
2022-06-09 10:00:00 【大道一支菜鸟】
这么经典的问题,要多看几次,多思考。
https://blog.csdn.net/chenxun_2010/article/details/49848865
(1)单消费者单生产者模式
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
std::mutex mtx; //互斥变量
std::condition_variable repo_not_full;//条件变量
std::condition_variable repo_not_empty;//条件变量
static const int item_total = 20;//一共要生产的个数
static const int repository_size = 10;//仓库的能够存储的个数
int product_position = 0;//buf中Product位置
int consume_position = 0;//buf中consume位置
int item_buffer[repository_size]; //仓库buffer
void produter_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);//锁住
//如果不加一 会直接死锁
while((product_position + 1) % repository_size == consume_position ){
std::cout << "The item repository is full;" << std::endl;
repo_not_full.wait(lck);//等待repo_not_full的notify命令
}
item_buffer[product_position] = i;
product_position++;
if(product_position == repository_size){
product_position = 0;
}
repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
}
int consumer_item()
{
std::unique_lock<std::mutex>lck(mtx);
int data;
while(product_position == consume_position)
{
std::cout << "The item repository is empty." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[consume_position];
consume_position++;
if(consume_position == repository_size){
consume_position = 0;
}
repo_not_full.notify_all();
return data;
}
//生产线程
void Producter_thread()
{
for(int i = 0;i < item_total; i++){
std::cout << "生产者生产第"<< i <<"个产品."<< std::endl;
produter_item(i);//执行producter动作
}
}
//消费线程
void Consumer_thread()
{
int cnt = 0;
while(1){
int item = consumer_item();//执行consume动作
std::cout << "消费者消费第"<< item << "个产品." << std::endl;
cnt++;//个数+1
if(cnt == item_total){
std::cout << "consumer_thread is finish." << std::endl;
break; //结束
}
}
}
int main ()
{
std::thread producer(Producter_thread);//开启一个producer线程,入口是Producter_thread函数
std::thread consumer(Consumer_thread);//同上
producer.join();//等producer结束
std::cout << "Producter_thread is end;" << std::endl;
consumer.join();//等consumer结束再结束main函数
std::cout << "consumer_thread is end;" << std::endl;
}
(2) 多消费者,单生产者模型
多消费者模型,需要在单消费者模型上添加一个控制消费产品总数的变量,也是互斥访问的,有线程访问时候就加1。
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <vector>
std::mutex mtx; //互斥变量
std::mutex consumer_count_mtx;//消费者互斥变量
std::condition_variable repo_not_full;//条件变量
std::condition_variable repo_not_empty;//条件变量
static const int item_total = 20;//一共要生产的个数
static const int repository_size = 10;//仓库的能够存储的个数
static std::size_t consumer_items = 0; //消费者线程得到的产品数量
static std::size_t product_position = 0;//buf中Product位置
static std::size_t consume_position = 0;//buf中consume位置
int item_buffer[repository_size]; //仓库buffer
void produter_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);//锁住
//如果不加一 会直接死锁
while((product_position + 1) % repository_size == consume_position ){
std::cout << "仓库满了." << std::endl;
repo_not_full.wait(lck);//等待repo_not_full的notify命令
}
item_buffer[product_position] = i;
product_position++;
if(product_position == repository_size){
product_position = 0;
}
repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
}
int consumer_item()
{
std::unique_lock<std::mutex>lck(mtx);
int data;
while(product_position == consume_position)
{
std::cout << "仓库空了." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[consume_position];
consume_position++;
if(consume_position == repository_size){
consume_position = 0;
}
repo_not_full.notify_all();
return data;
}
//生产线程
void Producter_thread()
{
for(int i = 0;i < item_total; i++){
std::cout << "生产者生产第"<< i <<"个产品."<< std::endl;
produter_item(i);//执行producter动作
}
}
//消费线程
void Consumer_thread()
{
int exit = 0;
while(1){
std::unique_lock<std::mutex> lck(consumer_count_mtx);
consumer_items++;
if(consumer_items <= item_total){
int item = consumer_item();//执行consume动作
std::cout << "消费者消费第"<< item << "个产品." << std::endl;
}else{
exit = 1;
}
if(exit == 1){
std::cout << "消费完了所有的产品。." << std::endl;
break; //结束
}
}
std::cout << "消费者线程:" << std::this_thread::get_id() << "结束." << std::endl;
}
int main ()
{
std::thread producer(Producter_thread);//开启一个producer线程,入口是Producter_thread函数
std::cout << "生产者线程id是:" << producer.get_id() << std::endl;
std::vector<std::thread> thread_vector;//定义一个线程向量
//将开启5个Consumer_thread线程
for(int i = 0; i < 5; i++){
thread_vector.push_back(std::thread(Consumer_thread));
}
producer.join();//等producer结束
std::cout << "生产者线程结束." << std::endl;
for(auto &thr:thread_vector){
thr.join();//等consumer结束再结束main函数
}
}
3、多生产者单消费者模型
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <vector>
std::mutex mtx; //互斥变量
std::mutex producer_count_mtx;//消费者互斥变量
std::condition_variable repo_not_full;//条件变量
std::condition_variable repo_not_empty;//条件变量
static const int item_total = 20;//一共要生产的个数
static const int repository_size = 10;//仓库的能够存储的个数
static std::size_t consumer_items = 0; //消费者线程得到的产品数量
static std::size_t producter_items = 0;//生产者线程生产的产品数量
static std::size_t product_position = 0;//buf中Product位置
static std::size_t consume_position = 0;//buf中consume位置
int item_buffer[repository_size]; //仓库buffer
std::chrono::seconds t(1);
void produter_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);//锁住
//如果不加一 会直接死锁
while((product_position + 1) % repository_size == consume_position ){
std::cout << "仓库满了." << std::endl;
repo_not_full.wait(lck);//等待repo_not_full的notify命令
}
item_buffer[product_position] = i;
product_position++;
if(product_position == repository_size){
product_position = 0;
}
repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
}
int consumer_item()
{
std::unique_lock<std::mutex>lck(mtx);
int data;
while(product_position == consume_position)
{
std::cout << "仓库空了." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[consume_position];
consume_position++;
if(consume_position == repository_size){
consume_position = 0;
}
repo_not_full.notify_all();
return data;
}
//生产线程
void Producter_thread()
{
int exit = 0;
while(1){
std::unique_lock<std::mutex> lck(producer_count_mtx);
producter_items++;
if(producter_items <= item_total){
std::cout << "生产第" << producter_items << "产品." << std::endl;
produter_item(producter_items - 1);
}else{
exit = 1;
}
if(exit == 1){
std::cout << "生产数量已经达到要求." << std::endl;
break;
}
}
std::cout << "生产者线程" << std::this_thread::get_id() << "结束." << std::endl;
}
//消费线程
void Consumer_thread()
{
while(1){
std::this_thread::sleep_for(t);
if(consumer_items <= item_total){
int item = consumer_item();//执行consume动作
std::cout << "消费者消费"<< item << "产品." << std::endl;
}
consumer_items++;
if(consumer_items == item_total){
break;
}
}
}
int main ()
{
//定义5个生产者线程和一个消费者线程
std::vector<std::thread> thread_vector;
for(int i = 0; i < 5; i++){
thread_vector.push_back(std::thread(Producter_thread));
}
std::thread consumer(Consumer_thread);
for(auto &thr: thread_vector){
thr.join();
}
consumer.join();
std::cout << "消费者线程结束." << std::endl;
}
4、多生产者多消费者模型
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <vector>
std::mutex mtx; //互斥变量
std::mutex consumer_count_mtx;//消费者互斥变量
std::mutex producer_count_mtx;//生产者互斥变量
std::condition_variable repo_not_full;//条件变量
std::condition_variable repo_not_empty;//条件变量
static const int item_total = 20;//一共要生产的个数
static const int repository_size = 10;//仓库的能够存储的个数
static std::size_t producer_items = 0;//生产者线程生产的产品数量
static std::size_t consumer_items = 0; //消费者线程得到的产品数量
static std::size_t product_position = 0;//buf中Product位置
static std::size_t consume_position = 0;//buf中consume位置
int item_buffer[repository_size]; //仓库buffer
std::chrono::seconds t(1);
void produter_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);//锁住
//如果不加一 会直接死锁
while((product_position + 1) % repository_size == consume_position ){
std::cout << "仓库满了." << std::endl;
repo_not_full.wait(lck);//等待repo_not_full的notify命令
}
item_buffer[product_position] = i;
product_position++;
if(product_position == repository_size){
product_position = 0;
}
repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
}
int consumer_item()
{
std::unique_lock<std::mutex>lck(mtx);
int data;
while(product_position == consume_position)
{
std::cout << "仓库空了." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[consume_position];
consume_position++;
if(consume_position == repository_size){
consume_position = 0;
}
repo_not_full.notify_all();
return data;
}
//生产线程
void Producter_thread()
{
int exit = 0;
while(1){
std::unique_lock<std::mutex> lck(producer_count_mtx);
producer_items ++;
if(producer_items <= item_total){
std::cout << "生产者生产第"<< producer_items <<"个产品."<< std::endl;
produter_item(producer_items);//执行producter动作
}else{
exit = 1;
}
if(exit == 1){
std::cout << "生产完了所有产品." << std::endl;
break;
}
}
std::cout << "生产者线程:" << std::this_thread::get_id() <<"结束." << std::endl;
}
//消费线程
void Consumer_thread()
{
int exit = 0;
while(1){
std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lck(consumer_count_mtx);
consumer_items++;
if(consumer_items <= item_total){
int item = consumer_item();//执行consume动作
std::cout << "消费者消费第"<< item << "个产品." << std::endl;
}else{
exit = 1;
}
if(exit == 1){
std::cout << "消费完了所有的产品." << std::endl;
break; //结束
}
}
std::cout << "消费者线程:" << std::this_thread::get_id() << "结束." << std::endl;
}
int main ()
{
std::vector<std::thread> producer_vector;//定义一个生产者线程向量
std::vector<std::thread> consumer_vector;//定义一个消费者线程向量
//将开启5个Consumer_thread线程
for(int i = 0; i < 5; i++){
producer_vector.push_back(std::thread(Producter_thread));
consumer_vector.push_back(std::thread(Consumer_thread));
}
//等待生产者线程结束
for(auto &thr:producer_vector){
thr.join();
}
//等待消费者线程结束
for(auto &thr:consumer_vector){
thr.join();//等consumer结束再结束main函数
}
}
总结:
多线程的时候,要在两个以上线程公用的变量前面加锁,阻止访问冲突。要像更加灵活的运用锁,就要和条件变量配合来使用。
边栏推荐
- 基于配置的权限控制
- 登出成功处理器
- 1108. IP address invalidation
- Some instructions in dict intersect with the difference sum in set, and increase or decrease elements
- 损失 3 亿美元后,IBM 宣布退出俄罗斯!
- 106. construct binary tree from middle order and post order traversal sequence
- Interview question 04.02 Minimum height tree - depth first traversal, plus tree divide and conquer
- 肆拾贰- JS 告诉你,到底你是贫穷还是富贵
- UnsupportedOperationException异常解决
- MicroNet:以极低的 FLOP 实现图像识别
猜你喜欢
![[model deployment and business implementation] model transformation of AI framework deployment scheme](/img/ea/8ca6bc6ae16ba1f90f6a5a38be184d.jpg)
[model deployment and business implementation] model transformation of AI framework deployment scheme

Terrain learning summary (6) -- terrain practice based on Alibaba cloud platform
![[genius_platform software platform development] lesson 36: definition of maximum value macro of built-in data type](/img/f1/ca57934507bb0758b8bb0a52606a10.jpg)
[genius_platform software platform development] lesson 36: definition of maximum value macro of built-in data type

Application of ebpf in cloud native environment

損失 3 億美元後,IBM 宣布退出俄羅斯!

【图像增强】基于稀疏表示和正则化实现图像增强附matlab代码

IBM announced its withdrawal from Russia after a loss of $300million!

Machine learning housing rental price forecasting: exploratory data analysis + Feature Engineering + modeling + reporting
![[optics] double slit interference with GUI Based on MATLAB simulation light](/img/9a/0c33e71111b878e48ef3a1cc77ab3c.png)
[optics] double slit interference with GUI Based on MATLAB simulation light

叁拾壹- NodeJS简单代理池(合) 之 MongoDB 链接数爆炸了
随机推荐
leetcode. 36 --- effective Sudoku
PIC simulation (particle in cell codes) (task a and task C)
Getting started with cloud based LDAP (Part 1)
Yantingli, a famous person in Jinan, Shandong Province, is a famous Oriental philosopher and his thoughts. China needs such a thinker
Flitter generate Poster
With good conditional access, remote office is more secure and efficient
flutter 生成海报
go-zero 微服务实战系列(二、服务拆分)
认证成功处理器
Blazor University (27) routing - detect navigation events
历史上的今天:PHP公开发布;iPhone 4 问世;万维网之父诞生
WPF implements ring chart with details
人大金仓再次荣获“国家鼓励的重点软件企业”认定
叁拾贰- NodeJS简单代理池(有完没完?) 之 SuperAgent 使用代理不是 Timeout 的 Timeout
同花顺app交易安全吗
叁拾玖- SQL对数据内容 分段/分组 汇总
What kind of digital collection platform is a good platform?
Other permission verification methods
[leetcode] [Niuke] brush questions on binary tree
InfoQ 极客传媒 15 周年庆征文| 迁移 Eureka 到 Nacos 之双注册双订阅模式