本文档主要介绍如何在HBase中增量归档OSS/HDFS。

前提条件

  • 已购买BDS数据迁移同步服务,配置BDS操作页面账户密码,并登录BDS操作页面。
  • 已打通BDS到HBase和HDFS集群(OSS)的网络。
  • 登录BDS操作页面已创建HBase、HDFS和OSS数据源。

支持版本

HBase
  • 自建HBase1.x、2.x (未开启kerberos)。
  • MR HBase
  • 标准版云HBase、增强版云HBase(集群版本, 暂不支持单机版)。
HDFS
HDFS 2.x
OSS
阿里云OSS

全量任务

任务创建
  1. 进入BDS操作页面,单击HBase导入导出 > HBase全量数据导出
  2. 单击创建迁移任务,选择对应的HBase数据源和HDFS、OSS数据源,指定导出的表,单击创建
  3. 查看任务进度。

文件目录

[root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test
Found 2 items
drwxr-xr-x   - hadoop hadoop          0 2020-01-21 12:11 /test/table1
drwxr-xr-x   - hadoop hadoop          0 2020-01-21 12:11 /test/table2

表文件

[root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test4/table1
Found 50 items
-rw-r--r--   2 hadoop hadoop  112574504 2020-01-21 12:11 /test4/table1/08efa07b46664576a969a9642d99badc
-rw-r--r--   2 hadoop hadoop  117327879 2020-01-21 12:11 /test4/table1/0f1223ebbfef49baba05c5f37fab88df
-rw-r--r--   2 hadoop hadoop  112009633 2020-01-21 12:11 /test4/table1/183e0580aa3b4373ad23089217d3d9f9
-rw-r--r--   2 hadoop hadoop  112924525 2020-01-21 12:11 /test4/table1/184a92f35c5d4f4c917af30f07ff73e4

文件内容

对于HBase表来说, 生成的parquet文件Schema如下:rowkey - HBase KV 的rowkey, byte数组version - HBase KV 的时间戳, long类型op - 标识 KV 的类型, 4 表示 put, 8、10、12、14 表示的delete, int 类型family - 列族, byte数组qualifier - 列名, byte数组val - KV的值, byte数组

scala> spark.read.parquet("/test/t1")
res0: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 4 more fields]

scala> res0.show()
+----------------+-------------+---+-------+--------------------+--------------------+
|          rowkey|      version| op| family|           qualifier|                 val|
+----------------+-------------+---+-------+--------------------+--------------------+
|[69 64 5F 32 30]|1578465770000|  4|[66 31]|                [66]|          [31 2E 35]|
|[69 64 5F 32 30]|1578465770000|  4|[66 31]|             [69 64]|             [32 30]|
|[69 64 5F 32 30]|1578465770000|  4|[66 31]|             [74 65]|[73 64 61 66 61 7...|
|[69 64 5F 32 31]|1578466086000|  4|[66 31]|                [66]|          [31 2E 35]|
|[69 64 5F 32 31]|1578466086000|  4|[66 31]|             [69 64]|             [32 31]|
|[69 64 5F 32 31]|1578466086000|  4|[66 31]|             [74 65]|[73 64 61 66 61 7...|
|[69 64 5F 32 31]|1578466086000|  4|[66 31]|       [74 69 6D 65]|[32 30 3A 31 34 3...|
+----------------+-------------+---+-------+--------------------+--------------------+

对于Phoenix表来说,生成的parquet文件Schema相比HBase会添加主键列:示例中,COL1、COL2是Phoenix表TABLE1的联合主键, 字段类型是string

scala> val df = spark.read.parquet("/test/TABLE1/3d2a8c3d63bd4fec8dfb86594d3be27e");
df: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 6 more fields]

scala> df.show();
+----------+-------------+---+----+----+------+---------+-------------------+
|    rowkey|      version| op|COL1|COL2|family|qualifier|                val|
+----------+-------------+---+----+----+------+---------+-------------------+
|[43 00 42]|1580786254253|  4|   C|   B|     0| 00000000|               [78]|
|[43 00 42]|1580786254253|  4|   C|   B|     0| NEW_COL1|[66 73 64 66 73 64]|
|[43 00 42]|1580786254253|  4|   C|   B|     0|     COL3|      [80 00 00 01]|
+----------+-------------+---+----+----+------+---------+-------------------+

增量任务

任务创建
  1. 进入BDS操作页面,在左侧导航栏选择HBase导入导出 >HBase实时数据归档
  2. 单击创建同步任务, 指定源HBase集群和目标HDFS、OSS数据源,填写要导出的表
参数说明
高级配置说明
  • ArchiveDir:数据归档的目录。
  • FlushInterval:实时数据flush到HDFS或者OSS的时间间隔。
文件内容
HBase表和Phoenix表的文件内容格式在全量格式的基础之上增加了分区字段dt、hh、mm分别表示日期、小时、分钟。

OPEN API

全量任务创建
地址:http://{{bds-master }}:12311/exporter/fulldata/create参数:
 @POST
 @Path("/fulldata/create")
 @Produces("text/plain")
 public String createFullDataJob(
      @FormParam("src") String srcName, //源HBase集群
      @FormParam("dst") String sinkName, // 目标HDFS集群
      @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔
      @FormParam("targetDir") String targetDir, // 指定数据导出到目标HDFS的指定目录
      @FormParam("extraAttrs") String extraAttrs) // 可选可不填
例子:
curl -d "src=hb-m5exyncdfv9l8lf9b&dst=hb-m5e23u1tx9zc2uep7-hdfs&tableNames=table1 {\"contentType\": \"KVType\"};table2 {\"contentType\": \"KVType\"}&targetDir=/test4" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{{bds-master}}:12311/hdfs/fulldata/create
# tableNames 传参为 table1 {"contentType": "KVType"} 表示以KV的格式导出table1的数据
返回值:
{
  "message": "workflow-e2203f973650430b9bb2e5e13f56e708", //任务id
  "success": "true" // 创建是否成功, 如果异常,此属性为false
}
查看全量任务详情
地址:http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/detail例子:
curl http://{{ bds-master }}:12311/hdfs/fulldata/workflow-e2203f973650430b9bb2e5e13f56e708/detail
返回值:
{
  "message": "ok",
  "success": "true",
  "tunnel": {
    "src": "hb-m5exyncdfv9l8lf9b",
    "dst": "hb-m5e23u1tx9zc2uep7-hdfs",
    "jobName": "workflow-e2203f973650430b9bb2e5e13f56e708",
    "state": "SUCCEEDED",
    "tableNames": [
      "table1 {\"contentType\": \"KVType\"}"
    ],
    "fullJobs": [
      {
        "jobName": "fulldata-68bd455003804efdbddfdc406220b7a4",
        "state": "SUCCEEDED", // 任务的状态, SUCCEEDED|RUNNING|FAILED|RETRY|KILLED|QUEUED|ASSIGNED
        "table": "table1 {\"contentType\": \"KVType\"}",
        "finishedTasks": 50, // 具体完成的task数
        "totalTasks": 50 // 总共的task数
      }
    ],
    "bulkloadJobs": [],
    "inrcJobs": [],
    "mergeTasks": []
  }
}
全量任务的终止、重试、删除
终止地址(POST):http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/abort重试地址(POST):http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/retry删除地址(POST):http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/del返回值:
{
    "success": "true",
    "message": "ok"
}
实时归档任务的创建
地址:http://{{ bds-master }}:12311/exporter/incr/create参数:
  @POST
  @Path("/incr/create")
  @Produces("text/plain")
  public String createIncrJob(
      @FormParam("name") String name, // 通道名
      @FormParam("src") String srcClusterName, //源HBase集群
      @FormParam("dst") String dstClusteName, // 目标HDFS集群
      @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔
      @FormParam("extraAttrs") String extraAttrs) // 可选,高级参数例如 ArchiveDir=/xxx,FlushInterval=60000
查看实时归档任务详情
地址:http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/detail
实时归档任务的终止、重试、删除

终止地址(POST):http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/abort重试地址(POST):http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/retry删除地址(POST):http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/del