spark数据导入踩坑记

2017-04-17  本文已影响0人  醉人的小巷

最近公司有不少关于数据同步的业务需求,比如需要将mysql表同步到hive中或者同步为parquet等格式存放在HDFS中,这种类型的需求一般不是简单的同步,而是需要将mysql的数据进行处理,然后将结果表的数据放入指定的数据源中。得益于威廉大哥开发的Streamingpro这一利器,同步数据,数据处理,最后放入指定的数据源中,这一连串的工作变得十分简单,只需要一个json格式的配置文件即可轻松搞定,实在是大大的提高了工作效率。关于streamingPro的使用请参考威廉的简书相关的文章,下文主要介绍遇到的一些问题:

mysql timestamp类型转换错误

需求是要将mysql的表数据同步至hive中,利用streamingPro是很容易实现的,只需要配置一个输入源,一个处理SQL语句,配置文件大概如下:

{
  "chinaDrug2hive": {
    "desc": "",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sources",
        "params": [
          {
            "url": "jdbc:mysql://localhost :3306/DB?user=username&password=password",
            "dbtable": "mysqlTableName",
            "driver": "com.mysql.jdbc.Driver",
            "path": "-",
            "format": "jdbc",
            "outputTable": "tableName"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "drop table db.hivetableName",
            "outputTableName": "-"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "create table if not exists db.hivetableName as select * from tableName",
            "outputTableName": "-"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

关于mysql相关的配置参数的配置,其实就是spark访问mysql需要配置的几个参数。上面的示例中,考虑到与mysql表解耦,即当mysql表结构由于业务或其他原因发生变化,配置文件不需要发生任何变化,故而用到了create table as select 语句,而没有用insert into语句。本来跟容易搞定的事情,因为mysql表中有字段类型是datetime,且未设置为not null。在运行时,会出现:
Cause: java.sql.SQLException: Value '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp。
解决的办法是在URL中添加一个参数:zeroDateTimeBehavior=convertToNull
问题得到解决。

tinyint类型自动转换成boolean类型

datetime类型得到了解决,数据也顺利写到hive表中了,原以为大功告成了。使用hive表的数据进行测试时,同事反应,tinyint类型的被转换成了boolean型。导致原本写好的SQL脚本不能运行,tinyint中存储的也不只有0和1两个值,所以转换成boolean类型是会导致错误的。解决此问题的方法也是在URL添加一个参数:tinyInt1isBit=false,再次运行重新同步数据,问题得到解决。

分区表问题

这是在使用streamingPro将表数据存为parquet文件,但是结果表是按日期进行分区的分区表。这种情况可以分为两种情况来考虑:
如果分区列本身就是表中列,那么可以使用如下方法:

{
        "name": "batch.outputs",
        "params": [
          {
            "name": "outName",
            "format": "parquet",
            "inputTableName": "inputTableName",
            "path": "/user/zhang/Data/inputTableName",
            "partitionBy":"hp_stat_date",
            "mode": "Overwrite"
          }
        ]
      }

如果分区列不是表的中列,那么只需要将路径通过参数动态传入:

YESTERDAY=$(date -d "@$i" "+%Y-%m-%d")
HiveOutputTable=/user/zhang/Data/tableName/hp_stat_date=$YESTERDAY
spark-submit   \
--class streaming.core.StreamingApp \
--master yarn-cluster \
--num-executors 4 \
--executor-memory 12G \
--executor-cores 1 \
--driver-memory 10G \
--name result_table \
/home/zhangzl/streamingpro/streamingpro-spark-0.4.14-SNAPSHOT.jar \
-streaming.name result_table    \
-streaming.platform spark \
-streaming.jobs XXX \
-streaming.enableHiveSupport true \
-streaming.sql.params.YESTERDAY $YESTERDAY \
-streaming.sql.out.outName.path $HiveOutputTable \
-streaming.job.file.path /user/zhang/test.json

在tableName的文件夹路径下,会生成
hp_stat_date=$YESTERDAY一系列的子目录。以上的提交命令中包含了如何向streamingPro中添加参数,-streaming.sql.params.YESTERDAY $YESTERDAY
代表在SQL语句中传入参数,-streaming.sql.out.outName.path $HiveOutputTable \表示的是在输出中添加参数。更多关于streamingPro的文章,请参看威廉的的简书,里面还有大量Spark,ES等相关的优质文章,满满的干货。

上一篇下一篇

猜你喜欢

热点阅读