storm第二天

2017-08-30  本文已影响0人  夙夜M

storm的集群提交方式

StormSubmitter.subnitTopology()方法

问题一、如何把storm的任务提交到集群上面?

     使用命令:storm jar jar包路径 类全限定名

问题二:查看任务日志

     凡是有supervisor的节点

     storm logviewer &

问题三:停止任务:在所有的supervisor

      storm kill topotest 或者在storm的ui界面直接kill掉topology

问题四、spout和bolt做的事情差不多,为什么要有spout来获取数据,而不直接让bolt获取并处理数据?

spout 

      数据源

       一般spout就只是获取数据,几乎不对数据进行处理和计算

问题五:并行度

   nimbus->zookeeper->supervisors

   supervisor->worker(最多四个,具体由配置的端口号决定,一个worker是一个进程)->executor(一个executor是一个线程,一个executor默认对于运行一个task)->task任务(spout,bolt)

1)默认情况下,一个topotest就分配一个worker

2)默认情况下,一台supervisor就会有四个slot,也就是能启动四个worker

3)默认情况下,一个executor线程对应一个task任务,一个spout或者一个bolt就是一个executor线程。

worker和executor

同一个worker里面的executor只会为同一个topology任务服务。

因此一个spout或者一个bolt就是一个线程。


设置executor个数:

设置executor和task的个数

多线程和单线程的区别:对CPU的利用率不一样

一个executor里面运行的task的类型要么都是spout,要么都是bolt,不可能出现既有spout又有bolt.

一个executor运行两个task(一个一个运行)

提高storm任务的并行度:

1)增加supervisor

2)增加worker(增加端口号)

3)增加executor(即增加java代码的线程)

在一个cpu core的情况下,两个executor线程对CPU利用率比较好。

我们一个任务提交了以后,我们可以动态的修改这个任务的worker的个数(一般不修改)和executor的个数。

一个executor对应两个task

修改bolt的executor的个数为2,这样就可以实现多线程并行运行bolt任务,对CPU利用率高。

一个executor对应一个task

默认情况下,一个worker会有一个ack线程(即一个executor,对应一个task);

该ack线程的作用是监控spout和bolt任务。

通过下面这种方式设置ack个数,0表示没有。便于我们观察executor和task的关系。

设置ack个数为0

我们将任务提交到集群之后,然后在集群上运行命令来动态调整executor的个数:

storm rebalance (--help)查看怎么使用?

参数:

topotest:topology任务的标识

-w:wait,等多久生效

 -n :number,代表worker个数

-e:executor,代表executor的个数 

-e myblot=2 -e myspout=1(通过代码中指定的spout和bolt的名字来 标识)

storm rebalance topotest -w 10 -n 1 -e mybolt=2 -e myspout=1

task的个数在写代码的时候就固定了,不可以动态修改。


线程安全:即多线程出来的计算结果跟单线程出来的计算结果是一致的。

分组策略:指的是上一级将数据传到下一级,以什么样的策略传数据呢?

需求:统计文本的行数。

从kafka里面读取数据

统计文本行数

首先开启kafka并创建一个topic

[hadoop@hadoop02 kafka_2.11-0.10.2.1]$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

[hadoop@hadoop03 kafka_2.11-0.10.2.1]$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

[hadoop@hadoop04 kafka_2.11-0.10.2.1]$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

在任意一台服务器,这里选择了Hadoop02上创建一个topic

[hadoop@hadoop02 kafka_2.11-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 1 --partitions 1 --topic test

查看

[hadoop@hadoop02 kafka_2.11-0.10.2.1]$ bin/kafka-topics.sh --list --zookeeper hadoop02:2181

写代码生产数据到kafka的test主题(即一个文件)

打印线程数:Thread.currentThread().getName()

如果要求不高,可以这样统计结果:一个线程的结果*线程数

FieldGrouping:相同的单词必然分到同一组

AllGrouping:广播发送, 对于每一个tuple, 所有的Bolts都会收到。两个线程获取到的数据都是全部的,且一模一样。

分组策略在线程数大于1是展现出各自不同的效果。

小练习:单词计数/词频统计

1)进行单词计数 2)开启并行度 3)线程安全

思路:需要两个bolt,第一个bolt采取ShuffleGrouping策略,拆分单词;第二个bolt采取FieldsGrouping策略,相同的单词必然到同一个线程,保证了线程安全。

统计单词数

每个bolt开启三个线程。

注意使用FieldsGrouping时必须指定fields字段

注意最后结果:每一次统计结果都会打印,因为storm是实时统计。


淘宝大屏项目

