Linux系统编程二十九基于信号量的环形队列生产消费模型

【Linux系统编程二十九】基于信号量的环形队列生产消费模型

  • 一.信号量
    • 1.P操作
    • 2.V操作
  • 二.环形队列
  • 三.单生产单消费场景
    • 1.信号量维持生产消费之间互斥同步
  • 四.多生产多消费场景
    • 1.加锁维持生产生产,消费消费互斥
  • 五.总结

一.信号量

当共享资源被当成整体使用时,则共享资源的数量要么是1,要么是0。
当被访问使用时,共享资源的数量就为0,当没有被使用时,数量就为1。

共享资源是可以被分成多份来使用的,只不过不同的线程访问的是该共享资源的不同的区域,它是允许多个线程并发访问的,只不过访问的是不同的区域。而信号量就是用来表示这个大的共享资源被分成多少份小的资源的个数。
信号量本质就是一把计数器,用来描述资源数量的多少。
信号量表示对资源的预定机制,只要你信号量申请成功了,就表明你肯定可以使用这块资源。不需要再判断资源是否就绪,因为在申请信号量的时候就已经判断好了,申请成功就可以使用,申请失败就不能使用就去挂起等待。
在这里插入图片描述

互斥锁的本质也就是信号量,只不过是二元信号量,结果要么是1要么是0。申请到锁了,就可以去访问共享资源,访问是能访问,单是使用是有使用条件的,如果使用条件不满足,也无法使用。
在这里插入图片描述

sem_t 就是信号量类型,使用它需要初始化:
在这里插入图片描述

1.P操作

P操作就是申请信号量,也就是先预定关心的资源。因为信号量本质就是一把计数器,用来描述临界资源数量的多少,而申请一个信号量,就是预定临界资源,对应的临界资源总数就要减一。
申请成功就可以使用该资源,申请失败,就说明没有资源给你使用,那么就等待。
在这里插入图片描述

2.V操作

V操作就是释放信号量,也就是释放对应的资源,对应的临界资源总数就要加一。在这里插入图片描述

二.环形队列

利用环形队列实现的生产消费模型需要满足几个条件:
首先生产者往队列里投入数据时,生产者就往后走。
消费者从队列里拿走数据时,消费者就往后走。
当队列为空,为满的时候,生产和消费是在同一个位置上的,而不为空,不为满的时候,生产和消费一定指向不同的位置。

1.当队列为空或者为满时,生产者和消费者都在同一个位置,而为了满足321原则,生产者和消费者之间必须互斥,所以只允许一个人访问队列。
并且为空的时候,肯定是生产者先访问,而消费后访问。当为满时,肯定是消费者先访问,生产者后访问。
2.而当队列不为空,不为满时,生产和消费一定指向的是不同的位置,这时生产和消费可以同时访问队列。

原则1:当实指向同一个位置时,只能有一个人访问
原则2:消费者是不能超过生产者的,生产者生产多少数据,消费者最多只能消费多少数据
原则3:生产者不能将消费者套一个圈,不然原先的数据就会被覆盖。

在这里插入图片描述
在模型中我们利用vector数组来模拟环形队列。

三.单生产单消费场景

在这里插入图片描述

生产者关注的资源是什么呢?消费者关心的资源是什么呢?

生产者关注的是队列里是否还有空间,关注的是空间资源。
消费者关注的是队列里是否还有数据,关注的是数据资源。
而一开始,队列里并没有数据资源,但空间资源是有的,就是队列的大小。

因为信号量是描述临界资源的数量,所以我们就可以定义两个信号量分别表示对应的,空间资源,数据资源的多少。

在这里插入图片描述

【1】生产者要想往队列里生产,需要先申请空间资源,也就是空间信号量。申请成功后,就可以进行生产,生产完毕后,当前的空间被生产者生产的数据着,并且队列数据多了一个,所以需要释放数据资源,也就是释放数据信号量。如果申请信号量失败,那么就等待挂起。
在这里插入图片描述

【2】消费者想要从队列里消费,需要先申请数据资源,也就是数据信号量。申请成功后,就可以进行消费,消费完毕后,数据被消费者拿走,而当前的空间多了一个,所以需要释放空间资源,也就是释放空间信号量。如果申请失败,那么就等待挂起。
在这里插入图片描述

1.信号量维持生产消费之间互斥同步

在环形队列生产模型中,我们要探讨的三种关系,其中生产者和消费者之间的互斥和同步关系是由信号量来维持的。

在这里插入图片描述

#include <pthread.h>
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "TASK.hpp"
struct ThreadData
{
    RingQueue<TASK>* RQ;
    std::string name;
};
void *Consumer(void *args)
{
   
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    while (true)
    {
        //sleep(2);
        // 1.消费数据
        TASK t;
        rq->Pop(&t);

        // 2.处理数据
        t.run();
        std::cout << "Comsumer get task,task is: " << t.GetTASK() <<"who: "<<td->name<< " reslut :" << t.Getresult() << std::endl;
    }
    return nullptr;
}

