Storm基础开发案例-数字累加
2020-02-03 本文已影响0人
学术界末流打工人
需求
使用Storm实现累加求和的操作
开发过程
1. Spout 定义
首先 Spout需要继承BaseRichSpout
public class LocalSumStormTopology {
public static class DataSourceSpout extends BaseRichSpout{
}
}
接下来需要实现 BasheRichSpout的三个方法
- open
- nextTuple
- declareOutputFields
open方法实现
// 因为数据需要发出去所以两个地方需要用,在这里初始化一个
private SpoutOutputCollector collector;
/**
* 初始化只会被调用一次
* @param map 配置参数
* @param topologyContext 上下文
* @param spoutOutputCollector 数据发射器
*/
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = collector;
}
nextTuple方法实现
int number = 0;
/**
* 产生数据,在生产上肯定从消息队列中获取数据
*
* 这个方法是个死循环,一直不停的执行
*/
public void nextTuple() {
this.collector.emit(new Values(number++));
System.out.println("Spout:"+number);
// 防止数据产生太快
Utils.sleep(1000);
}
declareOutputFields方法实现
/**
* 声明输出字段
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// num字段是对应nextTuple中的 Values值得数量
outputFieldsDeclarer.declare(new Fields("num"));
}
2. Bolt 定义
需要继承BaseRichBolt类
public static class SumBolt extends BaseRichBolt{
}
接下来需要实现 BaseRichBolt的三个方法
- prepare
- execute
- declareOutputFields
prepare 方法实现
/**
* 初始化方法,会被执行一次
* @param map
* @param topologyContext
* @param outputCollector
*/
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
// 本场景不需要任何实际内容
}
execute 方法实现
int sum = 0;
/**
* 其实也是一个死循环,职责:获取Spout发送过来的数据
* @param tuple
*/
public void execute(Tuple tuple) {
// 获取发送过来的字段
//Bolt中获取值可以根据index获取,也可以根据上一个环节中
//定义的field的名称获取(建议使用该方式)
Integer val = tuple.getIntegerByField("num");
sum +=val;
System.out.println("Bolt: sum = ["+sum+"]");
}
declareOutputFields 方法实现
/**
* 目前需求不需要往下继续发,所以暂且为空
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
测试
/**
* 测试
*/
public static void main(String[] args) {
// TopologyBuilder根据Spout和Bolt来构建出Topology
// Storm中任何一个作业都是通过Topology的方式提交的
// Topology中需要指定Spout和Bolt的执行顺序
TopologyBuilder builder = new TopologyBuilder();
//关联 Spout Bolt
builder.setSpout("DataSourceSpout",new DataSourceSpout());
// shuffleGrouping方法需要穿componentId,指的是去哪里拿数据,就指定哪个Id,这里是去DataSourceSpout拿
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
// 创建一个本地Storm集群:本地模式运行,不需要搭建Storm集群
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalSumStormTopology",new Config(), builder.createTopology());
}
整体代码
package com.imooc.bigdata;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* 1. 使用Storm实现积累求和的操作
*/
public class LocalSumStormTopology {
/**
1.Spout 需要继承BaseRichSpout
数据源需要产生数据并发射
*/
public static class DataSourceSpout extends BaseRichSpout{
// 因为数据需要发出去所以两个地方需要用,在这里初始化一个
private SpoutOutputCollector collector;
/**
* 1.1
* 初始化只会被调用一次
* @param map 配置参数
* @param topologyContext 上下文
* @param spoutOutputCollector 数据发射器
*/
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = collector;
}
int number = 0;
/**
* 1.2
* 产生数据,在生产上肯定从消息队列中获取数据
*
* 这个方法是个死循环,一直不停的执行
*/
public void nextTuple() {
this.collector.emit(new Values(number++));
System.out.println("Spout:"+number);
// 防止数据产生太快
Utils.sleep(1000);
}
/**
* 1.3
* 声明输出字段
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// num字段是对应nextTuple中的 Values值得数量
outputFieldsDeclarer.declare(new Fields("num"));
}
}
/**
* 2
* 数据的累计求和Bolt:接收数据并处理
*/
public static class SumBolt extends BaseRichBolt{
/**
* 初始化方法,会被执行一次
* @param map
* @param topologyContext
* @param outputCollector
*/
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
// 本场景不需要任何实际内容
}
int sum = 0;
/**
* 其实也是一个死循环,职责:获取Spout发送过来的数据
* @param tuple
*/
public void execute(Tuple tuple) {
// 获取发送过来的字段
//Bolt中获取值可以根据index获取,也可以根据上一个环节中
//定义的field的名称获取(建议使用该方式)
Integer val = tuple.getIntegerByField("num");
sum +=val;
System.out.println("Bolt: sum = ["+sum+"]");
}
/**
* 目前需求不需要往下继续发,所以暂且为空
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
/**
* 测试
*/
public static void main(String[] args) {
// TopologyBuilder根据Spout和Bolt来构建出Topology
// Storm中任何一个作业都是通过Topology的方式提交的
// Topology中需要指定Spout和Bolt的执行顺序
TopologyBuilder builder = new TopologyBuilder();
//关联 Spout Bolt
builder.setSpout("DataSourceSpout",new DataSourceSpout());
// shuffleGrouping方法需要穿componentId,指的是去哪里拿数据,就指定哪个Id,这里是去DataSourceSpout拿
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
// 创建一个本地Storm集群:本地模式运行,不需要搭建Storm集群
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalSumStormTopology",new Config(), builder.createTopology());
}
}