后端开发者Python消息队列

ZeroMQ指南:第一章——基础

2016-11-21  本文已影响2712人  lakerszhy
翻译自“http://zguide.zeromq.org/py:all”

拯救世界

开始的假设

我们假设你使用ZeroMQ 3.2以上的版本。我们假设你使用Linux或者类似的操作系统。我们假设你或多或少能看懂C语言,因为这是示例的默认语言。我们假设当看到类似PUSH或SUBSCRIBE这样的常亮时,你能知道它们的真名是ZMQ_PUSH或ZMQ_SUBSCRIBE。

获得示例

提问和回答

让我们开始写代码。我们从Hello World示例开始。我们会开发一个客户端和一个服务端。客户端发送“Hello”到服务端,服务端回复“World”。下面是Python的服务端,在5555端口启动一个ZeroMQ socket,从中读取请求,并回复“World”给每一个请求:

#   Hello World server in Python
#   Binds REP socket to tcp://*:5555
#   Expects b"Hello" from client, replies with b"World"

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")

图2:请求——回复
<div align=center>


</div>

REQ-REP socket对是同步的。客户端在循环中(或者只发起一次)发起zmq_send(),然后发起zmq_recv()。其它任何顺序(比如,一次发送两个消息)都会导致发送或接收返回-1。同样的,服务端按顺序发起zmq_recv(),然后发起zmq_send()。

下面是客户端的代码:

#   Hello World client in Python
#   Connects REQ socket to tcp://localhost:5555
#   Sends "Hello" to server, expects "World" back

import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

#  Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s …" % request)
    socket.send(b"Hello")

    #  Get the reply.
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

在实际应用看起来太简单了,但是向我们已经学过的那样,ZeroMQ有超能力。你可以向这个服务端一次发起几千个客户端,它也会继续快速的工作。为了好玩,先启动客户端,然后启动服务端,它仍然可以正常工作,然后想想这意味着什么。

让我们简单的解释一下这两个程序做了什么。它们创建了一个ZeroMQ的context和一个socket。不用担心这些名词的意思,稍后会解释。服务端绑定它的REP(回复)socket到5555端口。服务端在循环中等待请求,然后每次响应一个回复。客户端发送一个请求,并从服务端读取回复。

如果你杀死服务端(Ctrl-C),并重新启动,客户端不会正确的恢复。从崩溃进程中恢复很不容易。开发一个可靠的请求——回复流很复杂,我们会在第四章讨论。

这个场景背后发生了什么事情,但对于程序员来说代码很简洁,甚至在大量负载下也不会经常崩溃。这就是请求——回复(request-reply)模式,可能是使用ZeroMQ最简单的方式。它对应RPC和经典的客户端/服务端模型。

字符串小提示

除了发送的字节数,ZeroMQ不知道你发送的任何数据。这意味着你需要负责格式化数据,让应用程序可以取回数据。格式化对象和负责数据类型是专业库的工作,比如Protocol Buffers。但是对于字符串,你需要小心。

在C和其它一些语言中,字符串以null结尾。我们发送一个字符串,比如“HELLO”,会额外附加一个null字节:

zmq_send (requester, "Hello", 6, 0);

如果从其它语言发送字符串,可能不会包括null字节。例如,当用Python发送同样的字符串时:

socket.send ("Hello")

那么发送到网络上的是一个长度(较短的字符串只需要一个字节),以及字符串的内容作为单独的字符。

图3:一个ZeroMQ字符串
<div align=center>


</div>

如果你用C语言读取这个字符串,你会得到一个看起来像字符串的东西,可能偶尔行为像字符串(如果幸运的话,这5个字节发现它们后面跟着一个不知不觉潜伏的null),但它不是一个严格意义上的字符串。当你的客户端和服务端的字符串格式不同时,你会得到怪异的结果。

当你用C语言从ZeroMQ中接收字符串数据时,你不能简单的相信它安全的结束了。每次读取字符串,你应该分配一个新的buffer,包括一个额外的字节,拷贝字符串,并用null正确的结束。

因此,我们确定了规则,ZeroMQ的字符串是指定长度的,在网络上发送时不带结尾的null。最简单的情况下(我们会在示例中这么做),一个ZeroMQ字符串完整的对应一个ZeroMQ消息帧(message frame),就像图3所示——一个长度加一些字节。

在C语言中接收一个ZeroMQ字符串,并传递给应用程序一个有效的C语言字符串,我们需要这么做:

