Java专题stormstorm

Storm学习笔记

2017-09-11  本文已影响680人  af771459b4d4

Storm简介

Storm是什么

JStorm

实时计算

Storm体系结构

Storm与Hadoop的对比

结构 Hadoop Storm
主节点 JobTracker Nimbus
从节点 TaskTracker Supervisor
应用程序 Job Topology
工作进程名称 Child Worker
计算模型 Map / Reduce Spout / Bolt

架构

Storm环境搭建

环境准备

集群搭建

Storm Hello World 示例

建立maven工程

本地模式

集群模式

Storm API详解

Storm的组件

Topology

拓扑

Stream grouping

流分组、数据的分发方式

// 分组模式
builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");

Spout

喷口、消息源

Bolt

螺栓、处理器

Worker

工作进程

cfg.setNumWorkers(2); // 2个JVM

结构图

Executor

执行器、Task的线程

默认情况下,一个执行器执行一个任务,但是如果指定了任务的数目,则任务会平均分配到执行器中。但是实际使用中,即使设定了多个执行器,也有可能有的执行器被重复使用,有的执行器没有被使用。

Task

具体的执行任务

Configuration

配置

// 配置
Config cfg = new Config();
cfg.setNumWorkers(2); // 2个JVM
cfg.setDebug(true);

Strom的数据结构

tuple

tuple元组

tuple描述

API使用实例

设置多个执行器和工作进程

public static void main(String[] args) throws Exception {
    // 配置
    Config cfg = new Config();
    cfg.setNumWorkers(2); // 2个JVM
    cfg.setDebug(true);

    // 建立拓扑结构
    TopologyBuilder builder = new TopologyBuilder();
    // 设置两个执行器和2个任务
    builder.setSpout("spout", new PWSpout(), 2);// 默认是.setNumTasks(2)
    // 产生2个执行器和4个任务
    builder.setBolt("print-bolt", new PrintBolt(), 2).shuffleGrouping("spout").setNumTasks(4);
    // 设置6个执行器和6个任务
    builder.setBolt("write-bolt", new WriteBolt(), 6).shuffleGrouping("print-bolt");

//        localModel(cfg, builder);
    clusterModel(cfg, builder);
}

Storm流分组

  1. Shuffle Grouping :随机分组,尽量均匀分布到下游Bolt中。

    将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

  2. Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task(但是不同的field不一定就在不同的task中)。

    这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。

    “if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”.

    —— 小示例

  3. All grouping :广播。

    广播发送, 对于每一个tuple将会复制到每一个bolt中处理。

  4. Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。

    Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。

  5. None grouping :不分组。

    不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下

  6. Direct grouping :直接分组(指定分组)。

    由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。

  7. 本地分组:如果目标Bolt在同一个工作进程存在一个或多个任务,元祖数据会随机分配给执行任务,否则该分组方式与随机分组方式一样。

Strom WordCount

统计单词的小程序

数据源Sentence Spout获得数据(一个句子,包含多个单词)以后,发送给SplitBolt进行切分,然后由CountBolt进行统计结果,最后由ReportBolt记录结果。

59958F12-82E8-4642-9C02-B4998F4EC372

Storm的数据可靠性

Spout是Storm数据流的入口,在设计拓扑时,一件很重要的事情就是需要考虑消息的可靠性。

  1. 利用Storm可靠性机制(源生的ackfail),可以很容易的提供至少一次的处理(at least once processing):也就是在一个tuple超时或者fail的时候,Storm会调用Spout的fail函数,在这里,我们可以实现一个重发tuple的机制,当然,这种重发一般都建立在消息队列中间件的重发功能上的;

    1. 如果在第一个bolt失败的时候,可以重试;
    2. 如果在第二个bolt失败的时候,重试就会出现事务问题,如果数据入库,则可以和数据库的id进行比对,或者尽量不要拆分tuple;
    // 注意,此处的index放在第二个参数里,而不是放在new Values里,否则不会进行ack和fail
    collector.emit(new Values(sentence[index]), index);
    

    Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!

  2. 可以使用IBatchSpout批量发送,如果有失败的,则一批都会滚;

  3. 可以使用Trident框架,Trident是利用了幂等性进行对比;

acker

Storm有一组叫做acker的特殊任务,它们负责跟踪DAG(有向无环图)中的每个消息

  1. Spout在初始化时会产生一个tasksId;

  2. Spout中创建新的Tuple,其id是一个64位的随机数;

  3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:

  4. Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail

  5. 一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。

  6. 一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;

  7. 该Bolt调用OutputCollector.ack()时,Storm会做如下操作:

    1. 将anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0*XOR *tuple-id-1
    2. Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker
  8. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。

  9. fail的机制类似,在发现fail后直接回调Spout的fail方法。

Storm就是通过这个acker的机制来保证数据不丢失。

参考文章

可靠性与acker机制

Storm DRPC 详解

RPC

参考文章

深入浅出 RPC - 浅出篇

深入浅出 RPC - 深入篇

DRPC

分布式RPC,Distributed RPC。

引入DRPC主要是利用storm的实时计算能力来并行化CPU密集性的计算任务。

