当前位置:网站首页>生产者/消费者模型

生产者/消费者模型

2022-06-24 08:37:00 HHYX.

生产者消费者模型意义

生产者消费者模式是通过一个容器来解决生产者和消费者之间强耦合的关系。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者进行消费,而是先存到阻塞队列中。消费者不直接找生产者要数据,而是从阻塞队列中拿数据。阻塞队列就相当于一个缓冲区,将生产者和消费者之间进行解耦。
如果有多个生产者或者多个消费者那么他们之间的关系如下:

  1. 生产者vs生产者:竞争和互斥
  2. 消费者vs消费者:竞争和互斥
  3. 生产者vs消费者:互斥和同步
    因此生产者和消费者模型可以总结为如下几点:
    生产者和消费者之间的三种关系
    生产者和消费者这两种角色
    类似阻塞队列的容器或者缓冲区等一种交易场所

下面来简单实现一下生产者和消费者模型

生产者和消费者模型建立

构建基本框架

这里是使用阻塞队列来作为生产者和消费者的“交易场所”。因此需要先创建一个类。由于其可能存放的各种类型的数据或者结构,因此需要使用模板来定义。
该类中需要包含以下内容:

  1. 一个存放数据的队列
  2. 队列的大小/容量
  3. 保护临界资源即缓冲区中内容的一把锁
  4. 定义信号量来判断阻塞队列为空/为满,如果为空则需要通知生产者进行生产,如果为满则需要通知消费者进行消费。
    接下来通过对该阻塞队列类进行定义构造函数和析构函数即可。成员对象中锁和信号量需要进行手动的初始化和销毁,其他对象可以直接使用默认的构造和析构。实现代码如下:
const int default_cap = 5;
    template<class T>
    class BlockQueue
    {
    
        private:
        std::queue<T> bq_;//阻塞队列
        int cap_;//队列的元素上限
        pthread_mutex_t mtx_;//保护临界资源的锁
        //1、当生产满了的时候,就应该不要生产(不要竞争锁了),应该让消费者进行消费
        //2、当消费空了,就不应该消费了(竞争锁),应该让生产者进行生产
        pthread_cond_t is_full_;//bq满了,消费者在条件变量下等待
        pthread_cond_t is_empty_;//bq空的,生产者在该条件变量下等待
        public:
        BlockQueue(const int cap = default_cap)
            :cap_(cap)
        {
    
            pthread_mutex_init(&mtx_,nullptr);
            pthread_cond_init(&is_empty_,nullptr);
            pthread_cond_init(&is_full_,nullptr);
        }
        ~BlockQueue()
        {
    
            pthread_mutex_destroy(&mtx_);
            pthread_cond_destroy(&is_empty_);
            pthread_cond_destroy(&is_full_);
        }
    };
} 

阻塞队列的基本功能

判断队列是否为空/为满

生产者消费者模型中,需要知道缓冲区中的情况,是否为空或者是否为满,从而使用信号量对生产者和消费者进行控制,这里判断为空为满比较简单,将这里的队列大小和容量进行比较即可,代码实现如下:

        bool IsFull()
        {
    
            return bq_.size() == cap_;
        }

        bool IsEmpty()
        {
    
            return bq_.size() == 0;
        }

加锁解锁操作

这里的阻塞队列可以有多个线程进行访问,因此需要保证其线程安全问题,因此需要对该临界资源进行加锁。该类中已经初始化了一把锁,因此只需要对其使用加锁解锁的函数操作即可,代码实现如下:

        void LockQueue()
        {
    
            pthread_mutex_lock(&mtx_);
        }

        void UnlockQueue()
        {
    
            pthread_mutex_unlock(&mtx_);
        }

信号量操作

信号量这里是用来实现同步的操作,让生产者和消费者有序的进行临界资源的访问,从而能够有效的避免了生产者在队列已经满的情况下还在继续生产,或者消费者在队列已经为空的情况下还在继续消费,因此可以通过两个信号量来进行空/满的操作,让生产者在队列满的情况下暂停生产将锁释放先让消费者进行消费,或者让消费者在队列为空的情况下先将锁释放让生产者先进行生产。函数实现如下:

        void ProductorWait()
        {
    
            //1、调用该函数的时候,会先自动释放锁,然后再挂起自己
            //2、返回的时候,首先会自动竞争锁,获得锁,才能返回
            pthread_cond_wait(&is_empty_,&mtx_);
        }

        void ConsumerWait()
        {
    
            pthread_cond_wait(&is_full_,&mtx_);

        }

        void WakeupConsumer()
        {
    
            pthread_cond_signal(&is_full_);
        }

        void WakeupProductor()
        {
    
            pthread_cond_signal(&is_empty_);

        }

生产/消费函数

