大数据分析大数据玩转大数据

Spark + Hbase 自定义读取分片数据、深挖内部原理

2019-03-24  本文已影响9人  大猪大猪

大猪 见很多文章都写了Hbase如何设计rowkey避免热点问题,就连 大猪 的文章也写过这样的优化,但是只说到了优化的点上,那如何方便读取呢?刚才就有一位老朋友跟我说他的方案,他是做了16个预分区,然后就把16个分区的数据使用spark的union起来,组成16个RDD,牛批的孩子,看到他这么干,我得写篇文章出来探讨一下这个问题了。

Rowkey设计

在设计Hbase的rowkey的时候,我们往往会在高位上设置加上数字或者是Hash用来打散数据,特别是日志数据。
举个例子:

00|2019010100000|ab2d3c
41|2019010100001|ab2d3c
ee|2019010100002|ab2d3c

假设有8台RS,表创建的时候就要使用预分区,就像下面一样。

create 'logTable',{NAME => 'info',
CONFIGURATION => {
'SPLIT_POLICY' =>'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy',
'KeyPrefixRegionSplitPolicy.prefix_length'=>'2'
},COMPRESSION=>'SNAPPY'},
SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']

实际的表生成rowkey范围就会像下面这样


Hbase 自定义读取分片数据

上面我们的三条就会根据rowkey前2个位自动选择分区
这样就达到打散的效果,热点问题就不会产生了。

但是:我们如何同一分钟的数据会打到不同的分区,我们不能预先知道数据在哪些分区,通过一个Scan是查不完的,必要把所有分区都查下遍,才知道分区中有没有我们想的数据。

00 => 会放在第一个分区    空 ~ 20
41 => 会放在第三个分区    40 ~ 60
e0 => 会放在最后一个分区   e0 ~ 空

Hbase给我们的 TableInputFormat API里面只有设置startend,那让我怎么去读取这些分区的数据?

我的那个老朋友的做法我已经猜到了他是怎么玩的了,因为他预分区是16个,就是0~f,也就是他去按照分区加start跟end去scan16次,每次得到一个RDD,再union起来就是他要的数据结果。

var unionRdd:RDD[_] = null
Array("1","2"...)
      .foreach(index => {
          val conf = new Configuration
          conf.set(TableInputFormat.SCAN_ROW_START,index + "|20190101000000")
          conf.set(TableInputFormat.SCAN_ROW_STOP,index + "|20190101000000")
          val rdd = sc.newAPIHadoopRDD(
          conf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result]
          )
         unionRdd = logRdd.union(rdd)
      })

大概就是这样写,unionRdd是最后合成的一个大RDD,后面用来计算。
其实我的老朋友这样写其实也是可以的。我只想说,你真会玩。


Hbase 自定义读取分片数据

不知道union在spark上,是会产生shuffle操作的么?

源码分析

来来来,我们来看一下TableInputFormat的源码到底是怎么处理读取Hbase的分区数据的:
我们看TableInputFormat类中,从getSplits => oneInputSplitPerRegion => 挖出这个方法

