百亿级图数据JanusGraph迁移之旅

2019-08-20  本文已影响0人  BlackZhou

百亿级图数据 JanusGraph 迁移之旅

1. 迁移背景介绍

目前我们的图数据库数据量为 顶点 20 亿,边 200 亿的规模。在迁移之前我们使用的 AgensGraph 数据库
一个主库四个备库,机器的配置都比较高,256G 内存 SSD 的磁盘,单机数据量为 3T左右。
在数据量比较小的情况下 AgensGraph 表现非常稳定优异,我们之前一主一备的情况下支撑了很长一段时间。
但随着公司业务的急速发展,图越来越大,占用的磁盘越来越多,对应的查询量也越来越大,随之这种方案的问题就暴露出来了

由于上面的原因导致 AgensGraph 没办法继续支撑业务高速发展带来的性能要求。AgensGraph 底层基于 PostgreSQL 数据库使它在小数据量的情况下非常的稳定并且查询响应非常的迅速,在此感谢 AgensGraph 陪我们度过业务快速成长阶段。

为了寻找新的图数据库我们把目光投向了接受度和知名度都比较高的 JanusGraph。当然还有收费的图数据库 TigerGraph,暂时不做考虑
在此贴一张我们图的应用场景,查询用户之间的关系

用户关系

由于这不是一篇介绍 JanusGraph 文章,在此不对 JanusGraph 做过多的介绍,大家可以自行了解。这里主要列举下它的优点:

2. 数据导入方案探索

简单介绍完 JanusGraph 的优点,就正式开始迁移数据了。不得不说我们严重低估的数据的迁移难度,之前预估大概两周就能搞定,结果花了快两个月的时间。
方案一:利用 GremlimServer 批量插入
我们最开始采用的数据导入方式是连接 GremlinServer 批量插入顶点,然后再插入边,在插入边的同时需要检索到关联的顶点。
批量插入的优化方案主要参考下面这篇 blog 。批量插入顶点的时候还是比较慢 20亿顶点花了一周才搞定。这里说明下,我们底层存储用的是 HBase 集群,80多台机器。为了加快导入的速度我们的插入程序是用Spark 编写的,导入数据存放在 HDFS 集群上。值得注意的地方是数据写入需要使用同步方式,异步很快就会把GremlinServer 内存写满,然后出现连接异常。
导入完顶点导入边的时候才发现边的导入非常的慢,按照当时的导入速度计算 200 亿边预计需要 3个月的时间才能导入完成,这种速度是不能接受的。
插入边比较慢,最主要的原因是每插入一条边都需要检索两个顶点。社区里面建议是维持 name 索引到顶点id的一个 map 存放到内存中,我们没试过,主要感觉有两方面问题,第一20亿点的需要不少内存,其次因为我们顶点是批量插入的,构建这个 map 不是很方便,于是就放弃了这个方案。
方案二:生成 Cassandra SSTable 文件
只能尝试其他方案,尝试过网上生成 Cassandra SSTable 文件的方式导入数据,最后在建立索引的时候有问题,联系上原作者说不建议这种方式,说代码有bug数据有丢失。也只能放弃这种方案
方案三:生成 HBase Hfile 文件
想过自己写程序生成 HBase Hfile的形式快速导入数据,最大的困难是 JanusGraph 对 Hbase 表结构的介绍文档基本找不到,只能看源代码,这个在短时间内是比较难的。我们这边时间也不允许, AgensGraph 的磁盘很快就满了,查询压力也越来越大。另外这个也需要对 Hbase 有深入了解,团队中缺少这样的技术专家,大家都停留在使用层面。所以这个方案最终也选择放弃
最终方案:bulkLoader 方式
最终还是把目光放到了JanusGraph 官方提供的 bulkLoader 方式。其实最开始想到的就是这个方案,但是这个方案对导入的数据有非常严格的要求,它需要每个顶点一行数据,再把这个顶点关联的所有边都关联到这一行,中间用 tab 分隔,第一部分是顶点的属性,第二部分是顶点的入边,第三部分是顶点的出边。当时一看到这种结构就很头大,我们的顶点有3,4种关系,处理成这种格式感觉不可能,完全不知道怎么处理。以下就是 JanusGraph 官方提供的例子大家感受下

