OGG for Bigdata(Kafka)

OGG for Bigdata是Oracle官方的将数据库数据实时流式传输到大数据系统的工具,对于Oracle19c及以下的版本均可以支持,目前OGG for Bigdata可以将数据写入Kafka,而DataHub已经兼容Kafka Producer/Consumer协议,所以用户除了使用DataHub插件将Oracle数据写入DataHub之外,还可以使用OGG for Bigdata利用DataHub的Kafka接口写入DataHub。

一、环境要求

  • Oracle数据库,只要最新版OGG支持即可

  • 源端OGG,版本必须大于等于Oracle数据库版本,推荐使用最新版本

  • 目标端OGG for Bigdata,版本和源端OGG保持一致,或者高于源端OGG版本,推荐使用最新版本

  • OGG官方下载地址

二、 安装步骤

(下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本)

OGG 源端配置

省略,Oracle 源端安装请参考官方文档,如果源端为Oracle11g,也可以参考OGG for Oracle。

OGG for Bigdata目标端配置

1. 目标端OGG for Bigdata安装

目标端的OGG是OGG for Bigdata,不需要安装,只需要解压即可。解压之后,需要创建必须目录,启动ggsci之后输入命令create subdirs,成功之后便可以看到OGG目录下增加了dirxxx的几个目录。

2. 配置kafka相关参数

a. 配置custom_kafka_producer.properties

custom_kafka_producer.properties为Kafka Producer相关参数的配置,这里只给出一个可用示例,更多配置可参考Kafka官网

在dirprm目录下编辑文件custom_kafka_producer.properties,如果没有可以新建一个。

# kafka endpoint,这里为杭州region的endpoint,用户根据需要自行修改
bootstrap.servers=dh-cn-hangzhou.aliyuncs.com:9092

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# 参数调优,用户可以自行配置,也可以全部使用默认值
# compression.type=lz4
# batch.size=1024000
# max.request.size=41943040
# message.max.bytes=41943040
# linger.ms=5000
# reconnect.backoff.ms=1000

# DataHub的kafka接口默认使用SASL_SSL认证,下面为必要配置
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

b. 配置 kafka_client_producer_jaas.conf

kafka_client_producer_jaas.conf主要用来填写DataHub的AK,在dirprm目录下新建文件并编辑kafka_client_producer_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="accessId"
    password="accessKey";
};

c. 配置kafka.props

kafka.props用户配置写入Kafka的topic、数据格式、日志打印等等。这里只给出一个可用的简单示例,更多配置可参考OGG for bigdata 官方文档

在dirrpm目录下编辑文件kafka.props,如果没有可以新建一个。

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# 写入的kafka topic
gg.handler.kafkahandler.TopicName =kafka_topic_name
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.includeCurrentTimestamp=false
gg.handler.kafkahandler.BlockingSend =true
# token含有rowid相关信息以及用户添加的token信息
gg.handler.kafkahandler.includeTokens=true

# 主键更新设为update操作
gg.handler.kafkahandler.format.pkUpdateHandling =update
#gg.handler.kafkahandler.format.pkUpdateHandling =delete-insert

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
goldengate.userexit.nochkpt=FALSE

gg.log=log4j
gg.log.level=INFO
gg.report.time=120sec

###Kafka Classpath settings ###
gg.classpath=/xxx/lib/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config=dirprm/kafka_client_producer_jaas.conf

注意事项

  • 目前DataHub暂时不支持Kafka Consumer协议,配置gg.handler.kafkahandler.SchemaTopicName后会用到Kafka Consumer协议,所以不要配置该参数。

  • java.security.auth.login.config为必要配置,如不配置将无法正常启动

3. 配置目标端mgr

编辑mgr配置edit params mgr

PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS, MINKEEPDAYS 3

启动mgrstart mgr

4. 配置目标端replicat

在ggsci下编辑配置,edit params mqkafka

这里mqkafka为replicat进程名称,用户可任意定义。

REPLICAT mqkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
--MAP QASOURCE.*, TARGET QASOURCE.*;
MAP ogg_test.*, TARGET ogg_test.*;

添加并启动mqkafka

目前无法使用Kafka接口创建DataHub Topic,启动replicat进程之前需要保证DataHub Topic已存在,推荐使用Blob Topic,具体可参考兼容Kafka

