ZeroMQ学习笔记

2018-07-19  本文已影响0人  VienFu

1. 关于消息

ZMQ的传输单位是消息,即一个二进制块。你可以使用任意的序列化工具,如谷歌的Protocal Buffers、XDR、JSON等,将内容转化成ZMQ消息。不过这种转化工具最好是便捷和快速的,请注意衡量。

消息使用的几点注意

2. 多套接字处理

一般地,主程序的循环体内都会包括如下三个部分:

下面以同时处理任务分发和天气消息订阅套接字为例展示以上两种方式的具体处理过程。

2.1 非阻塞并行处理

# -*- coding: utf-8 -*-
import zmq,time

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.connect('tcp://localhost:5557')

subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5556')
subscriber.setsockopt(zmq.SUBSCRIBE, b'10001')

while True:

    while True:  # 任务分发优先级提前
        try:
            msg = receiver.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        #处理任务

        try:
            msg = subscriber.recv(zmq.DONTWAIT)
        except zmq.Again
            break
        #天气更新

        time.sleep(0.001)

上面方法的缺点是没有收到任何消息之前会有1毫秒的延迟,这在高压力的程序中还是会有问题的。并且程序中有意把任务分发的优先级提升,可以用轮询的方式改进一下,类似于ZMQ中的公平队列机制。

2.2 ZMQ轮询

#-*- coding: utf-8 -*-
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.connect('tcp://localhost:5557')

subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5556')
subscriber.setsockopt(zmq.SUBSCRIBE, b'10001')

poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break

    if receiver in socks:
        msg = receiver.recv()
        # 处理任务

    if subscriber in socks:
        msg = subscriber.recv()
        # 更新天气信息

3.

上一篇 下一篇

猜你喜欢

热点阅读