Storm从入门到精通8:Storm实现WordCount程序

2020-04-03  本文已影响0人  金字塔下的小蜗牛

1.Storm流式数据处理流程

Storm处理流式数据的一般架构如下图所示:

image
  1. Flume用来获取数据。
  2. Kafka用来临时保存数据。
  3. Storm用来计算数据。
  4. Redis是个内存数据库,用来保存数据。

2.Storm实现WordCount程序

新建Java工程StormWordCount,将$STORM_HOME/lib下面的所有Jar包加到工程的buildpath中,环境就搭建好了。

2.1创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

package demo.wordcount;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
//采集数据spout组件
public class WordCountSpout extends BaseRichSpout {
// 模拟产生一些数据
private String[] data = { "I love Beijing", "I love China", "Beijing is the capital of China" };
// 定义spout的输出流
private SpoutOutputCollector collector;
@Override
public void nextTuple() {
// 每隔3秒钟采集一次数据
Utils.sleep(3000);
// 由storm框架调用,每次调用进行数据采集
// 随机产生一个字符串
int random = (new Random()).nextInt(3);
// 打印
System.out.println("采集的数据是:" + data[random]);
// 将采集到的数据发送给下一个组件进行处理 
this.collector.emit(new Values(data[random]));
}
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
// 初始化spout组件时调用
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明输出的Tuple的格式
declarer.declare(new Fields("sentence"));

 }

2.2创建Bolt(WordCountSplitBolt)组件进行分词操作

package demo.wordcount;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountSplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
// 处理上一个组件发来的数据     
String str = tuple.getStringByField("sentence");
// 分词操作
String[] words = str.split(" ");
// 将处理好的(word,1)形式的数据发送给下一个组件
for (String w : words) {
this.collector.emit(new Values(w, 1));
}
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
// 初始化时调用
// OutputCollector代表的就是这个bolt组件的输出流
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明这个Bolt组件输出Tuple的格式
declarer.declare(new Fields("word", "count"));
    }
 }

2.3创建Bolt(WordCountBoltCount)组件进行单词计数操作

package demo.wordcount;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
//统计单词个数
public class WordCountTotalBolt extends BaseRichBolt {
private OutputCollector collector;
// 定义一个集合来保存单词计数结果
private Map<String, Integer> result = new HashMap<>();
@Override
public void execute(Tuple tuple) {
// 取出Tuple中的数据
String word = tuple.getStringByField("word");
int count = tuple.getIntegerByField("count");
// 统计单词出现的个数
if (result.containsKey(word)) {
int total = result.get(word);
result.put(word, total + count);
} else {
result.put(word, count);
}
// 打印
System.out.println("单词统计的结果:" + result);
// 输出数据
this.collector.emit(new Values(word, result.get(word)));
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
// 初始化时调用
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 声明输出数据的格式
declarer.declare(new Fields("word", "total"));
    }
 }

2.4创建主程序Topology(WordCountTopology)

package demo.wordcount;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
//主程序:创建一个Topology的任务
TopologyBuilder builder = new TopologyBuilder();
//指定Topology任务的spout组件
builder.setSpout("wordcount_spout", new WordCountSpout());
//指定Topology任务的第一个Bolt组件:分词 
builder.setBolt("wordcount_split_bolt", new WordCountSplitBolt()).shuffleGrouping("wordcount_spout");
//指定Topology任务的第二个Bolt组件:计数
builder.setBolt("wordcount_count_bolt", new WordCountTotalBolt()).fieldsGrouping("wordcount_split_bolt",new Fields("word"));
//创建Topology任务
StormTopology wc = builder.createTopology();
//配置参数
Config conf = new Config();
//执行任务
        //方式1:本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("MyStormWordCount", conf, wc);
        //方式2:集群模式
        //StormSubmitter.submitTopology(args[0], conf, wc);
    }
 }

3.执行Storm程序

3.1本地模式执行

在上面的主程序中,修改为以方式1:本地模式运行,结果如下:

image

3.2集群模式执行

将上面的主程序中,修改为以方式2:集群模式运行,打包成StormWordCount.jar,上传到Storm集群运行,结果如下:

[root@master input]# ls StormWordCount.jar
StormWordCount.jar
[root@master input]# storm jar StormWordCount.jar demo.wordcount.WordCountTopology MyStormWordCount
***log***
641 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: MyStormWordCount

image image image
上一篇下一篇

猜你喜欢

热点阅读