Storm整合 HDFS/HBase
2020-03-27 本文已影响0人
TZX_0710
一、整合HDFS
新增hdfs的pom包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>storm</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>
<!--Storm 整合 HDFS 依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
</project>
修改前面讲解的wordCount的示例
package example;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.topology.TopologyBuilder;
public class DataToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME","root" );
//定义输出字段的分隔符
RecordFormat recordFormat=new DelimitedRecordFormat()
.withFieldDelimiter( "|" );
//设置同步策略每200个Tuples从缓存刷新到Hdfs
SyncPolicy syncPolicy=new CountSyncPolicy(200 );
//文件策略:设置每个文件的上限为20M 超过限定则新建文件继续写入
FileRotationPolicy rotationPolicy=new FileSizeRotationPolicy( 20.0f, FileSizeRotationPolicy.Units.MB );
//定义存储路径
FileNameFormat fileNameFormat=new DefaultFileNameFormat().withPath( "/storm/HdfsFile" );
//定义HdfsBolt
HdfsBolt hdfsBolt=new HdfsBolt()
.withFsUrl( "hdfs://node01:8020" )
.withFileNameFormat( fileNameFormat )
.withRecordFormat( recordFormat )
.withRotationPolicy( rotationPolicy )
.withSyncPolicy( syncPolicy );
//构建Topology
TopologyBuilder topologyBuilder=new TopologyBuilder();
topologyBuilder.setSpout( DATA_SOURCE_SPOUT,new StormWordCount());
//保存至hdfs
topologyBuilder.setBolt( HDFS_BOLT,hdfsBolt,1 ).shuffleGrouping( DATA_SOURCE_SPOUT );
//判断是否外部传参
if(args.length>0){
try {
StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), topologyBuilder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}else{
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalDataToHdfsApp",
new Config(), topologyBuilder.createTopology());
}
}
}
HDFS存储
二、整合HBase
引入相关依赖
<!--Storm 整合 HBase 依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.2.2</version>
</dependency>
编写示例
/**
* 产生词频样本的数据源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
//对数据进行处理 分割之类的操作
/**
* 将每行数据按照指定分隔符进行拆分
*/
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(tuple(word, 1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
//对单词数据统计
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
//编写main方法
/**
* 进行词频统计 并将统计结果存储到 HBase 中
*/
public class WordCountToHBaseApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String HBASE_BOLT = "hbaseBolt";
public static void main(String[] args) {
// storm 的配置
Config config = new Config();
// HBase 的配置
Map<String, Object> hbConf = new HashMap<>();
hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase");
hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");
// 将 HBase 的配置传入 Storm 的配置中
config.put("hbase.conf", hbConf);
// 定义流数据与 HBase 中数据的映射
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("word","count"))
.withColumnFamily("info");
/*
* 给 HBaseBolt 传入表名、数据映射关系、和 HBase 的配置信息
* 表需要预先创建: create 'WordCount','info'
*/
HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
.withConfigKey("hbase.conf");
// 构建 Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
// save to HBase
builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp",
config, builder.createTopology());
}
}
}
流程
DATA_SOURCE_SPOUT产生数据===>然后Split对数据进行分割 根据\t分割
COUNT_BOLT对数据进行统计统计完成之后把数据丢到HBase数据库