179,song,COSMIC CHARLIE,,0 followedBy,130,1|followedBy,82,1|followedBy,76,1|followedBy,101,1|followedBy,3,1|followedBy,25,1|followedBy,215,1 followedBy,178,1|followedBy,148,1|followedBy,76,1|followedBy,110,3|followedBy,101,1

和团队成员讨论,看有什么办法能转换成这种格式,同事提醒说 Spark 有个 cogroup 操作应该可以达到这个目的。深入了解之后发现这就是我们要找的,cogroup 的核心思想就是将多个 RDD 根据相同的 key 可以 jion成一行,下面是个简单的例子

val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
rdd1.cogroup(rdd2).collect()

output:
(aa,(CompactBuffer(1),CompactBuffer(3, 5)))
(dd,(CompactBuffer(),CompactBuffer(4)))
(bb,(CompactBuffer(2),CompactBuffer()))
(cc,(CompactBuffer(6),CompactBuffer()))

我们转换为目标导入格式的代码已经放到 github上了,大家可以参考,没有封装成通用的工具,大家借鉴按自己的数据结构进行修改。

3. 数据导入过程

接下来就是按需要的格式生成导入数据,这中间有个值得注意的地方就是确保顶点 ID 的唯一性,确保数据没有重复,不然会导入失败。

我们还是低估了这种 bulkLoader 导入数据的难度,导入花了比较长的时间,最主要的问题分为两部分,一部分是 Hbase 相关参数调整的问题,另外一部分是 Spark 任务的内存优化问题。最痛苦的还是这种 bulkLoader 导入方式如果过程中出现问题,失败了,只能将数据清理掉重新导入。

先说 Hbase 参数相关的问题,JanusGraph 导入的过程中会往Hbase中写入大量数据,这个时候 Hbase 会有很多的异常情况出现。

下列参数就是导入过程中和 Hbase 相关的参数,这些参数都是从一次次失败中提炼总结出来的。当然这些参数都是根据我们自己的环境设置的,大家应该做相应调整

# 这个参数批量导入需要设置
storage.batch-loading=true

# 这个参数 经过调试,这个值比较合理
ids.block-size=20000000
ids.renew-timeout=3600000
storage.buffer-size=20240
# 使插入数据更 robust
storage.read-attempts=100
storage.write-attempts=100
storage.attempt-wait=1000

# 分区数,最好设置为 hbase 机器数 的2到 3倍
storage.hbase.region-count = 150

# hbase 超时时间,这个非常重要,不然导入会因为超时报错
# 需要hbase 服务器端同步设置,取客服端和服务器端的最小值
# 这些参数的只是是 看 janusgraph 源码才发现可以设置的
storage.hbase.ext.hbase.rpc.timeout = 300000
storage.hbase.ext.hbase.client.operation.timeout = 300000
storage.hbase.ext.hbase.client.scanner.timeout.period = 300000

再来说导入过程中 Spark 相关的问题。JanusGraph 官方集成 Spark的时候只提供了单机模式和 standalone cluster 模式的配置方式,没有提供如何集成 Spark on Yarn 的文档。这就导致一个问题,我们是有 Spark on Yarn 环境的并且集群性能和资源都很好。现在利用不上这部分资源需要重新申请机器再搭建一个 standalone cluster 的 Spark 集群。好在我们当时有部分机器空闲搭建了 standalone cluster 集群。并且我们也通过其他同事的努力解决了 JanusGraph 如何集成 Spark on Yarn

说回 Spark 导入过程中相关的问题,最主要的问题就是如何平衡 executor 内存和并行度的问题。executor 内存配置的小能够增加并行度但是会出现 OutOfMemoryError,如果把内存调整的很大并行度又下来了,导入时间会很长,不确定性增加。另一个问题就是如果并行度过高 Hbase 集群能否支撑的住。最终需要在这些问题中找到平衡。

