玩转大数据程序员技术文

storm自定义实现wordcount

2016-12-14  本文已影响598人  心_的方向

storm中的任务

  1. storm中的任务的结构是Topology(拓扑图),这个拓扑图是一个有向无环图(DAG),DAG能够清楚的表达链式的任务,每一个节点都是一个任务,边的方向代表着数据流的方向。如下图


    Paste_Image.png
  2. storm任务中数据流的数据结构是一个个tuple,tuple元组是任意数据结构类型的键值对组合。例如:(k1:v1, k2:v2, k3:v3, ····)
  3. Spout是数据采集器,从数据源采集数据,转成tuple发射到后面的bolt处理
  4. Bolt是数据处理器,可执行数据过滤,分析等操作。

开发流程

  1. 设计Topology图


    Paste_Image.png
  2. 按照Topology图,创建maven项目后,依次写各个任务节点。首先写SentenceSpout节点。
package strom.strom;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout {

    // tuple发射器
    private SpoutOutputCollector collector;

    private static final String[] SENTENCES = { "hadoop yarn mapreduce spark", "flume hadoop hive spark",
            "oozie yarn spark storm", "storm yarn mapreduce error", "error flume storm spark" };

    /*
     * 用于指定只针对本组件的一些特殊配置
     */
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    /*
     * spout组件的初始化方法 创建这个sentenceSpout组件实例时调用一次
     */
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
        // 用实例变量接收发射器
        this.collector = arg2;
    }

    /*
     * 声明向后面的组件发送tuple的key是什么
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("sentence"));
    }

    /*
     * 1)指定tuple的value值,封装tuple后,并将其发射给后面的组件, 2) 会迭代式的循环调用这个方法
     */
    @Override
    public void nextTuple() {
        // 从数组中随意获取一个值
        String sentence = SENTENCES[new Random().nextInt(SENTENCES.length)];
        // 指定value值并封装为tuple后,把tuple发射给后面的组件
        this.collector.emit(new Values(sentence));
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 写splitbolt组件
package strom.strom;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class SplitBolt implements IRichBolt {

    // bolt组件中的发射器
    private OutputCollector collector;

    @Override
    public void cleanup() {

    }

    /*
     * 设置key名称
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("word"));
    }

    /*
     * 每次接受到前面组件发送的tuple调用一次 ,封装好tuple后发射
     */
    @Override
    public void execute(Tuple input) {
        // 获取key value对后,取出value值
        String values = input.getStringByField("sentence");
        if (values != null && !"".equals(values)) {
            // 按空格分割value
            String[] valuelist = values.split(" ");
            for (String value : valuelist) {
                // 向后面的组件发射封装好的tuple
                this.collector.emit(new Values(value));
            }
        }
    }

    /*
     * bolt组件初始化方法,只会调用一次
     */
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        this.collector = arg2;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

4.CountBolt组件实现计数逻辑

package strom.strom;
//

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class CountBolt extends BaseRichBolt {

    // 发射器
    private OutputCollector collector;
    // 为了计数
    private Map<String, Integer> counts;

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        this.collector = arg2;
        this.counts = new HashMap<String, Integer>();
    }

    /*
     * 声明key名称,可以同时声明多个
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("word", "count"));
    }

    /*
     * 统计单词
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");

        int count = 1;
        // 如果这个单词已经存在,则取出count再加一
        if (counts.containsKey(word)) {
            count = counts.get(word) + 1;
        }
        counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }
}

5 . PrintBolt组件

package strom.strom;
//

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class PrintBolt extends BaseRichBolt {

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

    }

    /*
     * 打印到控制台
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        int count = input.getIntegerByField("count");
        System.out.println(word + "---->" + count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {

    }

}

6 . WordCountTopology类用来连接这些组件

package strom.strom;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {

    private static final String SPOUT_ID = "sentenceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String PRINT_BOLT = "printBolt";

    public static void main(String[] args) {
        // 构造Topology
        TopologyBuilder builder = new TopologyBuilder();
        // 指定spout
        builder.setSpout(SPOUT_ID, new SentenceSpout());
        // 指定bolt,并指定当有有多个bolt时,数据流发射的分组策略
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(SPOUT_ID);
        // 因为要保证正确的单词计数,同一个单词一定要划分到同一个CountBolt上,所以按照字段值分组
        builder.setBolt(COUNT_BOLT, new CountBolt()).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
        // 全局分组,所有tuple发射到一个printbolt,一般是id最小的那一个
        builder.setBolt(PRINT_BOLT, new PrintBolt()).globalGrouping(COUNT_BOLT);

        Config conf = new Config();

        if (args == null || args.length == 0) {
            // 本地执行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", conf, builder.createTopology());
        } else {
            // 提交到集群上执行
            // 指定使用多少个进程来执行该Topology
            conf.setNumWorkers(1);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 本地执行测试


    Paste_Image.png
  2. 打成jar包后上传到storm集群测试


    Paste_Image.png

    下面的jar包包含着依赖的包,上面的jar包中没有包括,所以我们选择使用下面这个jar包。
    上传到集群上然后执行

$ bin/storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar strom.strom.WordCountTopology wordcount

在UI中查看运行情况


Paste_Image.png

查看运行日志


Paste_Image.png
Paste_Image.png
Paste_Image.png

查看拓扑图


Paste_Image.png
Paste_Image.png
Paste_Image.png
上一篇下一篇

猜你喜欢

热点阅读