Phoenix&HBase数据类型转换
HBase 是什么?
在HBase官网中对HBase的描述为“ Apache HBase™ is the Hadoop database, a distributed, scalable, big data store”,直译过来就是HBase是基于Hadoop的数据库具有分布式、可伸缩、大数据存储特点。如果对HBase感兴趣可以直接去HBase官网查看。
PHOENIX是什么?
PHOENIX官网中的描述为“Phoenix is an open source SQL skin for HBase. You use the standard JDBC APIs instead of the regular HBase client APIs to create tables, insert data, and query your HBase data.”,翻译过来可以直接理解为PHOENIX是构建于HBase之上的一款插件,这款插件使得可以通过标准的JDBC接口代替HBase API直接在HBase上建表、插入数据以及查询数据。如果感兴趣可以直接去官网查看介绍。
问题由来
最近本菜参加部门一个数据存储相关的项目,主要是采用Oracle/MySql + Redis + HBase作为底层数据存储。由于项目组其他人对Hadoop以及HBase不太清楚加上PHOENIX在支持二级索引等等原因,最终决定不直接使用HBase提供的API而是采用PHOENIX通过标准的JDBC接口执行SQL来实现。
随着项目的进展,后来遇到一个需求"历史数据的导入",这个历史数据指的是十几年的气象数据(报文、雷达、卫星数据等等)。十几年的数据比较大采用JDBC或者HBase自身的API入库太慢了,随后本菜在相关的书籍以及博客中发现了BulkLoad的存在。BulkLoad的本质就是通过MapReduce直接生成HBase的数据文件HFile,然后由HBase加载到自身的数据目录中。在数据表设计阶段有一些字段使用了PHOENIX支持但是HBase无法支持的数据类型,所以就需要面对一个很现实的问题“数据类型转换”。
解决问题
PHOENIX支持的数据类型可以参照官网介绍,HBase中的类型直接参照工具类Bytes。下面举个例子简单写一个HBase插入操作
Put put = new Put(RowKey.getBytes());//RowKey 是一个字符串 put.addColumn(ColumnFamily.getBytes(),QUALIFIER.getBytes(),Bytes.toBytes(Value));//ColumnFamily,QUALIFIER,Value是字符串,
Table table = conn.getTable(TableName.valueOf(TableName));//TableName 是字符串
table.put(put);
其实看了这一小段代码就能发现HBase API中数据相关用的都是byte[] 而不是什么int/float/double什么的,实际HBase支持的类型完全就是看Bytes.toBytes的参数。因为HBase底层就是存储的byte数组,具体你想要存什么类型只要你写两个方法,一个是把数据转成byte数组,一个是把byte数组转回来。其实PHOENIX底层数据类型就是这么干的,下面粘贴PHOENIX中关于TimeStamp类型的定义以及关键代码:
public class PTimestamp extends PDataType<Timestamp> {
public static final int MAX_NANOS_VALUE_EXCLUSIVE = 1000000;
public static final PTimestamp INSTANCE = new PTimestamp();
private PTimestamp() {
super("TIMESTAMP", Types.TIMESTAMP, java.sql.Timestamp.class,
new PDate.DateCodec(), 9);
}
@Override
public byte[] toBytes(Object object) {
byte[] bytes = new byte[getByteSize()];
toBytes(object, bytes, 0);
return bytes;
}
@Override
public int toBytes(Object object, byte[] bytes, int offset) {
if (object == null) {
// Create the byte[] of size MAX_TIMESTAMP_BYTES
if(bytes.length != getByteSize()) {
bytes = Bytes.padTail(bytes, (getByteSize() - bytes.length));
}
PDate.INSTANCE.getCodec().encodeLong(0l, bytes, offset);
Bytes.putInt(bytes, offset + Bytes.SIZEOF_LONG, 0);
return getByteSize();
}
java.sql.Timestamp value = (java.sql.Timestamp) object;
// For Timestamp, the getTime() method includes milliseconds that may
// be stored in the nanos part as well.
PDate.INSTANCE.getCodec().encodeLong(value.getTime(), bytes, offset);
/*
* By not getting the stuff that got spilled over from the millis part,
* it leaves the timestamp's byte representation saner - 8 bytes of millis | 4 bytes of nanos.
* Also, it enables timestamp bytes to be directly compared with date/time bytes.
*/
Bytes.putInt(bytes, offset + Bytes.SIZEOF_LONG, value.getNanos() % MAX_NANOS_VALUE_EXCLUSIVE);
return getByteSize();
}
@Override
public Object toObject(Object object, PDataType actualType) {
if (object == null) {
return null;
}
if (equalsAny(actualType, PDate.INSTANCE, PUnsignedDate.INSTANCE, PTime.INSTANCE,
PUnsignedTime.INSTANCE)) {
return new java.sql.Timestamp(((java.util.Date) object).getTime());
} else if (equalsAny(actualType, PTimestamp.INSTANCE, PUnsignedTimestamp.INSTANCE)) {
return object;
} else if (equalsAny(actualType, PLong.INSTANCE, PUnsignedLong.INSTANCE)) {
return new java.sql.Timestamp((Long) object);
} else if (actualType == PDecimal.INSTANCE) {
BigDecimal bd = (BigDecimal) object;
long ms = bd.longValue();
int nanos =
(bd.remainder(BigDecimal.ONE).multiply(QueryConstants.BD_MILLIS_NANOS_CONVERSION))
.intValue();
return DateUtil.getTimestamp(ms, nanos);
} else if (actualType == PVarchar.INSTANCE) {
return DateUtil.parseTimestamp((String) object);
}
return throwConstraintViolationException(actualType, this);
}
@Override
public java.sql.Timestamp toObject(byte[] b, int o, int l, PDataType actualType,
SortOrder sortOrder, Integer maxLength, Integer scale) {
if (actualType == null || l == 0) {
return null;
}
java.sql.Timestamp v;
if (equalsAny(actualType, PTimestamp.INSTANCE, PUnsignedTimestamp.INSTANCE)) {
long millisDeserialized =
(actualType == PTimestamp.INSTANCE ? PDate.INSTANCE : PUnsignedDate.INSTANCE).getCodec()
.decodeLong(b, o, sortOrder);
v = new java.sql.Timestamp(millisDeserialized);
int nanosDeserialized =
PUnsignedInt.INSTANCE.getCodec().decodeInt(b, o + Bytes.SIZEOF_LONG, sortOrder);
/*
* There was a bug in serialization of timestamps which was causing the sub-second millis part
* of time stamp to be present both in the LONG and INT bytes. Having the <100000 check
* makes this serialization fix backward compatible.
*/
v.setNanos(
nanosDeserialized < MAX_NANOS_VALUE_EXCLUSIVE ? v.getNanos() + nanosDeserialized : nanosDeserialized);
return v;
} else if (equalsAny(actualType, PDate.INSTANCE, PUnsignedDate.INSTANCE, PTime.INSTANCE,
PUnsignedTime.INSTANCE, PLong.INSTANCE, PUnsignedLong.INSTANCE)) {
return new java.sql.Timestamp(actualType.getCodec().decodeLong(b, o, sortOrder));
} else if (actualType == PDecimal.INSTANCE) {
BigDecimal bd = (BigDecimal) actualType.toObject(b, o, l, actualType, sortOrder);
long ms = bd.longValue();
int nanos = (bd.remainder(BigDecimal.ONE).multiply(QueryConstants.BD_MILLIS_NANOS_CONVERSION))
.intValue();
v = DateUtil.getTimestamp(ms, nanos);
return v;
}
throwConstraintViolationException(actualType, this);
return null;
}
......后续代码省略
}
通过这个代码就能很清楚看懂PHOENIX对时间类型的是如何实现支持的。实际PHOENIX就是把时间戳拆成了两部分,毫秒部分采用long类型保存,纳秒部分用int类型来保存。
其实剩下的数据类型都是类似的,直接去查看PHOENIX源码中org.apache.phoenix.schema.types这个包下的内容就清楚了。
最后直接在列举一个小示例:
Put put = new Put("111".getBytes());
put.addColumn("0".getBytes(),"TRANSACTION_TIME".getBytes(), PTimestamp.toBytes(new Timestamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-09-09 11:11:11.111").getTime())));
Table table = conn.getTable(TableName.valueOf("EXCHANGE"));
table.put(put);
末尾
以上便是这次的所有内容,本菜第一次正儿八经的写的第一篇文章。希望能坚持下去吧!如果有啥问题欢迎留言到公众号“喜欢打游戏的摸鱼怪”留言。