Storm从入门到精通11:Storm与JDBC集成
2020-04-04 本文已影响0人
金字塔下的小蜗牛
1.导入Jar包
将Storm处理的结果存放到MySQL数据库中,需要依赖下面一些Jar包:
$STORM_HOME\external\sql\storm-sql-core*.jar
$STORM_HOME\external\storm-jdbc\storm-jdbc-1.0.3.jar
mysql-connector-java-5.1.7-bin.jar
commons-lang3-3.1.jar
2.示例
将Storm的计算结果存入MySQL:以Storm的WordCount程序为例
2.1创建Spout
创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源
public class WordCountSpout extends BaseRichSpout{
//模拟数据
private String[] data = {"I love Beijing",
"I love China",
"Beijing is the capital of China"};
//用于往下一个组件发送消息
private SpoutOutputCollector collector;
@Override
public void nextTuple(){
Utils.sleep(3000);
//由Storm框架调用,用于接收外部数据源的数据
int random = (new Random()).nextInt(3);
String sentence = data[random];
//System.out.println("发送数据:"+sentence);
this.collector.emit(new Values(sentence));
}
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
//Spout初始化方法
this.collector = collector;
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("sentence"));
}
}
2.2创建Bolt
创建Bolt(WordCountSplitBolt)组件进行分词操作
public class WordCountSplitBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void execute(Tuple tuple){
String sentence = tuple.getStringByField("sentence");
//分词
String[] words = sentence.split(" ");
for(String word:words){
this.collector.emit(new Values(word,1));
}
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector){
this.collector = collector;
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word","count"));
}
}
2.3创建Bolt
创建Bolt(WordCountBoltCount)组件进行单词计数操作
public class WordCountBoltCount extends BaseRichBolt{
private Map<String, Integer> result = new HashMap<String, Integer>();
private OutputCollector collector;
@Override
public void execute(Tuple tuple){
String word = tuple.getStringByField("word");
int count = tuple.getIntegerByField("count");
if(result.containsKey(word)){
int total = result.get(word);
result.put(word,total+count);
}else{
result.put(word,1);
}
//输出结果到屏幕
//System.out.println("输出的结果是:"+result);
//将统计结果发送给下一个Bolt,即插入MySQL数据库
this.collector.emit(new Values(word,result.get(word)));
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector){
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word","total"));
}
}
2.4创建主程序Topology
创建主程序Topology(WordCountTopology)
public static class WordCountTopology{
//创建JDBC Insert Bolt组件,需要实现在MySQL中创建对应的表:result
private static IRichBolt createJDBCBolt(){
ConnectionProvider connectionProvider = new MyConnectionProvider();
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("aaa",connectionProvider);
return new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
.withTableName("result").withQueryTimeoutSecs(30);
}
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
//设置任务的Spout组件
builder.setSpout("wordcount_spout",new WordCountSpout());
//设置任务的第一个Bolt组件
builder.setBolt("wordcount_splitbolt",new WordCountSplitBolt())
.shuffleGrouping("wordcount_spout");
//设置任务的第二个Bolt组件
builder.setBolt("wordcount_count",new WordCountBoltCount())
.filedsGrouping("wordcount_splitbolt",new Fields("word"));
//创建Topology任务
StormTopology wc = builder.createTopology();
Config config = new Config();
//提交到Storm集群运行
StormSubmitter.submitTopology(args[0],config,wc);
}
}
2.5实现ConnectionProvider接口
class MyConnectionProvider implements ConnectionProvider{
private static String driver = "com.mysql.jdbc.Driver";
private static String url = "jdbc:mysql://192.168.126.110:3306/demo";
private static String user = "root";
private static String password = "123456";
static{
try{
Class.forName(dirver);
}catch(ClassNotFoundException e){
throw new ExecptionInInitializerError(e);
}
}
@Override
public Connection getConnection(){
try{
return DriverManager.getConnection(usl,user,password);
}catch(SQLException e){
e.printStackTrace();
}
return null;
}
public void cleanup(){}
public void prepare(){}
}