void *Producer(void *args)
{
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    int len = opera.size();
    while (true)
    {
       
        // 1.获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opera[rand() % len];
        // 2.生产数据
        TASK t(data1, data2, op);
        rq->Push(t);
        std::cout << "Productor task done,task is: " << t.GetTASK() <<" who: "<<td->name<< std::endl;
         sleep(2);
    }
    return nullptr;
}


int main()
{

       srand(time(nullptr));
        RingQueue<TASK> *rq = new RingQueue<TASK>();
        pthread_t c, p;
        ThreadData* td=new ThreadData();
        td->name="Procductor-"+std::to_string(i);
        td->RQ=rq;
        pthread_create(p + i, nullptr, Producer, td);
  //在创建线程的时候可以给线程定义基本的属性比如名字,以参数的形式传给线程
        pthread_create(c + i, nullptr, Consumer, td);
   
        pthread_join(c[i], nullptr);
        pthread_join(p[i], nullptr);
    
}
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
// 先模拟单生产单消费
const static int defaultcap = 5; // 默认大小
template <class T>
class RingQueue
{

public:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    RingQueue(int cap = defaultcap) : _ringqueue(cap), _cap(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
//注意的是,对下标加锁,应该在申请信号量后面,从两个方面解释:信号量不需要保护,本身就是原子的。
//先申请信号量,后加锁可以提高效率,一个并发度,别人正在执行时,其他线程还可以申请信号量
    void Push(const T &in)
    {
        // 生产者在生产数据之前,需要先申请对应的信号量,申请成功说明可以生产数据
        P(pspace_sem); // 生产者申请空间资源
     
        _ringqueue[p_step] = in;
        p_step++;
        p_step %= _cap; // 维持环形
       
        V(cdata_sem); // 格子已经被占了,数据多了一个,所以释放数据资源
    }

    void Pop(T *out)
    {
        // 消费者在消费数据之前,也需要申请对应的信号量,申请成功,说明可以消费

        P(cdata_sem); // 消费者申请数据资源
        *out = _ringqueue[c_step];
        c_step++;
        c_step %= _cap; // 维持环形
        V(pspace_sem); // 拿走一个数据,空间资源多出来一个
    }

private:                       // 环形队列的基本属性
    std::vector<T> _ringqueue; // 用vector来模拟环形队列
    int _cap;                  // 该环形队列的最大值

    int c_step; // 消费者下标
    int p_step; // 生产者下标

    // 消费者和生产者各自关心的资源

    sem_t cdata_sem;  // 消费者关心的数据资源
    sem_t pspace_sem; // 生产者关心的空间资源
};
#pragma once
#include <iostream>
#include <string>

std::string opera="+-*/%";
class TASK
{

public:
    TASK()
    {}
    TASK(int data1, int data2, char op) : _data1(data1), _data2(data2), _oper(op)
    {
    }
    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
        {
            if (_data2 == 0)
                _exitcode = 1;
            else
                _result = _data1 / _data2;
        }
        break;
        case '%':
        {
            if (_data2 == 0)
                _exitcode = 2;
            else
                _result = _data1 % _data2;
        }
        break;

        default:
        _exitcode=3;
            break;
        }
    }

    std::string GetTASK()
    {
       std::string r=std::to_string(_data1);
       r+=_oper;
       r+=std::to_string(_data2);
       r+="=?";
       return r;
    }
    std::string Getresult()
    {
        std::string result=std::to_string(_data1);
        result+=_oper;
        result+=std::to_string(_data2);
        result+='=';
        result+=std::to_string(_result);
        result+="[code:";
        result+=std::to_string(_exitcode);
        result+=']';
        return result;
    }

private:
    int _data1;
    int _data2;
    char _oper;

    int _result;
    int _exitcode;
};

四.多生产多消费场景

在单生产单消费模型中,生产者只有一个,而当多生产时,就可能出现问题。
比如生产过程中Push,同时进入多个生产者,集中申请信号量,因为下标只有一个,那么线程就会对同一个下标进行竞争,A生产者要往队列该下标里写,B生产者也要往队列该下标里写,最后就会造成数据的覆盖丢失。
在这里插入图片描述

在这里插入图片描述

在环形队列里,不能存在多个生产者和多个消费者同时在环形队列的情况。这就要求生产者和生产者之间要进行互斥,消费者和消费者之间要进行互斥。
所以需要锁来实现它们之间的互斥,因为在大部分情况下,生产者和消费者不是在同一个位置上,所以可以并发执行,这时如果只设置一把锁,就会降低并发度,因为这两种关系的线程是关系不同的资源,所以使用两把锁,来加锁不同的资源。
在这里插入图片描述

1.加锁维持生产生产,消费消费互斥