生产和消费实际上就是往队列里面入数据和出数据的功能,生产者主要是向队列里面入数据,消费者主要是从队列里面出数据,这样可以使用如下代码实现:
入数据:该函数为了临界资源的安全,先进行加锁解锁操作。在临界区中首先要进行判断此时队列是否为满,如果为满则需要进行等待,先释放锁再将自己先暂时挂起,这里需要使用循环来判断,从而能保证退出循环一定是因为条件不满足导致的即队列不为满了。
后面正常使用stl容器队列的插入操作,将数据入队列。最后可以进行设置合适时候来唤醒消费者进行消费,这里是设置为当生产的数据大于容量的一半的时候唤醒消费者。

        void Push(const T& in)
        {
    
            LockQueue();
            //需要进行条件检测的时候,这里需要使用循环方式
            //来保证退出循环一定是因为条件不满足导致的
            while(IsFull())
            {
    
                ProductorWait();
            }

            //向队列中放数据,生产函数
            bq_.push(in);
            
            if(bq_.size()>cap_/2)
            {
    
                WakeupConsumer();
            }

            UnlockQueue();

        }

出数据:这里出数据和上面的入数据流程类似,先对临界区进行加锁,使用循环判断是否为空,为空则先释放锁进行等待。然后使用输出型参数out来拿到出队列的值,最后设置合适时间唤醒生产者进行生产,这里设置为队列中数据个数小于一半时进行唤醒生产者。

        void Pop(T* out)
        {
    
            LockQueue();
            //从队列中拿数据,消费函数
            while(IsEmpty())
            {
    
                //无法消费
                ConsumerWait();
            }
            *out=bq_.front();
            bq_.pop();

            if(bq_.size()<cap_/2)
            {
    
                WakeupProductor();
            }
            UnlockQueue();

        }

总体实现

如上便是生产者消费者模型的阻塞队列的基本实现,如下是整体代码:

namespace ns_blockqueue
{
    
    const int default_cap = 5;
    template<class T>
    class BlockQueue
    {
    
        private:
        std::queue<T> bq_;//阻塞队列
        int cap_;//队列的元素上限
        pthread_mutex_t mtx_;//保护临界资源的锁
        //1、当生产满了的时候,就应该不要生产(不要竞争锁了),应该让消费者进行消费
        //2、当消费空了,就不应该消费了(竞争锁),应该让生产者进行生产
        pthread_cond_t is_full_;//bq满了,消费者在条件变量下等待
        pthread_cond_t is_empty_;//bq空的,生产者在该条件变量下等待
        public:
        BlockQueue(const int cap = default_cap)
            :cap_(cap)
        {
    
            pthread_mutex_init(&mtx_,nullptr);
            pthread_cond_init(&is_empty_,nullptr);
            pthread_cond_init(&is_full_,nullptr);
        }
        ~BlockQueue()
        {
    
            pthread_mutex_destroy(&mtx_);
            pthread_cond_destroy(&is_empty_);
            pthread_cond_destroy(&is_full_);
        }
        private:
        bool IsFull()
        {
    
            return bq_.size() == cap_;
        }

        bool IsEmpty()
        {
    
            return bq_.size() == 0;
        }

        void LockQueue()
        {
    
            pthread_mutex_lock(&mtx_);
        }

        void UnlockQueue()
        {
    
            pthread_mutex_unlock(&mtx_);
        }
        void ProductorWait()
        {
    
            //1、调用该函数的时候,会先自动释放锁,然后再挂起自己
            //2、返回的时候,首先会自动竞争锁,获得锁,才能返回
            pthread_cond_wait(&is_empty_,&mtx_);
        }

        void ConsumerWait()
        {
    
            pthread_cond_wait(&is_full_,&mtx_);

        }

        void WakeupConsumer()
        {
    
            pthread_cond_signal(&is_full_);
        }

        void WakeupProductor()
        {
    
            pthread_cond_signal(&is_empty_);

        }



        public:
        //const& :输入
        //*:输出
        //&:输入输出

        void Push(const T& in)
        {
    
            LockQueue();
            //需要进行条件检测的时候,这里需要使用循环方式
            //来保证退出循环一定是因为条件不满足导致的
            while(IsFull())
            {
    
                ProductorWait();
                //return;
            }

            //向队列中放数据,生产函数
            bq_.push(in);
            
            if(bq_.size()>cap_/2)
            {
    
                WakeupConsumer();
            }

            UnlockQueue();

        }

        void Pop(T* out)
        {
    
            LockQueue();
            //从队列中拿数据,消费函数
            while(IsEmpty())
            {
    
                //无法消费
                ConsumerWait();
            }
            *out=bq_.front();
            bq_.pop();

            if(bq_.size()<cap_/2)
            {
    
                WakeupProductor();
            }
            UnlockQueue();

        }
    };
} 

生产者消费者模式的测试

