J3. 进程间通信 消息队列(System V标准)

2021-07-18  本文已影响0人  开源519

消息队列 是消息的链接表,存储内核中,由消息标识符标识。 --《UNIX环境高级编程》

简单理解,消息队列就是一堆消息的有序集合,并缓存于内核中。如此一来,多个进程就可通过访问内核来实现多个进程之间的通信。目前存在的消息队列有POSIX与System V标准的接口,本篇主要介绍System V接口的使用。

简介

消息队列的本质是位于内核空间的链表,其中每个节点都是一个独立的消息,每个消息都有类型,相同类型的消息组成一个链表。

当各种各样的消息发出时,就如同下图所示排列在内核空间中。形状看成消息的类型,相同的形状则表示相同的消息类型。


存在于内核空间中的消息.png

这些看似杂乱无章的消息,通过消息队列发出来后,跟据其发送的类型与发送的时间,在接收端中则是有规律的排序。


消息队列1.png 消息队列2.png

如上图,内核中杂乱无章的消息,接收端可通过消息类型与发送的顺序来逐一接收处理。可通过消息类型查看指定类型的消息,若指定类型为0,则按时间顺序输出所有接收到的消息。

接口

主要用到msgget、msgsnd、msgrcv和msgctl四个接口。其使用方式man手册说明的比较清晰了,这里简单描述一下函数形式及功能。

msgget

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

主要功能是根据key值获取一个消息队列的ID。msgflag主要有两个值IPC_CREAT 和IPC_EXC,指是需要新创建消息队列ID。

msgsnd、msgrcv

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
               int msgflg);

msgsnd与msgrcv主要用于消息队列的发送与接收。这里需要注意的是发送的msgp一般定义为结构体,首个成员为long型,表示消息的类型。如此msgrcv通过指定msgtype来筛选出需要的消息。

msgctl

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msgctl是用来控制消息队列的,其中cmd指进行的操作,buf记录了消息队列的信息。
cmd:

buf:

struct msqid_ds {
    struct ipc_perm msg_perm;     /* Ownership and permissions */
    time_t          msg_stime;    /* Time of last msgsnd(2) */
    time_t          msg_rtime;    /* Time of last msgrcv(2) */
    time_t          msg_ctime;    /* Time of last change */
    unsigned long   __msg_cbytes; /* Current number of bytes in
                                    queue (nonstandard) */
    msgqnum_t       msg_qnum;     /* Current number of messages
                                    in queue */
    msglen_t        msg_qbytes;   /* Maximum number of bytes
                                    allowed in queue */
    pid_t           msg_lspid;    /* PID of last msgsnd(2) */
    pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
};

如上信息可看到buf中存储了与消息队列相关的属性,设置cmd后,可通过buf拿到这些信息。

实例演示

功能: 用消息队列实现server接client的数据,server可筛选显示指定消息类型的数据。

效果:
server接收所有消息:

JDLAL$9E19$~SPBKV{N)DNN.png

server 筛选消息类型为2的数据:


image.png

注:代码里可将消息类型封装成枚举,此demo作为演示不做过多封装。

总结

消息队列在进程间通信的优势总结起来有以下几点:

代码

client.cpp

/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name   : client.cpp
* Author      :
* Version     : V1.0
* Description :
* Journal     : 2021-03-19 init v1.0
* Brief       : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others      :
********************************************************************************
*/

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <errno.h>
#include <string.h>
#include "common.h"

const char TEXT[2][50] = {"this is client1!", "this is client2!"};

int main(int argc, char *argv[])
{
    int msg_id, key, ret = 0;
    struct MsgFrame msg_buf = {0, {0}};

    if (argc < 2) {
        PRINT_ERR("usage: %s [msgid]\n", argv[0]);
        goto exit;
    }

    if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
        PRINT_ERR("Params invalid!\n");
        goto exit;
    }

    /* Obtain the standard key according to the file path */
    key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
    if (key < 0) {
        PRINT_ERR("ftok failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    msg_id = msgget(key, IPC_EXCL);
    if (msg_id < 0) {
        PRINT_ERR("msgget failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    do {
        memset(&msg_buf, 0x00, sizeof(msg_buf));
#if 0
        if (fgets(msg_buf.buffer, sizeof(msg_buf.buffer), stdin) == NULL) {
            PRINT_ERR("scanf failed! errno = %d(%s)\n", errno, strerror(errno));
            goto exit_msgid;
        }
#else
        msg_buf.type = atoi(argv[1]);
        strncpy(msg_buf.buffer, TEXT[msg_buf.type - 1], strlen(TEXT[msg_buf.type-1]));
#endif
        ret = msgsnd(msg_id, &msg_buf, sizeof(msg_buf.buffer), IPC_NOWAIT);
        if (ret < 0) {
            PRINT_ERR("msgnd failed! errno = %d(%s)\n", errno, strerror(errno));
            goto exit_msgid;
        } else {
            PRINT_INFO("[Send %ld %ld] %s\n", msg_buf.type,
                                        strlen(msg_buf.buffer), msg_buf.buffer);
        }
        sleep(1);
    } while (strncmp(msg_buf.buffer, "end", msg_buf.type) != 0);

exit_msgid:
    ret = msgctl(msg_id, IPC_RMID, 0);
    if (ret < 0) {
        PRINT_ERR("msgctl failed! errno = %d(%s)\n", errno, strerror(errno));
    }
exit:
    return 0;
}

server.cpp

/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name   : server.cpp
* Author      :
* Version     : V1.0
* Description :
* Journal     : 2021-03-21 init v1.0
* Brief       : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others      :
********************************************************************************
*/

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include "common.h"

static int msg_id;

static void SignalHandler(int sig)
{
    switch (sig)
    {
      case SIGSTOP:
        if (msg_id != 0) {
            msgctl(msg_id, IPC_RMID, 0);
        }
        break;
      default:
        break;
    }
}

int main(int argc, char *argv[])
{
    int key, ret = 0;
    long msg_type = 0;
    struct MsgFrame msg_recv = {0, {0}};

    signal(SIGSTOP, SignalHandler);

    if (argc == 1) {
       msg_type = 0;
    } else if (argc == 2) {
        if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
            PRINT_ERR("Params invalid!\n");
            goto exit;
        } else {
            msg_type = atoi(argv[1]);
            //PRINT_INFO("Receive msg type is %ld\n", msg_type);
        }
    } else {
        PRINT_ERR("Params invalid\n");
        goto exit;
    }

    key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
    if (key < 0)
    {
        PRINT_ERR("ftok failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    msg_id = msgget(key, IPC_CREAT|0666);
    if (msg_id < 0) {
        PRINT_ERR("msgget failed! errno = %d(%s)\n", errno, strerror(errno));
        goto exit;
    }

    do {
        memset(msg_recv.buffer, 0x00, MAX_SIZE);
        ret = msgrcv(msg_id,(void*)&msg_recv, MAX_SIZE, msg_type, 0);
        if (ret != -1) {
            PRINT_INFO("[Receive %ld %ld] %s\n",
                       msg_recv.type, strlen(msg_recv.buffer), msg_recv.buffer);
        } else {
            PRINT_ERR("msgrcv failed!\n");
        }
    } while (1);

exit:
    PRINT_INFO("Exit from %s\n", __func__);
    return 0;
}
上一篇 下一篇

猜你喜欢

热点阅读