Kettle

kettle 同步Oracle 与 Postgres

2020-08-26  本文已影响0人  不玩了啊

kettle 同步Oracle 与 Postgres

分类专栏: Kettle

版权

环境: PDI 8.2  ,windows, oracle 12C,  postgres 12

整个过程性能很好,10分钟+1分钟完成。接下是动态更新任务。

在spoon上,建立一个数据库连接postgres_150,并共享出来。

建议使用jndi的方式,下面更新后,需要重启spoon(这是它的缺点)

simple-jndi/jdbc.properties的设置如下:

postgres_150/type=javax.sql.DataSource

postgres_150/driver=org.postgresql.Driver

postgres_150/url=jdbc:postgresql://ip:ports/yourdatabase

postgres_150/user=username

postgres_150/password=****

1.建立时间中间表

createtableD_BZDZ_MLP_TIMES

(

idNUMERICnotnull,

last_loadTIMESTAMP(6),

current_loadTIMESTAMP(6)

)

插入初始化的数据

INSERTINTOtopology.d_bzdz_mlp_times(

id, last_load, current_load)

VALUES(1, to_timestamp('1971-01-01 01:01:01','YYYY-MM-DD HH24:MI:SS'),

to_timestamp('1971-02-02 01:01:01','YYYY-MM-DD HH24:MI:SS') );

# 或者

INSERTINTOtopology.d_bzdz_mlp_times(

id, last_load, current_load)

VALUES(1,'1971-01-01 01:01:01'::timestamp,'1971-01-01 01:01:01'::timestamp);

2..先把数据一次性从Oralce导入postgres,采用表输入和表输出

在表输出中通过SQL建立表,添加gemo字段,注意Oracle与postgres数据类型的不一样。

如果目标表和源表的字段类型不一致,需要在select * ^语句中转换,比如to_number("string")把字符串转成数字

createtableD_BZDZ_MLP_new

(

systemidVARCHAR(50),

sssqcjwhdmVARCHAR(13),

ssjlxdmVARCHAR(20),

sspcsdmVARCHAR(12),

dzxxdTEXT,

...........      ......

zxjdNUMERIC(30,20),

zxwdNUMERIC(30,20),

geom            GEOMETRY(Point,4326),

zxztTEXT,

smztTEXT,

sffwTEXT,

uuidVARCHAR(50),

cccjsjTIMESTAMP(6),

lastupdatedtimeTIMESTAMP(6)

)

Postgres中对表建立索引

不建立索引的话,后面的插入/更新转换步骤会非常慢。

因为插入/更新都需要进行select操作(这里是select systemid ***),再决定是插入还是更新。

createindexd_bzdz_mlp_idx_sysidond_bzdz_mlp(systemid);

然后建立一个转换,从Oralce中输入,输出到Postgres,一次性批量输出数据。

具体步骤包括:获取系统时间、更新时间中间表、获取时间中间表数据、查询数据、表输出。

记下上面同步完毕的时间,对时间中间表进行更新:

updatetopology.D_MLP_TIMESsetlast_load = current_loadwhereid=1;

3.建立四个转换,用一个作业把这四个转换运转起来。

3.1时间同步转换

从systemdate 获取当前时间,插入更新时间中间表的 当前时间

3.2 数据同步转换。

按照时间中间表,从oracle中查询变化的数据

SELECT

  LAST_LOAD last_load

, CURRENT_LOAD current_load

FROMtopology.D_BZDZ_MLP_TIMES

last_load 和 current_load 作为参数传入下面的查询

SELECT

  systemid,

.....

  zxjd,

  zxwd,

......

  cccjsj,

  lastupdatedtime

FROMLG.D_MLP

WHERE(x >112.916andx <114.082andy>22.526andy>24.005)ANDlastupdatedtime >= ?ANDlastupdatedtime < ?

把上面表输入的数据,插入/更新到postgres

以systemid作为查询的关键字

3.3 时间中间表同步转换

这是一个SQL脚本转换

updatetopology.D_BZDZ_MLP_TIMESsetlast_load = current_loadwhereid=1;

3.4 更新geom字段

update tablesetgeom=ST_SetSRID(st_point(x,y),4326)wheregeom is null ;

上一篇 下一篇

猜你喜欢

热点阅读