全部产品
云市场

OGG插件

更新时间:2020-01-17 21:36:43

OGG采集工具


一、背景介绍

随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中,并利用大数据工具对数据进行分析。

OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具,一般用于Oracle数据库之间的主从备份以及Oracle数据库到其他数据库(DB2, MySQL等)的同步。下面是Oracle官方提供的一个OGG的整体架构图,从图中可以看出OGG的部署分为源端和目标端两部分组成,主要有Manager,Extract,Pump,Collector,Replicat这么一些组件。屏幕快照 2016-11-24 下午3.31.29.png

  • Manager:在源端和目标端都会有且只有一个Manager进程存在,负责管理其他进程的启停和监控等;
  • Extract:负责从源端数据库表或者事务日志中捕获数据,有初始加载和增量同步两种模式可以配置,初始加载模式是直接将源表数据同步到目标端,而增量同步就是分析源端数据库的日志,将变动的记录传到目标端,本文介绍的是增量同步的模式;
  • Pump:Extract从源端抽取的数据会先写到本地磁盘的Trail文件,Pump进程会负责将Trail文件的数据投递到目标端;
  • Collector:目标端负责接收来自源端的数据,生成Trail文件
  • Replicat:负责读取目标端的Trail文件,转化为相应的DDL和DML语句作用到目标数据库,实现数据同步。

本文介绍的Oracle数据同步是通过OGG的Datahub插件实现的,该Datahub插件在架构图中处于Replicat的位置,会分析Trail文件,将数据的变化记录写入Datahub中,可以使用流计算对datahub中的数据进行实时分析,也可以将数据归档到MaxCompute中进行离线处理。

当前插件版本为2.0.3,用户可以直接下载,也可以在github上下载源码自行打包。

版本更新说明:2.0.2 => 2.0.3,历史版本可以在文末进行下载。

  • 支持采集oracle表的rowid。
  • 支持DataHub的DECIMAL、TIMESTAMP等类型。
  • 修改配置参数的拼写错误(defalutDatahubConfigure => defaultDatahubConfigure),如果在升级插件版本之后,依旧使用旧的配置文件,则需要修改此参数
  • 支持压缩、protobuf传输等功能。

二、安装步骤

0. 环境要求

  • 源端已安装好Oracle11g(当前插件版本仅支持ORA11g数据库)
  • 源端已安装好OGG(建议版本Oracle GoldenGate V12.2.0.2)
  • 目标端已安装好OGG Adapters(建议版本Oracle GoldenGate Application Adapters V12.2.0.1)
  • java 7(推荐JDK 1.8)

(下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本)

1. 源端OGG安装

下载OGG安装包解压后有如下目录:

  1. drwxr-xr-x install
  2. drwxrwxr-x response
  3. -rwxr-xr-x runInstaller
  4. drwxr-xr-x stage

目前oracle一般采取response安装的方式,在response/oggcore.rsp中配置安装依赖,具体如下:

  1. oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
  2. # 需要目前与oracle版本对应
  3. INSTALL_OPTION=ORA11g
  4. # goldegate主目录
  5. SOFTWARE_LOCATION=/home/oracle/u01/ggate
  6. # 初始不启动manager
  7. START_MANAGER=false
  8. # manger端口
  9. MANAGER_PORT=7839
  10. # 对应oracle的主目录
  11. DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1
  12. # 暂可不配置
  13. INVENTORY_LOCATION=
  14. # 分组(目前暂时将oracle和ogg用同一个账号ogg_test,实际可以给ogg单独账号)
  15. UNIX_GROUP_NAME=oinstall

执行命令:

  1. runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp

本示例中,安装后OGG的目录在/home/oracle/u01/ggate,安装日志在/home/oracle/u01/ggate/cfgtoollogs/oui目录下,当silentInstall{时间}.log文件里出现如下提示,表明安装成功:

The installation of Oracle GoldenGate Core was successful.

执行/home/oracle/u01/ggate/ggsci命令,并在提示符下键入命令:CREATE SUBDIRS,从而生成ogg需要的各种目录(dir打头的那些)。至此,源端OGG安装完成。

2. 源端Oracle配置

注意:以下操作如遗漏配置,可能会造成UPDATE操作的更新前值为空