//  Receive ZeroMQ string from socket and convert into C string
//  Chops string at 255 chars, if it's longer
static char *
s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    if (size > 255)
        size = 255;
    buffer [size] = 0;
    return strdup (buffer);
}

这是一个便捷函数,本着复用的原则,我们写了一个类似的s_send函数,以正确的ZeroMQ格式发送字符串,并打包到头文件中。

在zhelpers.h中,可以用C语言写一些简短实用的ZeroMQ程序。它的源码很长,只提供给C语言开发者。

获得版本号

ZeroMQ does come in several versions and quite often, if you hit a problem, it'll be something that's been fixed in a later version. So it's a useful trick to know exactly what version of ZeroMQ you're actually linking with.
ZeroMQ有很多版本,并且经常更新。如果你遇到了问题,可能在新版本中已经修复了。因此,了解你正在使用的ZeroMQ版本是一个很有用的技巧。

以下是获得ZeroMQ版本的代码:

# Report 0MQ version
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import zmq

print("Current libzmq version is %s" % zmq.zmq_version())
print("Current  pyzmq version is %s" % zmq.__version__)

对外发送消息

第二个经典模式是单向数据分发,其中服务端推送更新到一组客户端。让我们看一个推送天气更新的示例,其中包括邮编,温度和相对湿度。我们会随机生成这些值。

以下是服务端,我们使用5556端口:

#   Weather update server
#   Binds PUB socket to tcp://*:5556
#   Publishes random weather updates

import zmq
from random import randrange

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)

    socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

There's no start and no end to this stream of updates, it's like a never ending broadcast.
这个更新流没有开始和结束,像一个永远不会终止的广播。

以下是客户端程序,监听更新流,并抓取指定邮编的数据,默认值是纽约:

#   Weather update client
#   Connects SUB socket to tcp://localhost:5556
#   Collects weather updates and finds avg temp in zipcode

import sys
import zmq

#  Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server…")
socket.connect("tcp://localhost:5556")

# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"

# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
    zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

# Process 5 updates
total_temp = 0
for update_nbr in range(5):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

print("Average temperature for zipcode '%s' was %dF" % (
      zip_filter, total_temp / (update_nbr+1))
)

图4:发布——订阅
<div align=center>


</div>

注意,当你使用SUB socket时,你需要设置使用zmq_setsockopt()和SUBSCRIBE设置一个订阅。如果没有设置任何订阅,你不会收到任何消息。对于初学者,这是一个常见的错误。订阅者可以设置多个订阅。如果一个更新匹配任何一个订阅,订阅者就会收到更新。订阅者也可以取消指定的订阅。一个订阅通常,但不是必须的,是一个可打印的字符串。

PUB-SUB socket对是异步的。客户端在循环中调用zmq_recv()(或者只调用一次)。发送消息给SUB socket会导致错误。类似的,服务端调用zmq_send(),但是不能在PUB socket上调用zmq_recv()。

在ZeroMQ的理论中,它不关心哪个终端连接,哪个终端绑定。但是实际中存在不成文的区别,以后会讨论。现在,绑定PUB,连接SUB,除非你的网络设计不支持。

关于PUB-SUB sockets,还有一件更重要的事情需要知道:你不知道订阅者什么时候开始获取消息。甚至你启动一个订阅者,等待一会儿,然后启动发布者,订阅者总会错过发布者发送的第一条消息。这是因为订阅者连接到发布者(需要花费一点时间),发布者可能已经发送了消息出去。

很多开发者都遇到了这个“缓慢加入者”(slow joiner)的症状,之后我们会详细解释。记住,ZeroMQ是异步 I/O,比如在后台。你有两个节点做这件事,以如下顺序:

然后订阅者很可能收不到任何消息。你会眨眼,检查你设置了正确的过滤器,然后再次尝试,但是订阅者仍然收不到任何消息。

一个TCP连接涉及到握手,根据网络和节点之间的跳转数量,需要花费几毫秒时间。这个时间中,ZeroMQ可以发送很多消息。为了讨论,假设它需要5毫秒建立连接,同一个链接每秒可以处理1M消息。在这5毫秒中,订阅者正在连接到发布者,它让发布者只有1毫秒时间发送1K消息。

在第2章,我们会解释如何同步发布者和订阅者,这样你不会开始发布数据,知道订阅者真正完成连接。有一个简单愚蠢的方式延迟发布者,也就是sleep。在实际应用程序中不要这么做。因为它极其脆弱,不雅和缓慢。使用sleep验证发生了什么,然后等到第2章看如何正确的解决。

