[Linux] 线程同步分析总汇: 什么是条件变量?生产者消费者模型是什么?POSIX信号量怎么用?阻塞队列和环形队列模拟生产者消费者模型
只互斥的问题: 饥饿
线程同步
虽然, 同步是指让执行流访问临界资源有一定顺序性的机制, 但是 互斥其实也是同步机制的一种
虽然, 只采用互斥 执行流访问资源还是乱序的
但, 它还是在一定程度上协调了多个线程的执行, 因为 互斥锁可以保证同一时刻 只有一个执行流访问临界资源
不过本篇文章介绍时会将同步和互斥区别开, 即 同步不包括互斥, 不然非常容易混淆.
条件变量
pthread
库提供的一个结构体类型(pthread_cond_t)
的变量, 并且pthread
库中也提供的操作条件变量的一些接口cond
及接口
cond
即 condition
, 是条件的意思pthread_cond_t
即为定义条件变量的类型pthread_cond_t
类型定义的destroy
**了pthread_cond_init()
接口, 来初始化, 第一个参数是条件变量的地址, 第二个参数是条件变量的属性(可以不考虑)init
接口初始化的条件变量, 在不需要使用时, 需要调用pthread_cond_destroy()
接口进行销毁pthread_cond_wait()
是pthread
库提供的 使用条件变量进行等待的接口pthread_cond_timedwait()
也是pthread
库提供的 使用条件变量进行等待的接口, 不过 此接口是一种让线程定时等待的接口pthread_cond_signal()
, 调用此接口, 可以让某个 通过指定条件变量陷入等待的线程被唤醒pthread_cond_broadcast()
, 调用此接口, 则可以让所有 通过指定条件变量陷入等待的线程唤被醒cond
及接口的使用演示
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using std::cin;
using std::cout;
using std::endl;
// 定义并初始化全局互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 定义全局条件变量
pthread_cond_t cond;
void* waitCommand(void* args) {
pthread_detach(pthread_self());
// 先让线程自己分离自己, 我们就不在主线程中回收线程了
// 在此例中, 如果不分离, 线程回收会是个问题. 但具体问题后面再解释和解决
// 这里我们只是展示一下 接口的最基本的用法和现象
const char* name = (const char*)args;
while (true) {
pthread_cond_wait(&cond, &mutex);
// 此输出不表示任务执行, 只用于表示此线程被唤醒一次
cout << name << ", tid: " << pthread_self() << ", run……" << endl;
}
return nullptr;
}
int main() {
pthread_cond_init(&cond, nullptr);
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, nullptr, waitCommand, (void*)"Thread_1");
pthread_create(&tid2, nullptr, waitCommand, (void*)"Thread_2");
pthread_create(&tid3, nullptr, waitCommand, (void*)"Thread_3");
while (true) {
char c = 'a';
cout << "请输入你的命令(N/Q):: ";
cin >> c;
if (c == 'N' | c == 'n') {
pthread_cond_signal(&cond);
}
else
break;
usleep(1000); // 让主线程usleep一下, 防止线程之间在屏幕上打印干扰
}
pthread_cond_destroy(&cond);
return 0;
}
n
和N
来调用唤醒函数, 唤醒线程, 观察现象:N
和n
来唤醒等待的线程pthread_cond_signal()
来单个唤醒等待的线程pthread_cond_broadcast()
来广播唤醒所有等待的线程:cond
条件变量的没有场景的用法quit
, 为真时即为满足, 为假时即为不满足#include <iostream>
#include <unistd.h>
#include <pthread.h>
using std::cin;
using std::cout;
using std::endl;
// 定义并初始化全局互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 定义全局条件变量
pthread_cond_t cond;
// // 定义一个全局退出变量, 用于判断条件
volatile bool quit = false;
void* waitCommand(void* args) {
pthread_detach(pthread_self());
const char* name = (const char*)args;
while (!quit) {
// 不满足退出条件, 就进来等待
pthread_cond_wait(&cond, &mutex);
// 此输出不表示任务执行, 只用于表示此线程被唤醒一次
cout << name << ", tid: " << pthread_self() << ", run……" << endl;
}
pthread_mutex_unlock(&mutex); // 暂时不解释 这里解锁的原因
cout << name << ", tid: " << pthread_self() << ", end……" << endl;
return nullptr;
}
int main() {
pthread_cond_init(&cond, nullptr);
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, nullptr, waitCommand, (void*)"Thread_1");
pthread_create(&tid2, nullptr, waitCommand, (void*)"Thread_2");
pthread_create(&tid3, nullptr, waitCommand, (void*)"Thread_3");
while (true) {
char c = 'a';
cout << "请输入你的命令(N/Q):: ";
cin >> c;
if (c == 'N' | c == 'n') {
pthread_cond_broadcast(&cond);
}
else {
quit = true; // 修改条件为满足
pthread_cond_broadcast(&cond); // 然后唤醒线程, 再让线程判断条件是否满足
break;
}
usleep(1000); // 让主线程usleep一下, 防止线程之间在屏幕上打印干扰
}
pthread_cond_destroy(&cond);
return 0;
}
quit
, quit
为真时即为满足条件, quit
为假时即为不满足条件N
和n
时, 唤醒一下线程, 让线程继续判断条件是否满足, 非N
和n
时, 让退出条件被满足, 并唤醒线程为什么条件变量需要与互斥锁一起使用?
pthread_cond_wait()
的使用需要同时用到 条件变量和互斥锁.-
线程判断条件是否满足之前, 先上锁, 因为条件是可能被修改的临界资源
-
然后, 再判断是否条件是否满足, 一般是循环判断
-
如果不满足, 则进入循环体内, 调用
pthread_cond_wait()
此时,
pthread_cond_wait()
函数内部, 会先对 为保护临界资源而上的锁 解锁, 以确保其他线程能够正常访问到临界资源然后, 再通过条件变量陷入等待
如果满足, 线程就正常执行
-
线程通过条件变量等待时, 其他线程可以获取同一把锁, 然后访问临界资源
获取到锁的线程, 可以修改临界资源, 让条件变成满足
-
临界资源被某个线程修改, 即 条件变得满足时, 此线程会释放锁, 然后唤醒 因为条件不满足, 在条件变量上等待的线程
-
在条件变量上等待的线程被唤醒时, 首先需要再次获取锁
因为, 虽然 其他线程发起唤醒这个动作时, 条件是满足的, 但是 线程真正被唤醒时, 条件可能又不满足了
所以, 需要先获取锁, 然后再判断条件是否满足
为什么线程真正被唤醒时, 条件可能又不满足了?
首先, 确定一个点: 同一个条件, 不同线程会使用同一个条件变量来等待, 也会使用同一把锁来保护临界资源
所以, 由于不能保证只有一个线程在竞争同一条件, 也不能保证只有一个线程在竞争锁
条件变量的线程唤醒动作, 可能会唤醒多个在同一个条件变量上等待的线程, 然后被唤醒的线程会竞争同一把锁
但, 只有一个线程能够竞争到锁, 此时 其他被唤醒的线程又会等待锁
竞争到锁的线程, 能够访问临界资源, 处理完任务, 就可能使条件再次变为不满足
此时, 访问完临界资源的线程 释放锁, 就会有其他竞争锁的线程恢复执行, 此时就应该再次判断条件是否满足
所以, 条件是否满足一般会循环判断
pthread_cond_wait()
完成的pthread_cond_wait()
解锁并等待, 在线程被唤醒时, 会自动再去竞争锁pthread_cond_wait()
接口内部实现的第2行
, 我们让线程分离自己, 不用回收.第13行
, 我们执行了解锁操作pthread_cond_wait()
陷入等待时, 会释放锁, 然后被唤醒的时候, 又会会竞争锁pthread_cond_wait()
需要条件变量和互斥锁一起使用?pthread_cond_wait()
接口需要执行释放锁和竞争锁的操作, 所以 需要先看到锁生产者消费者模型介绍 **
这里的生产者和消费者, 不以生物学的角度看待
-
消费者与消费者之间是什么关系?
-
生产者与生产者之间是什么关系?
-
消费者与生产者之间需要什么关系?
消费者和生产者, 看似是没有之间的关系的
但是思考一个问题, 既然超市是临界资源, 那么消费者和生产者是可能在同一时间访问临界资源的
如果 供应商再给超市供货的时候, 货还没有供完, 货架上的东西还没有放完
在生活中我们可以直接拿走一个, 然后去超市结账
但是, 如果从计算机的角度来看, 生产线程还没有向临界资源内写完数据, 消费线程可以从临界资源中拿走数据吗?
很明显是不可以的, 因为 消费线程可能拿不到完整的资源
所以, 以计算机的角度来说, 消费者和生产者首先 要保持一个互斥关系
而除了互斥之外, 其实 还需要保持同步
因为消费者不能在超市没有商品的时候购买商品, 需要等待, 让生产者先向超市供货
生产者也不能在超市的空间资源已满的情况下继续向超市供货, 需要等待, 让消费者先购买商品
等到超市有商品了, 再通知消费者来购买
等到超市有空间了, 再通知生产者来供货
3
类2
种角色1
个交易场所3、2、1
的思想来理解3、2、1
的模型, 来解决问题生产者消费者模型的优点
- 解耦, 可以将两个角色之间的 强耦合关系 变为 松耦合关系
- 支持并发
- 支持忙闲不均
以阻塞队列模拟生产者消费者模型 **
uint32_t _cap
记录阻塞队列的容量queue<T> _bq
, 即为阻塞队列本身_mutex
、_conCond
、_proCond
一个互斥锁, 两个线程分别用的条件变量
blockQueue.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
using std::queue;
using std::cout;
using std::endl;
const uint32_t gDefultCap = 5;
template <class T>
class blockQueue {
public:
// 构造函数
blockQueue(uint32_t cap = gDefultCap)
:_cap(cap) {
pthread_mutex_init(&_mutex, nullptr); // 初始化锁
pthread_cond_init(&_proCond, nullptr); // 初始化生产线程使用的条件变量
pthread_cond_init(&_conCond, nullptr); // 初始化消费线程使用的条件变量
}
// 析构函数
~blockQueue() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_conCond);
pthread_cond_destroy(&_proCond);
}
// 生产接口
void push(const T &in) {
// 生产的全过程为
// 1. 上锁
// 2. 判满. 满不生产 条件等待, 不满则生产.
// 3. 生产之后, 解锁
// 4. 唤醒消费接口
lockQueue(); // 上锁
while(isFull()) {
// 满 进入条件等待
condWait(_proCond); // 传入生产线程所用的条件变量, 让生产线程等待
}
// 不满 则生产
pushCore(in);
// 解锁
unlockQueue();
condWakeUp(_conCond); // 传入消费线程所用的条件变量, 唤醒消费线程
}
T pop() {
// 消费的全过程为
// 1. 上锁
// 2. 判空. 空则不消费 条件等待, 不空 则消费
// 3. 消费之后, 解锁
// 4. 唤醒生产接口
lockQueue();
while(isEmpty()) {
condWait(_conCond);
}
T tmp = popCore();
unlockQueue();
condWakeUp(_proCond);
return tmp;
}
private:
// 队列上锁
void lockQueue() {
pthread_mutex_lock(&_mutex);
}
// 队列解锁
void unlockQueue() {
pthread_mutex_unlock(&_mutex);
}
// 判空
bool isEmpty() {
return _bq.empty();
}
// 判满
bool isFull() {
return _bq.size() == _cap;
}
// 条件等待
void condWait(pthread_cond_t &cond) {
pthread_cond_wait(&cond, &_mutex);
}
// 唤醒等待
void condWakeUp(pthread_cond_t &cond) {
pthread_cond_signal(&cond);
}
// 生产任务
void pushCore(const T &in) {
// 即为向队列中添加任务
_bq.push(in);
}
// 消费任务
T popCore() {
// 即从队列中拿出任务
T tmp = _bq.front();
_bq.pop();
return tmp;
}
private:
uint32_t _cap; // 队列容量
queue<T> _bq; // 阻塞队列
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _conCond; // 消费线程使用的条件变量
pthread_cond_t _proCond; // 生产线程使用的条件变量
};
blockQueue.cc:
#include <iostream>
#include <ctime>
#include "blockQueue.hpp"
using std::cout;
using std::endl;
void* productor(void* args) {
blockQueue<int>* pBq = static_cast<blockQueue<int>*>(args);
while (true) {
// 制作数据
int data = rand() % 10;
// 向队列中生产数据
pBq->push(data);
cout << "productor 生产数据完成……" << data << endl;
sleep(2);
}
return nullptr;
}
void* consumer(void* args) {
blockQueue<int>* pBq = static_cast<blockQueue<int>*>(args);
while (true) {
int data = pBq->pop();
cout << "consumer 消费数据完成……" << data << endl;
}
return nullptr;
}
int main() {
// 设置一个随机数种子
srand((unsigned long)time(nullptr) ^ getpid());
// 定义阻塞队列
// 创建两个线程
blockQueue<int> bq;
pthread_t pro, con;
pthread_create(&pro, nullptr, productor, &bq); // 生产线程
pthread_create(&con, nullptr, consumer, &bq); // 消费线程
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
return 0;
}
productor
生产者, 每2s
创建随机数并push
入队列中consumer
消费者, 从队列中取数据, 不做间隔限制productor
生产线程 每2s, 生产一个数据, consumer
消费线程跟随生产的节奏来消费数据.1s
生成一个, 顺序为: 5 4 3 3 4 0 1 6
2s
一次的顺序为 5 4 3 3
问题1: 条件判断的语句
while()
而不是if()
为什么?pthread_cond_wait()
流程的时候问题2: 什么时候唤醒 或者 什么时候解锁?
理解生产者消费者模型的 并发
Task.hpp:
#pragma once
#include <iostream>
#include <string>
class Task {
public:
Task(int one = 0, int two = 0, char op = 0)
: elemOne_(one)
, elemTwo_(two)
, operator_(op) {}
// 仿函数定义
int operator()() {
return run();
}
int run() {
int result = 0;
switch (operator_) {
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
// 除0处理
if (elemTwo_ == 0) {
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else {
result = elemOne_ / elemTwo_;
}
break;
case '%':
// 除0处理
if (elemTwo_ == 0) {
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else {
result = elemOne_ % elemTwo_;
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int* e1, int* e2, char* op) {
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
return 0;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
blockQueue.cc:
#include <iostream>
#include <ctime>
#include "blockQueue.hpp"
#include "Task.hpp"
using std::cout;
using std::endl;
const std::string ops = "+-*/%";
// 生产任务接口
void* productor(void* args) {
blockQueue<Task>* pBq = static_cast<blockQueue<Task>*>(args);
while (true) {
// 制作任务
int elemOne = rand() % 50;
int elemTwo = rand() % 10;
char oper = ops[rand() % 4]; // 操作符
Task t(elemOne, elemTwo, oper);
// 生产任务
pBq->push(t);
cout << "producter[" << pthread_self() << "] " <<
(unsigned long)time(nullptr) << " 生产了一个任务: " <<
elemOne << oper << elemTwo << "=?" << endl;
sleep(1);
}
return nullptr;
}
void* consumer(void* args) {
blockQueue<Task>* pBq = static_cast<blockQueue<Task>*>(args);
while (true) {
// 消费任务
Task t = pBq->pop();
// 处理任务
int result = t();
int elemOne, elemTwo;
char oper;
t.get(&elemOne, &elemTwo, &oper);
cout << "consumer[" << pthread_self() << "] " <<
(unsigned long)time(nullptr) << " 消费了一个任务: " <<
elemOne << oper << elemTwo << "=" << result << endl;
}
return nullptr;
}
int main() {
// 设置一个随机数种子
srand((unsigned long)time(nullptr) ^ getpid());
// 定义阻塞队列
// 创建两个线程
blockQueue<Task> bq;
pthread_t pro, con;
pthread_create(&pro, nullptr, productor, &bq); // 生产线程
pthread_create(&con, nullptr, consumer, &bq); // 消费线程
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
return 0;
}
blockQueue.hpp
还是上面的内容push
入队列的过程pop
出来的过程POSIX信号量
信号量
也是同步的一种机制什么是信号量?
-1
--
++
0
, 其他人再想要买票, 就需要等有人退票-1
表示着临界资源中的一部分被选中了, 也就表示着之后只能选择临界资源的其他部分1
, 那么 信号量可以表示什么?1 –> 0
就是上锁的过程0 –> 1
就是解锁的过程.1
时, 此信号量被称为二元信号量--
和 ++
--
和 信号量的释放++
都是 原子性的信号量的接口
sem_t
1. sem_init()初始化:
2. sem_destroy()销毁:
3. sem_wait()等待, 即申请信号量:
4. sem_post()释放信号量:
pthread
库提供的, 需要使用的是semphore.h
头文件以环形队列模拟生产者消费者模型 **
环形队列
[0, 7]
一共8个空间 就能够模拟出一个环形的队列[0, 7]
来实现, 那么队头恒为0
0
位置出, 然后将后面的元素向前移动一位0
1
2
3
位置存储有数据0
, 队尾指针指向3
或4
0
位置的数据, 然后 队头指针++, 移动到1
, 新的队头就是1
位置++
改变, 即, 环形队列的任意位置都可能是队头++
++
++
就会回到0
位置- 如果刚创建的队列, 那就为空
- 如果队尾指针刚追上队头指针, 那就为满
- 如果队头指针刚追上队尾指针, 那就为空
模拟模型
思路
-
生产者需要的是什么资源?
需要空间资源, 因为需要向队列中, 入数据
-
消费者需要的是什么资源?
需要数据资源, 因为需要从队列中, 出数据
roomSem
, 另一个信号量 表示数据资源量dataSem
roomSem
, 即申请 空间资源信号量, 申请成功 则空间资源信号量--
++
, 因为 有数据入队列了dataSem
, 申请 数据资源信号量, 申请成功 则数据资源信号量--
++
**的roomSem
应该为多少? dataSem
应该为多少?roomSem
应该为N
, dataSem
应该为0
roomSem++
时, 对应的dataSem
需要--
; dataSem++
时, 对应的roomSem
需要--
roomSem
为N
, dataSem
为0
, 需要生产者先生产roomSem
为0
, dataSem
为N
, 需要消费者先消费使用信号量 模拟 生产者消费者模型
ringQueue.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using std::cout;
using std::endl;
using std::vector;
const int gDefultCap = 10;
template <class T>
class ringQueue {
public:
// 构造函数
ringQueue(const int cap = gDefultCap)
: _ringQueue(cap)
, _pIndex(0)
, _cIndex(0) {
sem_init(&_roomSem, 0, _ringQueue.size());
sem_init(&_dataSem, 0, 0);
// sem_init() 接口的
// 第一个参数是 需要初始化的信号量,
// 第二个参数是 线程共享(0)还是进程共享
// 第三个参数是 需要初始化为多少
}
// 析构函数
~ringQueue() {
sem_destroy(&_roomSem);
sem_destroy(&_dataSem);
}
// 生产接口
void push(const T &in) {
// 生产数据
// 先申请空间信号量
sem_wait(&_roomSem); // 申请成功则 _roomSem--, 否则等待
_ringQueue[_pIndex] = in; // 将数据放入 数组
sem_post(&_dataSem); // 数组中数据+1, 那么 dataSem 需要++
_pIndex++; // 生产者下一次生产数据的位置 ++
_pIndex %= _ringQueue.size(); // 跟新下标, 保证环形特性
}
// 消费接口
T pop() {
// 消费数据
// 先申请数据信号量
sem_wait(&_dataSem); // 申请成功则 _dataSem--, 否则等待
T tmp = _ringQueue[_cIndex]; // 存储应拿到的数据
sem_post(&_roomSem); // 拿出了数据, 空间+1, 那么 _roomSem ++
_cIndex++;
_cIndex %= _ringQueue.size();
return tmp;
}
private:
vector<T> _ringQueue; // 模拟循环队列的数组
sem_t _roomSem; // 空间资源信号量, 生产者申请
sem_t _dataSem; // 数据资源信号量, 消费者申请
uint32_t _pIndex; // 生产者生产数据的索引下标, 即插入数据的下标
uint32_t _cIndex; // 消费者消费数据的索引下标, 即获取数据的下标
};
- 一个数组, 用来模拟环形队列
- 一个空间信号量、一个数据信号量. 分别用来控制生产者对空间的申请, 消费者对数据的申请
- 一个生产数据的索引下标, 一个消费数据的索引下标, 分别用来表示插入数据的下标, 和拿出数据的下标
sem_init()
的使用:int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_t* sem
, 需要初始化的信号量int pshared
, 信号量的类型. 暂不考虑, 传入0
unsigned int value
, 信号量的初始值
_cIndex %= _ringQueue.size()
和_pIndex %= _ringQueue.size()
来控制环形特性_pIndex
和_cIndex
需要++
ringQueue.cc:
#include "ringQueue.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
using std::cout;
// 消费线程调用函数
void* consumer(void* args) {
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
sleep(3);
int data = ringQp->pop();
cout << "consumer_pthread[" << pthread_self() << "]"
<< " 消费了一个数据: " << data << endl;
}
}
// 生产线程调用函数
void* productor(void* args) {
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
int data = rand() % 20;
ringQp->push(data);
cout << "productor_pthread[" << pthread_self() << "]"
<< " 生产了一个数据: " << data << endl;
sleep(1);
}
}
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
ringQueue<int> ringQ;
pthread_t con, pro;
pthread_create(&con, nullptr, consumer, &ringQ);
pthread_create(&pro, nullptr, productor, &ringQ);
pthread_join(con, nullptr);
pthread_join(pro, nullptr);
return 0;
}
1s
生产一次数据, 3s
消费一次数据sem_wait(_roomSem)
是无法申请成功空间信号量的, 因为此时_roomSem
为0ringQueue
封装的成员就需要改进一下:#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using std::cout;
using std::endl;
using std::vector;
const int gDefultCap = 30; // 实现了多线程, 适当的将队列放大
template <class T>
class ringQueue {
public:
// 构造函数
ringQueue(const int cap = gDefultCap)
: _ringQueue(cap)
, _pIndex(0)
, _cIndex(0) {
sem_init(&_roomSem, 0, _ringQueue.size());
sem_init(&_dataSem, 0, 0);
// sem_init() 接口的
// 第一个参数是 需要初始化的信号量,
// 第二个参数是
// 第三个参数是 需要初始化为多少
// 初始化锁
pthread_mutex_init(&_pMutex, nullptr);
pthread_mutex_init(&_cMutex, nullptr);
}
// 析构函数
~ringQueue() {
sem_destroy(&_roomSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pMutex);
pthread_mutex_destroy(&_cMutex);
}
// 生产接口
void push(const T &in) {
// 生产数据
// 先申请空间信号量
sem_wait(&_roomSem); // 申请成功则 _roomSem--, 否则等待
pthread_mutex_lock(&_pMutex); // 申请信号量成功后, 加锁
_ringQueue[_pIndex] = in; // 将数据放入 数组
_pIndex++; // 生产者下一次生产数据的位置 ++
_pIndex %= _ringQueue.size(); // 跟新下标, 保证环形特性
pthread_mutex_unlock(&_pMutex); // 访问完临界资源, 解锁
sem_post(&_dataSem); // 数组中数据+1, 那么 dataSem 需要++
}
// 消费接口
T pop() {
// 消费数据
// 先申请数据信号量
sem_wait(&_dataSem); // 申请成功则 _dataSem--, 否则等待
pthread_mutex_lock(&_cMutex);
T tmp = _ringQueue[_cIndex]; // 存储应拿到的数据
_cIndex++;
_cIndex %= _ringQueue.size();
pthread_mutex_unlock(&_cMutex);
sem_post(&_roomSem); // 拿出了数据, 空间+1, 那么 _roomSem ++
return tmp;
}
private:
vector<T> _ringQueue; // 模拟循环队列的数组
sem_t _roomSem; // 空间资源信号量, 生产者申请
sem_t _dataSem; // 数据资源信号量, 消费者申请
uint32_t _pIndex; // 生产者生产数据的索引下标, 即插入数据的下标
uint32_t _cIndex; // 消费者消费数据的索引下标, 即获取数据的下标
// 保护索引下标的锁
pthread_mutex_t _cMutex; // 消费数据索引下标的锁
pthread_mutex_t _pMutex; // 生产数据索引下标的锁
};
#include "ringQueue.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
using std::cout;
// 消费线程调用函数
void* consumer(void* args) {
sleep(10);
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
sleep(1);
int data = ringQp->pop();
cout << "consumer_pthread[" << pthread_self() << "]"
<< " 消费了一个数据: " << data << endl;
}
}
// 生产线程调用函数
void* productor(void* args) {
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
int data = rand() % 20;
ringQp->push(data);
cout << "productor_pthread[" << pthread_self() << "]"
<< " 生产了一个数据: " << data << endl;
usleep(500000);
}
}
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
ringQueue<int> ringQ;
pthread_t con1, con2, con3, pro1, pro2, pro3;
pthread_create(&con1, nullptr, consumer, &ringQ);
pthread_create(&con2, nullptr, consumer, &ringQ);
pthread_create(&con3, nullptr, consumer, &ringQ);
pthread_create(&pro1, nullptr, productor, &ringQ);
pthread_create(&pro2, nullptr, productor, &ringQ);
pthread_create(&pro3, nullptr, productor, &ringQ);
pthread_join(con1, nullptr);
pthread_join(con2, nullptr);
pthread_join(con3, nullptr);
pthread_join(pro1, nullptr);
pthread_join(pro2, nullptr);
pthread_join(pro3, nullptr);
return 0;
}
作者: 哈米d1ch 发表日期:2023 年 4 月 19 日