1. 程式人生 > >利用ogg實現oracle到kafka的增量資料實時同步

利用ogg實現oracle到kafka的增量資料實時同步

前言

ogg即Oracle GoldenGate是Oracle的同步工具,本文講如何配置ogg以實現Oracle資料庫增量資料實時同步到kafka中,其中同步訊息格式為json。
下面是我的源端和目標端的一些配置資訊:

- 版本 OGG版本 ip 別名
源端 OracleRelease 11.2.0.1.0 Oracle GoldenGate 11.2.1.0.3 for Oracle on Linux x86-64 192.168.44.128 master
目標端 kafka_2.11-1.1.0 Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 192.168.44.129 slave1

1、下載

可在這裡舊版本查詢下載
注意:源端和目標端的檔案不一樣,目標端需要下載Oracle GoldenGate for Big Data,源端需要下載Oracle GoldenGate for Oracle具體下載方法見最後的附錄截圖。

2、源端(Oracle)配置

注意:源端是安裝了oracle的機器,oracle環境變數之前都配置好了

2.1 解壓

先建立ogg目錄

mkdir -p /opt/ogg
unzip V34339-01.zip

解壓後得到一個tar包,再解壓這個tar

tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
chown -R oracle:oinstall /opt/ogg (使oracle使用者有ogg的許可權,後面有些需要在oracle使用者下執行才能成功)

2.2 配置ogg環境變數

為了簡單方便起見,我在/etc/profile裡配置的,建議在生產中配置oracle的環境變數檔案/home/oracle/.bash_profile裡配置,為了怕出問題,我把OGG_HOME等環境變數在/etc/profile配置了一份,不知道這是否是必須的。

vim /etc/profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
export PATH=$OGG_HOME:$PATH

使之生效

source /etc/profile

測試一下ogg命令

ggsci

如果命令成功即可進行下一步,不成功請檢查前面的步驟。

2.3 oracle開啟歸檔模式

su - oracle
sqlplus / as sysdba

執行下面的命令檢視當前是否為歸檔模式

archive log list 
SQL> archive log list 
Database log mode          No Archive Mode
Automatic archival         Disabled
Archive destination        USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     12
Current log sequence           14

若為Disabled,手動開啟即可

conn / as sysdba (以DBA身份連線資料庫) 
shutdown immediate (立即關閉資料庫)
startup mount (啟動例項並載入資料庫,但不開啟)
alter database archivelog; (更改資料庫為歸檔模式)
alter database open; (開啟資料庫)
alter system archive log start; (啟用自動歸檔)

再執行一下

archive log list 
Database log mode          Archive Mode
Automatic archival         Enabled
Archive destination        USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     12
Next log sequence to archive   14
Current log sequence           14

可以看到為Enabled,則成功開啟歸檔模式。

2.4 Oracle開啟日誌相關

OGG基於輔助日誌等進行實時傳輸,故需要開啟相關日誌確保可獲取事務內容,通過下面的命令檢視該狀態

select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
------ ----------------
NO     NO

若為NO,則需要通過命令修改

alter database force logging;
alter database add supplemental log data;

再檢視一下為YES即可

SQL> select force_logging, supplemental_log_data_min from v$database;

FORCE_ SUPPLEMENTAL_LOG
------ ----------------
YES    YES

2.5 oracle建立複製使用者

首先root使用者建立相關資料夾,並賦予許可權

mkdir -p /u01/app/oracle/oggdata/orcl
chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl

然後執行下面sql

SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;

Tablespace created.

SQL>  create user ogg identified by ogg default tablespace oggtbs;

User created.

SQL> grant dba to ogg;

Grant succeeded.

2.6 OGG初始化

ggsci
create subdirs
ggsci

Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.



GGSCI (ambari.master.com) 1> create subdirs

Creating subdirectories under current directory /root

Parameter files                /root/dirprm: created
Report files                   /root/dirrpt: created
Checkpoint files               /root/dirchk: created
Process status files           /root/dirpcs: created
SQL script files               /root/dirsql: created
Database definitions files     /root/dirdef: created
Extract data files             /root/dirdat: created
Temporary files                /root/dirtmp: created
Stdout files                   /root/dirout: created


GGSCI (ambari.master.com) 2> 

2.7 Oracle建立測試表

建立一個使用者,在該使用者下新建測試表,使用者名稱、密碼、表名均為 test_ogg。

create user test_ogg  identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));

3 目標端(kafka)配置

mkdir -p /opt/ogg
unzip 123111_ggs_Adapters_Linux_x64.zip 
tar xf ggs_Adapters_Linux_x64.tar  -C /opt/ogg/

