比pgload更快更方便写入大数据量至Greenplum的Gre
前序
Greenplum是目前比较优秀的mpp数据库,其官方推荐了几种将外部数据写入Greenplum方式,包含:通用的Jdbc,pgcopy和pgload以及Pivotal Greenplum-Spark Connector等。
- Jdbc:Jdbc方式,写大数据量会很慢。
- pgcopy:其中pgcopy是及其不推荐的一种,因为其写数据必须经过Greenplum的master,因此也只建议小数据量使用。
- pgload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考pgload。
- Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。
2. Greenplum-Spark Connector读数据架构
一个Spark application,是由Driver和Executor节点构成。当Spark application使用Greenplum-Spark Connector加载Greenplum数据时,其Driver端会通过JDBC的方式请求Greenplum的master节点获取相关的元数据信息。Connector将会根据这些元数据信息去决定Spark的Executor去怎样去并行的读取该表的数据。
Greenplum数据库存储数据是按segment组织的,Greenplum-Spark Connector在加载Greenplum数据时,需要指定Greenplum表的一个字段作为Spark的partition字段,Connector会使用这个字段的值来计算,该Greenplum表的某个segment该被哪一个或多个Spark partition读取。
其读取过程如下:
- Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
- Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
- 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。
其示意流程图如下:
Greenplum-Spark Connector
3. Greenplum-Spark Connector写数据流程
- GSC在Spark Executor端通过Jetty启动一个Http服务,将该服务封装为支持Greenplum的gpfdist协议。
- GSC在Spark Executor端通过Jdbc方式连接Greenplum master,创建Greenplum外部表,该外部表文件地址指向该Executor所启动的gpfdist协议地址。SQL示例如下:
CREATE READABLE EXTERNAL TABLE
"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")
LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')
FORMAT 'CSV'
(DELIMITER AS '|'
NULL AS '')
ENCODING 'UTF-8'
- GSC在Spark Executor端通过Jdbc方式连接Greenplum master,然后执行insert语句至真实的表中,数据来源于这张外部表。SQL示例如下:
INSERT INTO "public"."rank_a1"
SELECT *
FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"
至于这张外部表的数据,是否落地当前Executor服务器,不清楚。猜测不会落地,而是直接通过Http直接传递给了Greenplum对应的Segment。
- GSC监听onApplicationEnd事件,在Spark application结束后,删除创建的外部表。
4. Greenplum-Spark Connector使用
- 下载GSC Jar包。
下载地址:Pivotal Network。
可直接下载最新版本的GSC即1.6.2,支持Greenplum5.0之后的版本。greenplum-spark_<spark-version>-<gsc-version>.jar,如:
greenplum-spark_2.11-1.6.2.jar
- maven中引入:
<dependency>
<groupId>io.pivotal.greenplum.spark</groupId>
<artifactId>greenplum-spark_2.11</artifactId>
<version>1.6.2</version>
</dependency>
- spark提交引入:
- spark-shell或spark-submit时候,通过--jars加入greenplum-spark_2.11-1.6.2.jar。
- 将greenplum-spark_2.11-1.6.2.jar与Spark application包打成 uber jar 提交。
5. Greenplum-Spark Connector参数
参数名 | 参数描述 | 作用域 |
---|---|---|
url | Jdbc连接的url。 | 读,写 |
dbschema | Greenplum数据库的schema,GSC创建的临时外部表也在该schema下,默认值为public。 | 读,写 |
dbtable | Greenplum数据库的表名,GSC在读取时,会读取dbschema下的表。GSC在写数据时,如果该表不存在会自动创建。 | 读,写 |
driver | Jdbc driver全类名,非必填,在GSC Jar包中已经包含了driver包。 | 读,写 |
user | 用户名 | 读,写 |
password | 密码 | 读,写 |
partitionColumn | Greenplum数据表的字段,该字段将作为Spark分区的字段,支持integer, bigint, serial, bigserial4中类型,该字段名需小写。该字段为必填,且必须是Greenplum表建表时 DISTRIBUTED BY (<column>)语句中的字段。 | 读 |
partitions | Spark分区数,非必填,其默认值为Greenplum的primary segments数量。 | 读 |
truncate | 当在Spark中指定了输出模式为SaveMode.Overwrite时候,写的目标表存在的时候的策略,非必填。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。 | 写 |
iteratorOptimization | 指定写数据时内存模式,非必填。默认指为true,GSC将会使用 Iterator 方式。当为false时,GSC将会在写数据时将数据存储在内存中。 | 写 |
server.port | 指定在Spark Worker端启动gpfdist服务的端口号,非必填。默认情况下会使用随机的端口号。 | 读,写 |
server.useHostname | 指定是否使用Spark Worker节点的host name为gpfdis服务的地址,非必填。默认为false。 | 读,写 |
pool.maxSize | GSC连接Greenplum的连接池的最大连接数,默认为64。 | 读,写 |
pool.timeoutMs | 非活动连接被认为是空闲连接的时间,毫秒值。默认为10000(10秒)。 | 读,写 |
pool.minIdle | GSC连接Greenplum的连接池的最小空闲连接数,默认为0。 | 读,写 |
6. 从Greenplum读取数据
- DataFrameReader.load()方式:
val gscReadOptionMap = Map(
"url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
"user" -> "bill",
"password" -> "changeme",
"dbschema" -> "myschema",
"dbtable" -> "table1",
"partitionColumn" -> "id"
)
val gpdf = spark.read.format("greenplum")
.options(gscReadOptionMap)
.load()
- spark.read.greenplum()方式:
val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"
val tblname = "avgdelay"
val jprops = new Properties()
jprops.put("user", "user2")
jprops.put("password", "changeme")
jprops.put("partitionColumn", "airlineid")
val gpdf = spark.read.greenplum(url, tblname, jprops)
然鹅,这种方式必然需要引入一个隐式转换,官网也没介绍。
7. 写数据至Greenplum
7.1. 写数据示例:
val gscWriteOptionMap = Map(
"url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
"user" -> "bill",
"password" -> "changeme",
"dbschema" -> "myschema",
"dbtable" -> "table2",
)
dfToWrite.write.format("greenplum")
.options(gscWriteOptionMap)
.save()
在通过GSC写到Greenplum表时,如果表已经存在或表中已经存在数据,可通过DataFrameWriter.mode(SaveMode savemode)方式指定其输出模式。相关模式行为如下:
SaveMode | 行为 |
---|---|
ErrorIfExists | 如果Greenplum数据表已经存在则GSC直接返回错误,该策略为默认策略。 |
Append | 直接将Spark中数据追加至表中。 |
Ignore | 如果Greenplum数据表已经存在,GSC将不会写数据至表中也不会去修改已经存在的数据。 |
Overwrite | 如果Greenplum数据表已经存在,则truncate参数将会生效。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。 |
7.2. GSC自动建表:
- 创建的Greenplum表将不会有distribution列,如下为GSC生成的建表语句:
CREATE TABLE "public"."rank_a1"
("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
- 创建的Greenplum表的字段名将会使用Spark DataFrame中的字段名。
- 在GSC自动建表时,将会为字段名加上双引号,这将使Greenplum区分大小写。
- 当Spark DataFrame的字段不为nullable时,GSC自动建表的字段将是 NOT NULL。
- 将会对应的Spark DataFrame字段类型映射为Greenplum的字段类型。参考,字段类型映射表。
7.3. 提前手动建表:
- 将Spark DataFrame的字段名的数据写至Greenplum表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
- 写至Greenplum的字段的数据类型,与对应的Spark DataFrame一致,具体参见字段类型映射。
- 如果Spark数据中某列包含空数据,需确保对应的Greenplum表的列没有被指定为NOT NULL。
- Greenplum表中建表时其字段顺序可以与Spark DataFrame中不一致。但Greenplum表中不能出现不存在在Spark DataFrame中的字段。如下例子:
// Greenplum 中的字段
CREATE TABLE public.rank_a1 (
id int4 NOT NULL,
"rank" text NULL,
"year" int4 NOT NULL,
gender int4 NOT NULL,
count int4 NOT NULL
)
DISTRIBUTED BY (id);
// Spark DataFrame中的字段
var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")
// 在写数据至public.rank_a1表时,将会报错如下
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (5): _1, _2, _3, _4, _5
New column names (4): id, rank, year, gender
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)
at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)
at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)
at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)
- 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。
8. Troubleshooting
8.1. 端口相关问题
错误信息 | 原因 | 解决办法 |
---|---|---|
java.lang.RuntimeException:<port-number> is not a valid port number. | 通过server.port所指定的端口无效,比如1024以内,为系统使用端口 | 指定端口在[1024-65535]之间 |
java.lang.RuntimeException:Unable to start GpfdistService on any of ports=<list-of-port-numbers> | 通过server.port指定的端口已经被占用 | 从新指定一个未被占用的端口,或不指定该参数 |
8.2. Greenplum连接数问题
当连接Greenplum的连接数接近Greenplum数据库配置的最大连接数(max_connections)时。Spark application将会抛出 connection limit exceeded 错误。
排查过程:
- 查询Greenplum数据的最大连接数:
postgres=# show max_connections;
max_connections
-----------------
250
(1 row)
- 查询当前连接Greenplum数据库的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity;
- 查询指定的用户连接Greenplum数据的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
- 查询Greenplum数据库空闲和活动的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
- 查询连接Greenplum数据库名,用户名,客户端地址,客户端ip,当前查询语句:
postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;
如果确认是Spark application使用连接数过多,则配置JDBC Connection Pooling相关参数,减少连接数。
8.3. Greenplum Database Data Length Errors
在使用Greenplum 4.x或5.x的时候,可能会报出“data line too long”错误。这是因为在Greenplum数据库中参数项“gp_max_csv_line_length”默认值是1M。需要登陆Greenplum master修改这个参数值,示例如下,通过gpconfig修改该参数的值为5M:
gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880
gpadmin@gpmaster$ gpstop -u
9. 类型映射表
9.1. Greenplum to Spark
Greenplum Data Type | Spark Data Type |
---|---|
bigint | LongType |
bigSerial | LongType |
boolean | BooleanType |
char | StringType |
date | DateType |
decimal | DecimalType |
float4 | FloatType |
float8 | DoubleType |
int | IntegerType |
serial | IntegerType |
smallInt | ShortType |
text | StringType |
time | TimeStampType |
timestamp | TimeStampType |
timestamptz | TimeStampType |
timetz | TimeStampType |
varchar | StringType |
9.2. Spark to Greenplum
Spark Data Type | Greenplum Data Type |
---|---|
BinaryType | bytea |
BooleanType | boolean |
DateType | date |
DecimalType | numeric |
DoubleType | float8 |
FloatType | float4 |
IntegerType | int |
LongType | bigint |
ShortType | smallInt |
StringType | text |
TimeStampType | timestamp |