ogg实时同步到Kafka
目标库(Linux)
安装OGG for bigdata
解压OGG_BigData
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
再次解压
tar -vxf OGG_BigData_Linux_x64_12.3.2.1.1.tar
安装java 1.8
参考oracle ogg安装要求
https://docs.oracle.com/goldengate/bd123110/gg-bd/GBDIG/installing-oracle-goldengate-big-data.htm#GBDIG-GUID-8A857DA7-38DE-48DC-8946-FB590AFCD6C0
The Oracle GoldenGate for Big Data are certified for Java 1.8. Before installing and running Oracle GoldenGate for Java, you must install Java (JDK or JRE) version 1.8 or later. Either the Java Runtime Environment (JRE) or the full Java Development Kit (which includes the JRE) may be used.
java1.8下载界面
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
配置环境变量
vi /etc/profile
export JAVA_HOME=/opt/java/jdk1.8.0_211
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
source /etc/profile
初始化ogg
cd /opt/ogg
ggsci
create subdirs
edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
设置检查点
edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
配置复制进程
edit param rekafka
REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
配置kafka.props
cd /opt/ogg/dirprm/
vi kafka.props
#注释要去掉,注意空格
gg.handlerlist=kafkahandler //handler类型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
gg.classpath=dirprm/:/opt/kafka/kafka_2.12-2.3.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
vi custom_kafka_producer.properties
#修改下面两个IP地址
bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
acks=1
compression.type=gzip //压缩类型
reconnect.backoff.ms=1000 //重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
添加跟踪文件到复制进程
add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
安装kafka
cd /opt/kafka
tar -vxf kafka_2.12-2.3.0.tgz
修改配置文件
cd /opt/kafka/kafka_2.12-2.3.0/config
vi server.properties
listeners=PLAINTEXT://192.168.208.153:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.208.153:9092
开启kafka
cd /opt/kafka/kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties
#打开另一个窗口
cd /opt/kafka/kafka_2.12-2.3.0
bin/kafka-server-start.sh config/server.properties
源库(Windows)
切换数据库到归档模式
conn / as sysdba
shutdown immediate
startup mount
alter database archivelog;
alter database open;
alter database force logging;
alter database add supplemental log data;
alter system switch logfile;
alter system set recyclebin=off scope=spfile;
创建复制用户
create user ogg identified by oracle;
grant dba to ogg;
创建测试表
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));
初始化OGG
ogg安装在c:\ogg目录下
ggsci
create subdirs
配置extract
edit param ext
extract ext
--dynamicresolution
SETENV(ORACLE_HOME='D:\app\Administrator\product\11.2.0\dbhome_1')
setenv (NLS_LANG="american_america.AL32UTF8")
SETENV (ORACLE_SID="SWDS")
--ddl include all
userid ogg,password oracle
eofdelaycsecs 1
flushcsecs 1
--gettruncates
rmthost 192.168.208.153, mgrport 7809
rmttrail /opt/ogg/dirdat/to
--br broff
table test_ogg.test_ogg;
dblogin userid ogg,password oracle
add extract ext,tranlog,begin now
add rmttrail /opt/ogg/dirdat/to,extract ext
添加复制表
dblogin userid ogg password oracle
add trandata test_ogg.test_ogg
info trandata test_ogg.test_ogg
配置定义文件
edit param test_ogg
defsfile C:\ogg\dirdef\test_ogg.test_ogg
userid ogg,password oracle
table test_ogg.test_ogg;
生成定义文件
C:\ogg\defgen.exe paramfile C:\ogg\dirprm\test_ogg.prm
把生成的文件复制到kafka端/opt/ogg/dirdef/
设置mgr
edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
启动
启动所有进程
测试数据源端
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;
目标端
cd /opt/kafka/kafka_2.12-2.3.0/
bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning
填坑
enable_goldengate_replication is not set to true
2015-08-24 14:39:21 ERROR OGG-02091 Operation not supported because enable_goldengate_replication is not set to true.
SQL> alter system set enable_goldengate_replication=true;
参考文档
https://www.jianshu.com/p/c06c9b0ff2f0
完成后注意看有没有坑