同步的另一种选择是简单的假设发布的数据流是无限的,没有开始,没有结束。还假设订阅者不关心它启动之前丢失的数据。我们的天气客户端示例基于这种假设。

客户端订阅它选择的邮编,收集该邮编的100次更新。如果邮编是随机分布的,这意味着服务端大约发布一千万次更新。你可以启动客户端,然后启动服务端,客户端会继续工作。你可以停止和重启服务端,客户端会继续工作。当客户端收到100次更新,它会计算并打印平均值,然后退出。

发布——订阅(pub-sub)模式的一些要点:

这是在我笔记本电脑上(2011年 Inter i5处理器)接收和过滤10M消息花费的时间,不错,但没什么特别的:

$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F

real    0m4.470s
user    0m0.000s
sys     0m0.008s

分而治之

图5:并行管道
<div align=center>


</div>

作为最后一个示例,让我们做一点超级运算,然后喝一杯咖啡。我们的超级计算程序是一个相当典型的并行处理模型。我们有:

实际中,workers运行在超快的盒子中,可能使用GPUs完成大量的运算。以下是ventilator。它产生100个任务,每个消息告诉worker休眠几毫米:

# Task ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import zmq
import random
import time

try:
    raw_input
except NameError:
    # Python 3
    raw_input = input

context = zmq.Context()

# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

print("Press Enter when the workers are ready: ")
_ = raw_input()
print("Sending tasks to workers…")

# The first message is "0" and signals start of batch
sink.send(b'0')

# Initialize random number generator
random.seed()

# Send 100 tasks
total_msec = 0
for task_nbr in range(100):

    # Random workload from 1 to 100 msecs
    workload = random.randint(1, 100)
    total_msec += workload

    sender.send_string(u'%i' % workload)

print("Total expected cost: %s msec" % total_msec)

# Give 0MQ time to deliver
time.sleep(1)

以下是worker程序。它接收一个消息,休眠接收到的秒数,然后发出信号,它完成了:

# Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Process tasks forever
while True:
    s = receiver.recv()

    # Simple progress indicator for the viewer
    sys.stdout.write('.')
    sys.stdout.flush()

    # Do the work
    time.sleep(int(s)*0.001)

    # Send results to sink
    sender.send(b'')

以下是sink程序。它收集100个任务,然后计算整个处理时间,因此我们可以确定workers是并行运行:

# Task sink
# Binds PULL socket to tcp://localhost:5558
# Collects results from workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmations
for task_nbr in range(100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))

一次的平均耗时是5秒。当启动1,2或4个works时,从sink获得的结果如下:

Let's look at some aspects of this code in more detail:
让我们看下这些代码的细节:

图6:Fair Queuing
<div align=center>


</div>

管道模式也有“缓慢加入者”(slow joiner)症状,导致PUSH sockets不能正确的负载均衡。如果你正在使用PUSH和PULL,如果其中一个worker比其它的接收到更多的消息,这是因为该PULL socket比其它的更快加入,在其它workers连上之前获得更多消息。如果你想完全的负载均衡,你可能想要阅读第3章的负载均衡模式。

使用ZeroMQ编程

看完一些示例后,你肯定急于使用ZeroMQ。在你开始之前,深呼吸,放轻松,并仔细考虑一些基础的建议,这会让减轻你的压力和困惑。

正确获得Context

ZeroMQ程序总是从创建一个context开始,然后使用它创建sockets。在C语言中,调用zmq_ctx_new()。你应该在线程中正确创建和使用一个context。从技术上讲,context是单个线程中所有sockets的容器,并且是inproc sockets的传送带。如果运行时,一个进程中有两个context,它们就像单独的ZeroMQ实例。如果这确实是你想要的,可以,否则请记住:

在一个进程启动时调用zmq_ctx_new(),并在结束时调用一次zmq_ctx_destroy()。

如果你正在使用fork()系统调用,在fork之后,其它子进程代码之前调用zmq_ctx_new()。通常,你想要在子进程中做有趣的事情,在父进程中做无聊的进程管理。

干净的退出

优秀的程序员和优秀的杀手有同样的座右铭:完成工作后总是清理干净。当你使用类似Python语言时,资源会自动释放。使用C语言时,当你使用完对象后,你需要小心的释放它们,否则你会获得内存泄漏,不稳定的程序,通常会有不好的报应。