所以加锁是为了保护环形队列下标不被多线程并发访问。
不过这里就存在一个问题:加锁是在申请信号量之前加锁呢?还是在申请信号量之后加锁呢?

答案:在申请信号量之后加锁。为什么?
原因1:因为申请信号量本身就是原子的,不需要保护。
原因2:因为在申请信号之后加锁,可以提高效率,因为当线程持有锁执行期间,其他线程还可以进行继续申请信号量(申请成功后,就在等待锁资源,申请失败,挂起等待),等持有锁线程释放锁后,就可以直接去竞争锁。也就是多线程可以集中在申请信号量这里,先将资源预定好了,等有锁资源时,就可以直接访问临界资源。
所以在生产者生产的过程中,其他生产者还可以去申请信号量,提高并发度。
在这里插入图片描述

#include <pthread.h>
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "TASK.hpp"
struct ThreadData
{
    RingQueue<TASK>* RQ;
    std::string name;
};
void *Consumer(void *args)
{
   
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    while (true)
    {
        //sleep(2);
        // 1.消费数据
        TASK t;
        rq->Pop(&t);

        // 2.处理数据
        t.run();
        std::cout << "Comsumer get task,task is: " << t.GetTASK() <<"who: "<<td->name<< " reslut :" << t.Getresult() << std::endl;
    }
    return nullptr;
}

void *Producer(void *args)
{
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    int len = opera.size();
    while (true)
    {
       
        // 1.获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opera[rand() % len];
        // 2.生产数据
        TASK t(data1, data2, op);
        rq->Push(t);
        std::cout << "Productor task done,task is: " << t.GetTASK() <<" who: "<<td->name<< std::endl;
         sleep(2);
    }
    return nullptr;
}


int main()
{

    srand(time(nullptr));
    RingQueue<TASK> *rq = new RingQueue<TASK>();
    pthread_t c[5], p[3];
    for (int i = 0; i < 3; i++)
    {
        ThreadData* td=new ThreadData();
        td->name="Procductor-"+std::to_string(i);
        td->RQ=rq;
        pthread_create(p + i, nullptr, Producer, td);
    }
    for (int i = 0; i < 5; i++)//在创建线程的时候可以给线程定义基本的属性比如名字,以参数的形式传给线程
    {
         
        ThreadData* td=new ThreadData();
        td->name="Consumer-"+std::to_string(i);
        td->RQ=rq;
        pthread_create(c + i, nullptr, Consumer, td);
    }
    

    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
}
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
// 先模拟单生产单消费
const static int defaultcap = 5; // 默认大小
template <class T>
class RingQueue
{

public:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
      pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
      pthread_mutex_unlock(&mutex);
    }
    RingQueue(int cap = defaultcap) : _ringqueue(cap), _cap(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap);

        pthread_mutex_init(&c_mutex, nullptr);
        pthread_mutex_init(&p_mutex, nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
//注意的是,对下标加锁,应该在申请信号量后面,从两个方面解释:信号量不需要保护,本身就是原子的。
//先申请信号量,后加锁可以提高效率,一个并发度,别人正在执行时,其他线程还可以申请信号量
    void Push(const T &in)
    {
        // 生产者在生产数据之前,需要先申请对应的信号量,申请成功说明可以生产数据
        P(pspace_sem); // 生产者申请空间资源
        
        Lock(p_mutex);
        _ringqueue[p_step] = in;
        p_step++;
        p_step %= _cap; // 维持环形
        Unlock(p_mutex);
       
        V(cdata_sem); // 格子已经被占了,数据多了一个,所以释放数据资源
    }

    void Pop(T *out)
    {
        // 消费者在消费数据之前,也需要申请对应的信号量,申请成功,说明可以消费

        P(cdata_sem); // 消费者申请数据资源
        Lock(c_mutex);
        *out = _ringqueue[c_step];
        c_step++;
        c_step %= _cap; // 维持环形
        Unlock(c_mutex);

        V(pspace_sem); // 拿走一个数据,空间资源多出来一个
    }

private:                       // 环形队列的基本属性
    std::vector<T> _ringqueue; // 用vector来模拟环形队列
    int _cap;                  // 该环形队列的最大值

    int c_step; // 消费者下标
    int p_step; // 生产者下标

    // 消费者和生产者各自关心的资源

    sem_t cdata_sem;  // 消费者关心的数据资源
    sem_t pspace_sem; // 生产者关心的空间资源

    // 如果是多个生产者,多个消费者,那么就需要用到锁
    // 因为生产者与生产者直接会竞争下标,而下标只有一个,所以需要对下标进行加锁保护,实现生产者与生产者之间的互斥。
    // 而消费者之间也会竞争下标,生产消费因为关心的是不同资源,所以各自保护各自的下标。
    pthread_mutex_t c_mutex;
    pthread_mutex_t p_mutex;
};

五.总结

在这里插入图片描述