ZUCC

ZUCC,第1张

lab9进程的通信–消息队列 一.两个进程并发执行,通过消息队列,分别进行消息的发送和接收

1.代码:

//接受消息
#include
#include
#include
#include
#include
#include
#include
#include

struct my_msg_st{
	long int my_msg_type;
	char some_text[BUFSIZ];
}; 

int main(){
	int running=1;
	int msgid;
	struct my_msg_st some_data;
	long int msg_to_receive=0;
	
//First,we set up thee message queue
	msgid=msgget((key_t)1234,0666|IPC_CREAT);
	
	if(msgid==-1){
		fprintf(stderr,"megget failed with error: %d\n",errno);
		exit(EXIT_FAILURE);
	}
	
//Then the message are retrieved from the queue,until an end message is encountered
//Lastly,the message queue id deleted
	while(running){
		if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
			fprintf(stderr,"msgrcv failed with error: %d\n",errno);
			exit(EXIT_FAILURE);
		}
		printf("You wrote: %s ",some_data.some_text);
		if(strncmp(some_data.some_text,"end",3)==0)
			running=0;
	}
	
	if(msgctl(msgid,IPC_RMID,0)==-1){
		fprintf(stderr,"msgctl(IPC_RMID) failed\n");
		exit(EXIT_FAILURE);
	}
	
	exit(EXIT_SUCCESS);
}
///发送消息
#include
#include
#include
#include
#include
#include
#include
#include

#define MAX_TEXT 512
struct my_msg_st{
	long int my_msg_type;
	char some_text[MAX_TEXT];
}; 

int main(){
	int running=1;
	int msgid;
	struct my_msg_st some_data;
	char buffer[BUFSIZ];
	
	msgid=msgget((key_t)1234,0666|IPC_CREAT);
	
	if(msgid==-1){
		fprintf(stderr,"megget failed with error: %d\n",errno);
		exit(EXIT_FAILURE);
	}
	
	while(running){
		printf("Enter some text:");
		fgets(buffer,BUFSIZ,stdin);
		some_data.my_msg_type=1;
		strcpy(some_data.some_text,buffer);
			
	if(msgsnd(msgid,(void*)&some_data,MAX_TEXT,0)==-1){
		fprintf(stderr,"msgsnd failed\n");
		exit(EXIT_FAILURE);
	}
	if(strncmp(buffer,"end",3)==0)
		running=0;
	}
	
	exit(EXIT_SUCCESS);
}
//开两个终端分别运行

执行结果如图:

2)熟悉消息队列相关的系统调用

1.头文件:	#include
		   #include
		   #include
2. *** 作:
	创建队列:		msgget((key_t)10086,0666|IPC_CREAT);
	发送信息:		msgsnd(mq,(void*)&over2,MAX_SIZE,0)
	接收信息:		msgrcv(mq,(void*)&buf,1024,0,0)
	指定 *** 作:		msgctl(mq,IPC_RMID,&t)

3)尝试多个发送进程和接收进程,管擦进程的并发执行情况,并解释

现象:
	多个发送信息的终端在输入后,信息依次轮序出现在各个接收端进程中
解释:消息队列(MQ)由内核提供同步,互斥机制,可以翻阅msgsnd,msgrcv linux底层源代码一探究竟

链接

如图:



二.三个线程(两个发送者,一个接收者)并发执行。通过消息队列,分别进行消息的发送和接收

1.代码:

#include
#include
#include
#include
#include
#include
#include
#include
#include

#define QUEUE_ID 10086
#define MAX_SIZE 1024
#define MSG_STOP "exit"
#define snd_to_rcv1 1
#define snd_to_rcv2 2
#define rcv_to_snd1 3
#define rcv_to_snd2 4
#define CHECK(x) \
	do{\
		if(!(x)){\
			fprintf(stderr,"%s:%d:",__func__,__LINE__);\
			perror(#x);\
			exit(-1);\
		}\
	}while(0)\

#define P(x) sem_wait(&x)
#define V(x) sem_post(&x)

struct msg_st{					//
	long int message_type;
	char buffer[MAX_SIZE+1];
};

//function
void *sender1();
void *sender2();
void *receiver();

//global variable
sem_t w_mutex,empty,full,over,rcv_dp,snd_dp;

void *sender1(){
	int mq;
	struct msg_st buf;
	ssize_t bytes_read;
	
	//open the mail queue
	mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT);
	CHECK((key_t)-1!=mq);
	
	do{			
		P(w_mutex);
//				printf("11\n");
		P(snd_dp);
//				printf("12\n");
		printf("sender1>");
		V(rcv_dp);
		fflush(stdout);
		fgets(buf.buffer,BUFSIZ,stdin);
		buf.message_type=1;
		//send the message
				
		P(empty);
//				printf("13\n");
		CHECK(0<=msgsnd(mq,(void*)&buf,MAX_SIZE,0));
		V(full);
		V(w_mutex);
		usleep(100);
	}
	while(strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP)));
	
	//wait for response
	P(over);
