当storm遇上python

2015-04-23  本文已影响0人  Mr_Child

storm是什么

他的官方文档是这样介绍的

Storm is a distributed realtime computation system.

关键词:分布式、实时、计算

你什么时候需要storm

当你有海量数据需要进行实时处理的时候,在这种场景下你往往需要利用到多台机器,而且让你关注的某一类数据按一定的规则路由到确切的节点,从而实现对信息流(往往需是有状态的)的连续计算。
实际上分布式计算就是一大堆节点(一般是在多台机器上)之间的互相通信,而storm管理了这些节点,定义了一个计算的模型(topology)让开发者可以忽略很多细节(比如集群管理、消息队列),从而把实现实时分布式计算任务简单化。

storm的哲学

storm的组件

Paste_Image.png

storm的计算模型

Paste_Image.png

storm的几种路由方式

路由(grouping)定义了stream如何在各个节点之中流动,下面只介绍几种常见方式,如下:
Shuffle grouping: 洗牌模式。随机平均地发配到下游节点上。
Fields grouping: 按照某一个字段来分配,拥有相同值的字段会分配到同一个节点上(即可连续跟踪某个固定特征的数据流)
Global grouping: 强制到某唯一的节点,实际上如果有多个节点去到任务号最低的节点。
all grouping: 强制到所有节点,需小心使用。
Partial Key grouping: 最新支持的,带负载均衡的Fields grouping。
Direct grouping: 手动指定要流动到的节点。

[关于storm的组成部分与计算哲学的更详细文档]

