从API开始理解QNX

2019-11-22  本文已影响0人  jackniu_ae28

消息传递

QNX是个微内核结构的操作系统,靠的是进程间通讯来实现整个系统功能的。那么具体到写一个程序的时候,到底这个通讯是如何完成的呢?这章就是具体介绐最底层的消息传递API的。消息传递是通过内核进行的,所以所谓的API,实际也就是最底层的内核调用了。需要指出的是,真正在QNX上写程序的时候,很少会直接用到这些API,而是利用更高层的API,不过,知道这些底层的API对于将来理解建立在这些API上的界面,应该会有帮助的。


频道(Channel)与连接(Connect)

消息传递是基于服务器与客户端的模式来进行的,那么客户端怎样才能与服务器端通讯呢?最简单的,当然是指定对方的进程号。要发送的一方,将消息加一个头,告诉内核“把这个消息发给 pid 12345"就行了。其实这也是QNX4时候的做法。但QNX6开始完整支持POSIX线程后,这种方法似乎就不太适合了。如果服务器,有两个线程,分别进行不同的服务,那该怎么办呢?或者你会说“把这个消息发给pid 12345 tid 3"就行了。可是,如果某一个服务,不是由单一线程来进行服务的,而是有一组线程进行的,那又怎么办呢?

为此,QNX6抽象出了”频道“(Channel)这个概念。一个频道,就是一个服务的入口;至于这个频道到底具体有多少线程为其服务,那都是服务器端自己的事情。
一个服务器如果有多个服务,它也可以开多个频道。而客户端,在向“频道”发送消息前,需要先建立连接(Connection),然后将消息在连接上发出去。这样同一个客户端,如果需要,可以与同一个频道建立多个连接。所以,大致上通讯的准备过程是这样的:

服务代码:

  ChannelId = ChannelCreate(Flags); 

客户端代码:

ConnectionId = ConnectAttach(Node, Pid, Chid, Index, Flag);

服务器端就不用解释了,客户端要建立连接的话,它需要Node,这个就是机器号。如果过网络(透明分布处理)时这个值决定了哪一台机器;如果客户端与服务器在同一台机器里时,这个数字是0,或者说ND_LOCAL_NODE;pid是服备器的进程号;而chid就是服务器调用 ChannelCreate()后得到的频道号了。Index与Flag以后再讨论。基本上客户端就是同"Node这台机器里的,Pid这个进程的,Chid频道"做一个连接。有了连接以后,就可以进行消息传递了。

连接的终止是ConnectDetach(),而频道的结束则是ChannelDestroy()了。不过,一般服务器都是长久存在的,不大有需要ChannelDestroy()的时候。


发送(Send),接收(Receive)和应答(Reply)

QNX的消息传递,与我们传统常见的进程间通讯最大的不同,就是这是一个"同步的"消息传递。一个消息传递,都要经过发送,接收和应答三个部份,所谓的 SRR过程。具体来说,客户端在连接上"发送"消息,一旦发送,客户端会被阻塞,服务器端会接收到消息,进行处理,最后,将处理结果"应答"给客户端;只有服务器"应答"了以后,客户端的阻塞状态才会被解除。这种同步的过程,不但保证的客户端与服务器端的时序,也大大简化了编程。具体用API来说,就是这样。

服务代码:

 ReceiveId = MsgReceive(ChannelId, ReceiveBuffer, ReceiveBufLength, &MsgInfo);
  (... 检查Buffer里的消息进行处理 ...)
  MsgReply(RceeiveId, ReplyStatus, ReplyBuf, ReplyLen);

客户端代码:

 MsgSend(ConnectionId, SendBuf, SendLen, ReplyBuf, ReplyLen);
(... 由OS将这个线程挂起 ...)
(... 当服务器MsgReply()后,OS 解除线程的阻塞状态, 客户端可以检查自己的 ReceiveBuf 看看应答结果 ...)

服务器端在频道上进行接收,处理完后应答;客户端则是在连接上发送,要注意在发送的同时,客户端还提供了接收应答用的缓冲。如果你细心的话,或许你会问,服务器端的MsgReceive()与客户端的MsgSend()没有同步,会不会有问题呢?比如,如果MsgSend()时,服务器没有在 MsgReceive(),会出什么事呢?答案是OS依然会把发送线程挂起,发送线程从执行状态(RUNNING)转入“发送阻塞”状态(SEND BLOCK),一直等到服务器来MsgReceive()时,再将SendBuf里的东西复制到ReceiveBuffer里去,同时发送线程的状态变成 “应答阻塞”(REPLY BLOCK)。
同样的,如果服务器调用MsgReceive()时,没有客户端,服务器线程也会被挂起,进入“接收阻塞”状态(RECEIVE BLOCK)。

