全部产品
云市场

HBase实时数据导出RocketMQ

更新时间:2019-11-14 16:07:02

前置条件

  1. 购买BDS数据迁移同步服务,配置BDS操作页面账户密码,并登陆BDS操作页面
  2. 打通BDS和HBase迁移集群的网络
  3. 添加HBase集群数据源
  4. 添加RocketMQ数据源

版本支持

  • 开源RocketMQ 4.5
  • 阿里云消息队列 RocketMQ

限制

  • HBase实时数据同步MQ目前还不支持保序
  • BDS任务重试, MQ自身问题导致消息会被重复消费,需要业务方做到消费幂等
  • BDS同步过程中不支持点位回退
  • 只同步更新的列,相同Rowkey的其他未更新的列不会被同步
  • BDS不支持同步Bulkload进来的数据
  • BDS不支持Phoenix表数据的转码,需要客户自行处理Phoenix表的原生数据

任务创建

  1. 点击 任务管理 -> HBase实时数据同步RocketMQ -> 创建通道 1

  2. 选择源HBase集群、目标MQ数据源,指定默认同步到MQ的Topic,以及同步通道所属的Group1

  3. 查看任务执行情况1

  4. 订阅MQ指定Topic的消息,对消息中body进行转化,转化成字符串,就能得到KV对应的JSON格式的数据11

参数说明

任务创建同步表支持如下的填写格式

  1. t1 {"tag": "xxxx"} // t1表的message带上xxxx的标签,方便业务消息订阅时对tag标签进行消息过滤
  2. t2 {"tag": "xxxx", "topic": "topic1"} // t2表的数据同步到topic1中,不特殊指定就走默认topic
  3. t3 // t3表默认tag为空字符串

同步行为

  • BDS解析HBase WAL日志中的数据, 并对每个KV转化成对应的JSON格式写入到MQ,JSON格式如下所示

    1. {
    2. "namespace": "default",
    3. "tableName": "t1",
    4. "rowKey": "727878",
    5. "opCode": 4,
    6. "family": "6631",
    7. "qualifier": "61",
    8. "value": "7631",
    9. "timestamp": 1572503986205
    10. }
  • namespace表示的是HBase表的namespace,tableName表示的是表名

  • KV所包含的rowkey、family、qualifier、value在HBase都是byte数组形式进行存储,BDS同步KV默认会把byte数组转化成HexString,可以通过如下方式将HexString转化会原生的数组

    1. import org.apache.commons.codec.binary.Hex;
    2. ...
    3. Hex.decodeHex(rowkey.toCharArray())
    4. Hex.decodeHex(family.toCharArray())
    5. ...
  • timestamp 表示的是KV的版本,业务没有特殊指定,默认是KV入库的时间戳

  • opCode 表示的是KV的类型,不同的值对应如下信息

    1. enum Type {
    2. Put((byte) 4),
    3. Delete((byte) 8),
    4. DeleteFamilyVersion((byte) 10),
    5. DeleteColumn((byte) 12),
    6. DeleteFamily((byte) 14);
    7. }