Storm学习笔记
Storm简介
Storm是什么
- Storm是Twitter开源的一个分布式的实时计算系统,用于数据的实时分析,持续计算,分布式RPC等等
- Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的
JStorm
- 阿里的JStorm:http://120.25.204.125/index_cn.html
实时计算
-
扩展了解
-
实时计算解决的问题
- 实时推荐系统,其中
Hadoop
只是做离线的数据分析(海量数据分析),无法做到实时分析计算 - 车流量实时计算,比如用
Storm
计算每一个路段的拥挤度等相关路况信息 - 股票系统
- 实时推荐系统,其中
-
实现实时计算系统
- 低延迟
- 高性能
- 分布式
- 可扩展
- 容错
- 可靠性
- 快速,系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列(0.9.0版本以前)
- 本地模式:Storm有一个本地模式,可以在处理过程中完全模拟Storm集群,可快速进行开发和单元测试
Storm体系结构
Storm与Hadoop的对比
结构 | Hadoop | Storm |
---|---|---|
主节点 | JobTracker | Nimbus |
从节点 | TaskTracker | Supervisor |
应用程序 | Job | Topology |
工作进程名称 | Child | Worker |
计算模型 | Map / Reduce | Spout / Bolt |
架构
-
架构结构图
img -
体系结构
Storm主要分为两种组件Nimbus和Supervisor。这两种组件都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper上的,提交的代码资源都在本地机器的硬盘上。
-
Nimbus负责在集群里面发送代码,分配工作给机器,并且监控状态。全局只有一个。
-
Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程Worker。每一个要运行Storm的机器上都要部署一个,并且,按照机器的配置设定上面分配的槽位数。
-
Zookeeper是Storm重点依赖的外部资源。Nimbus和Supervisor甚至实际运行的Worker都是把心跳保存在Zookeeper上的。Nimbus也是根据Zookeerper上的心跳和任务运行状况,进行调度和任务分配的。
-
Storm提交运行的程序称为Topology。
-
Topology处理的最小的消息单位是一个Tuple,也就是一个任意对象的数组。
-
Topology由Spout和Bolt构成。Spout是发出Tuple的结点。Bolt可以随意订阅某个Spout或者Bolt发出的Tuple。Spout和Bolt都统称为component。
-
逻辑图(水管是
imgSpout
,闪电是Bolt
) -
提交流程图
img
-
-
Storm环境搭建
环境准备
-
关闭防火墙,修改/etc/hosts配置(3台机器的IP可以互相通信)
-
配置JDK环境
-
搭建Zookeeper集群(保证3台机器的Zookeeper都可用)
-
安装python(最好是2.6.6版本以上)
-
目前CentOS自带了python
[root@yann-centos-187 local]# python Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2 Type "help", "copyright", "credits" or "license" for more information.
集群搭建
-
下载并解压Storm发布版本
[root@yann-centos-187 software]# tar -zxf apache-storm-0.9.2-incubating.tar.gz -C /usr/local/
配置Storm环境变量
#set java environment JAVA_HOME=/usr/local/jdk1.7.0_25 JRE_HOME=/usr/local/jdk1.7.0_25/jre ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.9 STORM_HOME=/usr/local/apache-storm-0.9.2-incubating CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$ZOOKEEPER_HOME/bin:$STORM_HOME/bin export JAVA_HOME JRE_HOME CLASS_PATH PATH ZOOKEEPER_HOME STORM_HOME
-
修改storm.yaml配置文件
[root@yann-centos-187 apache-storm-0.9.2-incubating]# cd conf/ [root@yann-centos-187 conf]# ls storm_env.ini storm.yaml [root@yann-centos-187 conf]# vim storm.yaml
修改下面一段
########### These MUST be filled in for a storm configuration storm.zookeeper.servers: - "192.168.1.187" - "192.168.1.188" - "192.168.1.189" # 主节点 nimbus.host: "192.168.1.187" # 数据文件夹 storm.local.dir: "/usr/local/apache-storm-0.9.2-incubating/data" ui.port: 18080 # 配置supervisor的工作进程(slot槽) supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
把187机器上的Storm拷贝给其他两台机器
scp -r apache-storm-0.9.2-incubating/ root@192.168.1.188:/usr/local/ scp -r apache-storm-0.9.2-incubating/ root@192.168.1.189:/usr/local/
修改其他两台机器的Storm环境变量
刷新3台机器的环境变量
[root@yann-centos-187 local]# source /etc/profile
-
启动Strom各个后台进程
-
后台启动主节点
[root@yann-centos-187 local]# storm nimbus & [1] 5327
-
后台启动从节点
[root@yann-centos-188 local]# storm supervisor & [1] 6007
-
若此时没启动成功,则需要修改/etc/hosts文件
192.168.1.187 yann-centos-187 192.168.1.188 yann-centos-188 192.168.1.189 yann-centos-189
-
使用
jps
查看进程是否启动(QuorumPeerMain
为Zookeeper的进程)-
主节点
[root@yann-centos-187 conf]# jps 4369 QuorumPeerMain 5539 nimbus 5583 Jps
查看日志
[root@yann-centos-187 logs]# tail -f -n 10 nimbus.log 2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] EventThread shut down 2017-07-10 06:31:49 o.a.z.ZooKeeper [INFO] Session: 0x5d2859c6280003 closed 2017-07-10 06:31:49 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting 2017-07-10 06:31:49 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=192.168.1.187:2181,192.168.1.188:2181,192.168.1.189:2181/storm sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4c9fec 2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] Opening socket connection to server yann-centos-187/192.168.1.187:2181. Will not attempt to authenticate using SASL (unknown error) 2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] Socket connection established to yann-centos-187/192.168.1.187:2181, initiating session 2017-07-10 06:31:49 o.a.z.ClientCnxn [INFO] Session establishment complete on server yann-centos-187/192.168.1.187:2181, sessionid = 0x5d2859c6280004, negotiated timeout = 20000 2017-07-10 06:31:49 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2017-07-10 06:31:49 o.a.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered. 2017-07-10 06:31:49 b.s.d.nimbus [INFO] Starting Nimbus server...
-
从节点
[root@yann-centos-188 local]# jps 6224 Jps 6181 supervisor 2851 QuorumPeerMain
查看日志
[root@yann-centos-188 logs]# tail -f -n 10 supervisor.log 2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] EventThread shut down 2017-07-10 06:33:16 o.a.z.ZooKeeper [INFO] Session: 0x15d285afbe70001 closed 2017-07-10 06:33:16 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting 2017-07-10 06:33:16 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=192.168.1.187:2181,192.168.1.188:2181,192.168.1.189:2181/storm sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@72345f3c 2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] Opening socket connection to server yann-centos-188/192.168.1.188:2181. Will not attempt to authenticate using SASL (unknown error) 2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] Socket connection established to yann-centos-188/192.168.1.188:2181, initiating session 2017-07-10 06:33:16 o.a.z.ClientCnxn [INFO] Session establishment complete on server yann-centos-188/192.168.1.188:2181, sessionid = 0x15d285afbe70002, negotiated timeout = 20000 2017-07-10 06:33:16 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2017-07-10 06:33:16 o.a.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered. 2017-07-10 06:33:16 b.s.d.supervisor [INFO] Starting supervisor with id dc312fc6-efb2-4170-be2b-c94a446fa2a4 at host yann-centos-188
-
在主节点的机器上启动
UI
[root@yann-centos-187 local]# storm ui & [1] 2940 [root@yann-centos-187 logs]# tail -f -n 10 ui.log 2017-07-10 06:41:26 o.m.log [INFO] Logging to Logger[org.mortbay.log] via org.mortbay.log.Slf4jLog 2017-07-10 06:41:26 o.m.log [INFO] jetty-6.1.26 2017-07-10 06:41:26 o.m.log [INFO] Started SocketConnector@0.0.0.0:18080
-
访问UI:http://192.168.1.187:18080
-
-
Storm Hello World 示例
建立maven工程
-
引入依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.2-incubating</version> </dependency>
-
按如下流程开发java代码(Spout是数据来源,这个图整体就是一个Topology图)
img -
编写数据源类
Spout
,有如下两种方式- 继承BaseRichSpout类
- 实现IRichSpout接口
- 几个需要重写或实现的方法
- open
- nextTuple
- declareOutputFields
-
数据处理类Bolt,有两种方式
- 继承BaseBasicBolt类
- 实现IRichBolt接口
- 几个需要重写或实现的方法
- Execute
- declareOutputFields
-
最后编写主函数
Topology
去提交一个任务- Storm提供两种模式使用Topology
- 本地模式:无需Storm集群,直接在java中运行,一般用于测试和开发阶段,执行
main
函数即可 - 集群模式:需要Storm集群,把实现的java程序打成jar,然后使用Storm命令把Topology提交到集群中
- 本地模式:无需Storm集群,直接在java中运行,一般用于测试和开发阶段,执行
- Storm提供两种模式使用Topology
本地模式
- 直接运行main即可
集群模式
-
首先需要把可执行代码打成jar
- 注意jar的编译版本和linux的jdk版本一致
- 这里需要把3台机器的jdk都改为1.8
- 为何制定了maven的编译版本为1.7,打出来的包还是1.8点?
- 注意jar的编译版本和linux的jdk版本一致
-
把jar拷贝到linux系统中,执行
[root@yann-centos-187 local]# storm jar archi-storm-0.0.1-SNAPSHOT.jar cn.ares.cocoon.storm.simple.topology.PWTopology1 819 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 840 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar archi-storm-0.0.1-SNAPSHOT.jar to assigned location: /usr/local/apache-storm-0.9.2-incubating/data/nimbus/inbox/stormjar-8d26f835-df7e-45ae-a834-cb8b04cc4f94.jar 862 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/apache-storm-0.9.2-incubating/data/nimbus/inbox/stormjar-8d26f835-df7e-45ae-a834-cb8b04cc4f94.jar 862 [main] INFO backtype.storm.StormSubmitter - Submitting topology top1 in distributed mode with conf {"topology.workers":2,"topology.debug":true} 1410 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: top1
-
然后查看数据
- top1在运行
- 5个task
- 2个worker
[root@yann-centos-187 local]# storm list 1910 [main] INFO backtype.storm.thrift - Connecting to Nimbus at 192.168.1.187:6627 Topology_name Status Num_tasks Num_workers Uptime_secs ------------------------------------------------------------------- top1 ACTIVE 5 2 19
-
在ui上也可以看到这些信息
3377CC6C-4465-444A-999E-9566A6EB45B0 -
此时其它两个节点可以看到出现了worker进程
[root@yann-centos-188 local]# jps 2700 supervisor 2645 QuorumPeerMain 2901 worker 2911 Jps
-
查看188机器的日志,在不断的处理任务
[root@yann-centos-188 logs]# tail -f -n 100 worker-6700.log 2017-07-13 07:35:31 b.s.d.task [INFO] Emitting: spout default [python] 2017-07-13 07:35:32 b.s.d.task [INFO] Emitting: spout default [python] 2017-07-13 07:35:32 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:33 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:33 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:34 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:34 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:35 b.s.d.task [INFO] Emitting: spout default [java] 2017-07-13 07:35:35 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:36 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:36 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:37 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:37 b.s.d.task [INFO] Emitting: spout default [python] 2017-07-13 07:35:38 b.s.d.task [INFO] Emitting: spout default [java] 2017-07-13 07:35:38 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:39 b.s.d.task [INFO] Emitting: spout default [ruby] 2017-07-13 07:35:39 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:40 b.s.d.task [INFO] Emitting: spout default [ruby] 2017-07-13 07:35:40 b.s.d.task [INFO] Emitting: spout default [java] 2017-07-13 07:35:41 b.s.d.task [INFO] Emitting: spout default [ruby] 2017-07-13 07:35:41 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:42 b.s.d.task [INFO] Emitting: spout default [php] 2017-07-13 07:35:42 b.s.d.task [INFO] Emitting: spout default [python] 2017-07-13 07:35:43 b.s.d.task [INFO] Emitting: spout default [python] 2017-07-13 07:35:43 b.s.d.task [INFO] Emitting: spout default [groovy] 2017-07-13 07:35:44 b.s.d.task [INFO] Emitting: spout default [java] 2017-07-13 07:35:44 b.s.d.task [INFO] Emitting: spout default [java]
-
查看189的日志,正在不断的处理数据,并把数据写入文件中,注意189需要建立对应的目录,否则会报错
[root@yann-centos-189 logs]# tail -f -n 100 worker-6700.log 2017-07-13 07:38:28 b.s.d.executor [INFO] Processing received message source: spout:4, stream: default, id: {}, [java] 2017-07-13 07:38:28 c.a.c.s.b.PrintBolt [INFO] print:java 2017-07-13 07:38:28 b.s.d.task [INFO] Emitting: print-bolt default [java] 2017-07-13 07:38:28 b.s.d.executor [INFO] Processing received message source: print-bolt:3, stream: default, id: {}, [java] 2017-07-13 07:38:28 b.s.d.executor [INFO] Processing received message source: spout:4, stream: default, id: {}, [groovy] 2017-07-13 07:38:28 c.a.c.s.b.PrintBolt [INFO] print:groovy 2017-07-13 07:38:28 b.s.d.task [INFO] Emitting: print-bolt default [groovy]
-
查看189上产生的文件
[root@yann-centos-189 temp]# cat cn.ares.cocoon.storm.bolt.WriteBolt\@2913356b java/njava/njava/ngroovy/nphp/npython/npython/nphp/nphp/njava/npython/njava/nphp/nphp/ngroovy/npython/njava/nruby/npython/nphp/nphp/ngroovy/nphp/njava/njava/njava/ngroovy/nphp/nphp/njava/npython/nruby/njava/npython/nphp/npython/nruby/nphp/ngroovy/njava/ngroovy/njava/ngroovy/njava/ngroovy/njava/ngroovy/npython/nphp/npython/nphp/nphp/ngroovy/nruby/nphp/ngroovy/ngroovy/npython/ngroovy/nphp/nruby/npython/nruby/ngroovy/njava/nruby/nphp/nphp/nruby/nphp/nphp/ngroovy/njava/ngroovy/npython/nphp/nphp/nruby/ngroovy/ngroovy/nphp/nphp/ngroovy/nphp/npython/nruby/nruby/npython/njava/nphp/njava/nruby/njava/nruby/npython/npython/nphp/npython/nruby/ngroovy/nruby/njava/npython/ngroovy/nruby/ngroovy/nruby/nphp/njava/ngroovy/nphp/ngroovy/njava/ngroovy/ngroovy/nruby/ngroovy/nphp/nphp/npython/ngroovy/nruby/npython/npython/npython/ngroovy/nphp/njava/nphp/npython/ngroovy/njava/njava/npython/njava/njava/nruby/ngroovy/nruby/nphp/npython/njava/nphp/nphp/nruby/nphp/nruby/nphp/ngroovy/ngroovy/ngroovy/nruby/npython/ngroovy/npython/ngroovy/ngroovy/ngroovy/ngroovy/nphp/npython/ngroovy/njava/npython/nruby/njava/npython/npython/nruby/nruby/nruby/npython/nruby/npython/npython/njava/nruby/npython/ngroovy/nruby/njava/nphp/npython/nruby/npython/njava/ngroovy/npython/nphp/nphp/nphp/nphp/npython/nphp/nphp/nruby/nruby/npython/njava/njava/npython/npython/npython/ngroovy/nphp/npython/npython/njava/nruby/ngroovy/nphp/njava/njava/npython/npython/njava/ngroovy/nruby/ngroovy/nruby/npython/ngroovy/ngroovy/npython/njava/ngroovy/ngroovy/njava/nphp/n[root@yann-centos-189 temp]#
-
此时输入
storm kill
可以停止进程 -
此时jar会存在如下目录
[root@yann-centos-187 apache-storm-0.9.2-incubating]# cd data/ [root@yann-centos-187 data]# ls nimbus [root@yann-centos-187 data]# cd nimbus/ [root@yann-centos-187 nimbus]# ls inbox stormdist [root@yann-centos-187 nimbus]# cd inbox/ [root@yann-centos-187 inbox]# ls stormjar-04dc559f-9543-4c3d-bf3c-e696c878bf49.jar
-
也可以暂停,此时进程并没有结束,并且worker还存在,再点击
B3910A48-FC85-4308-A52B-79838447C689Activate
则会继续执行
Storm API详解
Storm的组件
Topology
拓扑
- 拓扑是一个有向图的计算。
- 创建拓扑步骤如下
- 构建TopologyBuilder对象
- 设置Spout数据源对象
- 设置Bolt数据处理对象
- 构建Config对象
- 提交拓扑
- 使用拓扑的具体流程是使用storm命令把一个jar提交给nimbus节点,然后nimbus会把任务分配给具体的子节点supervisor去工作
Stream grouping
流分组、数据的分发方式
// 分组模式
builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");
Spout
喷口、消息源
Bolt
螺栓、处理器
Worker
工作进程
cfg.setNumWorkers(2); // 2个JVM
结构图
Executor
执行器、Task的线程
默认情况下,一个执行器执行一个任务,但是如果指定了任务的数目,则任务会平均分配到执行器中。但是实际使用中,即使设定了多个执行器,也有可能有的执行器被重复使用,有的执行器没有被使用。
Task
具体的执行任务
- 每个Task有自己独立的执行线程
Configuration
配置
// 配置
Config cfg = new Config();
cfg.setNumWorkers(2); // 2个JVM
cfg.setDebug(true);
Strom的数据结构
tuple
tuple元组
- tuple是storm的主要数据结构,并且是storm中使用的最基本单元、数据模型和元组
tuple描述
- tuple就是一个值列表,tuple中的值可以是任何类型的,动态类型的tuple的fields可以不用声明,默认情况下,storm中的tuple支持私有类型、字符串、字节数组等作为它的字段值,如果使用其他类型,就需要序列化该类型。
- tuple的字段默认类型有:integer、float、double、long、short、string、byte、binary
- tuple可以理解成键值对,例如、创建一个bolt要发送2个字段(命名为double和triple),其中键就是定义declareOutputFields方法中的fields对象,值就是在emit方法中发送的values对象
API使用实例
设置多个执行器和工作进程
public static void main(String[] args) throws Exception {
// 配置
Config cfg = new Config();
cfg.setNumWorkers(2); // 2个JVM
cfg.setDebug(true);
// 建立拓扑结构
TopologyBuilder builder = new TopologyBuilder();
// 设置两个执行器和2个任务
builder.setSpout("spout", new PWSpout(), 2);// 默认是.setNumTasks(2)
// 产生2个执行器和4个任务
builder.setBolt("print-bolt", new PrintBolt(), 2).shuffleGrouping("spout").setNumTasks(4);
// 设置6个执行器和6个任务
builder.setBolt("write-bolt", new WriteBolt(), 6).shuffleGrouping("print-bolt");
// localModel(cfg, builder);
clusterModel(cfg, builder);
}
-
此时我们看本地执行效果,可以看到
write-bolt
产生了6个文件
-
可以知道有6个实例去处理写入,有6个线程并发处理结果
-
其中
builder.setSpout("spout", new PWSpout(), 2)
效果如下,有2个并行的任务,上面6个线程一样的原理 -
builder.setBolt("x-bolt", new PrintBolt(), 2).shuffleGrouping("spout").setNumTasks(4)
效果如下
-
Storm流分组
-
Shuffle Grouping :随机分组,尽量均匀分布到下游Bolt中。
将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。
-
Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task(但是不同的field不一定就在不同的task中)。
这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。
“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”.
—— 小示例
-
All grouping :广播。
广播发送, 对于每一个tuple将会复制到每一个bolt中处理。
-
Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。
Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。
-
None grouping :不分组。
不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。
-
Direct grouping :直接分组(指定分组)。
由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。
-
本地分组:如果目标Bolt在同一个工作进程存在一个或多个任务,元祖数据会随机分配给执行任务,否则该分组方式与随机分组方式一样。
Strom WordCount
统计单词的小程序
数据源Sentence Spout
获得数据(一个句子,包含多个单词)以后,发送给SplitBolt
进行切分,然后由CountBolt
进行统计结果,最后由ReportBolt
记录结果。
Storm的数据可靠性
Spout是Storm数据流的入口,在设计拓扑时,一件很重要的事情就是需要考虑消息的可靠性。
-
利用Storm可靠性机制(源生的
ack
和fail
),可以很容易的提供至少一次的处理(at least once processing):也就是在一个tuple超时或者fail的时候,Storm会调用Spout的fail
函数,在这里,我们可以实现一个重发tuple的机制,当然,这种重发一般都建立在消息队列中间件的重发功能上的;- 如果在第一个bolt失败的时候,可以重试;
- 如果在第二个bolt失败的时候,重试就会出现事务问题,如果数据入库,则可以和数据库的id进行比对,或者尽量不要拆分tuple;
// 注意,此处的index放在第二个参数里,而不是放在new Values里,否则不会进行ack和fail collector.emit(new Values(sentence[index]), index);
Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!
-
可以使用
IBatchSpout
批量发送,如果有失败的,则一批都会滚; -
可以使用
Trident
框架,Trident是利用了幂等性进行对比;
acker
Storm有一组叫做acker的特殊任务,它们负责跟踪DAG(有向无环图)中的每个消息
-
Spout在初始化时会产生一个tasksId;
-
Spout中创建新的Tuple,其id是一个64位的随机数;
-
Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:
-
Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail
-
一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。
-
一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;
-
该Bolt调用OutputCollector.ack()时,Storm会做如下操作:
- 将anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0*XOR *tuple-id-1
- Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker
-
acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。
-
fail的机制类似,在发现fail后直接回调Spout的fail方法。
Storm就是通过这个acker的机制来保证数据不丢失。
参考文章
Storm DRPC 详解
RPC
参考文章
DRPC
分布式RPC,Distributed RPC。
引入DRPC主要是利用storm的实时计算能力来并行化CPU密集性的计算任务。
DRPC Server
工作过程
Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。DRPC服务器工作过程如下:
- 接收一个RPC请求。
- 发送请求到storm topology
- 从storm topology接收结果。
- 把结果发回给等待的客户端。
工作流程
imgDRPC配置与示例
Storm提供了一个称作LinearDRPCTopologyBuilder的topology builder,它把实现DRPC的几乎所有步骤都简化了
官方示例
https://github.com/apache/storm/tree/master/examples
实现DRPC
-
修改storm配置文件(理论上只需要修改主节点的配置)
[root@yann-centos-187 conf]# vim storm.yaml
## Locations of the drpc servers drpc.servers: - "192.168.1.187"
- 需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果
-
启动storm的各个节点
[root@yann-centos-187 conf]# storm nimbus & [1] 3507 [root@yann-centos-187 conf]# storm ui & [2] 3567 [root@yann-centos-187 conf]# storm drpc & [3] 3600 [root@yann-centos-187 conf]# jps 3600 drpc 3507 nimbus 3636 Jps 3483 QuorumPeerMain 3567 core
-
编写测试代码
BasicDRPCTopology
,打成jar,传入服务器,执行这个jar[root@yann-centos-187 local]# storm jar archi-storm-0.0.1-SNAPSHOT.jar cn.ares.cocoon.storm.drpc.topology.BasicDRPCTopology drpc-top
- 其中
drpc-top
,是入参,指定topology的名字
- 其中
-
在UI中查看,已经出现了这个topology
9E7800CD-1072-4C8D-B7DE-9C9D058FFAEA -
执行客户端代码
DemoDRPCClient
,结果如下/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dfile.encoding=UTF-8 -classpath... hello-哈哈被处理啦 world-哈哈被处理啦 test-哈哈被处理啦 ha-哈哈被处理啦 client-哈哈被处理啦 Process finished with exit code 0
官方示例demo
- 疑问
- 为什么按id分组:根据测试,每次
drpc.execute
,GetTweeters
只执行一个,只有一个id。虽然开了4个并发,但是只处理了3个任务。如果开启4个执行器去处理,那么处理的线程是这4个中的,有可能一样,也有可能不同,如果只开启一个线程,那么只有一个线程处理了。
- 为什么按id分组:根据测试,每次
DRPC实际用途
- 用一个queue,实时的往queue里塞数据,然后这边实时的检查,queue啥时有数据了,就从queue中take一个数据,然后放到storm中的topology中去执行;
- 使用kafka做数据源,用kafka生产数据,然后通过kafka直接提交到topology中,然后就可以汇总一个结果了;
Storm Trident
Trident Function
09007B43-5758-4978-BD3F-9A43D89F1A1D官方文档
https://github.com/nathanmarz/storm/wiki/Trident-tutorial
参考文章
[翻译][Trident] Storm Trident 教程
Learning Storm - 第5章 - Trident functions
介绍
Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。
代码示例
-
创建代码
TridentFunctionDemo
- 在Trident中使用
BaseFunction
代替以前的bolt
- 拓扑使用
TridentTopology
- 在Trident中使用
-
本地执行,输出结果:
>>>>>>>>>>>>>>> a - b - c - d is 1 - 4 - 7 - 10 >>>>>>>>>>>>>>> sum is 5 >>>>>>>>>>>>>>> a - b - c - d is 1 - 1 - 3 - 11 >>>>>>>>>>>>>>> sum is 2 >>>>>>>>>>>>>>> a - b - c - d is 2 - 2 - 7 - 1 >>>>>>>>>>>>>>> sum is 4 >>>>>>>>>>>>>>> a - b - c - d is 2 - 5 - 7 - 2 >>>>>>>>>>>>>>> sum is 7
Trident Filters
052E55B1-BBE3-40AD-B8C7-D48FD82C5782参考文章
Learning Storm - 第5章 - Trident filters
代码示例
-
创建处理类
TridentFiltersDemo
;- 与function不同的是,fliter在each时不需要返回值
-
执行代码,输出结果如下:
>>>>>>>>>>>>>>>> a - b - c - d is 1 - 1 - 3 - 11 >>>>>>>>>>>>>>>> a - b - c - d is 2 - 2 - 7 - 1
Trident projection
参考文章
Learning Storm - 第5章 - Trident projection
Apache Storm 官方文档 —— Trident 教程
代码练习
==待补充==
Trident repartitioning operations
参考文章
Learning Storm - 第5章 - Trident repartitioning operations
Shuffle
使用随机轮询算法将tuple在目标分区之间均分
Broadcast
广播,每个元组都复制给所有目标分区
PartitionBy
paritionBy函数接收一组字段并根据这组字段做分区,具体是把这些字段做hash并对分区数取模,从而确定每个tuple落在哪个分区,能够保证同样字段的tuple落在同一分区
Global
所有tuple都去向同一个partition
batchGlobal
同一批的所有tuple落在相同分区,不同批的tuple可能去向不同的分区。这个可以保证同一批的事务一致?
partition
通过使用自定义的分区方法确定tuple落在哪个分区,实现backtype.storm.grouping.CustomStreamGrouping接口
Transactional
Batch与Spout
参考文章
Learning Storm - 第5章 - A transactional topology
功能实现
- 实现ITridentSpout接口:最通用的API,可以支持transaction or opaque transactional语义
- 实现IBatchSpout接口:一个non-transactional spout
- 实现IPartitionedTridentSpout接口:一个transactional spout,幂等性的
- 实现IOpaquePartitionedTridentSpout接口:一个opaque transactional spout