3.2 環境變數

vim /etc/profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
source /etc/profile

同樣測試一下ogg命令

ggsci

3.3 初始化目錄

create subdirs

4、OGG源端配置

4.1 配置OGG的全域性變數

先切換到oracle使用者下

su oracle
cd /opt/ogg
ggsci
GGSCI (ambari.master.com) 1> dblogin userid ogg password ogg
Successfully logged into database.

GGSCI (ambari.master.com) 2> edit param ./globals

然後和用vim編輯一樣新增

oggschema ogg

4.2 配置管理器mgr

GGSCI (ambari.master.com) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

說明:PORT即mgr的預設監聽埠;DYNAMICPORTLIST動態埠列表,當指定的mgr埠不可用時,會在這個埠列表中選擇一個,最大指定範圍為256個;AUTORESTART重啟引數設定表示重啟所有EXTRACT程序,最多5次,每次間隔3分鐘;PURGEOLDEXTRACTS即TRAIL檔案的定期清理

4.3 新增複製表

GGSCI (ambari.master.com) 4> add trandata test_ogg.test_ogg

Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG.

GGSCI (ambari.master.com) 5> info trandata test_ogg.test_ogg

Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG.

Columns supplementally logged for table TEST_OGG.TEST_OGG: ID

4.4 配置extract程序

GGSCI (ambari.master.com) 6> edit param extkafka
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

說明:第一行指定extract程序名稱;dynamicresolution動態解析;SETENV設定環境變數,這裡分別設定了Oracle資料庫以及字符集;userid ggs,password ggs即OGG連線Oracle資料庫的帳號密碼,這裡使用2.5中特意建立的複製帳號;exttrail定義trail檔案的儲存位置以及檔名,注意這裡檔名只能是2個字母,其餘部分OGG會補齊;table即複製表的表名,支援*通配,必須以;結尾

新增extract程序:

GGSCI (ambari.master.com) 16> add extract extkafka,tranlog,begin now
EXTRACT added.

(注:若報錯

ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

執行下面的命令再重新新增即可。

create subdirs

)

新增trail檔案的定義與extract程序繫結:

GGSCI (ambari.master.com) 17> add exttrail /opt/ogg/dirdat/to,extract extkafka
EXTTRAIL added.

4.5 配置pump程序

pump程序本質上來說也是一個extract,只不過他的作用僅僅是把trail檔案傳遞到目標端,配置過程和extract程序類似,只是邏輯上稱之為pump程序

GGSCI (ambari.master.com) 18> edit param pukafka
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 192.168.44.129 mgrport 7809
rmttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

說明:第一行指定extract程序名稱;passthru即禁止OGG與Oracle互動,我們這裡使用pump邏輯傳輸,故禁止即可;dynamicresolution動態解析;userid ogg,password ogg即OGG連線Oracle資料庫的帳號密碼rmthost和mgrhost即目標端(kafka)OGG的mgr服務的地址以及監聽埠;rmttrail即目標端trail檔案儲存位置以及名稱。

分別將本地trail檔案和目標端的trail檔案繫結到extract程序:

GGSCI (ambari.master.com) 1> add extract pukafka,exttrailsource /opt/ogg/dirdat/to
EXTRACT added.
GGSCI (ambari.master.com) 2> add rmttrail /opt/ogg/dirdat/to,extract pukafka
RMTTRAIL added.

4.6 配置define檔案

Oracle與MySQL,Hadoop叢集(HDFS,Hive,kafka等)等之間資料傳輸可以定義為異構資料型別的傳輸,故需要定義表之間的關係對映,在OGG命令列執行:

GGSCI (ambari.master.com) 3> edit param test_ogg
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

在OGG主目錄下執行(oracle使用者):

./defgen paramfile dirprm/test_ogg.prm

***********************************************************************
        Oracle GoldenGate Table Definition Generator for Oracle
 Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
   Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.


                    Starting at 2018-05-23 05:03:04
***********************************************************************

Operating System Version:
Linux
Version #1 SMP Wed Apr 12 15:04:24 UTC 2017, Release 3.10.0-514.16.1.el7.x86_64
Node: ambari.master.com
Machine: x86_64
                         soft limit   hard limit
Address Space Size   :    unlimited    unlimited
Heap Size            :    unlimited    unlimited
File Size            :    unlimited    unlimited
CPU Time             :    unlimited    unlimited

Process id: 13126

***********************************************************************
**            Running with the following parameters                  **
***********************************************************************
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ***
table test_ogg.test_ogg;
Retrieving definition for TEST_OGG.TEST_OGG



Definitions generated for 1 table in /opt/ogg/dirdef/test_ogg.test_ogg