作业:一份数据,统计每天(日期)的pv(文本数据总数)和uv(用户数)

以京东的网站来解释以下名词:

如何判断京东网站火不火?可以通过以下三个变量来估计。

1)点击一下,会产生一个pv(pageView),pv越,网站火

2)UV(userView),同一个用户无论点击多少次,它的UV就是1,UV值高,网站火。

3)会话:一次会话表示一次有效的访问。会话与浏览器共存亡,但有时间周期,可能六分钟不点击这次会话就会过期。

4)跳出率:只浏览了一个页面的会话个数个数占总的会话数的比率,越小越好。会话的平均时间越长越好。

作业图

RPC:官网:storm.apache.org

Storm中有一个Distributed RPC:分布式的RPC 

RPC概念:remote procedure call protocol:远程过程(进程)调用(方法)协议

不同进程间的方法调用

好多网站上都会有天气预报,这些数据是从哪里来的?

RPC是一个CS架构:即client/server架构

理解RPC:

1)客户端(进程)去调用服务端(进程)的方法

2)方法的执行在服务端

第一个RPC案例:首先需要RPC的jar包,可以搭建一个maven项目

server服务端按照RPC协议的规范实现方法,并提供main方法给客户端提供一个IP地址和端口号,RPC的协议需要提供一个versionID,必须是long类型;服务端一直处在启动状态等待客户端调用。

client客户端通过服务端的IP地址和端口号,通过RPC协议代理类调用服务端方法,最终方法的执行在服务端。

HadoopRPC:NameNode、DataNode、resourcemanager、NodeManager都是服务端

阅读源码的三种方式:

1)maven项目中会自动下载源码,缺点:不能直接修改源代码

2)直接到官网下载 src源码包

3)在github网站去下载源码,直接在网站看也可以


通过查看源码来证明NameNode是HadoopRPC的服务端,证明在初始化NameNode的时候有如下这样一段代码,那么就可以证明它是hadoopRPC的服务端了。

怎么证明是RPC协议?RPC协议中必定会有versionID号。

快捷键:Ctrl+T、Ctrl+f、Ctrl+o


创建目录mkdir的真正的执行者:

clientProtocol协议实例NameNode==真正创建目录的实例对象。

证明是RPC服务端的代码 DataNode心跳机制的源码

storm的DRPC的实现

storm的DRPC实现过程的课堂讲解 storm的DPRC的实现过程

需求:客户端传参spark或者Hadoop,通过drpc输出hello spark,hello Hadoop

内置的spout发射的数据:0表示id号,1表示数据

自定义bolt发射数据通关过collector,发射数据有要求:第一个位置必须是id号(因为最后一个bolt是内置bolt,有规定),后面的参数是我们真正要发射的数据

出错:必须明确定义下一个bolt字段,即declare方法必须定义。


第一个案例是本地local

第二个案例是远程remote

服务端:提交topology任务到集群    注意:createRemoteTopology

客户端:host:drpc server主机名   port(drpc服务端口):3773


storm架构:

主节点nimbus:1)用户提交任务给主节点 2)任务运行失败,重新分配任务

           加入nimbus挂了的影响

             1)任务是可以照常运行

             2)不能提交新的任务

             3)任务运行失败不能重新分配任务

从节点supervisor:计算处理数据

nimbus和supervisor都把信息存储到zookeeper。

查看storm信息,在zookeeper,1)监控zookeeper的/storm/supervisors目录,监控事件类型:子节点变化事件。2)监控/storm/storms目录,记录的是topology任务分配信息,监控事件类型:子节点数据变化事件。

注意:spark和storm的进程都是worker,冲突


消息的可靠性:

IRichSpout:

ack方法:数据处理成功以后,会被调用

fail方法:数据处理失败以后,会被调用

数据处理失败之后有两种处理:1)重发几次 2)重发几次依然失败单独处理

实现消息的可靠性:通过ack方法和fail方法

数据发射成功,存到集合,ack方法移除该条数据

数据处理失败,先判断retry次数,重发,将messageID和retry次数存到hashmap集合。

在spout和bolt中回调ack方法。注意是BaseRichSpout和BasicRichBolt


消息可靠性原理:

默认情况下,一个worker有一个ack线程。

一个ack线程维护着 taskid,ack_value(默认情况下是0)

xor异或

一个tuple有一个tupleid,ack_value与每一个tupleid异或最终的结果赋值给ack_value

正常情况下最终的ack_vlaue依然是0.


什么是线程安全?简单来说,就是多线程计算结果跟单线程处理结果一致。

DRPC:本地和远程。

补充知识:Trident(框架)

上一篇 下一篇

猜你喜欢

热点阅读