//				printf("14\n");
	bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,rcv_to_snd1,0);
	CHECK(bytes_read>=0);
	printf("%s",buf.buffer);
	printf("--------------------------\n");
	V(snd_dp);
	pthread_exit(NULL);
}
void *sender2(){
	int mq;
	struct msg_st buf;
	ssize_t bytes_read;
	
	//open the mail queue
	mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT);
	CHECK((key_t)-1!=mq);
	
	do{
		P(w_mutex);
//				printf("21\n");
		P(snd_dp);
//				printf("22\n");
		printf("sender2>");
		V(rcv_dp);
		fflush(stdout);
		fgets(buf.buffer,BUFSIZ,stdin);
		buf.message_type=2;
		//send the message
		P(empty);
//				printf("23\n");
		CHECK(0<=msgsnd(mq,(void*)&buf,MAX_SIZE,0));
		V(full);
		V(w_mutex);
		usleep(100);
	}
	while(strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP)));
	
	//wait for response
	P(over);
//				printf("24\n");
	bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,rcv_to_snd2,0);
	CHECK(bytes_read>=0);								//
	printf("%s",buf.buffer);
	printf("--------------------------\n");
	V(snd_dp);
	pthread_exit(NULL);
}
void *receiver(){
	struct msg_st buf,over1,over2;
	int mq,must_stop=2;
	struct msqid_ds t;
	over1.message_type=3;
	strcpy(over1.buffer,"over1\n");
	over2.message_type=4;
	strcpy(over2.buffer,"over2\n");
	
	//open the mail queue
	mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT);
	CHECK((key_t)-1!=mq);
	
	do{
		ssize_t bytes_read,bytes_write;
		//receive the message
		P(full);
//				printf("31\n");			
		bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,0,0);
		V(empty);
		CHECK(bytes_read>=0);

		if(!strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP))){
			if(buf.message_type==1){
//					printf("321\n");						
				bytes_write=msgsnd(mq,(void*)&over1,MAX_SIZE,0);
				CHECK(bytes_write>=0);
				V(over);
				must_stop--;
			}
			else if(buf.message_type==2){
//					printf("322\n");						
				bytes_write=msgsnd(mq,(void*)&over2,MAX_SIZE,0);		//
				CHECK(bytes_write>=0);
				V(over);
				must_stop--;
			}
			else{
//					printf("323\n");			
				P(rcv_dp);
				printf("Received %d:%s",buf.message_type,buf.buffer);
				printf("------------------------------------------\n");
				V(snd_dp); 
			}
		}

	}
	while(must_stop);
	
	//clean up
	P(snd_dp);
//				printf("33/n");
	CHECK(!msgctl(mq,IPC_RMID,&t));		//
	pthread_exit(NULL);
}
 
int main(int argc,char **argv){
	pthread_t t1,t2,t3;
	int state;
	
	sem_init(&snd_dp,1,1);
	sem_init(&rcv_dp,1,0);
	sem_init(&empty,1,10);
	sem_init(&full,1,0);
	sem_init(&w_mutex,1,1);
	sem_init(&over,1,0);
	
	state=pthread_create(&t1,NULL,receiver,NULL);
	CHECK(state==0);
	state=pthread_create(&t3,NULL,sender1,NULL);
	CHECK(state==0);
	state=pthread_create(&t2,NULL,sender2,NULL);
	CHECK(state==0);
	
	pthread_join(t3,NULL);
	pthread_join(t2,NULL);
	pthread_join(t1,NULL);
	return 0; 
}
注意:运行后,两次均输入exit,才能满足receive解锁条件,实现同步

运行如图:

2)熟悉消息队列相关的系统调用

1.头文件:	 
        #include
        #include
        #include
2. *** 作:
	创建队列:		msgget((key_t)10086,0666|IPC_CREAT);
	发送信息:		msgsnd(mq,(void*)&over2,MAX_SIZE,0)
	接收信息:		msgrcv(mq,(void*)&buf,1024,0,0)
	指定 *** 作:		msgctl(mq,IPC_RMID,&t)

3)回顾posix线程控制和信号量相关的函数

1.头文件: #include
2. *** 作:
	声明:    sem_t sem1
	初始:    sem_init(&sem1,1,1)
	p *** 作:   sem_wait(&sem1)
	v *** 作:   sem_post(&sem1)
