zeromq-4发布订阅模式demo

2021-05-11  本文已影响0人  小五愣
  1. 去官网下载zeromq4.1版本源码包

  2. 解压安装

A.执行configure文件:./configure
出现错误:
configure: error: Package requirements (libsodium) were not met:
No package ‘libsodium’ found
解决方案:忽略这个库
./configure --prefix=/home/ygy/zmq --without-libsodium(prefix中的路径是zmq存放的目录)
B.编译:make
C.安装:make install

publish.c

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 

int main(int argc, char* argv[]) {
    int loop = 1;

    // [0]创建对象
    void* ctx = zmq_ctx_new();
    void* publisher = zmq_socket(ctx, ZMQ_PUB);
   
    // [1]绑定到8090端口
    int rc = zmq_bind(publisher, "tcp://*:8090");
    if (-1 == rc) {
        printf("bind failed!\n");

        loop = 0;
    }

    int index = 0;

    while (loop) {
        //  Send message to all subscribers
        //每隔1s发送一次数据  
        char update [24];
        sprintf(update, "hello world: %d", index);

        printf("server send: %s\n", update);
 
        zmq_send (publisher, update, strlen (update), 0);

        index++;
        sleep(1);
    }

    zmq_close(publisher);
    zmq_ctx_destroy(ctx);

    return 0;
}

subscribe.c

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 
#include <assert.h>

int main (int argc, char *argv []) {
    //  [0]创建对象,连接到5566端口
    printf ("Collecting updates from server...\n");

    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
 
    int rc = zmq_connect (subscriber, "tcp://localhost:8090");
    assert (rc == 0);

    //  [1]设置过滤条件,设置为空,表示全订阅,“1”表示匹配开头为“1”的数据
    const char *filter =  "";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));

    assert (rc == 0);
    struct timeval t1, t2;

    int index = 0;

    //  [2]接受数据
    char buffer[256]    //根据需求设置长度
    memset(buffer , 0, 256 * sizeof(char));

    while(1) {
        gettimeofday(&t1, NULL);
        int size = zmq_recv (subscriber, buffer, LEN, 0);
        gettimeofday(&t2, NULL);
        
        //打印了接收时间和长度
        double recv_time = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000.0;
        printf("mq_recv_time: %f ms\n", recv_time);

        printf("recv size: %d\n", size);
    }
   
    zmq_close (subscriber);
    zmq_ctx_destroy (context);
  
    return 0;
}

上一篇下一篇

猜你喜欢

热点阅读