將生成的/opt/ogg/dirdef/test_ogg.test_ogg傳送的目標端ogg目錄下的dirdef裡:

scp -r /opt/ogg/dirdef/test_ogg.test_ogg root@slave1:/opt/ogg/dirdef/ 

5、OGG目標端配置

5.1 開啟kafka服務

cd /opt/kafka_2.11-1.1.0/
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

5.2 配置管理器mgr

GGSCI (ambari.slave1.com) 1>  edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

5.3 配置checkpoint

checkpoint即複製可追溯的一個偏移量記錄,在全域性配置裡新增checkpoint表即可。

edit  param  ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint

5.4 配置replicate程序

GGSCI (ambari.slave1.com) 4> edit param rekafka
REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

說明:REPLICATE rekafka定義rep程序名稱;sourcedefs即在4.6中在源伺服器上做的表對映檔案;TARGETDB LIBFILE即定義kafka一些適配性的庫檔案以及配置檔案,配置檔案位於OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即複製任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合併的單位,減少IO操作;MAP即源端與目標端的對映關係

5.5 配置kafka.props

cd /opt/ogg/dirprm/
vim kafka.props
gg.handlerlist=kafkahandler //handler型別
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,無需手動建立
gg.handler.kafkahandler.format=json //傳輸檔案的格式,支援json,xml等
gg.handler.kafkahandler.mode=op  //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次
gg.classpath=dirprm/:/opt/kafka_2.11-1.1.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
vim custom_kafka_producer.properties
bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮型別
reconnect.backoff.ms=1000 //重連延時
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

其中需要將後面的註釋去掉,ogg不識別註釋,如果不去掉會報錯

5.6 新增trail檔案到replicate程序

GGSCI (ambari.slave1.com) 2> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
REPLICAT added.

6、測試

6.1 啟動所有程序

在源端和目標端的OGG命令列下使用start [程序名]的形式啟動所有程序。
啟動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg命令列。
源端依次是

start mgr
start extkafka
start pukafka

目標端

start mgr
start rekafka

可以通過info all 或者info [程序名] 檢視狀態,所有的程序都為RUNNING才算成功
源端

GGSCI (ambari.master.com) 5> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
EXTRACT     RUNNING     EXTKAFKA    04:50:21      00:00:03    
EXTRACT     RUNNING     PUKAFKA     00:00:00      00:00:03    

目標端

GGSCI (ambari.slave1.com) 3> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
REPLICAT    RUNNING     REKAFKA     00:00:00      00:00:01    

6.2 異常解決

如果有不是RUNNING可通過檢視日誌的方法檢查解決問題,具體通過下面兩種方法

vim ggser.log

或者ogg命令列,以rekafka程序為例

GGSCI (ambari.slave1.com) 2> view report rekafka

列舉其中我遇到的一個問題:
異常資訊

SEVERE: Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
oracle.goldengate.util.ConfigException: Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
at ......

具體原因是網上的教程是舊版的,設定topicName的屬性為:

gg.handler.kafkahandler.topicName=test_ogg

新版的這樣設定

gg.handler.kafkahandler.topicMappingTemplate=test_ogg

大家可根據自己的版本進行設定,附上stackoverflow原答案

I tried to move data from Oracle Database to Kafka using Golden gate adapter Version 12.3.0.1.0

In new version there is no topicname

The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=test

In previous version we have gg.handler.kafkahandler.topicName=test

6.3 測試同步更新效果

現在源端執行sql語句

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

檢視源端trail檔案狀態

ls -l /opt/ogg/dirdat/to*
-rw-rw-rw- 1 oracle oinstall 1464 May 23 10:31 /opt/ogg/dirdat/to000000

檢視目標端trail檔案狀態

ls -l /opt/ogg/dirdat/to*
-rw-r----- 1 root root 1504 May 23 10:31 /opt/ogg/dirdat/to000000

檢視kafka是否自動建立對應的主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

在列表中顯示有test_ogg則表示沒問題
通過消費者看是否有同步訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2018-05-23 10:31:28.000078","current_ts":"2018-05-23T10:36:48.525000","pos":"00000000000000001093","after":{"ID":1,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2018-05-23 10:31:36.000073","current_ts":"2018-05-23T10:36:48.874000","pos":"00000000000000001233","before":{},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2018-05-23 10:31:43.000107","current_ts":"2018-05-23T10:36:48.875000","pos":"00000000000000001376","before":{"ID":1}}

顯然,Oracle的資料已準實時同步到Kafka,格式為json,其中op_type代表操作型別,這個可配置,我沒有配置則按預設的來,預設為

