全部产品

OGG for Bigdata(Kafka)

OGG for Bigdata是Oracle官方的 将数据库数据实时流式传输到大数据系统的工具,对于Oracle19c及以下的版本均可以支持,目前OGG for Bigdata可以将数据写入Kafka,而DataHub已经兼容Kafka Producer协议,所以用户除了使用DataHub插件将Oracle数据写入DataHub之外,还可以使用OGG for Bigdata利用DataHub的Kafka接口写入DataHub。

一、环境要求

  • Oracle数据库,只要最新版OGG支持支持即可
  • 源端OGG,版本必须大于大于等于Oracle数据库版本,推荐使用最新版本
  • 目标端OGG for Bigdata,版本和源端OGG保持一致,或者高于源端OGG版本,推荐使用最新版本
  • OGG官方下载地址

二、 安装步骤

(下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本)

OGG 源端配置

省略,Oracle 源端安装请参考官方文档,如果源端为Oracle11g,也可以参考OGG for Oracle。

OGG for Bigdata目标端配置

1. 目标端OGG for Bigdata安装

目标端的OGG是OGG for Bigdata,不需要安装,只需要解压即可。解压之后,需要创建必须目录,启动ggsci之后输入命令create subdirs,成功之后便可以看到OGG目录下增加了dirxxx的几个目录。

2. 配置kafka相关参数

a. 配置custom_kafka_producer.properties

custom_kafka_producer.properties为Kafka Producer相关参数的配置,这里只给出一个可用示例,更多配置可参考Kafka官网

在dirprm目录下编辑文件custom_kafka_producer.properties,如果没有可以新建一个。

   
  1. # kafka endpoint,这里为杭州region的endpoint,用户根据需要自行修改
  2. bootstrap.servers=dh-cn-hangzhou.aliyuncs.com:9092
  3. value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
  4. key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
  5. # 参数调优,用户可以自行配置,也可以全部使用默认值
  6. # compression.type=lz4
  7. # batch.size=1024000
  8. # max.request.size=41943040
  9. # message.max.bytes=41943040
  10. # linger.ms=5000
  11. # reconnect.backoff.ms=1000
  12. # DataHub的kafka接口默认使用SASL_SSL认证,下面为必要配置
  13. security.protocol=SASL_SSL
  14. sasl.mechanism=PLAIN

b. 配置 kafka_client_producer_jaas.conf

kafka_client_producer_jaas.conf主要用来填写DataHub的AK,在dirprm目录下新建文件并编辑kafka_client_producer_jaas.conf

   
  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="accessId"
  4. password="accessKey";
  5. };

c. 配置kafka.props

kafka.props用户配置写入Kafka的topic、数据格式、日志打印等等。这里只给出一个可用的简单示例,更多配置可参考OGG for bigdata 官方文档

