前提条件

  1. 购买LTS数据迁移同步服务
  2. 打通LTS和HBase迁移集群的网络
  3. 添加HBase集群数据源
  4. 添加RocketMQ数据源

版本支持

  • 开源RocketMQ 4.5
  • 阿里云消息队列 RocketMQ

限制

  • HBase实时数据同步MQ目前还不支持保序。
  • LTS任务重试, MQ自身问题导致消息会被重复消费,需要业务方做到消费幂等。
  • LTS同步过程中不支持点位回退。
  • 只同步更新的列,相同Rowkey的其他未更新的列不会被同步。
  • LTS不支持同步Bulkload进来的数据。
  • LTS不支持Phoenix表数据的转码,需要客户自行处理Phoenix表的原生数据。

任务创建

  1. 点击 任务管理 -> HBase实时数据同步RocketMQ -> 创建通道rocketMQ
  2. 选择源HBase集群、目标MQ数据源,指定默认同步到MQ的Topic,以及同步通道所属的Group。选择目标MQ数据源
  3. 查看任务执行情况。查看任务执行情况
  4. 订阅MQ指定Topic的消息,对消息中body进行转化,转化成字符串,就能得到KV对应的JSON格式的数据。订阅MQ指定Topic消息

参数说明

任务创建同步表支持如下的填写格式
t1 {"tag": "xxxx"} // t1表的message带上xxxx的标签,方便业务消息订阅时对tag标签进行消息过滤
t2 {"tag": "xxxx", "topic": "topic1"} // t2表的数据同步到topic1中,不特殊指定就走默认topic
t3 // t3表默认tag为空字符串

同步行为

  • LTS解析HBase WAL日志中的数据, 并对每个KV转化成对应的JSON格式写入到MQ,JSON格式如下所示
    {
    "namespace": "default",
    "tableName": "t1",
    "rowKey": "727878",
    "opCode": 4,
    "family": "6631",
    "qualifier": "61",
    "value": "7631",
    "timestamp": 1572503986205
    }
  • namespace表示的是HBase表的namespace,tableName表示的是表名。
  • KV所包含的rowkey、family、qualifier、value在HBase都是byte数组形式进行存储,LTS同步KV默认会把byte数组转化成HexString,可以通过如下方式将HexString转化会原生的数组。
    import org.apache.commons.codec.binary.Hex;
    ...
    Hex.decodeHex(rowkey.toCharArray())
    Hex.decodeHex(family.toCharArray())
    ...
  • timestamp 表示的是KV的版本,业务没有特殊指定,默认是KV入库的时间戳。
  • opCode 表示的是KV的类型,不同的值对应如下信息
    enum Type {
      Put((byte) 4),
    
      Delete((byte) 8),
    
      DeleteFamilyVersion((byte) 10),
    
      DeleteColumn((byte) 12),
    
      DeleteFamily((byte) 14);
    }