前提条件

  1. 购买LTS数据迁移同步服务,配置LTS操作页面账户密码,并登录LTS操作页面。
  2. 打通LTS到HBase和HDFS集群(OSS)的网络。
  3. 登录LTS操作页面, 创建HBase数据源。
  4. 登录LTS操作页面, 创建HDFS、OSS数据源。

版本

  • HBase
    • 自建HBase1.x、2.x (未开启kerberos)
    • EMR HBase
    • 标准版云HBase、增强版云HBase(集群版本, 暂不支持单机版)
  • HDFS:HDFS 2.x
  • OSS:阿里云OSS

全量任务

  • 任务创建
    1. 进入LTS操作页面,点击HBase导入导出-> HBase全量数据导出。进入BDS操作页面
    2. 点击创建迁移任务,选择对应的HBase数据源和HDFS、OSS数据源,指定导出的表,点击创建。点击创建迁移任务
    3. 查看任务进度。查看任务进度
  • 文件目录
    迁移完成后,目标HDFS制定目录下就会生成对应的parquet格式的文件
    [root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test
    Found 2 items
    drwxr-xr-x   - hadoop hadoop          0 2020-01-21 12:11 /test/table1
    drwxr-xr-x   - hadoop hadoop          0 2020-01-21 12:11 /test/table2
    表文件
    [root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test4/table1
    Found 50 items
    -rw-r--r--   2 hadoop hadoop  112574504 2020-01-21 12:11 /test4/table1/08efa07b46664576a969a9642d99badc
    -rw-r--r--   2 hadoop hadoop  117327879 2020-01-21 12:11 /test4/table1/0f1223ebbfef49baba05c5f37fab88df
    -rw-r--r--   2 hadoop hadoop  112009633 2020-01-21 12:11 /test4/table1/183e0580aa3b4373ad23089217d3d9f9
    -rw-r--r--   2 hadoop hadoop  112924525 2020-01-21 12:11 /test4/table1/184a92f35c5d4f4c917af30f07ff73e4
  • 文件内容
    对于HBase表来说, 生成的parquet文件Schema如下:
    • rowkey - HBase KV 的rowkey, byte数组
    • version - HBase KV 的时间戳, long类型
    • op - 标识 KV 的类型, 4 表示 put, 8、10、12、14 表示的delete, int 类型
    • family - 列族, byte数组
    • qualifier - 列名, byte数组
    • val - KV的值, byte数组
    scala> spark.read.parquet("/test/t1")
    res0: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 4 more fields]
    
    scala> res0.show()
    +----------------+-------------+---+-------+--------------------+--------------------+
    |          rowkey|      version| op| family|           qualifier|                 val|
    +----------------+-------------+---+-------+--------------------+--------------------+
    |[69 64 5F 32 30]|1578465770000|  4|[66 31]|                [66]|          [31 2E 35]|
    |[69 64 5F 32 30]|1578465770000|  4|[66 31]|             [69 64]|             [32 30]|
    |[69 64 5F 32 30]|1578465770000|  4|[66 31]|             [74 65]|[73 64 61 66 61 7...|
    |[69 64 5F 32 31]|1578466086000|  4|[66 31]|                [66]|          [31 2E 35]|
    |[69 64 5F 32 31]|1578466086000|  4|[66 31]|             [69 64]|             [32 31]|
    |[69 64 5F 32 31]|1578466086000|  4|[66 31]|             [74 65]|[73 64 61 66 61 7...|
    |[69 64 5F 32 31]|1578466086000|  4|[66 31]|       [74 69 6D 65]|[32 30 3A 31 34 3...|
    +----------------+-------------+---+-------+--------------------+--------------------+

    对于Phoenix表来说,生成的parquet文件Schema相比HBase会添加主键列:

    示例中,COL1、COL2是Phoenix表TABLE1的联合主键, 字段类型是string。

    scala> val df = spark.read.parquet("/test/TABLE1/3d2a8c3d63bd4fec8dfb86594d3be27e");
    df: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 6 more fields]
    
    scala> df.show();
    +----------+-------------+---+----+----+------+---------+-------------------+
    |    rowkey|      version| op|COL1|COL2|family|qualifier|                val|
    +----------+-------------+---+----+----+------+---------+-------------------+
    |[43 00 42]|1580786254253|  4|   C|   B|     0| 00000000|               [78]|
    |[43 00 42]|1580786254253|  4|   C|   B|     0| NEW_COL1|[66 73 64 66 73 64]|
    |[43 00 42]|1580786254253|  4|   C|   B|     0|     COL3|      [80 00 00 01]|
    +----------+-------------+---+----+----+------+---------+-------------------+

增量任务

  • 任务创建
    1. 进入LTS操作页面,点击HBase导入导出-> HBase实时数据归档。
    2. 点击创建同步任务, 指定源HBase集群和目标HDFS、OSS数据源,填写要导出的表。增量任务创建同步任务
  • 参数说明
    高级配置说明
    • ArchiveDir - 数据归档的目录
    • FlushInterval - 实时数据flush到HDFS或者OSS的时间间隔
  • 文件内容

    HBase表和Phoenix表的文件内容格式在全量格式的基础之上增加了分区字段dt、hh、mm分别表示日期、小时、分钟

OPEN API

  • 全量任务创建

    地址:http://{{LTS-master }}:12311/exporter/fulldata/create

    参数:
     @POST
     @Path("/fulldata/create")
     @Produces("text/plain")
     public String createFullDataJob(
          @FormParam("src") String srcName, //源HBase集群
          @FormParam("dst") String sinkName, // 目标HDFS集群
          @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔
          @FormParam("targetDir") String targetDir, // 指定数据导出到目标HDFS的指定目录
          @FormParam("extraAttrs") String extraAttrs) // 可选可不填
    例子:
    curl -d "src=hb-m5exyncdfv9l8lf9b&dst=hb-m5e23u1tx9zc2uep7-hdfs&tableNames=table1 {\"contentType\": \"KVType\"};table2 {\"contentType\": \"KVType\"}&targetDir=/test4" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{{LTS-master}}:12311/hdfs/fulldata/create
    # tableNames 传参为 table1 {"contentType": "KVType"} 表示以KV的格式导出table1的数据
    返回值:
    {
      "message": "workflow-e2203f973650430b9bb2e5e13f56e708", //任务id
      "success": "true" // 创建是否成功, 如果异常,此属性为false
    }
  • 查看全量任务详情

    地址:http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/detail

    例子:
    curl http://{{ LTS-master }}:12311/hdfs/fulldata/workflow-e2203f973650430b9bb2e5e13f56e708/detail
    返回值:
    {
      "message": "ok",
      "success": "true",
      "tunnel": {
        "src": "hb-m5exyncdfv9l8lf9b",
        "dst": "hb-m5e23u1tx9zc2uep7-hdfs",
        "jobName": "workflow-e2203f973650430b9bb2e5e13f56e708",
        "state": "SUCCEEDED",
        "tableNames": [
          "table1 {\"contentType\": \"KVType\"}"
        ],
        "fullJobs": [
          {
            "jobName": "fulldata-68bd455003804efdbddfdc406220b7a4",
            "state": "SUCCEEDED", // 任务的状态, SUCCEEDED|RUNNING|FAILED|RETRY|KILLED|QUEUED|ASSIGNED
            "table": "table1 {\"contentType\": \"KVType\"}",
            "finishedTasks": 50, // 具体完成的task数
            "totalTasks": 50 // 总共的task数
          }
        ],
        "bulkloadJobs": [],
        "inrcJobs": [],
        "mergeTasks": []
      }
    }
  • 全量任务的终止、重试、删除
    • 终止地址(POST):http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/abort
    • 重试地址(POST):http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/retry
    • 删除地址(POST):http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/del
    返回值:
    {
        "success": "true",
        "message": "ok"
    }
  • 实时归档任务的创建

    地址:http://{{LTS-master }}:12311/exporter/incr/create

    参数:
      @POST
      @Path("/incr/create")
      @Produces("text/plain")
      public String createIncrJob(
          @FormParam("name") String name, // 通道名
          @FormParam("src") String srcClusterName, //源HBase集群
          @FormParam("dst") String dstClusteName, // 目标HDFS集群
          @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔
          @FormParam("extraAttrs") String extraAttrs) // 可选,高级参数例如 ArchiveDir=/xxx,FlushInterval=60000
  • 查看实时归档任务详情

    地址:http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/detail

  • 实时归档任务的终止、重试、删除
    • 终止地址(POST):http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/abort
    • 重试地址(POST):http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/retry
    • 删除地址(POST):http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/del