下面是我们生产环境配置的 Spark 相关参数

spark.network.timeout=7600

spark.master=yarn
spark.deploy-mode=client
#  spark.executor.memory/spark.executor.cores 保障在8g左右,具体看数据量
spark.executor.memory=20g
spark.executor.cores=2
spark.yarn.queue=root.graph
spark.executor.instances=70

spark.executor.extraJavaOptions=-XX:+UseG1GC
spark.shuffle.io.retryWait=120s

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator

gremlin.spark.graphStorageLevel=MEMORY_AND_DISK
gremlin.spark.persistContext=true
gremlin.spark.graphWriter=org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
gremlin.spark.persistStorageLevel=DISK_ONLY

以上相关的参数在我上面提到的 github仓库中都有做相关说明,大家可以根据自己的情况自行做相应调整。

4.JanusGraph 查询优化

本来以为经历完漫长的数据导入过程,后面会顺利很多,但是现实和期望还是有差距。问题是 JanusGraph 在大数据量情况下,查询性能达不到生产要求,查询需要几十秒。相同的功能在 AgensGraph 查询都是秒级。

好在 JanusGraph 查询语句都可以用 profile 功能进行分析调试,通过分析的结果能明确知道那些地方有性能问题。

经过分析发现慢的最主要的原因就是 JanusGraph 获取顶点属性特别慢,默认居然不是并行获取而是逐条获取。我们的应用场景属性都是放到顶点上,例如:如果我要查询一个用户的通话关系,但是需要过滤只要相关注册用户,查询语句像下面这样

g.V().has("name","138xxxx4444").both("CALL").has("is_register","true")

上面的查询语句假设这个用户和 1000 个人有通话关系,但是我只关心和他相关的注册用户 100 人。JanusGraph 默认的做法是逐条获取这个1000 个用户的所有属性,再在内存中做过滤最后获得这 100 个用户,这就导致关联的顶点数量比较大的时候,直接不可用。

好在 JanusGraph 在最新的 0.4 版本中提供了一个 _multiPreFetch 的优化功能,能在属性过滤的时候批量并行获取所有关联顶点的属性,再在内存做属性过滤,关于这个功能的详细介绍可以看这里。个人感觉在没有这个优化功能的情况下 JanusGraph 基本不具备在生产环境使用的条件。并且这个功能并不是很完善,当你的过滤条件是 hasNot, 或者返回边的属性,或者语句后有 limit 操作都会使这个优化失效。而你能做的只能是想尽办法绕开,例如:has("is_exception", neq("true"))

另一个问题就是 JanusGraph 查询的数据如何返回的问题,Gremlin 返回数据支持多种写法。最常用的就是使用 valueMap 的方式,但是这里面有两个比较大的坑,第一个是返回的属性值默认是list类型,第二个是如果返回结果使用多个 valueMap 导致特别消耗内存。

这两个问题好在都能找到解决方法,详细情况不在这里做过多说明请参考这里。这些问题JanusGraph 都没有做很好的说明,并且默认也没做规避,都是一次次痛苦的经历得出来的经验,所以说 JanusGraph目前还不是特别成熟。

5.未来

虽然经过上面的优化,我们发现在数据量比较大的情况下,查询还是比较慢。经过分析发现主要从 Hbase 获取大量数据比较慢。分析 Hbase Region Server 的负载情况,发现磁盘IO 负载比较高。所以我们下一步的策略是搭建 一套基于 SSD 磁盘的 Hbase 集群来加速查询性能。

同时也期待 JanusGraph 开源社区的快速发展,为大家提供性能更高效的图数据库。同时也希望我们自己能对 JanusGraph 做一些优化并且回馈社区。希望大家一起为 JanusGraph 图数据库社区的发展助力

上一篇 下一篇

猜你喜欢

热点阅读