///common.h
#ifndef EXP3_3_COMMON_H
#define EXP3_3_COMMON_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define QUEUE_ID 10086
#define MAX_SIZE 1024
#define MSG_STOP "exit"
//message trace
#define snd_to_rcv1 1
#define snd_to_rcv2 2
#define rcv_to_snd1 3
#define rcv_to_snd2 4
#define CHECK(x) \
do { \
if (!(x)) { \
fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
perror(#x); \
exit(-1); \
} \
} while (0) \
//stderr shu chu biaozhun cuowu
//perror shu chu cuowu yuan yin
/* global variable */
sem_t *w_mutex, *empty, *full, *over, *rcv_dp, *snd_dp, *finish;
char *SEM_NAME[7] = {"w_mutex", "empty","full", "over", "rcv_dp", "snd_dp", "finish"};
void create_sem()
{
w_mutex = sem_open(SEM_NAME[0], O_CREAT, 0666, 1);
empty = sem_open(SEM_NAME[1], O_CREAT, 0666, 10);
full = sem_open(SEM_NAME[2], O_CREAT, 0666, 0);
over = sem_open(SEM_NAME[3], O_CREAT, 0666, 0);
rcv_dp = sem_open(SEM_NAME[4], O_CREAT, 0666, 0);
snd_dp = sem_open(SEM_NAME[5], O_CREAT, 0666, 1);
finish = sem_open(SEM_NAME[6], O_CREAT, 0666, 0);
return ;
}
void release_sem()
{
int i;
sem_close(w_mutex);
for(i = 0; i < 10; ++i)
sem_close(empty);
sem_close(snd_dp);
for(i = 0; i < 7; ++i)
sem_unlink(SEM_NAME[i]);
return ;
}
#define P(x) sem_wait(x)
#define V(x) sem_post(x)
//消息缓冲区
struct msg_st {
long int message_type;//消息类型
char buffer[MAX_SIZE];//消息正文
};
//进程随机获得写数据的机会
int get_chance(int range)
{
srand((unsigned)time(NULL));
return rand()%range;
}//随机数
#endif //EXP3_3_COMMON_H
//sender1.c
#include "common.h"
int main(){
int mq;
struct msg_st buf;
ssize_t bytes_read;
//创建或者打开有名信号量
create_sem();
//创建消息队列
mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);//msgget返回一个消息队列标识符,
CHECK((key_t) -1 != mq);
do {
printf("sender1> ");
fflush(stdout);//冲刷输出缓冲区
fgets(buf.buffer, MAX_SIZE, stdin);
buf.message_type = snd_to_rcv1;
/* send the message */
P(empty);//队列有空闲
P(w_mutex);
CHECK(0 <= msgsnd(mq, (void*)&buf, MAX_SIZE, 0));//向标识符为mq的队列发送消息,成功返回1,失败返回-1。
V(full);
V(w_mutex);
} while (strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP)));//字符比较,结束条件即发送exit字符串。
/* wait for response */
P(over);
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, rcv_to_snd1, 0);
/*
msgtype消息类型,即rcv_to_snd1.
0:接收消息队列中的第一个消息
>0:接受第一个为msgtype类型的消息
<0:接受第一个类型小于等于msgtype的绝对值的消息
*/
CHECK(bytes_read >= 0);
printf("%s", buf.buffer);
V(finish);
return 0;
}
//receive.c
#include "common.h"
int main()
{
create_sem();
struct msg_st buf, over1, over2;
int mq, must_stop = 2;
struct msqid_ds t;
over1.message_type = 3;
strcpy(over1.buffer, "over1\n");
over2.message_type = 4;
strcpy(over2.buffer, "over2\n");
/* open the mail queue */
mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
CHECK((key_t) -1 != mq);
do {
ssize_t bytes_read, bytes_write;
/* receive the message */
P(full);
bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
V(empty);
CHECK(bytes_read >= 0);
if (!strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))) {
if (buf.message_type == 1) {
printf("send over1 to sender1\n");
bytes_write = msgsnd(mq, (void *) &over1, MAX_SIZE, 0);
CHECK(bytes_write >= 0);
V(over);
must_stop--;
} else if (buf.message_type == 2) {
printf("send over2 to sender2\n");
bytes_write = msgsnd(mq, (void *) &over2, MAX_SIZE, 0);
CHECK(bytes_write >= 0);
V(over);
must_stop--;
}
} else {
printf("Received%ld: %s", buf.message_type, buf.buffer);
}
} while (must_stop);
P(finish);
P(finish);
CHECK(!msgctl(mq, IPC_RMID, &t));
release_sem();
return 0;
}
实验三:利用Linux的消息队列通信机制实现三个线程间的通信
编写程序创建三个线程:sender1线程、sender2线程和receive线程,三个线程的功能描述如下:
①sender1线程:运行函数sender1(),它创建一个消息队列,然后等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end1”,并且等待receiver的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,结束线程的运行。
②sender2线程:运行函数sender2(),共享sender1创建的消息队列,等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end2”,并且等待receiver的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,结束线程的运行。
③Receiver线程:运行函数receive(),它通过消息队列接收来自sender1和sender2两个线程的消息,将消息显示在终端屏幕上,当收到内容为“end1”的消息时,就向sender1发送一个应答消息“over1”;当收到内容为“end2”的消息时,就向sender2发送一个应答消息“over2”;消息接收完成后删除消息队列,结束线程的运行。选择合适的信号量机制实现三个线程之间的同步与互斥。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)