private List<InputSplit> oneInputSplitPerRegion() throws IOException {
        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(this.getRegionLocator(), this.getAdmin());
        TableName tableName = this.getTable().getName();
        Pair<byte[][], byte[][]> keys = this.getStartEndKeys();
        if (keys != null && keys.getFirst() != null && ((byte[][])keys.getFirst()).length != 0) {
            List<InputSplit> splits = new ArrayList(((byte[][])keys.getFirst()).length);

            for(int i = 0; i < ((byte[][])keys.getFirst()).length; ++i) {
                if (this.includeRegionInSplit(((byte[][])keys.getFirst())[i], ((byte[][])keys.getSecond())[i])) {
                    byte[] startRow = this.scan.getStartRow();
                    byte[] stopRow = this.scan.getStopRow();
                    ...
                    // ============= 重点 ===============
                    TableSplit split = new TableSplit(tableName, this.scan, splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
                    splits.add(split);
                    // ============= 重点 ===============
                    }
                }
            }

            return splits;
        }
  1. 其实从前三句就可以看出来了,通admin去拿到hbase表的所有分片信息。
    返回的多个InputSplit对应上的就是Spark的多个分区,如果有Hbase16个分片就会有16个分区。

我们可以从NewHadoopRDD类中的getPartitions挖出来确实是这样子的。

override def getPartitions: Array[Partition] = {
    val inputFormat = inputFormatClass.newInstance
    inputFormat match {
      case configurable: Configurable =>
        configurable.setConf(_conf)
      case _ =>
    }
    // =========== T =========
    val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
    // =========== T =========
    val rawSplits = if (ignoreEmptySplits) {
      allRowSplits.filter(_.getLength > 0)
    } else {
      allRowSplits
    }
    val result = new Array[Partition](rawSplits.size)
    for (i <- 0 until rawSplits.size) {
      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    }
    result
  }
  1. 看看oneInputSplitPerRegion方法上面注释重点的地方,其实就是你们写在程序conf配置上的 start = splitStartend = splitStop ,或者还有scan的各种过滤器等,再加的Hbase的 regionLocation 就组成一个分区查询了,你们的数据就是这么被Spark在每个分区上查出来给你们的。
TableSplit split = new TableSplit(tableName, this.scan, splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split);

看到文章这里进度的小伙伴们,是不是已经想到怎么做了?

结论

既然的数据在分区上,我们重写TableInputFormat的getSplits获取分区就行了。

scala版本TableInputFormat2

class TableInputFormat2 extends TableInputFormat {

  @throws(classOf[IOException])
  override def getSplits(context: JobContext): util.List[InputSplit] = {
    this.initialize(context)
    val conf = context.getConfiguration
    //scan 的rowkey start 开始范围
    val start = conf.get(TableInputFormat.SCAN_ROW_START, "")
    //scan 的rowkey end 结束范围
    val end = conf.get(TableInputFormat.SCAN_ROW_STOP, "")
    //预分区起始,例如:0
    val splitStart = conf.get("hbase.table.split.startkey", "0")
    //邓分区结束,例如:f  
    val splitEnd = conf.get("hbase.table.split.endkey", "")
    //预分区进制
    val rowkeyRadix = conf.getInt("hbase.table.split.radix", 10)
    //连接符号,例如:00|20190101
    val rowkeyConcat = conf.get("hbase.table.split.concat", "")
    //直接读取指定分区
    val regionSplits = conf.get("hbase.table.split.list", "")
    val numLength = Math.max(splitEnd.length, 1)
    val preString = "000000000000000000000000000000"
    val scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN))
    if (StringUtils.isNotBlank(regionSplits) || StringUtils.isNoneBlank(splitEnd)) {
      var repls: Array[String] = null
      if (StringUtils.isNotBlank(regionSplits)) {
        repls = regionSplits.trim.split(",", -1)
      } else {
        if (rowkeyRadix == 10 || rowkeyRadix == 16) {
          repls = (lang.Long.valueOf(splitStart, rowkeyRadix).toInt to lang.Long.valueOf(splitEnd, rowkeyRadix).toInt)
            .map(x => if (rowkeyRadix == 16) Integer.toHexString(x) else x.toString)
            .map(i => s"$preString$i".takeRight(numLength))
            .toArray
        } else throw new Exception(rowkeyRadix + " => radix only working in ( 16 | 8 )")
      }
      repls
        .map {
          prefix =>
            val location = getRegionLocator.getRegionLocation(Bytes.toBytes(prefix))

            val splitStart: Array[Byte] = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(start))
            val splitStop: Array[Byte] = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(end))
            new TableSplit(getTable.getName, scan, splitStart, splitStop, location.getHostname)
        }.toList
    } else {
      super.getSplits(context)
    }
  }
}
  1. 以上实现了10进制与16进制读取分区操作
  2. 也可以直接指定分区读取

