java实现JavaPairDStream写入hbase
2020-03-04 本文已影响0人
涓涓自然卷
热爱生活.png
好好吃饭好好睡觉.png
spark整合hbase
- 1、添加pom依赖
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
<type>pom</type>
</dependency>
<!-- /hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- /hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- /hbase-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
-2、主程序代码:
public class Demo04ReadFromKafkaToHbaseOk {
private static final Logger logger = LoggerFactory.getLogger(Demo04ReadFromKafkaToHbaseOk.class);
public static void main(String[] args) {
// 构建SparkStreaming上下文
SparkConf conf = new SparkConf()
.setAppName("Demo04ReadFromKafkaToHbaseOk")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{Demo04ReadFromKafkaToHbaseOk.class})
.set("spark.kryoserializer.buffer.mb", "256")
.set("spark.kryoserializer.buffer.max", "512");
SparkUtils.setMaster(conf);
// 绑定sc参数,并设置循环取数时间间隔为5s
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
//checkpoint目录
// jssc.checkpoint(ConfigurationManager.getProperty(Constants.STREAMING_CHECKPOINT_DIR));
jssc.checkpoint("./streaming-checkpoint");
// 构建kafka参数map
// 主要放置的是连接的kafka集群的地址(broker集群的地址列表)
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS));
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", ConfigurationManager.getProperty(Constants.GROUP_ID));
kafkaParams.put("auto.offset.reset", "latest");
//如果true,consumer定期地往zookeeper写入每个分区的offset
kafkaParams.put("enable.auto.commit", false);
// 构建topic set
String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
String[] kafkaTopicsSplited = kafkaTopics.split(",");
Collection<String> topics = new HashSet<>();
for (String kafkaTopic : kafkaTopicsSplited) {
topics.add(kafkaTopic);
}
try {
// 获取kafka的数据
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
// 获取words
JavaDStream<String> words = getWords(stream);
// words.print();
//获取word,1格式数据
JavaPairDStream<String, Integer> wordsAndOne = getWordsAndOne(words);
//聚合本次5s的拉取的数据
final JavaPairDStream<String, Integer> wordsCount = getWordsCount(wordsAndOne);
// wordsCount.print();
//历史累计 60秒checkpoint一次
DStream<Tuple2<String, Integer>> result = getResults60s(wordsAndOne);
// logger.info("==result print==");
// result.print();
//开窗函数 5秒计算一次 计算前15秒的数据聚合
JavaPairDStream<String, Integer> result2 = getResults15s(wordsAndOne);
logger.info("==result2 print==");
result2.print();
// 写入Hbase
writeToHbase(result2);
jssc.start();
jssc.awaitTermination();
jssc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取数据
*
* @param stream : JavaInputDStream
* @return :
*/
private static JavaDStream<String> getWords(JavaInputDStream<ConsumerRecord<String, String>> stream) {
return stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
@Override
public Iterator<String> call(ConsumerRecord<String, String> s) throws Exception {
List<String> list = new ArrayList<>();
//todo 获取到kafka的每条数据 进行操作
long offset = s.offset();
logger.info("ConsumerRecord<String, String> s =" + s);
logger.info("offset = " + offset); // offset = 180
logger.info("*** " + s.value() + " ***"); // *** kafka ***
list.add(s.value());
return list.iterator();
}
});
}
/**
* 映射成 Tuple2<>(wordSplit, 1)格式
*
* @param words :JavaDStream<String>
* @return :JavaPairDStream<String, Integer>
*/
private static JavaPairDStream<String, Integer> getWordsAndOne(JavaDStream<String> words) {
return (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
String wordSplit = "未知"; //
if (word.contains(" ")) {
String[] s = word.split(" ");
for (int i = 0; i < s.length; i++) {
wordSplit = s[i];
}
} else {
wordSplit = word;
}
// return new Tuple2<>(word, 1);
return new Tuple2<>(wordSplit, 1);
}
});
}
/**
* 聚合本次5s的拉取的数据
*
* @param wordsAndOne :
* @return :
*/
private static JavaPairDStream<String, Integer> getWordsCount(JavaPairDStream<String, Integer> wordsAndOne) {
return wordsAndOne.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
}
/**
* 历史累计 60秒checkpoint一次
*
* @param wordsAndOne :
* @return :
*/
private static DStream<Tuple2<String, Integer>> getResults60s(JavaPairDStream<String, Integer> wordsAndOne) {
return wordsAndOne.updateStateByKey(((Function2<List<Integer>, org.apache.spark.api.java.Optional<Integer>, org.apache.spark.api.java.Optional<Integer>>) (values, state) -> {
Integer updatedValue = 0;
if (state.isPresent()) {
updatedValue = Integer.parseInt(state.get().toString());
logger.info("updatedValue = ", updatedValue);
}
for (Integer value : values) {
updatedValue += value;
}
return Optional.of(updatedValue);
})).checkpoint(Durations.seconds(30));
}
/**
* 开窗函数 15 秒计算一次 计算前15秒的数据聚合
*
* @param wordsAndOne :
* @return :
*/
private static JavaPairDStream<String, Integer> getResults15s(JavaPairDStream<String, Integer> wordsAndOne) {
return wordsAndOne.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(15), Durations.seconds(15));
}
/**
* 写入hbase
*
* @param result2 : JavaPairDStream<String, Integer>
*/
private static void writeToHbase(JavaPairDStream<String, Integer> result2) {
result2.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
@Override
public void call(JavaPairRDD<String, Integer> v1, Time v2) throws Exception {
v1.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
HBasePoolConnection.getConnection();
String s1 = tuple2._1;
String s2 = tuple2._2.toString();
String rowKey = s1 + "00000";
TableName tableName = TableName.valueOf("wxj-test4");
Table table = HBasePoolConnection.getConnection().getTable(tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("word")
, Bytes.toBytes("name"), Bytes.toBytes(s1));
put.addColumn(Bytes.toBytes("count")
, Bytes.toBytes("number"), Bytes.toBytes(s2));
table.put(put);
logger.info("write to hbase success!");
}
});
}
});
}
}
- 3、SparkUtils代码:
public class SparkUtils {
/**
* 根据当前是否本地测试的配置
* 决定,如何设置SparkConf的master
*/
public static void setMaster(SparkConf conf) {
boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
if(local) {
conf.setMaster("local");
}
}
}
- 4、HBasePoolConnection方法:
/**
* Hbase连接池
*
* @author :
* @date :
*
*/
public class HBasePoolConnection {
private HBasePoolConnection() {
}
// 连接池
private static Connection connection = null;
// 配置文件
static Configuration hbaseConfiguration = HBaseConfiguration.create();
public static Connection getConnection() {
if (connection == null) {
ExecutorService pool = Executors.newFixedThreadPool(10);// 建立一个固定大小的线程池
hbaseConfiguration.addResource("hbase-site.xml");
try {
connection = ConnectionFactory.createConnection(hbaseConfiguration, pool);// 创建连接时,拿到配置文件和线程池
} catch (IOException e) {
e.printStackTrace();
}
}
return connection;
}
}
- 5、配置管理组件
public class ConfigurationManager {
private static Properties prop = new Properties();
static {
try {
InputStream in = ConfigurationManager.class
.getClassLoader().getResourceAsStream("application.properties");
prop.load(in);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获得属性
*
* @param key :
* @return :
*/
public static String getProperty(String key) {
return prop.getProperty(key);
}
/**
* 获取整数类型的配置项
*
* @param key :
* @return value
*/
public static Integer getInteger(String key) {
String value = getProperty(key);
try {
return Integer.valueOf(value);
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
/**
* 获取布尔类型的配置项
*
* @param key :
* @return :value
*/
public static Boolean getBoolean(String key) {
String value = getProperty(key);
try {
return Boolean.valueOf(value);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 获取Long类型的配置项
*
* @param key :
* @return :
*/
public static Long getLong(String key) {
String value = getProperty(key);
try {
return Long.valueOf(value);
} catch (Exception e) {
e.printStackTrace();
}
return 0L;
}
- 6、常量接口
/**
* 常量接口
*/
public interface Constants {
// * 项目配置相关的常量
// Cluster Mode Config :
String SPARK_LOCAL = "spark.local";
// * Spark作业相关的常量
String SPARK_SERIALIZER = "spark.serializer"; // 序列化类型
// * KAFKA
String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
String GROUP_ID = "group.id";
String KAFKA_TOPICS = "kafka.topics";
String STREAMING_CHECKPOINT_DIR = "streaming.checkpoint.dir";
// * Hbase
String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
String HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort";
}
- 7、application.properties
# Spark Config #
spark.local = ${spark.local}
spark.serializer = ${spark.serializer}
# KAFKA
kafka.bootstrap.servers = ${bootstrap.servers}
group.id = ${group.id}
kafka.topics = ${kafka.topics}
streaming.checkpoint.dir = ${streaming.checkpoint.dir}
# HBASE
hbase.zookeeper.quorum = ${hbase.zookeeper.quorum}
hbase.zookeeper.property.clientPort = ${hbase.zookeeper.property.clientPort}
# END Config #
myvalue=${myvalue}
- 8、dev.properties
# Spark Config #
spark.local = true
spark.serializer = org.apache.spark.serializer.KryoSerializer
# KAFKA
bootstrap.servers=×××.×××.×××.×××:9092,×××.×××.×××.×××:9092,×××.×××.×××.×××:9092
#bootstrap.servers=localhost:9092
kafka.topics=test-wxj1
#kafka.topics=myFlumeKafka
group.id=wxj
streaming.checkpoint.dir=hdfs://×××.×××.×××.×××:8022/streaming-checkpoint
# HBASE
hbase.zookeeper.quorum=cdh1,cdh2,cdh3
hbase.zookeeper.property.clientPort=2181
# end config #
myvalue=dev
好好吃饭好好睡觉.png