DRPC Server

工作过程

Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。DRPC服务器工作过程如下:

  1. 接收一个RPC请求。
  2. 发送请求到storm topology
  3. 从storm topology接收结果。
  4. 把结果发回给等待的客户端。

工作流程

img

DRPC配置与示例

Storm提供了一个称作LinearDRPCTopologyBuilder的topology builder,它把实现DRPC的几乎所有步骤都简化了

官方示例

https://github.com/apache/storm/tree/master/examples

实现DRPC

  1. 修改storm配置文件(理论上只需要修改主节点的配置)

    [root@yann-centos-187 conf]# vim storm.yaml
    
    ## Locations of the drpc servers
     drpc.servers:
         - "192.168.1.187"
    
    • 需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果
  2. 启动storm的各个节点

    [root@yann-centos-187 conf]# storm nimbus &
    [1] 3507
    [root@yann-centos-187 conf]# storm ui &
    [2] 3567
    [root@yann-centos-187 conf]# storm drpc &
    [3] 3600
    [root@yann-centos-187 conf]# jps
    3600 drpc
    3507 nimbus
    3636 Jps
    3483 QuorumPeerMain
    3567 core
    
  3. 编写测试代码BasicDRPCTopology,打成jar,传入服务器,执行这个jar

    [root@yann-centos-187 local]# storm jar archi-storm-0.0.1-SNAPSHOT.jar cn.ares.cocoon.storm.drpc.topology.BasicDRPCTopology drpc-top
    
    • 其中drpc-top,是入参,指定topology的名字
  4. 在UI中查看,已经出现了这个topology

    9E7800CD-1072-4C8D-B7DE-9C9D058FFAEA
  5. 执行客户端代码DemoDRPCClient,结果如下

    /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dfile.encoding=UTF-8 -classpath...
    hello-哈哈被处理啦
    world-哈哈被处理啦
    test-哈哈被处理啦
    ha-哈哈被处理啦
    client-哈哈被处理啦
    
    Process finished with exit code 0
    

官方示例demo

ReachTopology

DRPC实际用途

  1. 用一个queue,实时的往queue里塞数据,然后这边实时的检查,queue啥时有数据了,就从queue中take一个数据,然后放到storm中的topology中去执行;
  2. 使用kafka做数据源,用kafka生产数据,然后通过kafka直接提交到topology中,然后就可以汇总一个结果了;

Storm Trident

Trident Function

09007B43-5758-4978-BD3F-9A43D89F1A1D

官方文档

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

参考文章

[翻译][Trident] Storm Trident 教程

Learning Storm - 第5章 - Trident functions

介绍

Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。

代码示例

  1. 创建代码TridentFunctionDemo

    • 在Trident中使用BaseFunction代替以前的bolt
    • 拓扑使用TridentTopology
  2. 本地执行,输出结果:

    >>>>>>>>>>>>>>> a - b - c - d is 1 - 4 - 7 - 10
    >>>>>>>>>>>>>>> sum is 5
    >>>>>>>>>>>>>>> a - b - c - d is 1 - 1 - 3 - 11
    >>>>>>>>>>>>>>> sum is 2
    >>>>>>>>>>>>>>> a - b - c - d is 2 - 2 - 7 - 1
    >>>>>>>>>>>>>>> sum is 4
    >>>>>>>>>>>>>>> a - b - c - d is 2 - 5 - 7 - 2
    >>>>>>>>>>>>>>> sum is 7
    

Trident Filters

052E55B1-BBE3-40AD-B8C7-D48FD82C5782

参考文章

Learning Storm - 第5章 - Trident filters

代码示例

  1. 创建处理类TridentFiltersDemo

    • 与function不同的是,fliter在each时不需要返回值
  2. 执行代码,输出结果如下:

    >>>>>>>>>>>>>>>> a - b - c - d is 1 - 1 - 3 - 11
    >>>>>>>>>>>>>>>> a - b - c - d is 2 - 2 - 7 - 1
    

Trident projection

参考文章

Learning Storm - 第5章 - Trident projection

Apache Storm 官方文档 —— Trident 教程

代码练习

==待补充==

Trident repartitioning operations

参考文章

Learning Storm - 第5章 - Trident repartitioning operations

Shuffle

使用随机轮询算法将tuple在目标分区之间均分

Broadcast

广播,每个元组都复制给所有目标分区

PartitionBy

paritionBy函数接收一组字段并根据这组字段做分区,具体是把这些字段做hash并对分区数取模,从而确定每个tuple落在哪个分区,能够保证同样字段的tuple落在同一分区

Global

所有tuple都去向同一个partition

batchGlobal

同一批的所有tuple落在相同分区,不同批的tuple可能去向不同的分区。这个可以保证同一批的事务一致?

partition

通过使用自定义的分区方法确定tuple落在哪个分区,实现backtype.storm.grouping.CustomStreamGrouping接口

Transactional

Batch与Spout

参考文章

Learning Storm - 第5章 - A transactional topology

功能实现

Storm与Kafka

Storm与Redis

上一篇下一篇

猜你喜欢

热点阅读