在dirrpm目录下编辑文件kafka.props,如果没有可以新建一个。

   
  1. gg.handlerlist = kafkahandler
  2. gg.handler.kafkahandler.type = kafka
  3. gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
  4. # 写入的kafka topic
  5. gg.handler.kafkahandler.TopicName =kafka_topic_name
  6. gg.handler.kafkahandler.format =json
  7. gg.handler.kafkahandler.mode=op
  8. gg.handler.kafkahandler.format.includeCurrentTimestamp=false
  9. gg.handler.kafkahandler.BlockingSend =true
  10. # token含有rowid相关信息以及用户添加的token信息
  11. gg.handler.kafkahandler.includeTokens=true
  12. # 主键更新设为update操作
  13. gg.handler.kafkahandler.format.pkUpdateHandling =update
  14. #gg.handler.kafkahandler.format.pkUpdateHandling =delete-insert
  15. goldengate.userexit.timestamp=utc
  16. goldengate.userexit.writers=javawriter
  17. javawriter.stats.display=TRUE
  18. javawriter.stats.full=TRUE
  19. goldengate.userexit.nochkpt=FALSE
  20. gg.log=log4j
  21. gg.log.level=INFO
  22. gg.report.time=120sec
  23. ###Kafka Classpath settings ###
  24. gg.classpath=/xxx/lib/kafka/libs/*
  25. javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config=dirprm/kafka_client_producer_jaas.conf

注意事项

  • 目前DataHub暂时不支持Kafka Consumer协议,配置gg.handler.kafkahandler.SchemaTopicName后会用到Kafka Consumer协议,所以不要配置该参数。
  • java.security.auth.login.config为必要配置,如不配置将无法正常启动

3. 配置目标端mgr

编辑mgr配置edit params mgr

   
  1. PORT 7809
  2. DYNAMICPORTLIST 7810-7909
  3. AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
  4. PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS, MINKEEPDAYS 3

启动mgrstart mgr

4. 配置目标端replicat

在ggsci下编辑配置,edit params mqkafka

这里mqkafka为replicat进程名称,用户可任意定义。

   
  1. REPLICAT mqkafka
  2. TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
  3. REPORTCOUNT EVERY 1 MINUTES, RATE
  4. GROUPTRANSOPS 10000
  5. --MAP QASOURCE.*, TARGET QASOURCE.*;
  6. MAP ogg_test.*, TARGET ogg_test.*;

添加并启动mqkafka

目前无法使用Kafka接口创建DataHub Topic,启动replicat进程之前需要保证DataHub Topic已存在,推荐使用Blob Topic,具体可参考兼容Kafka

   
  1. # 添加进程
  2. add replicat mqkafka, exttrail ./dirdat/st
  3. # 启动mqkafka
  4. start mqkafka

三、数据示例

由于使用Kafka接口写入DataHub,所以写入DataHub的数据都是没有schema的,用户可以同步到ODPS之后,定义udf进行解析。下面以JSON格式为例,介绍写入DataHub的数据。

1. Oracle表结构

   
  1. SQL> desc orders
  2. Name Null? Type
  3. ----------------------------------------- -------- ----------------------------
  4. OID NUMBER(38)
  5. PID VARCHAR2(511)
  6. NUM VARCHAR2(511)

2. DataHub Topic结构

这里为了演示数据方便,使用了两列 STRING 的Tuple Topic,推荐使用Blob Topic,当数据含有二进制内容时,使用tuple可能会存在问题。

   
  1. {
  2. "fields":[
  3. {
  4. "name":"key",
  5. "type":"STRING"
  6. },
  7. {
  8. "name":"val",
  9. "type":"STRING"
  10. }
  11. ]
  12. }

3. 写入数据

a. sql脚本

   
  1. declare
  2. i number;
  3. op_num number;
  4. begin
  5. op_num := 1;
  6. for i in 1..op_num loop
  7. insert into orders(oid,pid,num) values(i,i+1,i+2);
  8. end loop;
  9. for i in 1..op_num loop
  10. update orders set pid=i*2+1 where oid=i;
  11. end loop;
  12. for i in 1..op_num loop
  13. delete from orders where oid=i;
  14. end loop;
  15. commit;
  16. end;

b. 启动日志

配置好对应的表之后,启动目标端的replicat进程之后,就可以在日志文件dirrpt/MQKAFKA_info_log4j.log中看到如下语句,表示已经生成表OGG_TEST.ORDERS的 Json Schema。

   
  1. INFO 2020-05-29 20:23:55,069 [main] Creating JSON schema for table OGG_TEST.ORDERS in file ./dirdef/OGG_TEST.ORDERS.schema.json

dirdef/OGG_TEST.ORDERS.schema.json文件内容为

   
  1. $cat dirdef/OGG_TEST.ORDERS.schema.json
  2. {
  3. "$schema":"http://json-schema.org/draft-04/schema#",
  4. "title":"OGG_TEST.ORDERS",
  5. "description":"JSON schema for table OGG_TEST.ORDERS",
  6. "definitions":{
  7. "row":{
  8. "type":"object",
  9. "properties":{
  10. "OID":{
  11. "type":[
  12. "string",
  13. "null"
  14. ]
  15. },
  16. "PID":{
  17. "type":[
  18. "string",
  19. "null"
  20. ]
  21. },
  22. "NUM":{
  23. "type":[
  24. "string",
  25. "null"
  26. ]
  27. }
  28. },
  29. "additionalProperties":false
  30. },
  31. "tokens":{
  32. "type":"object",
  33. "description":"Token keys and values are free form key value pairs.",
  34. "properties":{
  35. },
  36. "additionalProperties":true
  37. }
  38. },
  39. "type":"object",
  40. "properties":{
  41. "table":{
  42. "description":"The fully qualified table name",
  43. "type":"string"
  44. },
  45. "op_type":{
  46. "description":"The operation type",
  47. "type":"string"
  48. },
  49. "op_ts":{
  50. "description":"The operation timestamp",
  51. "type":"string"
  52. },
  53. "current_ts":{
  54. "description":"The current processing timestamp",
  55. "type":"string"
  56. },
  57. "pos":{
  58. "description":"The position of the operation in the data source",
  59. "type":"string"
  60. },
  61. "tokens":{
  62. "$ref":"#/definitions/tokens"
  63. },
  64. "before":{
  65. "$ref":"#/definitions/row"
  66. },
  67. "after":{
  68. "$ref":"#/definitions/row"
  69. }
  70. },
  71. "required":[
  72. "table",
  73. "op_type",
  74. "op_ts",
  75. "current_ts",
  76. "pos"
  77. ],
  78. "additionalProperties":false
  79. }

c. 数据抽样

运行sql之后,就可以看到数据正常的写入DataHub中了,可以再DataHub Console页面进行抽样查看,上面sql共写入三条数据,数据抽样内容如下所示:

   
  1. Shard ID System Time key (STRING) val (STRING)
  2. 0 2020529 下午6:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
  3. 0 2020529 下午6:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"U","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064000","pos":"00000002790849451514","before":{"OID":"1","PID":"2","NUM":"3"},"after":{"OID":"1","PID":"3","NUM":"3"}}
  4. 0 2020529 下午6:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"D","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064001","pos":"00000002790849451685","before":{"OID":"1","PID":"3","NUM":"3"}}