MLSQL读写Phoenix

2019-07-23  本文已影响0人  hongshen

简介

apache Phoenix是什么东西,不用多说了,它支持jdbc连接,使用mlsql的jdbc插件就能直接load,像这样:

connect jdbc where
url="jdbc:phoenix:192.168.1.102;2181;/hbase"
and driver="org.apache.phoenix.jdbc.PhoenixDriver"
and `phoenix.schema.isNamespaceMappingEnabled`="true"
as p;

load jdbc.`p.TEST.TEST_TABLE` as tb1;

不过需要你将client的jar包放到spark的jars目录。

但是如果要往里写,我试了下jdbc好像不能直接支持,因为Phoenix都是upsert,如果要支持,可能需要模仿mysql的upsert实现一些自定义的statement,http://docs.mlsql.tech/en/guide/datasource/jdbc.html
但是我觉得依赖一个driver的jar包是比较麻烦的,后来看了下官网支持spark,因此这里将Phoenix整合进mlsql

遇到的小问题

mlsql用的是spark2.4.3,但是Phoenix比较老版本是4.8.0,跟上次搞hive一样,还要处理高版本spark匹配低版本数据源的问题。
phoenix-spark的官网文档在这里哈http://phoenix.apache.org/phoenix_spark.html,文档中比较有用的信息是phoenix从4.10才提供spark2.0的官网版本,相关的jar包https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark
我本来想直接用高版本的,经过实践错误提示不能用Phoenix高版本的客户端去连低版本的server
那就只能重写phoenix-spark模块了。
在没有版本兼容问题的情况下,只需要依赖2个东西,core包和spark包,那么有了兼容问题,只需要依赖core包,然后自己将spark包写到代码里。

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark</artifactId>
    <version>4.14.0-HBase-1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>4.14.0-HBase-1.2</version>
</dependency>

具体实现

自己凭空写没有那个必要,直接找个高于4.10版本的phoenix-spark参考下即可,打开一看,很少的几个类,直接抄过来,改一改即可。
主要改动是增加phoenix.schema.isNamespaceMappingEnabled配置为true
改动点:
1、ConfigurationUtil类的getOutputConfiguration方法返回的那个配置增加:
config.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true),不然写的时候会报错
2、读的时候增加这个配置,在PhoenixRelation类中哦

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
    val hconf = new Configuration()
    hconf.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
    new PhoenixRDD(
      sqlContext.sparkContext,
      tableName,
      requiredColumns,
      Some(buildFilter(filters)),
      Some(zkUrl),
      hconf,
      dateAsTimestamp
    ).toDataFrame(sqlContext).rdd
  }

  // Required by BaseRelation, this will return the full schema for a table
  override def schema: StructType = {
    val hconf = new Configuration()
    hconf.setBoolean("phoenix.schema.isNamespaceMappingEnabled", true)
    new PhoenixRDD(
      sqlContext.sparkContext,
      tableName,
      Seq(),
      None,
      Some(zkUrl),
      hconf,
      dateAsTimestamp
    ).toDataFrame(sqlContext).schema
  }

其余的没什么要改的了,包名类名都没有变化,这时候,增加一个MLSQLPhoenix类,位置mlsql.core.datasource.impl,参考MLSQLHbase即可,其中dbSplitter我设置为override def dbSplitter: String = "."

  override def fullFormat: String = "org.apache.phoenix.spark"
  override def shortFormat: String = "phoenix"

phoenix要求的配置参数比较少,重点就2个,一个是table,table不要用冒号分隔namespace和表名,要用点,一个是zkUrl,格式参考192.168.1.1,192.168.1.2,192.168.1.3:2181,如果znode默认不是/hbase,应该可以这样写格式参考192.168.3.122,192.168.3.123,192.168.3.124:2181/znode,没试过,应该是跟hbase用户一样的

用法

由于mlsql设计理念忒屌,忒强大,用起来简单的一笔
你可以这样加载:

connect phoenix where
zkUrl="192.168.1.1,192.168.1.2,192.168.1.3:2181"
and namespace="testNamespace"
as p;
load phoenix.`p.testName` as t1;

你可以这样写,比如我创建了一张Phoenix的表,三个字段,ID,NAME,AGE,其中前两个是varchar,AGE为integer

set data='''
{"ID":"hello1","NAME":"name1","AGE":"16"}
{"ID":"hello2","NAME":"name2","AGE":"17"}
{"ID":"hello3","NAME":"name3","AGE":"18"}
''';
load jsonStr.`data` as t1;
connect phoenix where
zkUrl="192.168.1.1,192.168.1.2,192.168.1.3:2181"
and namespace="testNamespace"
as p;
select ID,NAME,cast(AGE AS int) AS AGE from t1 as t2;

save overwrite t2 as phoenix.`p.testName`;

注意点:写Phoenix不支持直接将varchar转成数值类型,所以要先cast,还有就是save时,只支持save overwrite,注意哦
是不是很简单。

上一篇下一篇

猜你喜欢

热点阅读