Storm | WordCount
2019-07-05 本文已影响0人
icebreakeros
wordcount
应用设计
storm jar jar路径 拓扑包名.拓扑类名 拓扑名称
storm kill 拓扑名称
消息源Spout
,继承BaseRichSpout
类/实现IRichSpout
接口
open()
:初始化动作
nextTuple()
:消息接入,执行数据发射
ack()
:tuple
成功处理后调用
fail()
:tuple
处理失败时调用
declareOutputFields()
:通常声明输出字段
处理单元Bolt
,继承BaseBasicBolt
类/实现IRichBolt
接口
prepare()
:worker
启动时初始化
execute()
:接受一个tuple
并执行逻辑处理,发射出去
cleanup()
:关闭前调用
declareOutputFields()
:字段申明
编码
vim pom.xml
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.icebreakeros.boot.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
# WordCountTopology.java
public class WordCountTopology {
private static TopologyBuilder topologyBuilder = new TopologyBuilder();
public static void main(String[] args) {
Config config = new Config();
topologyBuilder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
topologyBuilder.setBolt("WordNormalizer",
new WordNormalizerBolt(),
2).shuffleGrouping("RandomSentence");
topologyBuilder.setBolt("WordCount",
new WordCountBolt(),
2).fieldsGrouping("WordNormalizer", new Fields("word"));
topologyBuilder.setBolt("Print",
new PrintBolt(), 1).shuffleGrouping("WordCount");
config.setDebug(false);
if (args != null && args.length > 0) {
config.setNumWorkers(1);
try {
StormSubmitter.submitTopology(args[0],
config, topologyBuilder.createTopology());
} catch (Exception e) {
}
} else {
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordcount",
config, topologyBuilder.createTopology());
}
}
}