这个问题需要的知识主要包括:
1 多进程间进行通信;
2 使用同步信号量(semaphore)和互斥信号量(mutex)进行数据保护。
参考代码如下,可以参照注释辅助理解:
#include <stdio.h>#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#define N 2 // 消费者或者生产者的数目
#define M 10 // 缓冲数目
int in = 0 // 生产者放置产品的位置
int out = 0 // 消费者取产品的位置
int buff[M] = {0} // 缓冲初始化为0, 开始时没有产品
sem_t empty_sem // 同步信号量, 当满了时阻止生产者放产品
sem_t full_sem // 同步信号量, 当没产品时阻止消费者消费
pthread_mutex_t mutex // 互斥信号量, 一次只有一个线程访问缓冲
int product_id = 0 //生产者id
int prochase_id = 0 //消费者id
/* 打印缓冲情况 */
void print()
{
int i
for(i = 0 i < M i++)
printf("%d ", buff[i])
printf("\n")
}
/* 生产者方法 */
void *product()
{
int id = ++product_id
while(1)
{
// 用sleep的数量可以调节生产和消费的速度,便于观察
sleep(1)
//sleep(1)
sem_wait(&empty_sem)
pthread_mutex_lock(&mutex)
in = in % M
printf("product%d in %d. like: \t", id, in)
buff[in] = 1
print()
++in
pthread_mutex_unlock(&mutex)
sem_post(&full_sem)
}
}
/* 消费者方法 */
void *prochase()
{
int id = ++prochase_id
while(1)
{
// 用sleep的数量可以调节生产和消费的速度,便于观察
sleep(1)
//sleep(1)
sem_wait(&full_sem)
pthread_mutex_lock(&mutex)
out = out % M
printf("prochase%d in %d. like: \t", id, out)
buff[out] = 0
print()
++out
pthread_mutex_unlock(&mutex)
sem_post(&empty_sem)
}
}
int main()
{
pthread_t id1[N]
pthread_t id2[N]
int i
int ret[N]
// 初始化同步信号量
int ini1 = sem_init(&empty_sem, 0, M)
int ini2 = sem_init(&full_sem, 0, 0)
if(ini1 && ini2 != 0)
{
printf("sem init failed \n")
exit(1)
}
//初始化互斥信号量
int ini3 = pthread_mutex_init(&mutex, NULL)
if(ini3 != 0)
{
printf("mutex init failed \n")
exit(1)
}
// 创建N个生产者线程
for(i = 0 i < N i++)
{
ret[i] = pthread_create(&id1[i], NULL, product, (void *)(&i))
if(ret[i] != 0)
{
printf("product%d creation failed \n", i)
exit(1)
}
}
//创建N个消费者线程
for(i = 0 i < N i++)
{
ret[i] = pthread_create(&id2[i], NULL, prochase, NULL)
if(ret[i] != 0)
{
printf("prochase%d creation failed \n", i)
exit(1)
}
}
//销毁线程
for(i = 0 i < N i++)
{
pthread_join(id1[i],NULL)
pthread_join(id2[i],NULL)
}
exit(0)
}
在Linux下编译的时候,要在编译命令中加入选项-lpthread以包含多线程支持。比如存储的C文件为demo.c,要生成的可执行文件为demo。可以使用命令:
gcc demo.c -o demo -lpthread
程序中为便于观察,使用了sleep(1)来暂停运行,所以查看输出的时候可以看到,输出是每秒打印一次的。
#i nclude<stdio.h>#i nclude<iostream.h>
#i nclude<windows.h>
#define BufferSize 15
char Buffer[BufferSize]
int head,tail=0//Buffer数组下标
int count//被使用的缓冲区数量
HANDLE hMutex
HANDLE hNotFullEvent, hNotEmptyEvent//用来同步生产者和消费者线程
////////缓冲区存储情况
display(char a[15])
{
int i
cout<<"缓冲区存储情况为:"<<endl
for (i=14i>=0i--){
cout<<"\t|----"<<a<<"----|"<<endl
}
}
//p1
void p1_Producer()
{
int i
char ch
char p1[]={'a','A','b','B','c','C','D','d','E','e'}
if(tail<15){
for(i=0i<10i++){
while(1) {
WaitForSingleObject(hMutex,INFINITE)
if(count==BufferSize){ //缓冲区满
ReleaseMutex(hMutex)
//等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE)
continue
}
//得到互斥锁且缓冲区非满,跳出while循环
break
}
if (tail>14){
cout<<"缓冲区已满,不能再存入数据!"<<endl
ReleaseMutex(hMutex)//结束临界区
PulseEvent(hNotEmptyEvent)//唤醒消费者线程
}
else{
//得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p1:\t"<<p1<<endl
Buffer[tail]=p1
//tail=(tail+1)%BufferSize///存放于缓冲区的位置
display(Buffer)
tail++
count++
cout<<"按ENTER继续...."<<endl
ch=getchar()
ReleaseMutex(hMutex)//结束临界区
PulseEvent(hNotEmptyEvent)//唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//p2
void p2_Producer()
{
int i
char ch
char p2[]={'0','1','2','3','4','5','6','7','8','9'}
if(tail<15){
for(i=0i<10i++){
while(1) {
ch=getchar()
WaitForSingleObject(hMutex,INFINITE)
if(count==BufferSize){ // 缓冲区满
ReleaseMutex(hMutex)
// 等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE)
continue
}
// 得到互斥锁且缓冲区非满,跳出while循环
break
}
if (tail>14){
cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl
ReleaseMutex(hMutex)//结束临界区
PulseEvent(hNotEmptyEvent)//唤醒消费者线程
}
else{
// 得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p2:\t"<<p2<<endl
Buffer[tail]=p2
//tail=(tail+1)%BufferSize
display(Buffer)
tail++
count++
cout<<"按ENTER继续...."<<endl
ch=getchar()
ReleaseMutex(hMutex)// 结束临界区
PulseEvent(hNotEmptyEvent)// 唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//p3
void p3_Producer()
{
int i
char ch
char p3[]={'!','#','$','%','&','*','+','-','.','/'}
if(tail<15){
for(i=0i<10i++){
while(1) {
ch=getchar()
WaitForSingleObject(hMutex,INFINITE)
if(count==BufferSize){ // 缓冲区满
ReleaseMutex(hMutex)
// 等待直到缓冲区非满
WaitForSingleObject(hNotFullEvent,INFINITE)
continue
}
// 得到互斥锁且缓冲区非满,跳出while循环
break
}
if (tail>14){
cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl
ReleaseMutex(hMutex)//结束临界区
PulseEvent(hNotEmptyEvent)//唤醒消费者线程
}
else{
// 得到互斥锁且缓冲区非满,开始产生新数据
cout<<"Producer p3:\t"<<p3<<endl
Buffer[tail]=p3
//tail=(tail+1)%BufferSize
display(Buffer)
tail++
count++
cout<<"按ENTER继续...."<<endl
ch=getchar()
ReleaseMutex(hMutex)// 结束临界区
PulseEvent(hNotEmptyEvent)// 唤醒消费者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c1
void c1_Consumer()
{
int i,j,k
char result,ch
while(1){
ch=getchar()
WaitForSingleObject(hMutex,INFINITE)
if(count==0){ // 没有可以处理的数据
ReleaseMutex(hMutex)// 释放互斥锁且等待
// 等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE)
}
else {if(Buffer[head]==0) {
cout<<"Consumer 0: 缓冲区的数据已全消费过一次,消费完毕!"<<endl
ReleaseMutex(hMutex)// 结束临界区
ExitThread(0)
}
else { // 获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head]
if(result>64&&result<70){
result=result+32
cout<<"Consumer c1:(大写->小写)\t "<<result<<endl
Buffer[head]='^'// '^'表示数据已被消费
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
if(result>96&&result<102){
result=result-32
cout<<"Consumer c1:(小写->大写)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
if(result>47&&result<58){
cout<<"Consumer c1:(显示字符)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)}
if(result>32&&result<48){
cout<<"Consumer c1:(用符号打印出菱形) "<<endl
for(i=1i<=(9+1)/2i++)
{
for(j=1j<=40-ij++)
cout<<" "
for(k=1k<=2*i-1k++)
cout<<result
cout<<endl
}
for(i=1i<=9/2i++)
{
for(j=1j<=40-(9+1)/2+ij++)
cout<<" "
for(k=1k<=9-2*ik++)
cout<<result
cout<<endl
}
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
head=(head+1)%BufferSize
count--
cout<<"按ENTER继续...."<<endl
ch=getchar()
ReleaseMutex(hMutex)// 结束临界区
PulseEvent(hNotFullEvent)// 唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c2
void c2_Consumer()
{
int i,j,k
char result,ch
while(1){
WaitForSingleObject(hMutex,INFINITE)
if(count==0){ // 没有可以处理的数据
ReleaseMutex(hMutex)// 释放互斥锁且等待
// 等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE)
}
else {if(Buffer[head]==0) {
cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl
ReleaseMutex(hMutex)// 结束临界区
ExitThread(0)
}
else { // 获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head]
if(result>64&&result<90){
result=result+32
cout<<"Consumer c2:(大写->小写)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
if(result>96&&result<102){
result=result-32
cout<<"Consumer c2:(小写->大写)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)}
if(result>47&&result<58){
cout<<"Consumed c2:(显示字符)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)}
if(result>32&&result<48){
cout<<"Consumer c2:(用符号打印出菱形) "<<endl
for(i=1i<=(9+1)/2i++)
{
for(j=1j<=40-ij++)
cout<<" "
for(k=1k<=2*i-1k++)
cout<<result
cout<<endl
}
for(i=1i<=9/2i++)
{
for(j=1j<=40-(9+1)/2+ij++)
cout<<" "
for(k=1k<=9-2*ik++)
cout<<result
cout<<endl
}
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
head=(head+1)%BufferSize
count--
cout<<"按ENTER继续...."<<endl
ch=getchar()
ReleaseMutex(hMutex)// 结束临界区
PulseEvent(hNotFullEvent)// 唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//c3
void c3_Consumer()
{
int i,j,k
char result,ch
while(1){
WaitForSingleObject(hMutex,INFINITE)
if(count==0){ // 没有可以处理的数据
ReleaseMutex(hMutex)// 释放互斥锁且等待
// 等待直到缓冲区非空
WaitForSingleObject(hNotEmptyEvent,INFINITE)
}
else {if(Buffer[head]==0) {
cout<<"Consumer 0: 缓冲区的数据已全消费过一次,消费完毕!"<<endl
ReleaseMutex(hMutex)// 结束临界区
ExitThread(0)
}
else { // 获得互斥锁且缓冲区有数据,开始处理
result=Buffer[head]
if(result>64&&result<70){
result=result+32
cout<<"Consumer c3:(大写->小写)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)}
if(result>96&&result<102){
result=result-32
cout<<"Consumer c3:(小写->大写)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)}
if(result>47&&result<58){
cout<<"Consumer c1:(显示字符)\t "<<result<<endl
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
if(result>32&&result<48){
cout<<"Consumer c3:(用符号打印出菱形) "<<endl
for(i=1i<=(7+1)/2i++)
{
for(j=1j<=40-ij++)
cout<<" "
for(k=1k<=2*i-1k++)
cout<<result
cout<<endl
}
for(i=1i<=7/2i++)
{
for(j=1j<=40-(7+1)/2+ij++)
cout<<" "
for(k=1k<=7-2*ik++)
cout<<result
cout<<endl
}
Buffer[head]='^'
cout<<"'^'表示数据已被消费"<<endl
display(Buffer)
}
head=(head+1)%BufferSize
count--
cout<<"按ENTER继续...."<<endl
ch=getchar()
ReleaseMutex(hMutex)// 结束临界区
PulseEvent(hNotFullEvent)// 唤醒生产者线程
}
}
}
}
//////////////////////////////////////////////////////////////////
//主函数
void main()
{
HANDLE hThreadVector[6]
DWORD ThreadID
count = 0
head = 0
tail = 0
hMutex=CreateMutex(NULL,FALSE,NULL)
hNotFullEvent=CreateEvent(NULL,TRUE,FALSE,NULL)
hNotEmptyEvent=CreateEvent(NULL,TRUE,FALSE,NULL)
hThreadVector[0]=CreateThread (NULL, 0,(LPTHREAD_START_ROUTINE) p1_Producer,NULL, 0, (LPDWORD)&ThreadID)
hThreadVector[1]=CreateThread (NULL, 0,(LPTHREAD_START_ROUTINE) c1_Consumer,NULL, 0, (LPDWORD)&ThreadID)
hThreadVector[3]=CreateThread (NULL, 0,(LPTHREAD_START_ROUTINE) p2_Producer,NULL, 0, (LPDWORD)&ThreadID)
hThreadVector[4]=CreateThread (NULL, 0,(LPTHREAD_START_ROUTINE) c2_Consumer,NULL, 0, (LPDWORD)&ThreadID)
hThreadVector[5]=CreateThread (NULL, 0,(LPTHREAD_START_ROUTINE) p3_Producer,NULL, 0, (LPDWORD)&ThreadID)
hThreadVector[5]=CreateThread (NULL, 0,(LPTHREAD_START_ROUTINE) c3_Consumer,NULL, 0, (LPDWORD)&ThreadID)
WaitForMultipleObjects(2,hThreadVector,TRUE,INFINITE)
//cout<<"**********************Finish*************************"<<endl
}
我最近也在学 *** 作系统,PV好麻烦的
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)