zeromq-4发布订阅模式demo
2021-05-11 本文已影响0人
小五愣
-
去官网下载zeromq4.1版本源码包
-
解压安装
A.执行configure文件:./configure
出现错误:
configure: error: Package requirements (libsodium) were not met:
No package ‘libsodium’ found
解决方案:忽略这个库
./configure --prefix=/home/ygy/zmq --without-libsodium(prefix中的路径是zmq存放的目录)
B.编译:make
C.安装:make install
publish.c
#include <stdlib.h>
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
int main(int argc, char* argv[]) {
int loop = 1;
// [0]创建对象
void* ctx = zmq_ctx_new();
void* publisher = zmq_socket(ctx, ZMQ_PUB);
// [1]绑定到8090端口
int rc = zmq_bind(publisher, "tcp://*:8090");
if (-1 == rc) {
printf("bind failed!\n");
loop = 0;
}
int index = 0;
while (loop) {
// Send message to all subscribers
//每隔1s发送一次数据
char update [24];
sprintf(update, "hello world: %d", index);
printf("server send: %s\n", update);
zmq_send (publisher, update, strlen (update), 0);
index++;
sleep(1);
}
zmq_close(publisher);
zmq_ctx_destroy(ctx);
return 0;
}
subscribe.c
#include <stdlib.h>
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <assert.h>
int main (int argc, char *argv []) {
// [0]创建对象,连接到5566端口
printf ("Collecting updates from server...\n");
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:8090");
assert (rc == 0);
// [1]设置过滤条件,设置为空,表示全订阅,“1”表示匹配开头为“1”的数据
const char *filter = "";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
assert (rc == 0);
struct timeval t1, t2;
int index = 0;
// [2]接受数据
char buffer[256] //根据需求设置长度
memset(buffer , 0, 256 * sizeof(char));
while(1) {
gettimeofday(&t1, NULL);
int size = zmq_recv (subscriber, buffer, LEN, 0);
gettimeofday(&t2, NULL);
//打印了接收时间和长度
double recv_time = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000.0;
printf("mq_recv_time: %f ms\n", recv_time);
printf("recv size: %d\n", size);
}
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}