Storm | WordCount

2019-07-05  本文已影响0人  icebreakeros

wordcount

应用设计

storm jar jar路径 拓扑包名.拓扑类名 拓扑名称
storm kill 拓扑名称

消息源Spout,继承BaseRichSpout类/实现IRichSpout接口
open():初始化动作
nextTuple():消息接入,执行数据发射
ack()tuple成功处理后调用
fail()tuple处理失败时调用
declareOutputFields():通常声明输出字段

处理单元Bolt,继承BaseBasicBolt类/实现IRichBolt接口
prepare()worker启动时初始化
execute():接受一个tuple并执行逻辑处理,发射出去
cleanup():关闭前调用
declareOutputFields():字段申明

编码

vim pom.xml
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.icebreakeros.boot.Application</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>
    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.0</version>
    </dependency>
</dependencies>

# WordCountTopology.java
public class WordCountTopology {

    private static TopologyBuilder topologyBuilder = new TopologyBuilder();

    public static void main(String[] args) {
        Config config = new Config();
        topologyBuilder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        topologyBuilder.setBolt("WordNormalizer", 
            new WordNormalizerBolt(), 
            2).shuffleGrouping("RandomSentence");
        
        topologyBuilder.setBolt("WordCount", 
            new WordCountBolt(), 
            2).fieldsGrouping("WordNormalizer", new Fields("word"));

        topologyBuilder.setBolt("Print", 
            new PrintBolt(), 1).shuffleGrouping("WordCount");
        config.setDebug(false);

        if (args != null && args.length > 0) {
            config.setNumWorkers(1);
            try {
                StormSubmitter.submitTopology(args[0], 
                    config, topologyBuilder.createTopology());
            } catch (Exception e) {

            }
        } else {
            config.setMaxTaskParallelism(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordcount", 
                config, topologyBuilder.createTopology());
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读