内存泄漏是一件事,但是ZeroMQ对于如何退出程序十分讲究。原因是技术上的,并且很痛苦,结果是,如果你让任何sockets打开,zmq_ctx_destroy()函数会永远挂起。默认情况下,即使你关闭所有sockets,如果有挂起的连接或者发送,zmq_ctx_destroy()会永远等待,除非在关闭sockets之前设置LINGER为零。

我们需要关心ZeroMQ的messages,sockets和contexts对象。幸运的是,起码在简单的程序中,它很简单:

最起码用C语言开发时需要小心。对于自动释放对象的语言,sockets和contexts会在离开对象域时销毁。如果你没有使用这类语言,你需要在类似“final”块中完成清理工作。

如果你正在使用多线程,情况会更复杂。我们会在下一章介绍多线程,但是有些人会在完全理解之前试图使用多线程,以下是退出ZeroMQ多线程程序的快速和不讨好的指南。

首先,不要再多个线程中使用同一个socket。请不要解释你认为这会很有趣,请不要这么做。接着,你需要关闭每个继续请求的socket。正确的方式是设置一个低的LINGER值(1秒),并且关闭socket。当你销毁一个context时,如果你使用的语言绑定没有自动这么做,请发送一个patch。

最后,销毁context。这会导致附属线程中(比如,共享同一个context)所有阻塞的接收,轮询或者发送返回一个错误。捕获该错误,然后设置一个持续时间,并在该线程中关闭socket,最后退出。不要销毁同一个context两次。主线程中的zmq_ctx_destroy会阻塞,直到所有sockets安全关闭。

瞧!这足够复杂和痛苦。所有语言绑定作者都会自动完成这项工作,不用再做额外的工作。

为什么需要ZeroMQ

你已经使用过ZeroMQ,让我们回头看看为什么使用它。

现在,很多程序由横跨多种网络(局域网或者互联网)的组件构成。很多开发者倒在消息通讯上。有些开发者使用消息队列产品,但是大部分情况他们自己使用TCP或UDP。这些协议不难使用,但是从A发送一些字节到B,以及通过各种可靠方式传递消息之间,有很大的差异。

让我们看看使用原始TCP时,会遇到的典型问题。所有可复用的消息层都需要解决所有或大部分这些问题:

打开一个典型的开源项目,比如Hadoop Zookeeper,阅读src/c/src/zookeeper.c中的C API。当我2013年1月阅读代码时,里面有4200行没有注释的,客户端/服务端网络通信协议代码。它很高效,因为使用poll代替了select。但是,Zookeeper应该使用通用消息层和注释清楚的网络层协议。重复造轮子很浪费团队的时间。

但是如何开发一个可复用的消息层?这么多项目需要这项技术,为什么人们仍然使用TCP sockets这种困难的方式,并且一次次解决列表中这些问题?

事实证明,开发一个可复用的消息系统真的很难,这也是为什么几乎没有FOSS项目尝试,以及为什么商业消息产品很复杂,很贵,很顽固和很脆弱。在2006年,iMatix设计了AMQP,让FOSS开发者第一次用上了可复用的消息系统。AMQP比其它设计更好,但仍然很复杂,昂贵和脆弱。它需要几周时间学习如何使用,几个月时间创建稳定的架构。

图7:最初的消息传输
<div align=center>


</div>

绝大部分消息项目,比如AMQP,试图通过创造一个可复用的新概念——“broker”,完成寻址,路由和队列功能来解决列表中的问题。结果是,在一些未经文档化的协议顶部的客户端/服务端协议,或者一组APIs中,允许程序与broker交互。Brokers在降低网络通讯的复杂性上是卓越的。但是在产品(比如Zookeeper)中增加基于broker的消息通信会更糟糕。这意味着增加了一个额外的大盒子,和一个新的故障点。Broker迅速成为瓶颈和风险。如果软件支持它,我们可以增加第二个,第三个和第四个broker,以及故障转移计划。需要开发者完成这项工作。它创建了更多可移动组件,更复杂,更容易发生故障。

一个broker-centric设置需要自己的运行团队。你需要不加夸张的日夜监控brokers,当它们不正常工作时,你需要解决问题。你需要盒子,需要备份盒子,还需要人员管理这些盒子。有很多可移动组件的,几个团队,几年时间开发的大型程序才值得这么做。