(http://storm.apache.org/documentation/Concepts.html)

hand on the code

假设你已经安装好了storm(请参照官方文档,或者其他一切所能参照的文档,而且请装0.9.2版本,下面会有说明)。
这时候一般的入门会让你开始你的第一个java程序来提交topology,但如标题所预示, 我们这里会使用python(对,只需要python)来进行示例。
首先我们需要一个开源项目的支持,它叫pyleus(yelp公司出品),这里有一个不幸的消息,它对storm的支持仅到0.9.2(最新版本0.9.4的支持正在开发中)。

第一次的提交

$ pip install pyleus
$ git clone https://github.com/Yelp/pyleus.git
$ pyleus build  pyleus/examples/exclamation_topology/pyleus_topology.yaml
$ pyleus local exclamation_topology.jar

只要以上简单几个操作,即可把这个topology提交到本地,如果没有任何错误我们就可以继续接下来的实际例子。

更有意义的例子-数单词

这个例子是pyleus项目自带的,examples目录下还有其他详细的例子可以参考。

这个Topology的目录树
word_count/
|-- word_count/
|   |-- __init__.py
|   |-- count_words.py
|   |-- line_spout.py
|   |-- log_results.py
|   |--split_words.py
|-- pyleus_topology.yaml
pyleus_topology.yaml

此文件定义了这个拓扑基本组成与数据流动

# An ultra-simple topology which shows off Storm and the pyleus.storm library

name: word_count # 自定义拓扑的名字

topology:

    - spout:
        name: line-spout # 自定义spout组件的名字
        module: word_count.line_spout # 代码是word_count文件夹下的line_spout.py

    - bolt:
        name: split-words # 自定义bolt组件的名字
        module: word_count.split_words # 代码是word_count文件夹下的split_words.py
        parallelism_hint: 3 # 并发的节点数
        groupings: 
            - shuffle_grouping: line-spout # 以洗牌模式接收来自line-spout组件的数据流

    - bolt:
        name: count-words # 自定义bolt的名字
        module: word_count.count_words # 代码是word_count文件夹下的count_words.py
        parallelism_hint: 3 # 并发的节点数
        groupings:
            - fields_grouping:
                component: split-words
                fields:
                    - word # 以filed grouping模式接收来自split-words组件的数据流,field字段为word。

    - bolt:
        name: log-results # 自定义bolt的名字
        module: word_count.log_results # 代码是word_count文件夹下的log_results.py文件
        groupings:
            - global_grouping: count-words # 以global grouping方式接收来自count-words组件的数据流

这里数据的流动可以描述为

line-spout > split-words > count-words > log-results

line_spout.py
import logging
import random

from pyleus.storm import Spout

log = logging.getLogger('counter')

LINES = """
Lorem ipsum dolor sit amet, consectetur
adipiscing elit. Curabitur pharetra ante eget
nunc blandit vestibulum. Curabitur tempus mi
a risus lacinia egestas. Nulla faucibus
elit vitae dignissim euismod. Fusce ac
elementum leo, ut elementum dui. Ut
consequat est magna, eu posuere mi
pulvinar eget. Integer adipiscing, quam vitae
pretium facilisis, mi ligula viverra sapien,
nec elementum lacus metus ac mi.
Morbi sodales diam non velit accumsan
mollis. Donec eleifend quam in metus
faucibus auctor. Cras auctor sapien non
mauris vehicula, vel aliquam libero luctus.
Sed eu lobortis sapien. Maecenas eu
fringilla enim. Ut in velit nec
lectus tincidunt varius. Sed vel dictum
nunc. Morbi mollis nunc augue, eget
sagittis libero laoreet id. Suspendisse lobortis
nibh mauris, non bibendum magna iaculis
sed. Mauris interdum massa ut sagittis
vestibulum. In ipsum lacus, faucibus eu
hendrerit at, egestas non nisi. Duis
erat mauris, aliquam in hendrerit eget,
aliquam vel nibh. Proin molestie porta
imperdiet. Interdum et malesuada fames ac
ante ipsum primis in faucibus. Praesent
vitae cursus leo, a congue justo.
Ut interdum tellus non odio adipiscing
malesuada. Mauris in ante nec erat
lobortis eleifend. Morbi condimentum interdum elit,
quis iaculis ante pharetra id. In
""".strip().split('\n')


class LineSpout(Spout):

    OUTPUT_FIELDS = ["line"] # 定义要输出的字段名与数量

    def next_tuple(self):
        line = random.choice(LINES)
        log.debug(line)
        # 这里tup_id是可选的
        # 如果你要让storm跟踪你的tuple流动的话需要加上
        # storm的可靠性保证需要这个
        # (line,) 这个tuple (刚好python的属于与storm的属于对上了)
        # 对应之前设置的OUTPUT_FIELDS
        self.emit((line,), tup_id=random.randrange(999999999))


if __name__ == '__main__':
    # 这里是无法通过print的方式从终端输出调试结果的
    # 所以这里采取的方式是写临时文件
    # 实际上如果多个节点同时写一个文件会存在竞争的情况
    # 不过这里仅供调试,所以暂时忽略这个问题
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_lines.log',
        format="%(message)s",
        filemode='a',
    )

    LineSpout().run()

这个spout的作用是把一个文本分拆成行,每行作为一个tuple输出给下游的bolt。
看下word_count_lines.log的内容:

$ head word_count_lines.log
lobortis eleifend. Morbi condimentum interdum elit,
erat mauris, aliquam in hendrerit eget,
vestibulum. In ipsum lacus, faucibus eu
quis iaculis ante pharetra id. In
Ut interdum tellus non odio adipiscing
nunc. Morbi mollis nunc augue, eget
elit vitae dignissim euismod. Fusce ac
nunc. Morbi mollis nunc augue, eget
lectus tincidunt varius. Sed vel dictum
lobortis eleifend. Morbi condimentum interdum elit,
split_words.py

import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('splitter')


class SplitWordsBolt(SimpleBolt):

    OUTPUT_FIELDS = ["word"] # 定义输出的字段只有一个,名为word

    def process_tuple(self, tup):
        line, = tup.values # 接收到上游的tuple
        log.debug(line)
        for word in line.split():
            log.debug(word)
            # 这里bolt用于跟踪tuple的参数是anchors
            # 并且需要把上游的tuple传入
            # 把word传给下游
            self.emit((word,), anchors=[tup]) 


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_split_words.log',
        format="%(message)s",
        filemode='a',
    )
    SplitWordsBolt().run()

这个bolt的作用是,对收到的line拆解成word单词,并传给下游
他的输出是 word_count_split_words.log