在应答时,还可以用MsgError()来告诉发送方有错误发生了。因为MsgReply()也可以返回一个状态,或许你会问这两者之间有什么区别?MsgReply(rcvid, EINVAL, 0, 0);的结果是,MsgSend() 这个函数的返回值是22(EINVAL);而MsgError(rcvid, EINVAL);的结果,是MsgSend()返回-1,而errno被设为EINVAL。


数据区与iov

除了用线性的缓冲区进行消息传递以外,为了方便使用,还提供了用iov_t来“汇集”数据。也就是说,可以一次传送几块数据。好象下面的图这样子。虽然在客户端蓝色的Header同红色的databuf是两块不相邻的内存,但传递到服务器端的ReceiveBuffer里,就是连续的了。也就是说在服务器端,要想得到原来databuf里的数据,只需要(ReceiveBuffer + sizeof(header))就可以了。(要注意数据结构对其)

SETIOV(&iov[0], &header, sizeof(header));
SETIOV(&iov[1], databuf, datalen);
MsgSendvs(ConnectionId, iov, 2, Replybf, ReplyLen);

"header" 与 "databuf" 是不连续的两块数据。
服务器接收后,"header"与"databuf"被连续地存在ReceiveBuffer里。

ReceiveId = MsgReceive(ChannelId, ReceiveBuffer, ReceiveBufLength, &MsgInfo); 
header = (struct header *)ReceiveBUffer;
databuf = (char *)((char *)header + sizeof(*header));

例子

好了,有了以上这些基本函数(内核调用),我们就可以写一个客户端和一个服务器端,进行最基本的通信了。
服务嚣:这个服务器,准务好频道后,就从频道上接收信息。如果信息是字符串”Hello“的话,这个服务器应答一个”World“字符串。如果收到的信处是字符串“Ni Hao", 那么它会应答”Zhong Guo",其它任何消息都用MsgError()回答一个错误。

$ cat simple_server.c

// Simple server
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/neutrino.h>

int main()
{
        int chid, rcvid, status;
        char buf[128];

        if ((chid = ChannelCreate(0)) == -1) {
                perror("ChannelCreate");
                return -1;
        }

        printf("Server is ready, pid = %d, chid = %d\n", getpid(), chid);

        for (;;) {
                if ((rcvid = MsgReceive(chid, buf, sizeof(buf), NULL)) == -1) {
                        perror("MsgReceive");
                        return -1;
                }

                printf("Server: Received '%s'\n", buf);

                /* Based on what we receive, return some message */
                if (strcmp(buf, "Hello") == 0) {
                        MsgReply(rcvid, 0, "World", strlen("World") + 1);
                } else if (strcmp(buf, "Ni Hao") == 0) {
                        MsgReply(rcvid, 0, "Zhong Guo", strlen("Zhong Guo") + 1);
                } else {
                        MsgError(rcvid, EINVAL);
                }
        }

        ChannelDestroy(chid);
        return 0;
}

客户端:客户端通过从命令行得到的服务器的进程号与频道号,与服务器建立连接。然后向服务器发送三遍"Hello"和”Ni Hao",并检查返回值。最后发一个“unknown"看是不是MsgSend()会得到一个出错返回。

$ cat simple_client.c

//simple client
#include <stdio.h>
#include <string.h>
#include <sys/neutrino.h>

int main(int argc, char **argv)
{
        pid_t spid;
        int chid, coid, i;
        char buf[128];

        if (argc < 3) {
                fprintf(stderr, "Usage: simple_client <pid> <chid>\n");
                return -1;
        }

        spid = atoi(argv[1]);
        chid = atoi(argv[2]);

        if ((coid = ConnectAttach(0, spid, chid, 0, 0)) == -1) {
                perror("ConnectAttach");
                return -1;
        }

        /* sent 3 pairs of "Hello" and "Ni Hao" */
        for (i = 0; i < 3; i++) {
                sprintf(buf, "Hello");
                printf("client: sent '%s'\n", buf);
                if (MsgSend(coid, buf, strlen(buf) + 1, buf, sizeof(buf)) != 0) {
                        perror("MsgSend");
                        return -1;
                }
                printf("client: returned '%s'\n", buf);

                sprintf(buf, "Ni Hao");
                printf("client: sent '%s'\n", buf);
                if (MsgSend(coid, buf, strlen(buf) + 1, buf, sizeof(buf)) != 0) {
                        perror("MsgSend");
                        return -1;
                }
                printf("client: returned '%s'\n", buf);
        }

        /* sent a bad message, see if we get an error */
        sprintf(buf, "Unknown");
        printf("client: sent '%s'\n", buf);
        if (MsgSend(coid, buf, strlen(buf) + 1, buf, sizeof(buf)) != 0) {
                perror("MsgSend");
                return -1;
        }

        ConnectDetach(coid);

        return 0;
}

