Spark + Hbase 自定义读取分片数据、深挖内部原理
大猪 见很多文章都写了
Hbase
如何设计rowkey
避免热点问题,就连 大猪 的文章也写过这样的优化,但是只说到了优化的点上,那如何方便读取呢?刚才就有一位老朋友跟我说他的方案,他是做了16个预分区,然后就把16个分区的数据使用spark的union起来,组成16个RDD,牛批的孩子,看到他这么干,我得写篇文章出来探讨一下这个问题了。
Rowkey设计
在设计Hbase的rowkey的时候,我们往往会在高位上设置加上数字或者是Hash用来打散数据,特别是日志数据。
举个例子:
00|2019010100000|ab2d3c
41|2019010100001|ab2d3c
ee|2019010100002|ab2d3c
假设有8台RS,表创建的时候就要使用预分区,就像下面一样。
create 'logTable',{NAME => 'info',
CONFIGURATION => {
'SPLIT_POLICY' =>'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy',
'KeyPrefixRegionSplitPolicy.prefix_length'=>'2'
},COMPRESSION=>'SNAPPY'},
SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']
实际的表生成rowkey范围就会像下面这样
Hbase 自定义读取分片数据
上面我们的三条就会根据rowkey前2个位自动选择分区
这样就达到打散的效果,热点问题就不会产生了。
但是:我们如何同一分钟的数据会打到不同的分区,我们不能预先知道数据在哪些分区,通过一个Scan是查不完的,必要把所有分区都查下遍,才知道分区中有没有我们想的数据。
00 => 会放在第一个分区 空 ~ 20
41 => 会放在第三个分区 40 ~ 60
e0 => 会放在最后一个分区 e0 ~ 空
Hbase给我们的 TableInputFormat API里面只有设置start
跟end
,那让我怎么去读取这些分区的数据?
我的那个老朋友的做法我已经猜到了他是怎么玩的了,因为他预分区是16个,就是0~f,也就是他去按照分区加start跟end去scan16次,每次得到一个RDD,再union起来就是他要的数据结果。
var unionRdd:RDD[_] = null
Array("1","2"...)
.foreach(index => {
val conf = new Configuration
conf.set(TableInputFormat.SCAN_ROW_START,index + "|20190101000000")
conf.set(TableInputFormat.SCAN_ROW_STOP,index + "|20190101000000")
val rdd = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
unionRdd = logRdd.union(rdd)
})
大概就是这样写,unionRdd是最后合成的一个大RDD,后面用来计算。
其实我的老朋友这样写其实也是可以的。我只想说,你真会玩。
Hbase 自定义读取分片数据
不知道union在spark上,是会产生shuffle操作的么?
源码分析
来来来,我们来看一下TableInputFormat的源码到底是怎么处理读取Hbase的分区数据的:
我们看TableInputFormat
类中,从getSplits
=> oneInputSplitPerRegion
=> 挖出这个方法
private List<InputSplit> oneInputSplitPerRegion() throws IOException {
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(this.getRegionLocator(), this.getAdmin());
TableName tableName = this.getTable().getName();
Pair<byte[][], byte[][]> keys = this.getStartEndKeys();
if (keys != null && keys.getFirst() != null && ((byte[][])keys.getFirst()).length != 0) {
List<InputSplit> splits = new ArrayList(((byte[][])keys.getFirst()).length);
for(int i = 0; i < ((byte[][])keys.getFirst()).length; ++i) {
if (this.includeRegionInSplit(((byte[][])keys.getFirst())[i], ((byte[][])keys.getSecond())[i])) {
byte[] startRow = this.scan.getStartRow();
byte[] stopRow = this.scan.getStopRow();
...
// ============= 重点 ===============
TableSplit split = new TableSplit(tableName, this.scan, splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split);
// ============= 重点 ===============
}
}
}
return splits;
}
- 其实从前三句就可以看出来了,通admin去拿到hbase表的所有分片信息。
返回的多个InputSplit
对应上的就是Spark的多个分区,如果有Hbase16个分片就会有16个分区。
我们可以从NewHadoopRDD
类中的getPartitions
挖出来确实是这样子的。
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(_conf)
case _ =>
}
// =========== T =========
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
// =========== T =========
val rawSplits = if (ignoreEmptySplits) {
allRowSplits.filter(_.getLength > 0)
} else {
allRowSplits
}
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
- 看看
oneInputSplitPerRegion
方法上面注释重点的地方,其实就是你们写在程序conf配置上的 start = splitStart 跟 end = splitStop ,或者还有scan的各种过滤器等,再加的Hbase的 regionLocation 就组成一个分区查询了,你们的数据就是这么被Spark在每个分区上查出来给你们的。
TableSplit split = new TableSplit(tableName, this.scan, splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split);
看到文章这里进度的小伙伴们,是不是已经想到怎么做了?
结论
既然的数据在分区上,我们重写TableInputFormat的getSplits
获取分区就行了。
scala版本TableInputFormat2
class TableInputFormat2 extends TableInputFormat {
@throws(classOf[IOException])
override def getSplits(context: JobContext): util.List[InputSplit] = {
this.initialize(context)
val conf = context.getConfiguration
//scan 的rowkey start 开始范围
val start = conf.get(TableInputFormat.SCAN_ROW_START, "")
//scan 的rowkey end 结束范围
val end = conf.get(TableInputFormat.SCAN_ROW_STOP, "")
//预分区起始,例如:0
val splitStart = conf.get("hbase.table.split.startkey", "0")
//邓分区结束,例如:f
val splitEnd = conf.get("hbase.table.split.endkey", "")
//预分区进制
val rowkeyRadix = conf.getInt("hbase.table.split.radix", 10)
//连接符号,例如:00|20190101
val rowkeyConcat = conf.get("hbase.table.split.concat", "")
//直接读取指定分区
val regionSplits = conf.get("hbase.table.split.list", "")
val numLength = Math.max(splitEnd.length, 1)
val preString = "000000000000000000000000000000"
val scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN))
if (StringUtils.isNotBlank(regionSplits) || StringUtils.isNoneBlank(splitEnd)) {
var repls: Array[String] = null
if (StringUtils.isNotBlank(regionSplits)) {
repls = regionSplits.trim.split(",", -1)
} else {
if (rowkeyRadix == 10 || rowkeyRadix == 16) {
repls = (lang.Long.valueOf(splitStart, rowkeyRadix).toInt to lang.Long.valueOf(splitEnd, rowkeyRadix).toInt)
.map(x => if (rowkeyRadix == 16) Integer.toHexString(x) else x.toString)
.map(i => s"$preString$i".takeRight(numLength))
.toArray
} else throw new Exception(rowkeyRadix + " => radix only working in ( 16 | 8 )")
}
repls
.map {
prefix =>
val location = getRegionLocator.getRegionLocation(Bytes.toBytes(prefix))
val splitStart: Array[Byte] = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(start))
val splitStop: Array[Byte] = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(end))
new TableSplit(getTable.getName, scan, splitStart, splitStop, location.getHostname)
}.toList
} else {
super.getSplits(context)
}
}
}
- 以上实现了10进制与16进制读取分区操作
- 也可以直接指定分区读取
要上scala
的代码了
//比如要查一天的数据
val conf: Configuration = new Configuration()
conf.set("hbase.mapreduce.inputtable", "logTable")
conf.set(TableInputFormat.SCAN_ROW_START, "20190101000000")
conf.set(TableInputFormat.SCAN_ROW_STOP, "20190102000000")
conf.set("hbase.table.split.startkey", "0")
conf.set("hbase.table.split.endkey", "f")
conf.set("hbase.table.split.radix", 16)
conf.set("hbase.table.split.concat", "|")
val logRdd = sc.newAPIHadoopRDD(
conf,
//classOf[TableInputFormat],
classOf[TableInputFormat2],
classOf[ImmutableBytesWritable],
classOf[Result]
)
println(logRdd.count())
不会scala?
Spark 读取 Hbase 自定义读取分片数据给你个Java版本的好了
public class TableInputFormat2 extends TableInputFormat {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
this.initialize(context);
Configuration conf = context.getConfiguration();
String start = conf.get(TableInputFormat.SCAN_ROW_START, "");
String end = conf.get(TableInputFormat.SCAN_ROW_STOP, "");
String splitStart = conf.get("hbase.table.split.startkey", "0");
String splitEnd = conf.get("hbase.table.split.endkey", "");
int rowkeyRadix = conf.getInt("hbase.table.split.radix", 10);
String rowkeyConcat = conf.get("hbase.table.split.concat", "");
String regionSplits = conf.get("hbase.table.split.list", "");
int numLength = Math.max(splitEnd.length(), 1);
String preString = "000000000000000000000000000000";
Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
if (StringUtils.isNotBlank(regionSplits) || StringUtils.isNoneBlank(splitEnd)) {
String[] repls = null;
if (StringUtils.isNotBlank(regionSplits)) {
repls = regionSplits.trim().split(",", -1);
} else {
Integer s = Integer.parseInt(splitStart);
Integer e = Long.valueOf(end, rowkeyRadix).intValue() + 1;
repls = IntStream
.range(s, e)
.mapToObj(value -> {
if (rowkeyRadix == 16) {
String ss = preString + Integer.toHexString(value);
return StringUtils.substring(ss, ss.length() - numLength, ss.length());
} else {
String ss = String.valueOf(value);
return StringUtils.substring(ss, ss.length() - numLength, ss.length());
}
})
.collect(Collectors.toList()).toArray(new String[]{});
}
List<InputSplit> splitLists = new ArrayList<>();
for (String prefix : repls) {
HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(prefix));
byte[] _splitStart = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(start));
byte[] _splitStop = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(end));
new TableSplit(getTable().getName(), scan, _splitStart, _splitStop, location.getHostname());
}
return splitLists;
}
return super.getSplits(context);
}
}
程序
SparkSession spark = SparkSession.builder()
.appName("test")
.master("local")
.getOrCreate();
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.mapreduce.inputtable", "logTable");
conf.set(TableInputFormat.SCAN_ROW_START, "20190101000000");
conf.set(TableInputFormat.SCAN_ROW_STOP, "20190102000000");
conf.set("hbase.table.split.startkey", "0");
conf.set("hbase.table.split.endkey", "f");
conf.setInt("hbase.table.split.radix", 16);
conf.set("hbase.table.split.concat", "|");
RDD<Tuple2<ImmutableBytesWritable, Result>> logRdd = spark.sparkContext()
.newAPIHadoopRDD(
conf,
TableInputFormat3.class,
ImmutableBytesWritable.class,
Result.class
);
System.out.println(logRdd.count());
运行没错请告诉,反正我没运行过。
Hbase TableInputFormat