一、问题描述
生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。本作业要求设计在同一个进程地址空间内执行的两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。
二、实现代码
#include <windows.h>
#include <iostream>
const unsigned short SIZE_OF_BUFFER = 10//缓冲区长度
unsigned short ProductID = 0 //产品号
unsigned short ConsumeID = 0 //将被消耗的产品号
unsigned short in = 0 //产品进缓冲区时的缓冲区下标
unsigned short out = 0 //产品出缓冲区时的缓冲区下标
int g_buffer[SIZE_OF_BUFFER] //缓冲区是个循环队列
bool g_continue = true //控制程序结束
HANDLE g_hMutex //用于线程间的互斥
HANDLE g_hFullSemaphore//当缓冲区满时迫使生产者等待
HANDLE g_hEmptySemaphore//当缓冲区空时迫使消费者等待
DWORD WINAPI Producer(LPVOID) //生产者线程
DWORD WINAPI Consumer(LPVOID) //消费者线程
int main()
{
//创建各个互斥信号
g_hMutex = CreateMutex(NULL,FALSE,NULL)
g_hEmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL)
//调整下面的数值,可以发现,当生产者个数多于消费者个数时,
//生产速度快,生产者经常等待消费者;反之,消费者经常等待
const unsigned short PRODUCERS_COUNT = 3 //生产者的个数
const unsigned short CONSUMERS_COUNT = 1 //消费者的个数
//总的线程数
const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT
DWORD producerID[CONSUMERS_COUNT]//生产者线程的标识符
DWORD consumerID[THREADS_COUNT]//消费者线程的标识符
//创建生产者线程
for (int i=0i<PRODUCERS_COUNT++i){
hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i])
if (hThreads[i]==NULL) return -1
}
//创建消费者线程
for (int i=0i<CONSUMERS_COUNT++i){
hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i])
if (hThreads[i]==NULL) return -1
}
while(g_continue){
if(getchar()){ //按回车后终止程序运行
g_continue = false
}
}
return 0
}
//生产一个产品。简单模拟了一下,仅输出新产品的ID号
void Produce()
{
std::cerr <<"Producing " <<++ProductID <<" ... "
std::cerr <<"Succeed" <<std::endl
}
//把新生产的产品放入缓冲区
void Append()
{
std::cerr <<"Appending a product ... "
g_buffer[in] = ProductID
in = (in+1)%SIZE_OF_BUFFER
std::cerr <<"Succeed" <<std::endl
//输出缓冲区当前的状态
for (int i=0i<SIZE_OF_BUFFER++i){
std::cout <<i <<": " <<g_buffer[i]
if (i==in) std::cout <<" <-- 生产"
if (i==out) std::cout <<" <-- 消费"
std::cout <<std::endl
}
}
//从缓冲区中取出一个产品
void Take()
{
std::cerr <<"Taking a product ... "
ConsumeID = g_buffer[out]
out = (out+1)%SIZE_OF_BUFFER
std::cerr <<"Succeed" <<std::endl
//输出缓冲区当前的状态
for (int i=0i<SIZE_OF_BUFFER++i){
std::cout <<i <<": " <<g_buffer[i]
if (i==in) std::cout <<" <-- 生产"
if (i==out) std::cout <<" <-- 消费"
std::cout <<std::endl
}
}
//消耗一个产品
void Consume()
{
std::cerr <<"Consuming " <<ConsumeID <<" ... "
std::cerr <<"Succeed" <<std::endl
}
//生产者
DWORD WINAPI Producer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hFullSemaphore,INFINITE)
WaitForSingleObject(g_hMutex,INFINITE)
Produce()
Append()
Sleep(1500)
ReleaseMutex(g_hMutex)
ReleaseSemaphore(g_hEmptySemaphore,1,NULL)
}
return 0
}
//消费者
DWORD WINAPI Consumer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hEmptySemaphore,INFINITE)
WaitForSingleObject(g_hMutex,INFINITE)
Take()
Consume()
Sleep(1500)
ReleaseMutex(g_hMutex)
ReleaseSemaphore(g_hFullSemaphore,1,NULL)
}
return 0
}
【Linux多线程】三个经典同步问题标签: 多线程同步生产者与消费者写者与读者
目录(?)[+]
在了解了《同步与互斥的区别 》之后,我们来看看几个经典的线程同步的例子。相信通过具体场景可以让我们学会分析和解决这类线程同步的问题,以便以后应用在实际的项目中。
一、生产者-消费者问题
问题描述:
一组生产者进程和一组消费者进程共享一个初始为空、大小为 n 的缓冲区,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中取出消息,否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。
分析:
关系分析:生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,它们也是同步关系。
整理思路:这里比较简单,只有生产者和消费者两个进程,且这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步的PV *** 作的位置。
信号量设置:信号量mutex作为互斥信号量,用于控制互斥访问缓冲池,初值为1;信号量full用于记录当前缓冲池中“满”缓冲区数,初值为 0;信号量empty用于记录当前缓冲池中“空”缓冲区数,初值为n。
代码示例:(semaphore类的封装见下文)
#include<iostream>
#include<unistd.h> // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std
#define N 5
semaphore mutex("/", 1) // 临界区互斥信号量
semaphore empty("/home", N) // 记录空缓冲区数,初值为N
semaphore full("/home/songlee",0)// 记录满缓冲区数,初值为0
int buffer[N]// 缓冲区,大小为N
int i=0
int j=0
void* producer(void* arg)
{
empty.P()// empty减1
mutex.P()
buffer[i] = 10 + rand() % 90
printf("Producer %d write Buffer[%d]: %d\n",arg,i+1,buffer[i])
i = (i+1) % N
mutex.V()
full.V() // full加1
}
void* consumer(void* arg)
{
full.P() // full减1
mutex.P()
printf(" \033[131m")
printf("Consumer %d read Buffer[%d]: %d\n",arg,j+1,buffer[j])
printf("\033[0m")
j = (j+1) % N
mutex.V()
empty.V()// empty加1
}
int main()
{
pthread_t id[10]
// 开10个生产者线程,10个消费者线程
for(int k=0k<10++k)
pthread_create(&id[k], NULL, producer, (void*)(k+1))
for(int k=0k<10++k)
pthread_create(&id[k], NULL, consumer, (void*)(k+1))
sleep(1)
return 0
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455561234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
编译运行输出结果:
Producer 1 write Buffer[1]: 83
Producer 2 write Buffer[2]: 26
Producer 3 write Buffer[3]: 37
Producer 5 write Buffer[4]: 35
Producer 4 write Buffer[5]: 33
Consumer 1 read Buffer[1]: 83
Producer 6 write Buffer[1]: 35
Consumer 2 read Buffer[2]: 26
Consumer 3 read Buffer[3]: 37
Consumer 4 read Buffer[4]: 35
Consumer 5 read Buffer[5]: 33
Consumer 6 read Buffer[1]: 35
Producer 7 write Buffer[2]: 56
Producer 8 write Buffer[3]: 22
Producer 10 write Buffer[4]: 79
Consumer 9 read Buffer[2]: 56
Consumer 10 read Buffer[3]: 22
Producer 9 write Buffer[5]: 11
Consumer 7 read Buffer[4]: 79
Consumer 8 read Buffer[5]: 1112345678910111213141516171819201234567891011121314151617181920
二、读者-写者问题
问题描述:
有读者和写者两组并发线程,共享一个文件,当两个或以上的读线程同时访问共享数据时不会产生副作用,但若某个写线程和其他线程(读线程或写线程)同时访问共享数据时则可能导致数据不一致的错误。因此要求:
允许多个读者可以同时对文件执行读 *** 作;
只允许一个写者往文件中写信息;
任一写者在完成写 *** 作之前不允许其他读者或写者工作;
写者执行写 *** 作前,应让已有的读者和写者全部退出。
分析:
关系分析:由题目分析可知,读者和写者是互斥的,写者和写者也是互斥的,而读者和读者不存在互斥问题。
整理思路:写者是比较简单的,它与任何线程互斥,用互斥信号量的 PV *** 作即可解决。读者的问题比较复杂,它必须实现与写者的互斥,多个读者还可以同时读。所以,在这里用到了一个计数器,用它来判断当前是否有读者读文件。当有读者的时候写者是无法写文件的,此时读者会一直占用文件,当没有读者的时候写者才可以写文件。同时,不同的读者对计数器的访问也应该是互斥的。
信号量设置:首先设置一个计数器count,用来记录当前的读者数量,初值为0;设置互斥信号量mutex,用于保护更新 count 变量时的互斥;设置互斥信号量rw用于保证读者和写者的互斥访问。
代码示例:
#include<iostream>
#include<unistd.h> // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std
int count = 0 // 记录当前的读者数量
semaphore mutex("/",1) // 用于保护更新count变量时的互斥
semaphore rw("/home",1)// 用于保证读者和写者的互斥
void* writer(void* arg)
{
rw.P() // 互斥访问共享文件
printf(" Writer %d start writing...\n", arg)
sleep(1)
printf(" Writer %d finish writing...\n", arg)
rw.V() // 释放共享文件
}
void* reader(void* arg)
{
mutex.P() // 互斥访问count变量
if(count == 0) // 当第一个读线程读文件时
rw.P() // 阻止写线程写
++count// 读者计数器加1
mutex.V() // 释放count变量
printf("Reader %d start reading...\n", arg)
sleep(1)
printf("Reader %d finish reading...\n", arg)
mutex.P() // 互斥访问count变量
--count// 读者计数器减1
if(count == 0) // 当最后一个读线程读完文件
rw.V() // 允许写线程写
mutex.V() // 释放count变量
}
int main()
{
pthread_t id[8]// 开6个读线程,2个写线程
pthread_create(&id[0], NULL, reader, (void*)1)
pthread_create(&id[1], NULL, reader, (void*)2)
pthread_create(&id[2], NULL, writer, (void*)1)
pthread_create(&id[3], NULL, writer, (void*)2)
pthread_create(&id[4], NULL, reader, (void*)3)
pthread_create(&id[5], NULL ,reader, (void*)4)
sleep(2)
pthread_create(&id[6], NULL, reader, (void*)5)
pthread_create(&id[7], NULL ,reader, (void*)6)
sleep(4)
return 0
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
编译运行的结果如下:
Reader 2 start reading...
Reader 1 start reading...
Reader 3 start reading...
Reader 4 start reading...
Reader 1 finish reading...
Reader 2 finish reading...
Reader 3 finish reading...
Reader 4 finish reading...
Writer 1 start writing...
Writer 1 finish writing...
Writer 2 start writing...
Writer 2 finish writing...
Reader 5 start reading...
Reader 6 start reading...
Reader 5 finish reading...
Reader 6 finish reading...1234567891011121314151612345678910111213141516
三、哲学家进餐问题
问题描述:
一张圆桌上坐着 5 名哲学家,桌子上每两个哲学家之间摆了一根筷子,桌子的中间是一碗米饭,如图所示:
哲学家们倾注毕生精力用于思考和进餐,哲学家在思考时,并不影响他人。只有当哲学家饥饿的时候,才试图拿起左、右两根筷子(一根一根拿起)。如果筷子已在他人手上,则需等待。饥饿的哲学家只有同时拿到了两根筷子才可以开始进餐,当进餐完毕后,放下筷子继续思考。
分析:
关系分析:5名哲学家与左右邻居对其中间筷子的访问是互斥关系。
整理思路:显然这里有 5 个线程,那么要如何让一个哲学家拿到左右两个筷子而不造成死锁或饥饿现象?解决方法有两个,一个是让他们同时拿两个筷子;二是对每个哲学家的动作制定规则,避免饥饿或死锁现象的发生。
信号量设置:定义互斥信号量数组chopstick[5] = {1,1,1,1,1}用于对 5 根筷子的互斥访问。
示例代码:
RCU, Read-Copy-Update,是Linux内核中的一种同步机制。RCU常被描述为读写锁的替代品,它的特点是读者并不需要直接与写者进行同步,读者与写者也能并发的执行。
来一张图片来描述下大体的 *** 作吧:
多个读者可以并发访问临界资源,同时使用rcu_read_lock/rcu_read_unlock来标定临界区;
写者(updater)在更新临界资源的时候,拷贝一份副本作为基础进行修改,当所有读者离开临界区后,把指向旧临界资源的指针指向更新后的副本,并对旧资源进行回收处理;
图中只显示一个写者,当存在多个写者的时候,需要在写者之间进行互斥处理。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)