执行结果

$ ./simple_server
Server is ready, pid = 36409378, chid = 2
Server: Received 'Hello'
Server: Received 'Ni Hao'
Server: Received 'Hello'
Server: Received 'Ni Hao'
Server: Received 'Hello'
Server: Received 'Ni Hao'
Server: Received 'Unknown'
Server: Received ''
$ ./simple_client 36409378 2
client: sent 'Hello'
client: returned 'World'
client: sent 'Ni Hao'
client: returned 'Zhong Guo'
client: sent 'Hello'
client: returned 'World'
client: sent 'Ni Hao'
client: returned 'Zhong Guo'
client: sent 'Hello'
client: returned 'World'
client: sent 'Ni Hao'
client: returned 'Zhong Guo'
client: sent 'Unknown'
MsgSend: Invalid argument

可变消息长度

从上面的程序也可以看出来,消息传递的实质是把数据从一个缓冲,复制到(另一个进程的)另一个缓冲里去。问题是,如何确定缓冲的大小呢?上述的例子里,服务器端用了一个128字节的缓冲,万一客户端发送一个比如说512字节的消息,是不是消息传递就会出错了呢?

答案是,传递依然成功,但是,只有SendBuffer的最初的128个字节的数据会被复制。设计思想是,服务器必须发现这样的情形,并设法取得完整的数据。

在MsgRecieve()时,第四个参数是一个 struct _msg_info。内核会在进行消息传递的同时,填充这个结构,从而告诉让你得到一些信息。在这个结构中,"msglen"告诉你这次消息传递你实际收到了多少字节(在我们的例子里,就是128),而"srcmsglen"则告诉你发送方的实际Buffer会有多大(在我们的例子里,是512)。通过比较这两个值,服务器端就可以判断有没有收到全部数据。

一旦服务器知道了还有更多的数据没有收到,那该怎么办呢?QNX提供了 MsgRead()这个特殊函数。服务器端可以用这个函数,从发送缓冲中“读取”数据。MsgRead()基本上就是告诉内核,从发送缓冲的某个指定偏移开始,读取一定长的数据回来。所以服务器端这部份的代码基本上是这样的。

int rcvid;
struct _msg_info info;
char buf[128], *totalmsg;

...

rcvid = MsgReceive(chid, buf, 128, &info);
...
if (info->srcmsglen > info->msglen) {
    totalmsg = malloc(info->srcmsglen);
    if (!totalmsg) {
        MsgError(rcvid, ENOMEM);
        continue;
    } 
    memcpy(totalmsg, buf , 128);
    if (MsgRead(rcvid, &totalmsg[128], 128, info->srcmsglen - info->msglen) == -1) {
        MsgError(rcvid, EINVAL);
        continue;
    }
} else {
    totalmsg = buf;
}

/* Now totalmsg point to a full message, don't forget to free() it later on,
 * if totalmsg is malloc()'d here
 */

你或者会问,为什么消息接收都已经结束了,服务器端还能去读取客户端的数据?这是因为从一开始我们就提到的,QNX的消息传递是“同步”的。还记得吗?在服务器端“应答”之前,客户端是被阻塞的;也说是说客户端的发送缓冲会一直保留在那里,不会变化。(另外再开个线程去把这个缓冲搞乱甚至free掉?当然可以。不过,这是你客户端程序的BUG了)

与此相近的,有的时候,服务器需要返回大量的数据给客户端(比如说1M)。服务器不希望 malloc(1024 * 1024),然后MsgReply(),然后再free()。(在嵌入式程序里,经常地进行malloc()/free()不是一个很好的习惯)那么服务器也可以用一个小的定长缓冲,比方说16K,然后把数据“一部份一部份地写回”客户端的应答缓冲里。好象下面的样子。要记得最后还是要做一个 MsgReply() 以让客户端继续运行。