要上scala的代码了

Hbase 自定义读取分片数据
//比如要查一天的数据
val conf: Configuration = new Configuration()
conf.set("hbase.mapreduce.inputtable", "logTable")
conf.set(TableInputFormat.SCAN_ROW_START, "20190101000000")
conf.set(TableInputFormat.SCAN_ROW_STOP, "20190102000000")
conf.set("hbase.table.split.startkey", "0")
conf.set("hbase.table.split.endkey", "f")
conf.set("hbase.table.split.radix", 16)
conf.set("hbase.table.split.concat", "|")

val logRdd = sc.newAPIHadoopRDD(
          conf,
          //classOf[TableInputFormat],
          classOf[TableInputFormat2],
          classOf[ImmutableBytesWritable],
          classOf[Result]
        )
println(logRdd.count())

不会scala?

Spark 读取 Hbase 自定义读取分片数据

给你个Java版本的好了

public class TableInputFormat2 extends TableInputFormat {

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        this.initialize(context);
        Configuration conf = context.getConfiguration();
        String start = conf.get(TableInputFormat.SCAN_ROW_START, "");
        String end = conf.get(TableInputFormat.SCAN_ROW_STOP, "");
        String splitStart = conf.get("hbase.table.split.startkey", "0");
        String splitEnd = conf.get("hbase.table.split.endkey", "");
        int rowkeyRadix = conf.getInt("hbase.table.split.radix", 10);
        String rowkeyConcat = conf.get("hbase.table.split.concat", "");
        String regionSplits = conf.get("hbase.table.split.list", "");
        int numLength = Math.max(splitEnd.length(), 1);
        String preString = "000000000000000000000000000000";
        Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
        if (StringUtils.isNotBlank(regionSplits) || StringUtils.isNoneBlank(splitEnd)) {
            String[] repls = null;
            if (StringUtils.isNotBlank(regionSplits)) {
                repls = regionSplits.trim().split(",", -1);
            } else {
                Integer s = Integer.parseInt(splitStart);
                Integer e = Long.valueOf(end, rowkeyRadix).intValue() + 1;
                repls = IntStream
                        .range(s, e)
                        .mapToObj(value -> {
                            if (rowkeyRadix == 16) {
                                String ss = preString + Integer.toHexString(value);
                                return StringUtils.substring(ss, ss.length() - numLength, ss.length());
                            } else {
                                String ss = String.valueOf(value);
                                return StringUtils.substring(ss, ss.length() - numLength, ss.length());
                            }
                        })
                        .collect(Collectors.toList()).toArray(new String[]{});

            }
            List<InputSplit> splitLists = new ArrayList<>();
            for (String prefix : repls) {
                HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(prefix));
                byte[] _splitStart = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(start));
                byte[] _splitStop = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(end));
                new TableSplit(getTable().getName(), scan, _splitStart, _splitStop, location.getHostname());
            }
            return splitLists;
        }
        return super.getSplits(context);
    }
}

程序

SparkSession spark = SparkSession.builder()
                .appName("test")
                .master("local")
                .getOrCreate();

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.mapreduce.inputtable", "logTable");
        conf.set(TableInputFormat.SCAN_ROW_START, "20190101000000");
        conf.set(TableInputFormat.SCAN_ROW_STOP, "20190102000000");
        conf.set("hbase.table.split.startkey", "0");
        conf.set("hbase.table.split.endkey", "f");
        conf.setInt("hbase.table.split.radix", 16);
        conf.set("hbase.table.split.concat", "|");

        RDD<Tuple2<ImmutableBytesWritable, Result>> logRdd = spark.sparkContext()
                .newAPIHadoopRDD(
                        conf,
                        TableInputFormat3.class,
                        ImmutableBytesWritable.class,
                        Result.class
                );

System.out.println(logRdd.count());

运行没错请告诉,反正我没运行过。


Hbase TableInputFormat
上一篇下一篇

猜你喜欢

热点阅读