OGG for Bigdata是Oracle官方的将数据库数据实时流式传输到大数据系统的工具,对于Oracle19c及以下的版本均可以支持,目前OGG for Bigdata可以将数据写入Kafka,而DataHub已经兼容Kafka Producer协议,所以用户除了使用DataHub插件将Oracle数据写入DataHub之外,还可以使用OGG for Bigdata利用DataHub的Kafka接口写入DataHub。
OGG for Bigdata是Oracle官方的将数据库数据实时流式传输到大数据系统的工具,对于Oracle19c及以下的版本均可以支持,目前OGG for Bigdata可以将数据写入Kafka,而DataHub已经兼容Kafka Producer协议,所以用户除了使用DataHub插件将Oracle数据写入DataHub之外,还可以使用OGG for Bigdata利用DataHub的Kafka接口写入DataHub。
一、环境要求
Oracle数据库,只要最新版OGG支持即可
源端OGG,版本必须大于等于Oracle数据库版本,推荐使用最新版本
目标端OGG for Bigdata,版本和源端OGG保持一致,或者高于源端OGG版本,推荐使用最新版本
二、 安装步骤
(下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本)
OGG 源端配置
省略,Oracle 源端安装请参考官方文档,如果源端为Oracle11g,也可以参考OGG for Oracle。
OGG for Bigdata目标端配置
1. 目标端OGG for Bigdata安装
目标端的OGG是OGG for Bigdata,不需要安装,只需要解压即可。解压之后,需要创建必须目录,启动ggsci之后输入命令create subdirs
,成功之后便可以看到OGG目录下增加了dirxxx的几个目录。
2. 配置kafka相关参数
a. 配置custom_kafka_producer.properties
custom_kafka_producer.properties为Kafka Producer相关参数的配置,这里只给出一个可用示例,更多配置可参考Kafka官网。
在dirprm目录下编辑文件custom_kafka_producer.properties
,如果没有可以新建一个。
# kafka endpoint,这里为杭州region的endpoint,用户根据需要自行修改
bootstrap.servers=dh-cn-hangzhou.aliyuncs.com:9092
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 参数调优,用户可以自行配置,也可以全部使用默认值
# compression.type=lz4
# batch.size=1024000
# max.request.size=41943040
# message.max.bytes=41943040
# linger.ms=5000
# reconnect.backoff.ms=1000
# DataHub的kafka接口默认使用SASL_SSL认证,下面为必要配置
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
b. 配置 kafka_client_producer_jaas.conf
kafka_client_producer_jaas.conf主要用来填写DataHub的AK,在dirprm目录下新建文件并编辑kafka_client_producer_jaas.conf
。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
c. 配置kafka.props
kafka.props用户配置写入Kafka的topic、数据格式、日志打印等等。这里只给出一个可用的简单示例,更多配置可参考OGG for bigdata 官方文档。
在dirrpm目录下编辑文件kafka.props
,如果没有可以新建一个。
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# 写入的kafka topic
gg.handler.kafkahandler.TopicName =kafka_topic_name
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.includeCurrentTimestamp=false
gg.handler.kafkahandler.BlockingSend =true
# token含有rowid相关信息以及用户添加的token信息
gg.handler.kafkahandler.includeTokens=true
# 主键更新设为update操作
gg.handler.kafkahandler.format.pkUpdateHandling =update
#gg.handler.kafkahandler.format.pkUpdateHandling =delete-insert
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
goldengate.userexit.nochkpt=FALSE
gg.log=log4j
gg.log.level=INFO
gg.report.time=120sec
###Kafka Classpath settings ###
gg.classpath=/xxx/lib/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config=dirprm/kafka_client_producer_jaas.conf
注意事项
目前DataHub暂时不支持Kafka Consumer协议,配置
gg.handler.kafkahandler.SchemaTopicName
后会用到Kafka Consumer协议,所以不要配置该参数。java.security.auth.login.config
为必要配置,如不配置将无法正常启动
3. 配置目标端mgr
编辑mgr配置edit params mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS, MINKEEPDAYS 3
启动mgrstart mgr
4. 配置目标端replicat
在ggsci下编辑配置,edit params mqkafka
这里mqkafka为replicat进程名称,用户可任意定义。
REPLICAT mqkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
--MAP QASOURCE.*, TARGET QASOURCE.*;
MAP ogg_test.*, TARGET ogg_test.*;
添加并启动mqkafka
目前无法使用Kafka接口创建DataHub Topic,启动replicat进程之前需要保证DataHub Topic已存在,推荐使用Blob Topic,具体可参考兼容Kafka。
# 添加进程
add replicat mqkafka, exttrail ./dirdat/st
# 启动mqkafka
start mqkafka
三、数据示例
由于使用Kafka接口写入DataHub,所以写入DataHub的数据都是没有schema的,用户可以同步到ODPS之后,定义udf进行解析。下面以JSON
格式为例,介绍写入DataHub的数据。
1. Oracle表结构
SQL> desc orders
Name Null? Type
----------------------------------------- -------- ----------------------------
OID NUMBER(38)
PID VARCHAR2(511)
NUM VARCHAR2(511)
2. DataHub Topic结构
这里为了演示数据方便,使用了两列 STRING 的Tuple Topic,推荐使用Blob Topic,当数据含有二进制内容时,使用tuple可能会存在问题。
{
"fields":[
{
"name":"key",
"type":"STRING"
},
{
"name":"val",
"type":"STRING"
}
]
}
3. 写入数据
a. sql脚本
declare
i number;
op_num number;
begin
op_num := 1;
for i in 1..op_num loop
insert into orders(oid,pid,num) values(i,i+1,i+2);
end loop;
for i in 1..op_num loop
update orders set pid=i*2+1 where oid=i;
end loop;
for i in 1..op_num loop
delete from orders where oid=i;
end loop;
commit;
end;
b. 启动日志
配置好对应的表之后,启动目标端的replicat进程之后,就可以在日志文件dirrpt/MQKAFKA_info_log4j.log
中看到如下语句,表示已经生成表OGG_TEST.ORDERS
的 Json Schema。
INFO 2020-05-29 20:23:55,069 [main] Creating JSON schema for table OGG_TEST.ORDERS in file ./dirdef/OGG_TEST.ORDERS.schema.json
dirdef/OGG_TEST.ORDERS.schema.json文件内容为
$cat dirdef/OGG_TEST.ORDERS.schema.json
{
"$schema":"http://json-schema.org/draft-04/schema#",
"title":"OGG_TEST.ORDERS",
"description":"JSON schema for table OGG_TEST.ORDERS",
"definitions":{
"row":{
"type":"object",
"properties":{
"OID":{
"type":[
"string",
"null"
]
},
"PID":{
"type":[
"string",
"null"
]
},
"NUM":{
"type":[
"string",
"null"
]
}
},
"additionalProperties":false
},
"tokens":{
"type":"object",
"description":"Token keys and values are free form key value pairs.",
"properties":{
},
"additionalProperties":true
}
},
"type":"object",
"properties":{
"table":{
"description":"The fully qualified table name",
"type":"string"
},
"op_type":{
"description":"The operation type",
"type":"string"
},
"op_ts":{
"description":"The operation timestamp",
"type":"string"
},
"current_ts":{
"description":"The current processing timestamp",
"type":"string"
},
"pos":{
"description":"The position of the operation in the data source",
"type":"string"
},
"tokens":{
"$ref":"#/definitions/tokens"
},
"before":{
"$ref":"#/definitions/row"
},
"after":{
"$ref":"#/definitions/row"
}
},
"required":[
"table",
"op_type",
"op_ts",
"current_ts",
"pos"
],
"additionalProperties":false
}
c. 数据抽样
运行sql之后,就可以看到数据正常的写入DataHub中了,可以再DataHub Console页面进行抽样查看,上面sql共写入三条数据,数据抽样内容如下所示:
Shard ID System Time key (STRING) val (STRING)
0 2020年5月29日下午6:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
0 2020年5月29日下午6:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"U","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064000","pos":"00000002790849451514","before":{"OID":"1","PID":"2","NUM":"3"},"after":{"OID":"1","PID":"3","NUM":"3"}}
0 2020年5月29日下午6:01:38 OGG_TEST.ORDERS {"table":"OGG_TEST.ORDERS","op_type":"D","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.064001","pos":"00000002790849451685","before":{"OID":"1","PID":"3","NUM":"3"}}