3.互斥/同步: pv/vp

4)删除信号量的并发控制,观察混乱情况

代码中注释掉所有P *** 作

5)理清并发线程中同步和互斥关系

1. *** 作:去掉原文代码所有注释		作用:在所有P *** 作后进行输出,来观察进程并发执行情况

运行如图:

流程图:并发线程同步,互斥关系:(sender1与sender2互斥)
移动端图片看不清,可以用pc端浏览;欢迎访问gitee仓库
ZUCC_ *** 作系统原理实验_Lab9进程的通信消息队列



三.编写程序

1.修改例程1,模仿例程2,在原有的MSG1向MSG2发送消息的基础上,实现MSG2也能向MSG1发送消息,即两并发进程能通过消息队列,实现双向对话。

构造核心:
    1.使用System V信号量实现两个进程间的通信
    2.使用消息队列实现两个进程间消息的接收与发送
    3.确定流程图顺序,从而确定信号量个数与初值的设定,同时流程图拆分确定两个进程的PV *** 作和核心代码

代码:

///发送消息
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include


struct my_msg_st{
	long int my_msg_type;
	char some_text[BUFSIZ];
}; 
union semun{
	int val;				//value for setval
	struct semid_ds *buf;	//buffer for IPC_STAT,IPC_SET
	unsigned short int *array; //array for GETALL,SETALL
	struct seminfo *_buf;  //buffer for IPC_INFO
};

static int sem_id=0;
static int set_semvalue(int semnum,int sem_value);
static void del_semvalue(int semnum);
static int semaphore_p(int semnum);
static int semaphore_v(int semnum);

int main(){
	int running=1;
	int msgid;
	struct my_msg_st some_data;
	char buffer[BUFSIZ];
	
	msgid=msgget((key_t)1234,0666|IPC_CREAT);
	sem_id=semget((key_t)1234,4,0666|IPC_CREAT);		//创建信号量 		//会与msgget冲突吗? 
	set_semvalue(0,0);
	set_semvalue(1,0);
	set_semvalue(2,0);
	set_semvalue(3,0);


	
	while(running){
        //输入发送
		printf("Enter some text:");
		fgets(buffer,BUFSIZ,stdin);
		some_data.my_msg_type=1;
		strcpy(some_data.some_text,buffer);
		msgsnd(msgid,(void*)&some_data,BUFSIZ,0);
		semaphore_v(0);
		if(strncmp(some_data.some_text,"end",3)==0)
			break;
		
		semaphore_p(1);
        //接收输出
		msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0);
		printf("You wrote: %s",some_data.some_text);
		semaphore_v(2);
		if(strncmp(some_data.some_text,"end",3)==0)
			break;
		
		semaphore_p(3);
        //接收输出
		msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0);
		printf("You wrote: %s",some_data.some_text);
		if(strncmp(some_data.some_text,"end",3)==0)
			break;		 
	}
	exit(EXIT_SUCCESS);
}
//开两个终端分别运行


static int set_semvalue(int semnum,int sem_value){			//初始化信号量,使用信号量前必须这样做 
	union semun sem_union;
	sem_union.val=sem_value;
	if(semctl(sem_id,semnum,SETVAL,sem_union)==-1){
		fprintf(stderr,"Failed to set the semaphore\n");
		return 0;
	}
	return 1;
}
static void del_semvalue(int semnum){			//删除信号量 
	union semun sem_union;
	if(semctl(sem_id,semnum,IPC_RMID,sem_union)==-1){
		fprintf(stderr,"Failed to delete the semaphore\n");
	}
}
static int semaphore_p(int semnum){			//对信号量做-1,即等待P 
	struct sembuf sem_b;
	sem_b.sem_num=semnum;
	sem_b.sem_op=-1;
	sem_b.sem_flg=SEM_UNDO;
	if(semop(sem_id,&sem_b,1)==-1){
		fprintf(stderr,"semaphore_p failed\n");
		return 0;
	}
	return 1;
}
static int semaphore_v(int semnum){		//释放 *** 作,使得信号量可用,发送V信号 
	struct sembuf sem_b;
	sem_b.sem_num=semnum;
	sem_b.sem_op=1;
	sem_b.sem_flg=SEM_UNDO;
	if(semop(sem_id,&sem_b,1)==-1){
		fprintf(stderr,"semaphore_v failed\n");
		return 0;
	}
	return 1;
}
//接受消息
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

union semun{
	int val;				//value for setval
	struct semid_ds *buf;	//buffer for IPC_STAT,IPC_SET
	unsigned short int *array; //array for GETALL,SETALL
	struct seminfo *_buf;  //buffer for IPC_INFO
};