char *buf[16 * 1024];
unsigned offset;

    for  (offset = 0; offset < 1024 * 1024; offset += 16 * 1024) {
        /* moving data into buffer */
        MsgWrite(rcvid, buffer, 16 * 1024, offset);
    }
    /* 1MB returned, Reply() to let client go */
    MsgReply(rcvid, 0, 0, 0);

实例

以下是QNX的C库中的read()和write()函数实装,有了前面的基础,应该很好理解了。先不管fd是如何得到的,只要理解fd就是 ConnectAttach()返加的连接号就可以了。虽然read()是从服务器取得数据,而write()是向服务器输出数据,但实质上,它们都是向服务器提出一个请求,由服务器来应答。而对于write()来说,这是一个io_write_t,一个MsgWritev()把请求与要传递的数据一起发给服务器;而对于read()来说,请求被封装在 io_read_t 里,MsgSend()把这请求传给服务器,read()的结果缓冲,则做为应答缓冲,由服务器MsgReply()时填入。
read():

/*
 * $QNXLicenseC:
 * Copyright 2007, QNX Software Systems. All Rights Reserved.
 * 
 * You must obtain a written license from and pay applicable license fees to QNX 
 * Software Systems before you may reproduce, modify or distribute this software, 
 * or any work that includes all or part of this software. Free development 
 * licenses are available for evaluation and non-commercial purposes. For more 
 * information visit http://licensing.qnx.com or email licensing@qnx.com.
 * 
 * This file may contain contributions from others. Please review this entire 
 * file for other proprietary rights or license notices, as well as the QNX 
 * Development Suite License Guide at http://licensing.qnx.com/license-guide/ 
 * for other information.
 * $
 */




#include <unistd.h>
#include <sys/iomsg.h>

ssize_t read(int fd, void *buff, size_t nbytes) {
    io_read_t   msg;

    msg.i.type = _IO_READ;
    msg.i.combine_len = sizeof msg.i;
    msg.i.nbytes = nbytes;
    msg.i.xtype = _IO_XTYPE_NONE;
    msg.i.zero = 0;
    return MsgSend(fd, &msg.i, sizeof msg.i, buff, nbytes); 
}

/*
 * $QNXLicenseC:
 * Copyright 2007, QNX Software Systems. All Rights Reserved.
 * 
 * You must obtain a written license from and pay applicable license fees to QNX 
 * Software Systems before you may reproduce, modify or distribute this software, 
 * or any work that includes all or part of this software. Free development 
 * licenses are available for evaluation and non-commercial purposes. For more 
 * information visit http://licensing.qnx.com or email licensing@qnx.com.
 * 
 * This file may contain contributions from others. Please review this entire 
 * file for other proprietary rights or license notices, as well as the QNX 
 * Development Suite License Guide at http://licensing.qnx.com/license-guide/ 
 * for other information.
 * $
 */




#include <unistd.h>
#include <sys/iomsg.h>

ssize_t write(int fd, const void *buff, size_t nbytes) {
    io_write_t  msg;
    iov_t   iov[2];

    msg.i.type = _IO_WRITE;
    msg.i.combine_len = sizeof msg.i;
    msg.i.xtype = _IO_XTYPE_NONE;
    msg.i.nbytes = nbytes;
    msg.i.zero = 0;
    SETIOV(iov + 0, &msg.i, sizeof msg.i);
    SETIOV(iov + 1, buff, nbytes);
    return MsgSendv(fd, iov, 2, 0, 0);
}

服务器端应该是怎样进行处理的?想想MsgRead()/MsgWrite(),你应该不难想像服务器端是如何工作的吧。

脉冲(Pulse)

脉冲其实更像一个短消息,也是在“连接”上发送的。脉冲最大的特点是它是异步的。发送方不必要等接收方应答,直接可以继续执行。但是,这种异步性也给脉冲带来了限制。脉冲能携带的数据量有限,只有一个8位的"code"域用来区分不同的脉冲,和一个32位的“value"域来携带数据。脉冲最主要的用途就是用来进行“通知”(Notification)。不仅是用户程序,内核也会生成发送特殊的“系统脉冲”到用户程序,以通知某一特殊情况的发生。

脉冲的接收比较简单,如果你知道频道上不会有别的消息,只有脉冲的话,可以用MsgReceivePulse()来只接收脉冲;如果频道既可以接收消息,也可以接收脉冲时,就直接用MsgReceive(),只要确保接收缓冲(ReveiveBuf)至少可以容下一个脉冲(sizeof struct _pulse)就可以了。在后一种情况下,如果MsgReceive()返回的rcvid是0,就代表接收到了一个脉冲,反之,则收到了一个消息。所以,一个既接收脉冲,又接收消息的服务器,可以是这样的。