以dba分身进入sqlplus:sqlplus / as sysdba

  1. # 创建独立的表空间
  2. create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;
  3. # 创建ogg_test用户,密码也为ogg_test
  4. create user ogg_test identified by ogg_test default tablespace ATMV;
  5. # 给ogg_test赋予充分的权限
  6. grant connect,resource,dba to ogg_test;
  7. # 检查附加日志情况
  8. Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;
  9. # 增加数据库附加日志
  10. alter database add supplemental log data;
  11. alter database add supplemental log data (primary key, unique,foreign key) columns;
  12. # rollback
  13. alter database drop supplemental log data (primary key, unique,foreign key) columns;
  14. alter database drop supplemental log data;
  15. # 全字段模式,注意:在该模式下的delete操作也只有主键值,如果需要其他值,须在source端extract配置NOCOMPRESSDELETES
  16. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  17. # 开启数据库强制日志模式
  18. alter database force logging;
  19. # 安装sequence support
  20. @sequence.sql
  21. #
  22. alter table sys.seq$ add supplemental log data (primary key) columns;

3. OGG源端mgr配置

以下是通过ggsci对ogg进行配置

配置mgredit params mgr

  1. PORT 7839
  2. DYNAMICPORTLIST 7840-7849
  3. USERID ogg_test, PASSWORD ogg_test
  4. PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
  5. LAGREPORTHOURS 1
  6. LAGINFOMINUTES 30
  7. LAGCRITICALMINUTES 45
  8. PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
  9. PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

启动mgr(运行日志在ggate/dirrpt中)

start mgr

查看mgr状态

info mgr

查看mgr配置

view params mgr

4. OGG源端extract配置

以下是通过ggsci对ogg进行配置

配置extract(名字可任取,这里名字为dhext)edit params dhext

  1. EXTRACT dhext
  2. SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
  3. DBOPTIONS ALLOWUNUSEDCOLUMN
  4. USERID ogg_test, PASSWORD ogg_test
  5. REPORTCOUNT EVERY 1 MINUTES, RATE
  6. NUMFILES 5000
  7. DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 100
  8. DISCARDROLLOVER AT 2:00
  9. WARNLONGTRANS 2h, CHECKINTERVAL 3m
  10. EXTTRAIL ./dirdat/st, MEGABYTES 200
  11. DDL &
  12. INCLUDE MAPPED OBJTYPE 'table' &
  13. INCLUDE MAPPED OBJTYPE 'index' &
  14. INCLUDE MAPPED OBJTYPE 'SEQUENCE' &
  15. EXCLUDE OPTYPE COMMENT
  16. DDLOPTIONS NOCROSSRENAME REPORT
  17. TABLE OGG_TEST.*,tokens (TKN-ROWID=@GETENV('RECORD','rowid'));
  18. SEQUENCE OGG_TEST.*;
  19. GETUPDATEBEFORES

备注: TABLE OGG_TEST.*,tokens (TKN-ROWID=@GETENV('RECORD','rowid'));可以用来采集源表的rowdid,如果不需要采集rowid可以修改为TABLE OGG_TEST.*;

增加extract进程(extract后的名字要跟上面dhext对应)

add extract dhext,tranlog, begin now

添加抽取进程,每个队列文件大小为200m

add exttrail ./dirdat/st,extract dhext, megabytes 200

启动抽取进程(运行日志在ggate/dirrpt中)

start dhext

至此,extract配置完成,数据库的一条变更可以在ggate/dirdat目录下的文件中看到

5. 生成def文件

源端ggsci起来后执行如下命令,生成defgen文件,并且拷贝到目标端dirdef下

edit params defgen

  1. DEFSFILE ./dirdef/ogg_test.def
  2. USERID ogg_test, PASSWORD ogg_test
  3. table OGG_TEST.*;

在shell中执行如下命令,生成ogg_test.def

./defgen paramfile ./dirprm/defgen.prm

6. 目标端OGG安装和配置

解压adapter包将源端中dirdef/ogg_test.def文件拷贝到adapter的dirdef下

执行ggsci起来后执行如下命令,创建必须目录create subdirs

编辑mgr配置edit params mgr

  1. PORT 7839
  2. DYNAMICPORTLIST 7840-7849
  3. PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
  4. LAGREPORTHOURS 1
  5. LAGINFOMINUTES 30
  6. LAGCRITICALMINUTES 45
  7. PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
  8. PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

启动mgrstart mgr

7. 源端ogg pump配置

启动ggsci后执行如下操作:

编辑pump配置edit params pump

  1. EXTRACT pump
  2. RMTHOST xx.xx.xx.xx, MGRPORT 7839, COMPRESS
  3. PASSTHRU
  4. NUMFILES 5000
  5. RMTTRAIL ./dirdat/st
  6. DYNAMICRESOLUTION
  7. TABLE OGG_TEST.*;
  8. SEQUENCE OGG_TEST.*;