# 添加进程
add replicat mqkafka, exttrail ./dirdat/st

# 启动mqkafka
start mqkafka

三、数据示例

由于使用Kafka接口写入DataHub,所以写入DataHub的数据都是没有schema的,用户可以同步到ODPS之后,定义udf进行解析。下面以JSON格式为例,介绍写入DataHub的数据。

1. Oracle表结构

  SQL> desc orders
   Name                                      Null?    Type
   ----------------------------------------- -------- ----------------------------
   OID                                                NUMBER(38)
   PID                                                VARCHAR2(511)
   NUM                                                VARCHAR2(511)

2. DataHub Topic结构

这里为了演示数据方便,使用了两列 STRING 的Tuple Topic,推荐使用Blob Topic,当数据含有二进制内容时,使用tuple可能会存在问题。

{
    "fields":[
        {
            "name":"key",
            "type":"STRING"
        },
        {
            "name":"val",
            "type":"STRING"
        }
    ]
}

3. 写入数据

a. sql脚本

declare
i number;
op_num number;
begin
          op_num := 1;
          for i in 1..op_num loop
        insert into orders(oid,pid,num) values(i,i+1,i+2);
      end loop;

      for i in 1..op_num loop
        update orders set pid=i*2+1 where oid=i;
      end loop;

      for i in 1..op_num loop
        delete from orders where oid=i;
      end loop;
          commit;
end;

b. 启动日志

配置好对应的表之后,启动目标端的replicat进程之后,就可以在日志文件dirrpt/MQKAFKA_info_log4j.log中看到如下语句,表示已经生成表OGG_TEST.ORDERS的 Json Schema。

INFO 2020-05-29 20:23:55,069 [main] Creating JSON schema for table OGG_TEST.ORDERS in file ./dirdef/OGG_TEST.ORDERS.schema.json

dirdef/OGG_TEST.ORDERS.schema.json文件内容为

$cat dirdef/OGG_TEST.ORDERS.schema.json

{
    "$schema":"http://json-schema.org/draft-04/schema#",
    "title":"OGG_TEST.ORDERS",
    "description":"JSON schema for table OGG_TEST.ORDERS",
    "definitions":{
        "row":{
            "type":"object",
            "properties":{
                "OID":{
                    "type":[
                        "string",
                        "null"
                    ]
                },
                "PID":{
                    "type":[
                        "string",
                        "null"
                    ]
                },
                "NUM":{
                    "type":[
                        "string",
                        "null"
                    ]
                }
            },
            "additionalProperties":false
        },
        "tokens":{
            "type":"object",
            "description":"Token keys and values are free form key value pairs.",
            "properties":{
            },
            "additionalProperties":true
        }
    },
    "type":"object",
    "properties":{
        "table":{
            "description":"The fully qualified table name",
            "type":"string"
        },
        "op_type":{
            "description":"The operation type",
            "type":"string"
        },
        "op_ts":{
            "description":"The operation timestamp",
            "type":"string"
        },
        "current_ts":{
            "description":"The current processing timestamp",
            "type":"string"
        },
        "pos":{
            "description":"The position of the operation in the data source",
            "type":"string"
        },
        "tokens":{
            "$ref":"#/definitions/tokens"
        },
        "before":{
            "$ref":"#/definitions/row"
        },
        "after":{
            "$ref":"#/definitions/row"
        }
    },
    "required":[
        "table",
        "op_type",
        "op_ts",
        "current_ts",
        "pos"
    ],
    "additionalProperties":false
}

c. 数据抽样

运行sql之后,就可以看到数据正常的写入DataHub中了,可以再DataHub Console页面进行抽样查看,上面sql共写入三条数据,数据抽样内容如下所示:

Shard ID    System Time    key (STRING)    val (STRING)
0    2020年5月29日下午6:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
0    2020年5月29日下午6:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"U","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064000","pos":"00000002790849451514","before":{"OID":"1","PID":"2","NUM":"3"},"after":{"OID":"1","PID":"3","NUM":"3"}}
0    2020年5月29日下午6:01:38    OGG_TEST.ORDERS    {"table":"OGG_TEST.ORDERS","op_type":"D","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064001","pos":"00000002790849451685","before":{"OID":"1","PID":"3","NUM":"3"}}