union {
    struct _pulse pulse;
    msg_header   header;
} msgs;

...

if ((rcvid = MsgReceive(chid, &msgs, sizeof(msgs), &info)) == -1) {
    perror("MsgReceive");
    continue;
}

if (rcvid == 0) {
    process_pulse(&msgs, &info);
} else {
    process_message(&msgs, &info);
}

脉冲的发送,最直接的就是MsgSendPulse()。不过,这个函数通常只在一个进程中,用在一个线程要通知另一个线程的情形。在跨进程的时候,通常不会用到这个函数,而是用到下面将要提到的 MsgDeliverEvent()。

与消息传递相比,消息传递永远是在进程间进行的。也就是说,不会有一个进程向内核发送数据的情形。而脉冲就不一样,除了用户进程间可以发脉冲以外,内核也会向用户进程发送“系统脉冲”来通知某一事件的发生.

消息传递的方向与MsgDeliverEvent()

从一开始就提到,QNX的消息传递是客户、服务器型的。也就是说,总是由客户端向服务器端发送请求,等待被回复的。但在现实情况中,客户端与服务器端并不是很容易区分开来的。有的服务器端为了处理客户端的请求,本身就需要向别的服务器发送消息;有的客户端需要从不同的服务器那里得到服务,而不能阻塞在某一特定的服务器上;还有的时候,两个进程间的数据是互相流动的,这应该怎么办呢?

也许有人认为,两个进程互为通讯就可以了。每个进程都建立自己的频道,然后都与对方的频道建一个连接就好了;这样,需要的时候,就可以直接通过连接向对方发送消息了。就好象管道(pipe)或是socketpair一样。请注意,这种设计在QNX的消息传递中是应该避免的。因为很容易就造成死锁。一个常见的情形是这样的。

进程A:MsgSend() 到进程B
进程B:MsgReceive()接收到消息
进程B:处理消息,然后MsgSend()给进程A

因为进程A正在阻塞状态中,无法接收并处理B的请求;所以A会在STATE_REPLY里,而B则会因MsgSend()而进入STATE_SEND,两个进程就互为死锁住了。当然,如果A和B都使用多线程,专门用一个线程来MsgReceive(),这个情形或许可以避免;但你要保证 MsgReceive()的线程不会去MsgSend(),否则一样会死锁。在程序简单的时候或许你还有控制,如果程序变得复杂,又或者你写的只是一个程序库,别人怎么来用你完全没有控制,那么最好还是不要用这种设计。

在QNX中,正确的方法是这样的。

客户端: 准备一个“通知事件”(Notification Event),并把这个事件用MsgSend()发给服务器端,意思是:“如果xxx情况发生的话,请用这个事件通知我”。
服务器: 收到这个消息后,记录下当时的rcvid,和传过来的事件,然后应答“好的,知道了”。
客户端: 因为有了服务器的应答,客户端不再阻塞,可以去做别的事
......
服务器: 在某个时刻,客户端所要求的“xxx情况”满足了,服务器调用 MsgDeliverEvent(rcvid, event);以通知客户端
客户端: 收到通知,再用MsgSend()发关“xxx 情况的数据在哪里?”
服务器: 用MsgReply()把数据返回给客户端

具体的例子,可以参考MsgDeliverEvent()的文档说明。

路径名(Path Name)

现在来回想一下我们最初的例子,客户端与服务器是怎样取得连接的?客户端需要服务器的 nd, pid, chid,才能与服务器正确地建立连接。在我们的例子里,我们是让服务器显示这几个数,然后在客户端的启动时,通过命令行里传给客户端。但是,在一个现实的系统里,进程不断地启动、终止;服务器与客户端的起动过程也无法控制,这种方法显然是行不通的。

QNX的解决办法,是把“路径名”与上述的“服务频道”概念巧妙地结合起来。让服务器进程可以注册一个路径名,与服务频道的nd, pid, chid关联起来。这样,客户端就不需要知道服务器的nd, pid, chid,而只要请求连接版务器路径名就可以了。具体来说 name_attach()就是用来建立一个频道,并为频道注册一个名字的;而name_open()则是用来连接注册过的服务器频道;具体的例子,可以在name_attach()的文档里找到,这里就不再重复了。

上一篇下一篇

猜你喜欢

热点阅读