添加投递进程add extract pump,exttrailsource ./dirdat/st

备注:投递进程,每个队文件大小为200madd rmttrail ./dirdat/st,extract pump,megabytes 200

启动pumpstart pump

启动后,结合上面adapter的配置,可以在目标端的dirdat目录下看到过来的trailfile

8. Datahub插件安装和配置

依赖环境:JDK >= 1.7。

配置好JAVA_HOME, LD_LIBRARY_PATH,可以将环境变量配置到~/.bash_profile中,例如

  1. export JAVA_HOME=/xxx/xxx
  2. export JRE_HOME=/xxx/xxx/jrexx
  3. export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JRE_HOME/lib/amd64:$JRE_HOME/lib/amd64/server

修改环境变量后,请重启adapter的mgr进程下载datahub-ogg-plugin.tar.gz并解压:

修改conf路径下的javaue.properties文件,将{YOUR_HOME}替换为解压后的路径

  1. gg.handlerlist=ggdatahub
  2. gg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler
  3. gg.handler.ggdatahub.configureFileName={YOUR_HOME}/aliyun-datahub-ogg-plugin/conf/configure.xml
  4. goldengate.userexit.nochkpt=false
  5. goldengate.userexit.timestamp=utc+8
  6. gg.classpath={YOUR_HOME}/aliyun-datahub-ogg-plugin/lib/*
  7. gg.log.level=debug
  8. jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar

修改conf路径下的log4j.properties文件,将{YOUR_HOME}替换为解压后的路径

修改conf路径下的configure.xml文件,修改方式见文件中的注释

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configue>
  3. <defaultOracleConfigure>
  4. <!-- oracle sid, 必选-->
  5. <sid>100</sid>
  6. <!-- oracle schema, 可以被mapping中的oracleSchema覆盖, 两者必须有一个非空-->
  7. <schema>ogg_test</schema>
  8. </defaultOracleConfigure>
  9. <defalutDatahubConfigure>
  10. <!-- datahub endpoint, 必填-->
  11. <endPoint>YOUR_DATAHUB_ENDPOINT</endPoint>
  12. <!-- datahub project, 可以被mapping中的datahubProject, 两者必须有一个非空-->
  13. <project>YOUR_DATAHUB_PROJECT</project>
  14. <!-- datahub accessId, 可以被mapping中的datahubAccessId覆盖, 两者必须有一个非空-->
  15. <accessId>YOUR_DATAHUB_ACCESS_ID</accessId>
  16. <!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆盖, 两者必须有一个非空-->
  17. <accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey>
  18. <!-- 数据变更类型同步到datahub对应的字段,可以被columnMapping中的ctypeColumn覆盖 -->
  19. <ctypeColumn>optype</ctypeColumn>
  20. <!-- 数据变更时间同步到datahub对应的字段,可以被columnMapping中的ctimeColumn覆盖 -->
  21. <ctimeColumn>readtime</ctimeColumn>
  22. <!-- 数据变更序号同步到datahub对应的字段, 按数据变更先后递增, 不保证连续, 可以被columnMapping中的cidColumn覆盖 -->
  23. <cidColumn>record_id</cidColumn>
  24. </defalutDatahubConfigure>
  25. <!-- 默认最严格,不落文件 直接退出 无限重试-->
  26. <!-- 运行每批上次的最多纪录数, 可选, 默认1000-->
  27. <batchSize>1000</batchSize>
  28. <!-- 默认时间字段转换格式, 可选, 默认yyyy-MM-dd HH:mm:ss-->
  29. <defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat>
  30. <!-- 脏数据是否继续, 可选, 默认false-->
  31. <dirtyDataContinue>true</dirtyDataContinue>
  32. <!-- 脏数据文件, 可选, 默认datahub_ogg_plugin.dirty-->
  33. <dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile>
  34. <!-- 脏数据文件最大size, 单位M, 可选, 默认500-->
  35. <dirtyDataFileMaxSize>200</dirtyDataFileMaxSize>
  36. <!-- 重试次数, -1:无限重试 0:不重试 n:重试次数, 可选, 默认-1-->
  37. <retryTimes>0</retryTimes>
  38. <!--指定shard id列表, 优先生效, 可选,例如0,1-->
  39. <shardId>0,1</shardId>
  40. <!-- 重试间隔, 单位毫秒, 可选, 默认3000-->
  41. <retryInterval>4000</retryInterval>
  42. <!-- 点位文件, 可选, 默认datahub_ogg_plugin.chk-->
  43. <checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName>
  44. <mappings>
  45. <mapping>
  46. <!-- oracle schema, 见上描述-->
  47. <oracleSchema></oracleSchema>
  48. <!-- oracle table, 必选-->
  49. <oracleTable>t_person</oracleTable>
  50. <!-- datahub project, 见上描述-->
  51. <datahubProject></datahubProject>
  52. <!-- datahub topic, 必选-->
  53. <datahubTopic>t_person</datahubTopic>
  54. <!-- oracle表rowid同步到DataHub的字段,可选 -->
  55. <rowIdColumn></rowIdColumn>
  56. <ctypeColumn></ctypeColumn>
  57. <ctimeColumn></ctimeColumn>
  58. <cidColumn></cidColumn>
  59. <columnMapping>
  60. <!--
  61. src:oracle字段名称, 必须;
  62. dest:datahub field, 必须;
  63. destOld:变更前数据落到datahub的field, 可选;
  64. isShardColumn: 是否作为shard的hashkey, 可选, 默认为false, 可以被shardId覆盖
  65. isDateFormat: timestamp字段是否采用DateFormat格式转换, 默认true. 如果是false, 源端数据必须是long
  66. dateFormat: timestamp字段的转换格式, 不填就用默认值
  67. -->
  68. <column src="id" dest="id" isShardColumn="true" isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/>
  69. <column src="name" dest="name" isShardColumn="true"/>
  70. <column src="age" dest="age"/>
  71. <column src="address" dest="address"/>
  72. <column src="comments" dest="comments"/>
  73. <column src="sex" dest="sex"/>
  74. <column src="temp" dest="temp" destOld="temp1"/>
  75. </columnMapping>
  76. </mapping>
  77. </mappings>
  78. </configue>

在ggsci下编辑datahub writer

edit params dhwriter

  1. extract dhwriter
  2. getEnv (JAVA_HOME)
  3. getEnv (LD_LIBRARY_PATH)
  4. getEnv (PATH)
  5. CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"
  6. sourcedefs ./dirdef/ogg_test.def
  7. table OGG_TEST.*;

添加dhwriteradd extract dhwriter, exttrailsource ./dirdat/st

启动dhwriterstart dhwriter

三、使用场景

这里会用一个简单的示例来说明数据的使用方法,例如我们在Oracle数据库有一张商品订单表orders(oid int, pid int, num int),该表有三列,分别为订单ID, 商品ID和商品数量。将这个表通过OGG Datahub进行增量数据同步之前,我们需要先将源表已有的数据通过DataX同步到MaxCompute中。增量同步的关键步骤如下:

(1)在Datahub上创建相应的Topic,Topic的schema为(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);

(2)OGG Datahub的插件按照上述的安装流程部署配置,其中列的Mapping配置如下:

  1. <ctypeColumn>optype</ctypeColumn>
  2. <ctimeColumn>readtime</ctimeColumn>
  3. <cidColumn>record_id</cidColumn>
  4. <columnMapping>
  5. <column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"/>
  6. <column src="pid" dest="pid_after" destOld="pid_before"/>
  7. <column src="num" dest="num_after" destOld="num_before"/>
  8. </columnMapping>

其中optype和readtime字段是记录数据库的数据变更类型和时间,optype有”I”, “D”, “U”三种取值,分别对应为“增”,“删”,“改”三种数据变更操作。

(3)OGG Datahub插件部署好成功运行后,插件会源源不断的将源表的数据变更记录输送至datahub中,例如我们在源订单表中新增一条记录(1,2,1),datahub里收到的记录如下:

  1. +--------+------------+------------+------------+------------+------------+------------+------------+------------+
  2. | record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after |
  3. +-------+------------+------------+------------+------------+------------+------------+------------+------------+
  4. | 14810373343020000 | I | 2016-12-06 15:15:28.000141 | NULL | 1 | NULL | 2 | NULL | 1 |

修改这条数据,比如把num改为20,datahub则会收到的一条变更数据记录,如下:

  1. +-------+------------+------------+------------+------------+------------+------------+------------+------------+
  2. | record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after |
  3. +--------+------------+------------+------------+------------+------------+------------+------------+------------+
  4. | 14810373343080000 | U | 2016-12-06 15:15:58.000253 | 1 | 1 | 2 | 2 | 1 | 20 |

(4)在源端OGG运行ggsci,然后运行stats dhext可以查看每个表的操作统计,在目标端同样可以使用,可以查看采集端和目标端操作统计是否一致。

实时计算

在前一天的离线计算的基础数据上,我们可以写一个StreamCompute流计算的分析程序,很容易地对数据进行实时汇总,例如实时统计当前总的订单数,每种商品的销售量等。处理思路就是对于每一条到来的变更数据,可以拿到变化的数值,实时更新统计变量即可。

离线处理

为了便于后续的离线分析,我们也可以将Datahub里的数据归档到MaxCompute中,在MaxCompute中创建相应Schema的表:

  1. create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);

在Datahub上创建MaxCompute的数据归档,上述流入Datahub里的数据将自动同步到MaxCompute当中。建议将同步到MaxCompute中的数据按照时间段进行划分,比如每一天的增量数据都对应一个独立分区。这样当天的数据同步完成后,我们可以处理对应的分区,拿到当天所有的数据变更,而与和前一天的全量数据进行合并后,即可得到当天的全量数据。为了简单起见,先不考虑分区表的情况,以2016-12-06这天的增量数据为例,假设前一天的全量数据在表orders_base里面,datahub同步过来的增量数据在orders_log表中,将orders_base与orders_log做合并操作,可以得到2016-12-06这天的最终全量数据写入表orders_result中。这个过程可以在MaxCompute上用如下这样一条SQL完成。

  1. INSERT OVERWRITE TABLE orders_result
  2. SELECT t.oid, t.pid, t.num
  3. FROM
  4. (
  5. SELECT oid, pid, num, '0' x_record_id, 1 AS x_optype
  6. FROM
  7. orders_base
  8. UNION ALL
  9. SELECT decode(optype,'D',oid_before,oid_after) AS oid
  10. , decode(optype,'D', pid_before,pid_after) AS pid
  11. , num_after AS num
  12. , record_id x_record_id
  13. , decode(optype, 'D', 0, 1) AS x_optype
  14. FROM
  15. orders_log
  16. ) t
  17. JOIN
  18. (
  19. SELECT
  20. oid
  21. , pid
  22. , max(record_id) x_max_modified
  23. FROM
  24. (
  25. SELECT
  26. oid
  27. , pid
  28. , '0' record_id
  29. FROM
  30. orders_base UNION ALL SELECT
  31. decode(optype,'D',oid_before,oid_after) AS oid
  32. , decode(optype,'D', pid_before,pid_after) AS pid
  33. , record_id
  34. FROM
  35. orders_log ) g
  36. GROUP BY oid , pid
  37. ) s
  38. ON
  39. t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;

四、参数详细介绍

configure

名称 默认值 是否必须 描述
batchSize 1000 可选 上传到datahub, 一批的最多纪录数
dirtyDataContinue false 可选 脏数据是否继续
dirtyDataFile datahub_ogg_plugin.dirty 可选 脏数据文件
dirtyDataFileMaxSize 500 可选 脏数据文件最大size,单位:MB
retryTimes -1 可选 重试次数, -1:无限重试 0:不重试 n:重试次数
retryInterval 3000 可选 重试间隔, 单位:毫秒
disableCheckPointFile false 可选 是否禁用checkpoint file
checkPointFileName datahub_ogg_plugin.chk 可选 adapter进程的点位文件名
storageCtimeColumnAsTimestamp
(Deprecated)
false 可选 弃用之后,会根据DataHub中的字段类型进行转换,timestamp存储微秒时间戳,STRING存储为yyyy-MM-dd HH:mm:ss.SSSSSS

defaultOracleConfigure

名称 默认值 是否必须 描述
sid - 必须 oracle 数据库的SID
schema - 必须 oracle schema
dateFormat(Deprecated) yyyy-MM-dd HH:mm:ss 可选 默认时间字段转换格式

defaultDatahubConfigure

名称 默认值 是否必须 描述
endPoint - 必须 DataHub endpoint
project - 必须 DataHub project
accessId - 必须 DataHub accessId
accessKey - 必须 DataHub accessKey
compressType - 可选 数据传输压缩格式,现支持DEFLATE和LZ4,默认不压缩
enablePb false 可选 数据传输是否使用protobuf
ctypeColumn - 可选 数据变更类型同步到datahub对应的字段,可以被columnMapping中的ctypeColumn覆盖,必须为String类型
ctimeColumn - 可选 数据变更时间同步到datahub对应的字段,可以被columnMapping中的ctimeColumn覆盖,必须为String或者Timestamp类型
cidColumn - 可选 数据变更序号同步到datahub对应的字段, 按数据变更先后递增, 不保证连续, 可以被columnMapping中的cidColumn覆盖,必须为String类型
constColumnMap - 可选 额外增加的常量列,每条record该列值为指定值,格式为c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆盖,必须为String类型

mapping

名称 默认值 是否必须 描述
oracleSchema - 可选 如果不配置,则采用defaultOracleConfigure中的schema
oracleTable - 必须 oracle table
datahubProject - 可选 如果不配置,则采用defaultDatahubConfigure中的project
datahubAccessId
(Deprecated)
- 可选 弃用,默认采用defaultDatahubConfigure中的accessId
datahubAccessKey
(Deprecated)
- 可选 弃用,默认采用defaultDatahubConfigure中的accessKey
datahubTopic - 必须 DataHub topic
shardId - 可选 shardId列表,如果指定,则不会自动更新shardId列表。例如0,1,2
rowIdColumn - 可选 oracle表中的rowid与DataHub相对应的字段,必须为String类型,一般在没有主键的oracle表中使用
ctypeColumn - 可选 同defaultDatahubConfigure中的ctypeColumn
ctimeColumn - 可选 同defaultDatahubConfigure中的ctimeColumn
cidColumn - 可选 同defaultDatahubConfigure中的cidColumn
constColumnMap - 可选 同defaultDatahubConfigure中的constColumnMap

columnMapping中的column

名称 默认值 是否必须 描述
src - 必须 oracle字段名称
dest - 必须 DataHub topic 字段名称
destOld - 可选 变更前数据落到DataHub topic 字段名称
isShardColumn false 可选 是否作为shard的hashkey,会被shardId覆盖
isDateFormat true 可选 只有在DataHub中的字段为TIMESTAMP时才会生效,是否采用DateFormat将时间格式转换,如果oracle字段是date或者timestamp,则不需要进行设置,如果是false, 源端数据必须是long
isKeyColumn false 可选 对于有主键的表,update操作对没有更新的列after值为””,设置为true之后就会把before的值放入到after
dateFormat yyyy-MM-dd HH:mm:ss[.fffffffff] 可选 一般不需要进行设置,当时间格式不符合默认值格式时需要进行设置

五、oracle类型与DataHub类型对应说明

DataHub的TIMESTAMP类型存储微秒时间戳,字段映射到TIMESTAMP类型时,都会被转为微秒时间戳。

oracle DataHub 备注
char STRING
varchar / varchar2 STRING
number BIGINT / DOUBLE / DECIMAL number表示整数时,可以使用BIGINT;number为浮点数时,根据精度选择DOUBLE或者DECIMAL。
INTEGER BIGINT
BINARY_FLOAT / BINARY_DOUBLE DOUBLE / DECIMAL
FLOAT DOUBLE / DECIMAL
date / timestamp TIMESTAMP / STRING 如果DataHub数据类型为TIMESTAMP,会自动转换为微秒时间戳

六、常见问题

Q:目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.

A:目标端机器hostname在/etc/hosts里面重新设置localhost对应的ip

Q:找不到jvm相关的so包

A:将jvm的so路径添加到LD_LIBRARY_PATH后,重启mgr

  1. 例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

Q:有了DDL语句,比如增加一列,源端ogg没有问题,但是adapter端的ffwriter和jmswriter进程退出,且报错: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.

A:由于表结构改变,需要重做def文件,将重做的def文件放入dirdef后重启即可

Q: 写入DataHub时间不对,与正常时间相差8个小时

A: 修改javaue.properties中的参数goldengate.userexit.timestamp为utc+8,然后重启dhwriter。

Q: ogg和ogg Adapter是否可以在同一台机器上

A: 可以,但是要修改以下ogg或者ogg Adapter的端口号,保证两者端口号不冲突。

Q: 对于无主键表,如果标识每条数据

A: datahub-ogg-plugin从2.0.3版本开始支持rowid的传输,可以通过rowid来标识每条数据。

Q: 如何采集多个表的数据

A: 在configure中的mappings下可以添加多个mapping,如果表特别多的时候,可以多配置源端的extract和pump进程, 以及目标端的dhwriter,需要注意的是,每个dhwriter都对应一个javaue.properties和configure.xml文件。

历史版本下载

datahub-ogg-plugin OGG OGG Adapter 支持数据库版本
2.0.2 12.1.2.1 12.1.2.1 ORA11g