gg.handler.kafkahandler.format.insertOpKey = I  
gg.handler.kafkahandler.format.updateOpKey = U  
gg.handler.kafkahandler.format.deleteOpKey = D  

before代表操作之前的資料,after代表操作後的資料,現在已經可以從kafka獲取到同步的json資料了,後面可以用SparkStreaming和Storm等解析然後存到hadoop等大資料平臺裡

6.4 SparkStreaming測試消費同步訊息

7、更新:後續遇到的問題

在後面的使用過程中發現上面同步到kafka的json資料中少一些我們想要的一些,下面講一下我是如何解決的
首先建表:

CREATE TABLE "TCLOUD"."T_OGG2" 
   (    "ID" NUMBER(*,0), 
    "TEXT_NAME" VARCHAR2(20), 
    "AGE" NUMBER(*,0), 
    "ADD" VARCHAR2(100), 
    "IDD" VARCHAR2(100), 
     CONSTRAINT "T_OGG2_PK" PRIMARY KEY ("ID", "IDD")

   ) 

為什麼不用之前建的表,主要是之前的欄位太少,不容易看出問題,現在主要是增加幾個欄位,然後id,idd是聯合主鍵。
看一下按照之前的配置,同步到kafka的資料(擷取部分資料)

{"table":"TCLOUD.T_OGG2","op_type":"I","op_ts":"2018-05-31 11:46:09.512672","current_ts":"2018-05-31T11:46:15.292000","pos":"00000000000000001903","after":{"ID":4,"TEXT_NAME":null,"AGE":0,"ADD":null,"IDD":"8"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:10.514549","current_ts":"2018-05-31T11:49:16.450000","pos":"00000000000000002227","before":{},"after":{"ID":4,"TEXT_NAME":"lisi","IDD":"7"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:48.514869","current_ts":"2018-05-31T11:49:54.481000","pos":"00000000000000002373","before":{"ID":4,"IDD":"7"},"after":{"ID":1,"IDD":"7"}}

{"table":"TCLOUD.T_OGG2","op_type":"D","op_ts":"2018-05-31 11:52:38.516877","current_ts":"2018-05-31T11:52:45.633000","pos":"00000000000000003161","before":{"ID":1,"IDD":"7"}}

現在只有insert的資料是全的,update更新非主鍵欄位before是沒有資料的,更新主鍵before只有主鍵的資料,delete只有before的主鍵欄位,也就是update和delete的資訊是不全的,且沒有主鍵資訊(程式裡是不能判斷哪一個是主鍵的),這樣對於程式自動解析同步資料是不利的(不同的需求可能不一樣),具體自己可以分析,就不囉嗦了,這裡主要解決,有需要before和after全部資訊和主鍵資訊的需求。

7.1 新增before

在源端extract裡新增下面幾行

GGSCI (ambari.master.com) 33> edit param extkafka
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES

重啟 extkafka

stop extkafka
start extkafka

然後測試

{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.709000","pos":"00000000000000003770","before":{"ID":1,"AGE":20,"IDD":"1"},"after":{"ID":1,"AGE":1,"IDD":"1"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.714000","pos":"00000000000000004009","before":{"ID":1,"AGE":20,"IDD":"2"},"after":{"ID":1,"AGE":1,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.715000","pos":"00000000000000004248","before":{"ID":1,"AGE":20,"IDD":"8"},"after":{"ID":1,"AGE":1,"IDD":"8"}}

發現update之後before裡有資料即可,但是現在before和after的資料都不全(只有部分欄位)

7.2 新增主鍵

在kafka.props新增

gg.handler.kafkahandler.format.includePrimaryKeys=true

重啟 rekafka

stop rekafka
start rekafka

測試:

{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:58:57.637035","current_ts":"2018-05-31T14:59:03.401000","pos":"00000000000000004510","primary_keys":["ID","IDD"],"before":{"ID":1,"AGE":1,"IDD":"1"},"after":{"ID":1,"AGE":20,"IDD":"1"}}

發現有primary_keys,不錯~

7.3 補全全部欄位

如果欄位補全應該是Oracle沒有開啟全列補充日誌

SQL> select supplemental_log_data_all from v$database;  

SUPPLE
------
NO

通過以下命令開啟

SQL> alter database add supplemental log data(all) columns;

Database altered.

SQL> select supplemental_log_data_all from v$database;

SUPPLE
------
YES

SQL> 

測試一下

{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.891000","pos":"00000000000000006070","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"1"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"1"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.893000","pos":"00000000000000006341","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"2"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.895000","pos":"00000000000000006612","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"8"},"after":{"ID":1,"TEXT_NAME":null,"AGE":