db

Spark JDBC 写 clickhouse 操作总结

2019-08-15  本文已影响0人  郭彦超

在中小业务数据规模上通过clickhouse进行数据分析很适合,维护简单操作方便,更主要的是快;接下来给大家分享下易企秀在from hive to clickhouse过程中的经验

概述

clickhouse对hadoop生态并不友好,官方也没有提供spark connector直接用于读写操作,好在双方都支持jdbc; clickhouse支持两种jdbc驱动实现,一种是官方自带的8123端口的,另一种是来自第三方实现的驱动,9000端口基于tcp协议

jdbc:8123端口

这种方式是http协议实现的,整体性能差了很多 经常会出现超时的问题,且对数据压缩支持不好,因压缩速度跟不上写入速度,数据写入的过程中数据目录会快速膨胀 导致磁盘空间打满。

jdbc:9000端口

这种方式支持高性能写入,数据按列组织并压缩。

因spark jdbc的方式不支持在clickhouse中自动创建表结构,这里在插入前需要提前创建表
考虑到clickhouse中的数据维度会经常新增和缩减,表结构维护仍需自动化,我们用了一种取巧的方式,借助mysql进行桥接,因为spark jdbc方式支持在mysql自动创建表,同时clickhouse也支持create table from mysql 。

# clickhouse jdbc驱动使用1.7的版本
/data/work/spark-2/bin/spark-shell --name "to_ck_scene_model"    --master yarn --packages com.github.housepower:clickhouse-native-jdbc:1.7-stable --jars /data/work/spark-2/mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar  

#读取hive中数据并转化为 dataframe
var df=spark.sql("select * from "+tableName )

//在mysql中创建表
val prop = new java.util.Properties
prop.setProperty("user", mysqlUser)
prop.setProperty("password", mysqlPwd)
prop.setProperty("driver",mysqlDriver)
df.where("1=0").write.mode("Overwrite").jdbc(mysqlUrl,tableName, prop)

//通过mysql桥接 在clickhouse中创建表 操作后两边数据结构会一致
val connection = DriverManager.getConnection(ckUrl,"default","")
var pst=connection.createStatement()
pst.execute("drop table if exists "+ckTableN)
pst.execute("create table "+ckTableN+" ENGINE = MergeTree partition by ifNull(toYYYYMM("+partitionField+"),1970-01) order by "+orderFieldAndDefauV+" as  SELECT * FROM mysql('#:3306', 'bigdata', "+tableName+", '"+mysqlUser+"', '"+mysqlPwd+"')")

ckDriver = "com.github.housepower.jdbc.ClickHouseDriver"
var pro = new java.util.Properties
pro.put("driver",ckDriver)
#默认写入批次是2w,可以调大至5w
df.write.mode("append").option("batchsize", "50000").option("isolationLevel", "NONE").option("numPartitions", "1").jdbc(ckUrl,ckTableN,pro)

总结

1、在mysql 中创建表时需注意,如果hive中存在一个以上的timestamp类型的字段时会创建失败,并报 Invalid default value for ‘update_time’ ,需要将字段先转成string类型写入mysql ,然后通过 alter table modify column 将string类型转成datetime就ok了

2、同时加载clickhouse与mysql的jdbc驱动可能会出现jar冲突的问题,出现 “Accept the id of response that is not recongnized by Server”的错误时,需先将clickhouse的驱动移除

val dv = DriverManager.getDriver(ckUrl)
DriverManager.deregisterDriver(dv)

当MySQL相关操作执行完毕后 ,再将clickhouse驱动重新注册一下

DriverManager.registerDriver(dv)

3、clickhouse不支持事务操作,需关闭事务 option("isolationLevel", "NONE") ,否则个别clickhouse的jdbc版本可能会报错

4、插入记录数偏大问题: 使用com.github.housepower:clickhouse-native-jdbc:1.6-stable版本的同学需要注意,这个版本对spark支持有问题,当单次插入数据小于默认batchsize时数据正常插入,当插入数据量超过一个batch时会出现数据不一致的问题,看了源码发现1.6版本执行完当前batch操作后未清除batch对象,导致后面数据一直在此基础上累加

5、如果觉得写入速度不够快,那么还可以通过调大num-executors或者增加batchsize;我们目前1亿数据写入用时不到20分钟

上一篇下一篇

猜你喜欢

热点阅读