ZeroMQ分享-part2

2016-09-28  本文已影响722人  分享放大价值

ZeroMQ API Reference


void *zmq_ctx_new();

创建一个ZMQ的上下文环境,是ZMQ一切的开始。

int zmq_ctx_shutdown(void *context);

关闭ZMQ的上下文环境。

注意 该函数并没有释放ZMQ分配的资源。因此,对于彻底关闭ZMQ通讯而言,执行该函数是可选的,调用之后仍需调用zmq_ctx_term释放ZMQ分配的所有资源。

int zmq_ctx_term(void *context);

销毁ZMQ的上下文环境,释放ZMQ分配的所有资源,是ZMQ一切的结束。执行zmq_ctx_shutdown的所有操作。

void *zmq_socket(void *context, int type);

创建一个给定上下文环境下的ZMQ socket,type指定了ZMQ通讯的模式。

注意

  1. 传统的socket提供的是同步的字节流或数据包通讯接口;而ZMQ socket提供的是异步的消息队列,传递的不是离散的数据包或连续的字节流,而是离散的“消息”。
  2. 传统的socket提供的是一对一或多对一(多个客户端,一个服务器),特殊情况下会有一对多(多播);而ZMQ socket提供的是多对多模型,客户端可以通过zmq_connect连接多个服务器,服务器端可以通过zmq_bind连接多个客户端。
int zmq_close(void *socket);

关闭指定的socket上的所有连接,未到达的数据会被丢弃。

int zmq_connect(void *socket, const char *endpoint);

(客户端)与指定的端点建立连接,通过endpoint参数指定了通讯协议。

注意

  1. 传统的connect函数一般不马上返回(三次握手),一旦超时则socket不再可用,必须关闭;zmq_connect总是立即返回,但是成功调用函数并不表明已经建立连接。因此,绝大多数情况下,服务器调用zmq_bind与客户端调用zmq_connect的顺序无所谓。
  2. 除了ZMQ_ROUTER模式外,调用zmq_connect后socket进入就绪状态;而调用zmq_bind后socket进入静音状态,根据zmq_socket类型会阻塞或返回并丢弃消息。
int zmq_disconnect(void *socket, const char *endpoint);

(客户端)关闭指定的socket上的指定连接,未到达的数据会被丢弃。

int zmq_bind(void *socket, const char *endpoint);

(服务器)将socket与指定的端点绑定,接收客户端的连接。

注意 调用zmq_bind后的socket进入静音状态,根据zmq_socket类型会阻塞或立即返回并丢弃消息

int zmq_unbind(void *socket, const char *endpoint);

(服务器)将socket与指定的端点解绑,不再接收向该端点提出连接请求的客户端连接。

/* Create a ZMQ_SUB socket */
void *socket = zmq_socket(context, ZMQ_SUB);
assert(socket);
/* Bind it to the system-assigned ephemeral port using a TCP transport */
rc = zmq_bind(socket, "tcp://127.0.0.1:*");
assert(rc == 0);
/* Obtain real endpoint */
const size_t buf_size = 32;
char buf[buf_size];
rc = zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, buf,
                       (size_t *)&buf_size);
assert (rc == 0);
/* Unbind socket by real endpoint */
rc = zmq_unbind(socket, buf); 
assert (rc == 0);
int zmq_setsockopt(void *socket, int option_name,
                    const void *option_value,size_t option_len);

为指定的socket设定option_name,值由option_value指定。为了使选项生效,要在建立连接(如zmq_connect)之前调用zmq_setsockopt。

注意 该选项会被zmq_recv的flag参数ZMQ_DONTWAIT屏蔽。

注意 该选项会被zmq_send的flag参数ZMQ_DONTWAIT屏蔽。

int zmq_getsockopt(void *socket, int option_name,
                    void *option_value,size_t *option_len);

从指定的socket中提取指定option_name,保存在option_value中。

int zmq_recv(void *socket, void *buf,size_t len, int flags);

从指定的socket接收消息,并存储在buf中。

注意 当socket无数据接收时,若flags参数设置为ZMQ_DONTWAIT则zmq_recv立即返回-1,忽略zmq_setsockopt的ZMQ_RCVTIMEO设置的超时时间。

int zmq_send(void *socket, void *buf,size_t len, int flags);

将buf中的消息放入指定socket的发送队列。

注意

  1. 成功调用zmq_send并不能说明消息已经传输到网络,只能保证消息被放入消息队列,而ZMQ对消息队列中的消息负责。
    对于多片的消息,ZMQ会“原子”地发送。即要么消息全部送达,要么消息全部未送达。
  2. 当socket无数据发送时,若flags参数设置为ZMQ_DONTWAIT则zmq_send立即返回-1,忽略zmq_setsockopt的ZMQ_SNDTIMEO设置的超时时间。
const char *zmq_strerror(int errnum);

根据给定的errno序号得到相应的出错信息。

int zmq_poll(zmq_pollitem_t *items, int nitems,long timeout);

提供ZMQ的多路I/O复用机制。

typedef struct{
    void *socket;
    int fd;
    short events;
    short revents;
} zmq_pollitem_t;

注意

  1. zmq_poll提供了与传统的select和poll类似的“轮询”功能。在单线程下可以同时监控多个套接字的响应状态,使得服务端能够同时处理多个客户端的请求,也可以使客户端同时向多个服务端发送请求。
  2. 其实,对于ZMQ来说,zmq_poll是不必要的,因为zmq_connect和zmq_bind支持连接/绑定多个端点。完全可以使用它们取代zmq_poll的功能。例如:
// using zmq_poll implements I/O multiplexing
vector<string> zmq_url/* */;
int port_num/* */;
vector<void *> receiver;
vector<void *> context;
zmq_pollitem_t *poll_item;int ret;
poll_item = (zmq_pollitem_t*)malloc(sizeof(zmq_pollitem_t) * port_num);
for (int i = 0; i < port_num; ++i) {
        void *aContext, *aReceiver;
        aContext = zmq_ctx_new();
        aReceiver = zmq_socket(aContext, ZMQ_PULL);
        int timeout = 1000;  // milliseconds 
        zmq_setsockopt(aReceiver, ZMQ_RCVTIMEO, &timeout, sizeof(int)); 
        ret = zmq_connect(aReceiver, zmq_url[i].c_str());    
        assert(0 == ret);    
        receiver.push_back(aReceiver);    
        context.push_back(aContext);    
        poll_item[i].socket = receiver[i];    
        poll_item[i].fd = 0;    
        poll_item[i].events = ZMQ_POLLIN;    
        poll_item[i].revents = 0;
}
ret = zmq_poll(poll_item, port_num, 0);
assert(-1 != ret);
for (int i = 0; i < port_num; ++i) {  
        if (poll_item[i].revents & ZMQ_POLLIN) {        
            ret = zmq_recv(receiver[i] ...       
             ...    
        }
}
// using zmq_connect implements I/O multiplexing
vector<string> zmq_url/* */;
int port_num/* */;
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
zmq_setsockopt(receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
int timeout = 1000;  // milliseconds
int ret;
for (int i = 0; i < port_num; ++i) {  
        ret = zmq_connect(receiver, zmq_url[i].c_str());    
        assert(0 == ret);
}
ret = zmq_recv(receiver[i] ...
...
上一篇下一篇

猜你喜欢

热点阅读