生产者消费者模式的测试这里是创建多个消费者和一个生产者进行相关测试,其中给阻塞队列中传入的可以是正常的数据类型让他们进行相关数据传输,也可以传入相关任务,让生产者产生任务,消费者去执行任务,这里测试后一种情况:
这里先创建了一个生产者线程和五个消费者线程

int main()
{
    
    srand((long long)time(nullptr));
    BlockQueue<Task> *bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_t c1,c2,c3,c4;
    pthread_create(&c,nullptr,consumer,(void*)bq);
    pthread_create(&c1,nullptr,consumer,(void*)bq);
    pthread_create(&c2,nullptr,consumer,(void*)bq);
    pthread_create(&c3,nullptr,consumer,(void*)bq);
    pthread_create(&c4,nullptr,consumer,(void*)bq);

    pthread_create(&p,nullptr,productor,(void*)bq);

    pthread_join(c,nullptr);
    pthread_join(c1,nullptr);
    pthread_join(c2,nullptr);
    pthread_join(c3,nullptr);
    pthread_join(c4,nullptr);

    pthread_join(p,nullptr);
    return 0;
}

其中给他们传入的是任务也是一种结构类型,该类型是用来进行简单的±*/%操作,任务代码如下:

namespace ns_task
{
    
    class Task
    {
    
    private:
        int x_;
        int y_;
        char op_;//+-*/%
    public:
        Task(){
    }
        Task(int x ,int y ,char op)
            :x_(x)
            ,y_(y)
            ,op_(op)
        {
    
            
        }
        int Run()
        {
    
            int res = 0;
            switch (op_)
            {
    
            case '+':
                res = x_ + y_;
                break;
            case '-':
                res = x_ - y_;
                break;
            case '*':
                res = x_ * y_;
                break;
            case '/':
                res = x_ / y_;
                break;    
            case '%':
                res = x_ % y_;
                break;  
            default:
                std::cout<<"bug"<<std::endl;
                break;
            }
            std::cout<<"当前任务正在被:"<< pthread_self()<<"处理:"<< x_ << op_ << y_ << "="<< res <<std::endl;
            return res;
        }

        int operator()()
        {
    
            return Run();
        }
        ~Task(){
    }
    };
}

生产者主要是用来进行任务的发布,即这里是产生两个数运算的数字以及运算符,代码实现如下:

void* productor(void* args)
{
    
    BlockQueue<Task> *bq =(BlockQueue<Task>*)args;
    std::string ops = "+-*/%";
    while(true)
    {
    
        int x = rand()%20 + 1;//1-20
        int y = rand()%20 + 1;//1-20
        char op = ops[rand()%5];
        Task t(x,y,op);
        std::cout<<"生产者派发了一个任务:"<< x << op << y << "=?" <<std::endl; 
        bq->Push(t);
    }
}

消费者主要是进行数据的处理以及计算出结构,这里是通过Task类中的仿函数来实现的,消费者函数实现如下:

void* consumer(void* args)
{
    
    BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
    sleep(2);
    while(true)
    {
    
        Task t;
        bq->Pop(&t);//这里完成了任务消费的第一步
        t();        //这里完成了任务处理的第二步
    }
}

运行结果如下:
在这里插入图片描述
整体代码实现如下:

void* consumer(void* args)
{
    
    BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
    sleep(2);
    while(true)
    {
    
        Task t;
        bq->Pop(&t);//这里完成了任务消费的第一步
        t();        //这里完成了任务处理的第二步
    }
}

void* productor(void* args)
{
    
    BlockQueue<Task> *bq =(BlockQueue<Task>*)args;
    std::string ops = "+-*/%";
    while(true)
    {
    
        int x = rand()%20 + 1;//1-20
        int y = rand()%20 + 1;//1-20

        char op = ops[rand()%5];
        Task t(x,y,op);
        std::cout<<"生产者派发了一个任务:"<< x << op << y << "=?" <<std::endl; 
        bq->Push(t);

        sleep(1);

    }
}


int main()
{
    
    srand((long long)time(nullptr));
    BlockQueue<Task> *bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_t c1,c2,c3,c4;
    pthread_create(&c,nullptr,consumer,(void*)bq);
    pthread_create(&c1,nullptr,consumer,(void*)bq);
    pthread_create(&c2,nullptr,consumer,(void*)bq);
    pthread_create(&c3,nullptr,consumer,(void*)bq);
    pthread_create(&c4,nullptr,consumer,(void*)bq);

    pthread_create(&p,nullptr,productor,(void*)bq);

    pthread_join(c,nullptr);
    pthread_join(c1,nullptr);
    pthread_join(c2,nullptr);
    pthread_join(c3,nullptr);
    pthread_join(c4,nullptr);

    pthread_join(p,nullptr);
    return 0;
}

原网站

版权声明
本文为[HHYX.]所创,转载请带上原文链接,感谢
https://blog.csdn.net/h1091068389/article/details/125427287