图8:之后的消息传输
<div align=center>


</div>

所以中小型应用程序开发者被困住了。不管是避免网络编程,或者开发一个不能扩展得应用程序。还是一头扎进网络编程,开发脆弱的,复杂的,难以维护的应用程序。或者他们投注在一个消息产品上,依赖昂贵的,容易出问题的技术上。没有真正好的选择,这也是为什么消息传输还停留在上个世纪,并激起了强烈的情绪:用户的负面情绪,销售许可证和技术支持人员却幸灾乐祸。

我们需要的是完成消息传输工作,但在任何程序中都可以简单,方便完成工作的方式。它应该是一个库,不需要其他依赖。没有额外的组件,也就没有额外的风险。它应该可以在所有操作系统和编程语言工作。

这就是ZeroMQ:一个高效的,可嵌入的库,不需要多少成本,就漂亮的解决了网络的大部分问题。

尤其是:

实际上,ZeroMQ比这做得更多。它在开发网络应用程序上有颠覆性的影响。表面上看,它是一个受socket启发的API,你在socket上调用zmq_recv()和zmq_send()。但是消息处理迅速成为中心点,你的应用程序很快分解为一组消息处理任务。它是优雅和自然的。它可扩展:这些每个任务对应一个节点,节点之间通过任意传输协议通讯。两个节点在一个进程(节点是线程),两个节点在同一个机器(节点是进程),或者两个在同一个网络(节点是机器),都是一样的,程序代码不用改变。

Socket的可扩展性

让我们看看实际中ZeroMQ的扩展性。这是一个启动了一个天气服务端和几个并行客户端的脚本:

wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &

当客户端运行,我们使用top命令查看激活的进程,看起来像这样(在四核机器上):

PID  USER  PR  NI  VIRT  RES  SHR S %CPU %MEM   TIME+  COMMAND
7136  ph   20   0 1040m 959m 1156 R  157 12.0 16:25.47 wuserver
7966  ph   20   0 98608 1804 1372 S   33  0.0  0:03.94 wuclient
7963  ph   20   0 33116 1748 1372 S   14  0.0  0:00.76 wuclient
7965  ph   20   0 33116 1784 1372 S    6  0.0  0:00.47 wuclient
7964  ph   20   0 33116 1788 1372 S    5  0.0  0:00.25 wuclient
7967  ph   20   0 33072 1740 1372 S    5  0.0  0:00.35 wuclient

让我们想想这里发生了什么。天气服务端只有一个socket,这里我们让它并行给5个客户端发送数据。我们可以有几千个并行客户端。服务端应用程序看不见它们,不会直接与它们通讯。所以ZeroMQ socket像一个小型服务端,悄悄地接收客户端请求,只要网络可以处理,就会尽快推送数据到客户端。它是一个多线程服务端,充分利用你的CPU。

从ZeroMQ v2.2升级到ZeroMQ v3.2

警告:不稳定的示范

典型的网络编程建立在一个普遍的假设基础上:一个socket与一个连接,一个终端通讯。有多路广播协议,但它们是独特的。当我们假设一个socket等于一个连接时,我们以特定方式扩展架构。我们创建逻辑线程,每个线程处理一个socket,一个终端。我们在这些线程中处理逻辑和状态。

在ZeroMQ世界中,sockets是进入小而快的后台通讯引擎的入口,该引擎自动管理所有连接。你不能查看,操作,打开,关闭,或者附加状态到这些连接。不管你使用阻塞发送或接收,或者poll,你只能与socket交互,而不是socket为你管理的连接。连接是私有的,不可见的,这是ZeroMQ可扩展性的关键。

因为你的代码与socket交互,所以可以处理任何网络协议的任意连接数量。ZeroMQ中的消息传输模式比你的应用程序代码中的消息传输模式更容易扩展。

所以,普遍的建设不再成立。当你阅读代码示例时,你的代码试图映射你已经知道的知识。当你看到“socket”,会想“哦,这表示到另一个节点的连接”。这是错误的。当你看到“线程”,会再次想”哦,线程表示到另一个节点的连接“,你的大脑再次错了。

如果你第一次阅读这篇指南,当你真正编写一段时间ZeroMQ代码后,才会认识到这一点。你可能会感到困惑,尤其是ZeroMQ让事情变得这么简单,你可能试图强加这些假设在ZeroMQ上,但这不对。

上一篇下一篇

猜你喜欢

热点阅读