$head word_count_split_words.log
erat mauris, aliquam in hendrerit eget,
erat
lobortis eleifend. Morbi condimentum interdum elit,
lobortis
mauris,
vestibulum. In ipsum lacus, faucibus eu
aliquam
vestibulum.
in
eleifend.
count_words.py

count-words这个组件使用了field grouping
以field grouping模式接收来自split-words组件的数据流,field字段为word
所以,相同的word单词,会流动到同一个节点。

from collections import defaultdict
from collections import namedtuple
import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('counter')

Counter = namedtuple("Counter", "word count") # 输出是两个字段


class CountWordsBolt(SimpleBolt):

    OUTPUT_FIELDS = Counter # 输出是两个字段

    def initialize(self):
        # 在bolt启动的时候初始化
        # bolt是作为单例一直运行的
        self.words = defaultdict(int) 


    def process_tuple(self, tup):
        word, = tup.values # 获得上游的word
        self.words[word] += 1 # 计数
        log.debug("{0} {1}".format(word, self.words[word]))
        # 注意这里输出到下游的是两个字段 word 与 word的计数
        self.emit((word, self.words[word]), anchors=[tup])


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_count_words.log',
        format="%(message)s",
        filemode='a',
    )

    CountWordsBolt().run()

这个bolt的作用是对通用的单词进行计数,并且把两个字段:单词本身跟单词计数传递给下游
word_count_count_words.log的内容:

$ tail word_count_count_words.log
Ut 35894
laoreet 9472
Maecenas 8294
id. 19047
sapien, 11816
Suspendisse 9472
blandit 9599
Mauris 24100
erat 19047
a 16677
log_results.py

log-results组件用的是global grouping
在这种情况下所有来自上游的数据流都会到同一个节点,这样log写文件的话就不会有竞争问题了

import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('log_results')


class LogResultsBolt(SimpleBolt):

    def process_tuple(self, tup):
        word, count = tup.values # 从上游接收两个字段
        log.debug("%s: %d", word, count)


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_results.log',
        format="%(message)s",
        filemode='a',
    )

    LogResultsBolt().run()

这个bolt的作用只是落地,这里落地的方式是写log文件(一般生产场景会落地到数据库),落地文件内容:

tail word_count_results.log
faucibus: 28713
adipiscing,: 8526
molestie: 11949
at,: 9742
Maecenas: 8392
diam: 11934
eget.: 9714
quam: 17203
mauris,: 24189
tincidunt: 9662

提交此topology到远程

pyleus build examples/word_count/pyleus_topology.yaml
pyleus submit -n NIMBUS_HOST exclamation_topology.jar

这里NIMBUS_HOST为远程的nimbus地址

pyleus的详细文档

关于spout

可以看到上面数单词的例子实际上数据流的来源是一个测试程序随机产生的,在实际生产环境中,我们一般会采用kafka来作为数据产生的源头,
一个kafka的spout定义如下
(https://github.com/Yelp/pyleus/tree/develop/examples/kafka_spout)

# 这里的定义是没有任何操作的,可以通过为它增加bolt来实现功能
name: kafka_spout_example # 自定义topology名字

topology:

    - spout:
        name: kafka-my_topic # 自定义spout名字
        type: kafka # 制定类型为kafka
        options:
            # 配置kafka的topic
            topic: my_topic

            # 配置zookeeper地址,多个用逗号隔开
            zk_hosts: zookeeper1:2181,zookeeper2:2181

            # 配置给kafka存储consumer offsets 的ZooKeeper Root path
            # 默认为: /pyleus-kafka-offsets/<topology name>
            zk_root: /pyleus-kafka-offsets/kafka_spout_example

            # Kafka consumer ID.
            # 默认为: pyleus-<topology name>
            consumer_id: pyleus-kafka_spout_example

            # 需要从某个offset开始吗
            # 默认是false.
            from_start: false

            # 如果需要从某个offset开始则定义该offset
            start_offset_time: 1398971060
上一篇下一篇

猜你喜欢

热点阅读