技术栈

Storm从入门到精通-Storm WordCount案例及常用

2019-06-29  本文已影响8人  ebd4ce191be4

1、WordCount案例分析

1.1、功能说明

设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。

整个topology分为三个部分:

RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。

SplitSentenceBolt:负责将单行文本记录(句子)切分成单词

WordCountBolt:负责对单词的频率进行累加

1.2、TopologyMain 驱动类

public static void main(String[] args) throws Exception {
        // Storm框架支持多语言,在JAVA环境下创建一个拓扑,需要使用TopologyBuilder进行构建
        TopologyBuilder builder = new TopologyBuilder();
        //RandomSentenceSpout类,在已知的英文句子中,随机发送一条句子出去。
        builder.setSpout("spout1", new RandomSentenceSpout(), 3);
        // SplitSentenceBolt类,主要是将一行一行的文本内容切割成单词
        builder.setBolt("split1", new SplitSentenceBolt(), 9).shuffleGrouping("spout1");
        // WordCountBolt类,对单词出现的次数进行统计
        builder.setBolt("count2", new WordCountBolt(),3).fieldsGrouping("split1",new Fields("word"));
        //启动topology的配置信息
        Config conf = new Config();
        //TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。
        //这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
//        conf.setDebug(true);
        conf.setDebug(false);
        //storm的运行有两种模式: 本地模式和分布式模式.
        if (args != null && args.length > 0) {
            //定义你希望集群分配多少个工作进程给你来执行这个topology
            conf.setNumWorkers(3);
            //向集群提交topology
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            //指定本地模式运行多长时间之后停止,如果不显式的关系程序将一直运行下去
            //Utils.sleep(10000);
            //cluster.shutdown();
        }
    }

1.3、RandomSentenceSpout

public class RandomSentenceSpout extends BaseRichSpout {
   private static final long serialVersionUID = 5028304756439810609L;
   //用来收集Spout输出的tuple
   SpoutOutputCollector collector;
   Random rand;
   //该方法调用一次,主要由storm框架传入SpoutOutputCollector
   public void open(Map conf, TopologyContext context,
         SpoutOutputCollector collector) {
      this.collector = collector;
      rand = new Random();
       //连接kafka  mysql ,打开本地文件 
   }

   /**
    * 上帝之手
    *  while(true){
    *      spout.nexTuple()
    *  }
    */
   public void nextTuple() {
      String[] sentences = new String[] { "the cow jumped over the moon",
            "the cow jumped over the moon",
            "the cow jumped over the moon",
            "the cow jumped over the moon", "the cow jumped over the moon" };
      String sentence = sentences[rand.nextInt(sentences.length)];
      collector.emit(new Values(sentence));
      System.out.println("RandomSentenceSpout 发送数据:"+sentence);
   }
   //消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("sentence"));
   }
}

1.4、SplitSentenceBolt

public class SplitSentenceBolt extends BaseBasicBolt {
   private static final long serialVersionUID = -5283595260540124273L;
   //该方法只会被调用一次,用来初始化
   public void prepare(Map stormConf, TopologyContext context) {
      super.prepare(stormConf, context);
   }
   /**
    * 接受的参数是RandomSentenceSpout发出的句子,即input的内容是句子 execute方法,将句子切割形成的单词发出
    */
   public void execute(Tuple input, BasicOutputCollector collector) {
      String sentence = (String)input.getValueByField("sentence");
      String[] words = sentence.split(" ");
      for (String word : words) {
         word = word.trim();
         if (!word.isEmpty()) {
            word = word.toLowerCase();
            System.out.println("SplitSentenceBolt 切割单词:"+word);
            collector.emit(new Values(word,1));
         }
      }
   }
   //消息源可以发射多条消息流stream。多条消息流可以理解为多种类型的数据。
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word","num"));
   }
}

1.5、WordCountBolt

public class WordCountBolt extends BaseBasicBolt {
   private static final long serialVersionUID = 5678586644899822142L;
   // 用来保存最后计算的结果key=单词,value=单词个数
   Map<String, Integer> counters = new HashMap<String, Integer>();

   //该方法只会被调用一次,用来初始化
   public void prepare(Map stormConf, TopologyContext context) {
       super.prepare(stormConf, context);
   }

   /*
    * 将collector中的元素存放在成员变量counters(Map)中.
    * 如果counters(Map)中已经存在该元素,getValule并对Value进行累加操作。
    */
   public void execute(Tuple input, BasicOutputCollector collector) {
       String str = (String)input.getValueByField("word");
       Integer num =input.getIntegerByField("num");
       System.out.println("----------------"+Thread.currentThread().getId()+"     "+str);
       if (!counters.containsKey(str)) {
           counters.put(str, num);
       } else {
           Integer c = counters.get(str) + num;
           counters.put(str, c);
       }
       System.out.println("WordCountBolt 统计单词:"+counters);
   }

   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       // TODO Auto-generated method stub
   }

1.6、pom依赖

上一篇 下一篇

猜你喜欢

热点阅读