1.代码:
//接受消息
#include
#include
#include
#include
#include
#include
#include
#include
struct my_msg_st{
long int my_msg_type;
char some_text[BUFSIZ];
};
int main(){
int running=1;
int msgid;
struct my_msg_st some_data;
long int msg_to_receive=0;
//First,we set up thee message queue
msgid=msgget((key_t)1234,0666|IPC_CREAT);
if(msgid==-1){
fprintf(stderr,"megget failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
//Then the message are retrieved from the queue,until an end message is encountered
//Lastly,the message queue id deleted
while(running){
if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
fprintf(stderr,"msgrcv failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
printf("You wrote: %s ",some_data.some_text);
if(strncmp(some_data.some_text,"end",3)==0)
running=0;
}
if(msgctl(msgid,IPC_RMID,0)==-1){
fprintf(stderr,"msgctl(IPC_RMID) failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
///发送消息
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_TEXT 512
struct my_msg_st{
long int my_msg_type;
char some_text[MAX_TEXT];
};
int main(){
int running=1;
int msgid;
struct my_msg_st some_data;
char buffer[BUFSIZ];
msgid=msgget((key_t)1234,0666|IPC_CREAT);
if(msgid==-1){
fprintf(stderr,"megget failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
while(running){
printf("Enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
if(msgsnd(msgid,(void*)&some_data,MAX_TEXT,0)==-1){
fprintf(stderr,"msgsnd failed\n");
exit(EXIT_FAILURE);
}
if(strncmp(buffer,"end",3)==0)
running=0;
}
exit(EXIT_SUCCESS);
}
//开两个终端分别运行
执行结果如图:
2)熟悉消息队列相关的系统调用
1.头文件: #include
#include
#include
2. *** 作:
创建队列: msgget((key_t)10086,0666|IPC_CREAT);
发送信息: msgsnd(mq,(void*)&over2,MAX_SIZE,0)
接收信息: msgrcv(mq,(void*)&buf,1024,0,0)
指定 *** 作: msgctl(mq,IPC_RMID,&t)
3)尝试多个发送进程和接收进程,管擦进程的并发执行情况,并解释
现象:
多个发送信息的终端在输入后,信息依次轮序出现在各个接收端进程中
解释:消息队列(MQ)由内核提供同步,互斥机制,可以翻阅msgsnd,msgrcv linux底层源代码一探究竟
链接
如图:
二.三个线程(两个发送者,一个接收者)并发执行。通过消息队列,分别进行消息的发送和接收
1.代码:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define QUEUE_ID 10086
#define MAX_SIZE 1024
#define MSG_STOP "exit"
#define snd_to_rcv1 1
#define snd_to_rcv2 2
#define rcv_to_snd1 3
#define rcv_to_snd2 4
#define CHECK(x) \
do{\
if(!(x)){\
fprintf(stderr,"%s:%d:",__func__,__LINE__);\
perror(#x);\
exit(-1);\
}\
}while(0)\
#define P(x) sem_wait(&x)
#define V(x) sem_post(&x)
struct msg_st{ //
long int message_type;
char buffer[MAX_SIZE+1];
};
//function
void *sender1();
void *sender2();
void *receiver();
//global variable
sem_t w_mutex,empty,full,over,rcv_dp,snd_dp;
void *sender1(){
int mq;
struct msg_st buf;
ssize_t bytes_read;
//open the mail queue
mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT);
CHECK((key_t)-1!=mq);
do{
P(w_mutex);
// printf("11\n");
P(snd_dp);
// printf("12\n");
printf("sender1>");
V(rcv_dp);
fflush(stdout);
fgets(buf.buffer,BUFSIZ,stdin);
buf.message_type=1;
//send the message
P(empty);
// printf("13\n");
CHECK(0<=msgsnd(mq,(void*)&buf,MAX_SIZE,0));
V(full);
V(w_mutex);
usleep(100);
}
while(strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP)));
//wait for response
P(over);
// printf("14\n");
bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,rcv_to_snd1,0);
CHECK(bytes_read>=0);
printf("%s",buf.buffer);
printf("--------------------------\n");
V(snd_dp);
pthread_exit(NULL);
}
void *sender2(){
int mq;
struct msg_st buf;
ssize_t bytes_read;
//open the mail queue
mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT);
CHECK((key_t)-1!=mq);
do{
P(w_mutex);
// printf("21\n");
P(snd_dp);
// printf("22\n");
printf("sender2>");
V(rcv_dp);
fflush(stdout);
fgets(buf.buffer,BUFSIZ,stdin);
buf.message_type=2;
//send the message
P(empty);
// printf("23\n");
CHECK(0<=msgsnd(mq,(void*)&buf,MAX_SIZE,0));
V(full);
V(w_mutex);
usleep(100);
}
while(strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP)));
//wait for response
P(over);
// printf("24\n");
bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,rcv_to_snd2,0);
CHECK(bytes_read>=0); //
printf("%s",buf.buffer);
printf("--------------------------\n");
V(snd_dp);
pthread_exit(NULL);
}
void *receiver(){
struct msg_st buf,over1,over2;
int mq,must_stop=2;
struct msqid_ds t;
over1.message_type=3;
strcpy(over1.buffer,"over1\n");
over2.message_type=4;
strcpy(over2.buffer,"over2\n");
//open the mail queue
mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT);
CHECK((key_t)-1!=mq);
do{
ssize_t bytes_read,bytes_write;
//receive the message
P(full);
// printf("31\n");
bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,0,0);
V(empty);
CHECK(bytes_read>=0);
if(!strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP))){
if(buf.message_type==1){
// printf("321\n");
bytes_write=msgsnd(mq,(void*)&over1,MAX_SIZE,0);
CHECK(bytes_write>=0);
V(over);
must_stop--;
}
else if(buf.message_type==2){
// printf("322\n");
bytes_write=msgsnd(mq,(void*)&over2,MAX_SIZE,0); //
CHECK(bytes_write>=0);
V(over);
must_stop--;
}
else{
// printf("323\n");
P(rcv_dp);
printf("Received %d:%s",buf.message_type,buf.buffer);
printf("------------------------------------------\n");
V(snd_dp);
}
}
}
while(must_stop);
//clean up
P(snd_dp);
// printf("33/n");
CHECK(!msgctl(mq,IPC_RMID,&t)); //
pthread_exit(NULL);
}
int main(int argc,char **argv){
pthread_t t1,t2,t3;
int state;
sem_init(&snd_dp,1,1);
sem_init(&rcv_dp,1,0);
sem_init(&empty,1,10);
sem_init(&full,1,0);
sem_init(&w_mutex,1,1);
sem_init(&over,1,0);
state=pthread_create(&t1,NULL,receiver,NULL);
CHECK(state==0);
state=pthread_create(&t3,NULL,sender1,NULL);
CHECK(state==0);
state=pthread_create(&t2,NULL,sender2,NULL);
CHECK(state==0);
pthread_join(t3,NULL);
pthread_join(t2,NULL);
pthread_join(t1,NULL);
return 0;
}
注意:运行后,两次均输入exit,才能满足receive解锁条件,实现同步
运行如图:
2)熟悉消息队列相关的系统调用
1.头文件:
#include
#include
#include
2. *** 作:
创建队列: msgget((key_t)10086,0666|IPC_CREAT);
发送信息: msgsnd(mq,(void*)&over2,MAX_SIZE,0)
接收信息: msgrcv(mq,(void*)&buf,1024,0,0)
指定 *** 作: msgctl(mq,IPC_RMID,&t)
3)回顾posix线程控制和信号量相关的函数
1.头文件: #include
2. *** 作:
声明: sem_t sem1
初始: sem_init(&sem1,1,1)
p *** 作: sem_wait(&sem1)
v *** 作: sem_post(&sem1)
3.互斥/同步: pv/vp
4)删除信号量的并发控制,观察混乱情况
代码中注释掉所有P *** 作
5)理清并发线程中同步和互斥关系
1. *** 作:去掉原文代码所有注释 作用:在所有P *** 作后进行输出,来观察进程并发执行情况
运行如图:
流程图:并发线程同步,互斥关系:(sender1与sender2互斥)
移动端图片看不清,可以用pc端浏览;欢迎访问gitee仓库
ZUCC_ *** 作系统原理实验_Lab9进程的通信消息队列
三.编写程序
1.修改例程1,模仿例程2,在原有的MSG1向MSG2发送消息的基础上,实现MSG2也能向MSG1发送消息,即两并发进程能通过消息队列,实现双向对话。
构造核心:
1.使用System V信号量实现两个进程间的通信
2.使用消息队列实现两个进程间消息的接收与发送
3.确定流程图顺序,从而确定信号量个数与初值的设定,同时流程图拆分确定两个进程的PV *** 作和核心代码
代码:
///发送消息
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
struct my_msg_st{
long int my_msg_type;
char some_text[BUFSIZ];
};
union semun{
int val; //value for setval
struct semid_ds *buf; //buffer for IPC_STAT,IPC_SET
unsigned short int *array; //array for GETALL,SETALL
struct seminfo *_buf; //buffer for IPC_INFO
};
static int sem_id=0;
static int set_semvalue(int semnum,int sem_value);
static void del_semvalue(int semnum);
static int semaphore_p(int semnum);
static int semaphore_v(int semnum);
int main(){
int running=1;
int msgid;
struct my_msg_st some_data;
char buffer[BUFSIZ];
msgid=msgget((key_t)1234,0666|IPC_CREAT);
sem_id=semget((key_t)1234,4,0666|IPC_CREAT); //创建信号量 //会与msgget冲突吗?
set_semvalue(0,0);
set_semvalue(1,0);
set_semvalue(2,0);
set_semvalue(3,0);
while(running){
//输入发送
printf("Enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
msgsnd(msgid,(void*)&some_data,BUFSIZ,0);
semaphore_v(0);
if(strncmp(some_data.some_text,"end",3)==0)
break;
semaphore_p(1);
//接收输出
msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0);
printf("You wrote: %s",some_data.some_text);
semaphore_v(2);
if(strncmp(some_data.some_text,"end",3)==0)
break;
semaphore_p(3);
//接收输出
msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0);
printf("You wrote: %s",some_data.some_text);
if(strncmp(some_data.some_text,"end",3)==0)
break;
}
exit(EXIT_SUCCESS);
}
//开两个终端分别运行
static int set_semvalue(int semnum,int sem_value){ //初始化信号量,使用信号量前必须这样做
union semun sem_union;
sem_union.val=sem_value;
if(semctl(sem_id,semnum,SETVAL,sem_union)==-1){
fprintf(stderr,"Failed to set the semaphore\n");
return 0;
}
return 1;
}
static void del_semvalue(int semnum){ //删除信号量
union semun sem_union;
if(semctl(sem_id,semnum,IPC_RMID,sem_union)==-1){
fprintf(stderr,"Failed to delete the semaphore\n");
}
}
static int semaphore_p(int semnum){ //对信号量做-1,即等待P
struct sembuf sem_b;
sem_b.sem_num=semnum;
sem_b.sem_op=-1;
sem_b.sem_flg=SEM_UNDO;
if(semop(sem_id,&sem_b,1)==-1){
fprintf(stderr,"semaphore_p failed\n");
return 0;
}
return 1;
}
static int semaphore_v(int semnum){ //释放 *** 作,使得信号量可用,发送V信号
struct sembuf sem_b;
sem_b.sem_num=semnum;
sem_b.sem_op=1;
sem_b.sem_flg=SEM_UNDO;
if(semop(sem_id,&sem_b,1)==-1){
fprintf(stderr,"semaphore_v failed\n");
return 0;
}
return 1;
}
//接受消息
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
union semun{
int val; //value for setval
struct semid_ds *buf; //buffer for IPC_STAT,IPC_SET
unsigned short int *array; //array for GETALL,SETALL
struct seminfo *_buf; //buffer for IPC_INFO
};
static int sem_id=0;
static int set_semvalue(int semnum,int sem_value);
static void del_semvalue(int semnum);
static int semaphore_p(int semnum);
static int semaphore_v(int semnum);
struct my_msg_st{
long int my_msg_type;
char some_text[BUFSIZ];
};
int main(){
int running=1;
int msgid;
struct my_msg_st some_data;
char buffer[BUFSIZ];
msgid=msgget((key_t)1234,0666|IPC_CREAT);
sem_id=semget((key_t)1234,4,0666|IPC_CREAT); //创建信号量 //会与msgget冲突吗?
while(running){
semaphore_p(0);
//接收输出
msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0);
printf("You wrote: %s",some_data.some_text);
if(strncmp(some_data.some_text,"end",3)==0)
break;
//输入发送
printf("Enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
msgsnd(msgid,(void*)&some_data,BUFSIZ,0);
semaphore_v(1);
if(strncmp(some_data.some_text,"end",3)==0)
break;
semaphore_p(2);
//输入发送
printf("Enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
msgsnd(msgid,(void*)&some_data,BUFSIZ,0);
semaphore_v(3);
if(strncmp(some_data.some_text,"end",3)==0)
break;
}
sleep(1);
msgctl(msgid,IPC_RMID,0);
del_semvalue(0);
exit(EXIT_SUCCESS);
}
static int set_semvalue(int semnum,int sem_value){ //初始化信号量,使用信号量前必须这样做
union semun sem_union;
sem_union.val=sem_value;
if(semctl(sem_id,semnum,SETVAL,sem_union)==-1){
fprintf(stderr,"Failed to set the semaphore\n");
return 0;
}
return 1;
}
static void del_semvalue(int semnum){ //删除信号量
union semun sem_union;
if(semctl(sem_id,semnum,IPC_RMID,sem_union)==-1){
fprintf(stderr,"Failed to delete the semaphore\n");
}
}
static int semaphore_p(int semnum){ //对信号量做-1,即等待P
struct sembuf sem_b;
sem_b.sem_num=semnum;
sem_b.sem_op=-1;
sem_b.sem_flg=SEM_UNDO;
if(semop(sem_id,&sem_b,1)==-1){
fprintf(stderr,"semaphore_p failed\n");
return 0;
}
return 1;
}
static int semaphore_v(int semnum){ //释放 *** 作,使得信号量可用,发送V信号
struct sembuf sem_b;
sem_b.sem_num=semnum;
sem_b.sem_op=1;
sem_b.sem_flg=SEM_UNDO;
if(semop(sem_id,&sem_b,1)==-1){
fprintf(stderr,"semaphore_v failed\n");
return 0;
}
return 1;
}
sender,receiver进程并发控制的流程图,代码构成
运行如图:
四.消息队列常用模块
#include
#include
#include
#include
#include
#include
#include
#include
struct my_msg_st{
long int my_msg_type;
char some_text[BUFSIZ];
};
int main(){
int running=1;
int msgid;
struct my_msg_st some_data;
char buffer[BUFSIZ];
//创建
msgid=msgget((key_t)1234,0666|IPC_CREAT);
if(msgid==-1){
fprintf(stderr,"megget failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
while(running){
//输入发送
printf("Enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
fprintf(stderr,"msgrcv failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
if(strncmp(some_data.some_text,"end",3)==0)
break;
//接收输出
if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
fprintf(stderr,"msgrcv failed with error: %d\n",errno);
exit(EXIT_FAILURE);
}
printf("You wrote: %s",some_data.some_text);
}
exit(EXIT_SUCCESS);
}
1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)