Storm从入门到精通-Storm WordCount案例及常用
2019-06-29 本文已影响8人
ebd4ce191be4
1、WordCount案例分析
1.1、功能说明
设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。
整个topology分为三个部分:
RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
WordCountBolt:负责对单词的频率进行累加
1.2、TopologyMain 驱动类
public static void main(String[] args) throws Exception {
// Storm框架支持多语言,在JAVA环境下创建一个拓扑,需要使用TopologyBuilder进行构建
TopologyBuilder builder = new TopologyBuilder();
//RandomSentenceSpout类,在已知的英文句子中,随机发送一条句子出去。
builder.setSpout("spout1", new RandomSentenceSpout(), 3);
// SplitSentenceBolt类,主要是将一行一行的文本内容切割成单词
builder.setBolt("split1", new SplitSentenceBolt(), 9).shuffleGrouping("spout1");
// WordCountBolt类,对单词出现的次数进行统计
builder.setBolt("count2", new WordCountBolt(),3).fieldsGrouping("split1",new Fields("word"));
//启动topology的配置信息
Config conf = new Config();
//TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。
//这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
// conf.setDebug(true);
conf.setDebug(false);
//storm的运行有两种模式: 本地模式和分布式模式.
if (args != null && args.length > 0) {
//定义你希望集群分配多少个工作进程给你来执行这个topology
conf.setNumWorkers(3);
//向集群提交topology
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
//指定本地模式运行多长时间之后停止,如果不显式的关系程序将一直运行下去
//Utils.sleep(10000);
//cluster.shutdown();
}
}
1.3、RandomSentenceSpout
public class RandomSentenceSpout extends BaseRichSpout {
private static final long serialVersionUID = 5028304756439810609L;
//用来收集Spout输出的tuple
SpoutOutputCollector collector;
Random rand;
//该方法调用一次,主要由storm框架传入SpoutOutputCollector
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
//连接kafka mysql ,打开本地文件
}
/**
* 上帝之手
* while(true){
* spout.nexTuple()
* }
*/
public void nextTuple() {
String[] sentences = new String[] { "the cow jumped over the moon",
"the cow jumped over the moon",
"the cow jumped over the moon",
"the cow jumped over the moon", "the cow jumped over the moon" };
String sentence = sentences[rand.nextInt(sentences.length)];
collector.emit(new Values(sentence));
System.out.println("RandomSentenceSpout 发送数据:"+sentence);
}
//消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
1.4、SplitSentenceBolt
public class SplitSentenceBolt extends BaseBasicBolt {
private static final long serialVersionUID = -5283595260540124273L;
//该方法只会被调用一次,用来初始化
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}
/**
* 接受的参数是RandomSentenceSpout发出的句子,即input的内容是句子 execute方法,将句子切割形成的单词发出
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = (String)input.getValueByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
word = word.toLowerCase();
System.out.println("SplitSentenceBolt 切割单词:"+word);
collector.emit(new Values(word,1));
}
}
}
//消息源可以发射多条消息流stream。多条消息流可以理解为多种类型的数据。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
1.5、WordCountBolt
public class WordCountBolt extends BaseBasicBolt {
private static final long serialVersionUID = 5678586644899822142L;
// 用来保存最后计算的结果key=单词,value=单词个数
Map<String, Integer> counters = new HashMap<String, Integer>();
//该方法只会被调用一次,用来初始化
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}
/*
* 将collector中的元素存放在成员变量counters(Map)中.
* 如果counters(Map)中已经存在该元素,getValule并对Value进行累加操作。
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String str = (String)input.getValueByField("word");
Integer num =input.getIntegerByField("num");
System.out.println("----------------"+Thread.currentThread().getId()+" "+str);
if (!counters.containsKey(str)) {
counters.put(str, num);
} else {
Integer c = counters.get(str) + num;
counters.put(str, c);
}
System.out.println("WordCountBolt 统计单词:"+counters);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}