static int sem_id=0;
static int set_semvalue(int semnum,int sem_value);
static void del_semvalue(int semnum);
static int semaphore_p(int semnum);
static int semaphore_v(int semnum);



struct my_msg_st{
	long int my_msg_type;
	char some_text[BUFSIZ];
}; 

int main(){
	int running=1;
	int msgid;
	struct my_msg_st some_data;
	char buffer[BUFSIZ];
	
	msgid=msgget((key_t)1234,0666|IPC_CREAT);
	sem_id=semget((key_t)1234,4,0666|IPC_CREAT);		//创建信号量 		//会与msgget冲突吗? 
	while(running){
		semaphore_p(0);
        //接收输出
		msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0);
		printf("You wrote: %s",some_data.some_text);
		if(strncmp(some_data.some_text,"end",3)==0)
			break;
        //输入发送
		printf("Enter some text:");
		fgets(buffer,BUFSIZ,stdin);
		some_data.my_msg_type=1;
		strcpy(some_data.some_text,buffer);
		msgsnd(msgid,(void*)&some_data,BUFSIZ,0);
		semaphore_v(1);
		if(strncmp(some_data.some_text,"end",3)==0)
			break;

		
		semaphore_p(2);
        //输入发送
		printf("Enter some text:");
		fgets(buffer,BUFSIZ,stdin);
		some_data.my_msg_type=1;
		strcpy(some_data.some_text,buffer);
		msgsnd(msgid,(void*)&some_data,BUFSIZ,0);
		semaphore_v(3);	
		if(strncmp(some_data.some_text,"end",3)==0)
			break;
	}
	sleep(1);
	msgctl(msgid,IPC_RMID,0);
	del_semvalue(0); 
	exit(EXIT_SUCCESS);
}


static int set_semvalue(int semnum,int sem_value){			//初始化信号量,使用信号量前必须这样做 
	union semun sem_union;
	sem_union.val=sem_value;
	if(semctl(sem_id,semnum,SETVAL,sem_union)==-1){
		fprintf(stderr,"Failed to set the semaphore\n");
		return 0;
	}
	return 1;
}
static void del_semvalue(int semnum){			//删除信号量 
	union semun sem_union;
	if(semctl(sem_id,semnum,IPC_RMID,sem_union)==-1){
		fprintf(stderr,"Failed to delete the semaphore\n");
	}
}
static int semaphore_p(int semnum){			//对信号量做-1,即等待P 
	struct sembuf sem_b;
	sem_b.sem_num=semnum;
	sem_b.sem_op=-1;
	sem_b.sem_flg=SEM_UNDO;
	if(semop(sem_id,&sem_b,1)==-1){
		fprintf(stderr,"semaphore_p failed\n");
		return 0;
	}
	return 1;
}
static int semaphore_v(int semnum){		//释放 *** 作,使得信号量可用,发送V信号 
	struct sembuf sem_b;
	sem_b.sem_num=semnum;
	sem_b.sem_op=1;
	sem_b.sem_flg=SEM_UNDO;
	if(semop(sem_id,&sem_b,1)==-1){
		fprintf(stderr,"semaphore_v failed\n");
		return 0;
	}
	return 1;
}

sender,receiver进程并发控制的流程图,代码构成

运行如图:



四.消息队列常用模块
#include
#include
#include
#include
#include
#include
#include
#include


struct my_msg_st{
	long int my_msg_type;
	char some_text[BUFSIZ];
}; 

int main(){
	int running=1;
	int msgid;
	struct my_msg_st some_data;
	char buffer[BUFSIZ];
	
	//创建
	msgid=msgget((key_t)1234,0666|IPC_CREAT);
	if(msgid==-1){
		fprintf(stderr,"megget failed with error: %d\n",errno);
		exit(EXIT_FAILURE);
	}
	
	while(running){
        //输入发送
		printf("Enter some text:");
		fgets(buffer,BUFSIZ,stdin);
		some_data.my_msg_type=1;
		strcpy(some_data.some_text,buffer);
		if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
			fprintf(stderr,"msgrcv failed with error: %d\n",errno);
			exit(EXIT_FAILURE);
		}
		if(strncmp(some_data.some_text,"end",3)==0)
			break;
		
        //接收输出
		if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
			fprintf(stderr,"msgrcv failed with error: %d\n",errno);
			exit(EXIT_FAILURE);
		}
		printf("You wrote: %s",some_data.some_text);
	}
	exit(EXIT_SUCCESS);
}

1

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/789853.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存