全部产品
云市场

增量归档OSS/HDFS

更新时间:2020-03-10 18:07:34

前提条件

  1. 购买BDS数据迁移同步服务,配置BDS操作页面账户密码,并登陆BDS操作页面
  2. 打通BDS到HBase和HDFS集群(OSS)的网络
  3. 登陆BDS操作页面, 创建HBase数据源
  4. 登陆BDS操作页面, 创建HDFS、OSS数据源

版本

HBase

  • 自建HBase1.x、2.x (未开启kerberos)
  • EMR HBase
  • 标准版云HBase、增强版云HBase(集群版本, 暂不支持单机版)

HDFS

  • HDFS 2.x

OSS

  • 阿里云OSS

全量任务

任务创建

  1. 进入BDS操作页面,点击HBase导入导出-> HBase全量数据导出
    1
  2. 点击创建迁移任务,选择对应的HBase数据源和HDFS、OSS数据源,指定导出的表,点击创建
    1
  3. 查看任务进度
    1

文件目录

迁移完成后,目标HDFS制定目录下就会生成对应的parquet格式的文件

  1. [root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test
  2. Found 2 items
  3. drwxr-xr-x - hadoop hadoop 0 2020-01-21 12:11 /test/table1
  4. drwxr-xr-x - hadoop hadoop 0 2020-01-21 12:11 /test/table2

表文件

  1. [root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test4/table1
  2. Found 50 items
  3. -rw-r--r-- 2 hadoop hadoop 112574504 2020-01-21 12:11 /test4/table1/08efa07b46664576a969a9642d99badc
  4. -rw-r--r-- 2 hadoop hadoop 117327879 2020-01-21 12:11 /test4/table1/0f1223ebbfef49baba05c5f37fab88df
  5. -rw-r--r-- 2 hadoop hadoop 112009633 2020-01-21 12:11 /test4/table1/183e0580aa3b4373ad23089217d3d9f9
  6. -rw-r--r-- 2 hadoop hadoop 112924525 2020-01-21 12:11 /test4/table1/184a92f35c5d4f4c917af30f07ff73e4

文件内容

对于HBase表来说, 生成的parquet文件Schema如下:
rowkey - HBase KV 的rowkey, byte数组
version - HBase KV 的时间戳, long类型
op - 标识 KV 的类型, 4 表示 put, 8、10、12、14 表示的delete, int 类型
family - 列族, byte数组
qualifier - 列名, byte数组
val - KV的值, byte数组

  1. scala> spark.read.parquet("/test/t1")
  2. res0: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 4 more fields]
  3. scala> res0.show()
  4. +----------------+-------------+---+-------+--------------------+--------------------+
  5. | rowkey| version| op| family| qualifier| val|
  6. +----------------+-------------+---+-------+--------------------+--------------------+
  7. |[69 64 5F 32 30]|1578465770000| 4|[66 31]| [66]| [31 2E 35]|
  8. |[69 64 5F 32 30]|1578465770000| 4|[66 31]| [69 64]| [32 30]|
  9. |[69 64 5F 32 30]|1578465770000| 4|[66 31]| [74 65]|[73 64 61 66 61 7...|
  10. |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [66]| [31 2E 35]|
  11. |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [69 64]| [32 31]|
  12. |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [74 65]|[73 64 61 66 61 7...|
  13. |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [74 69 6D 65]|[32 30 3A 31 34 3...|
  14. +----------------+-------------+---+-------+--------------------+--------------------+

对于Phoenix表来说,生成的parquet文件Schema相比HBase会添加主键列:
示例中,COL1、COL2是Phoenix表TABLE1的联合主键, 字段类型是string

  1. scala> val df = spark.read.parquet("/test/TABLE1/3d2a8c3d63bd4fec8dfb86594d3be27e");
  2. df: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 6 more fields]
  3. scala> df.show();
  4. +----------+-------------+---+----+----+------+---------+-------------------+
  5. | rowkey| version| op|COL1|COL2|family|qualifier| val|
  6. +----------+-------------+---+----+----+------+---------+-------------------+
  7. |[43 00 42]|1580786254253| 4| C| B| 0| 00000000| [78]|
  8. |[43 00 42]|1580786254253| 4| C| B| 0| NEW_COL1|[66 73 64 66 73 64]|
  9. |[43 00 42]|1580786254253| 4| C| B| 0| COL3| [80 00 00 01]|
  10. +----------+-------------+---+----+----+------+---------+-------------------+

增量任务

任务创建

  1. 进入BDS操作页面,点击HBase导入导出-> HBase实时数据归档
  2. 点击创建同步任务, 指定源HBase集群和目标HDFS、OSS数据源,填写要导出的表
    1

参数说明

高级配置说明

  • ArchiveDir - 数据归档的目录
  • FlushInterval - 实时数据flush到HDFS或者OSS的时间间隔

文件内容

HBase表和Phoenix表的文件内容格式在全量格式的基础之上增加了分区字段dt、hh、mm分别表示日期、小时、分钟

OPEN API

全量任务创建

地址: http://{{ bds-master }}:12311/exporter/fulldata/create
参数:

  1. @POST
  2. @Path("/fulldata/create")
  3. @Produces("text/plain")
  4. public String createFullDataJob(
  5. @FormParam("src") String srcName, //源HBase集群
  6. @FormParam("dst") String sinkName, // 目标HDFS集群
  7. @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔
  8. @FormParam("targetDir") String targetDir, // 指定数据导出到目标HDFS的指定目录
  9. @FormParam("extraAttrs") String extraAttrs) // 可选可不填

例子:

  1. curl -d "src=hb-m5exyncdfv9l8lf9b&dst=hb-m5e23u1tx9zc2uep7-hdfs&tableNames=table1 {\"contentType\": \"KVType\"};table2 {\"contentType\": \"KVType\"}&targetDir=/test4" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{{bds-master}}:12311/hdfs/fulldata/create
  2. # tableNames 传参为 table1 {"contentType": "KVType"} 表示以KV的格式导出table1的数据

返回值:

  1. {
  2. "message": "workflow-e2203f973650430b9bb2e5e13f56e708", //任务id
  3. "success": "true" // 创建是否成功, 如果异常,此属性为false
  4. }

查看全量任务详情

地址:
http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/detail
例子:

  1. curl http://{{ bds-master }}:12311/hdfs/fulldata/workflow-e2203f973650430b9bb2e5e13f56e708/detail

返回值:

  1. {
  2. "message": "ok",
  3. "success": "true",
  4. "tunnel": {
  5. "src": "hb-m5exyncdfv9l8lf9b",
  6. "dst": "hb-m5e23u1tx9zc2uep7-hdfs",
  7. "jobName": "workflow-e2203f973650430b9bb2e5e13f56e708",
  8. "state": "SUCCEEDED",
  9. "tableNames": [
  10. "table1 {\"contentType\": \"KVType\"}"
  11. ],
  12. "fullJobs": [
  13. {
  14. "jobName": "fulldata-68bd455003804efdbddfdc406220b7a4",
  15. "state": "SUCCEEDED", // 任务的状态, SUCCEEDED|RUNNING|FAILED|RETRY|KILLED|QUEUED|ASSIGNED
  16. "table": "table1 {\"contentType\": \"KVType\"}",
  17. "finishedTasks": 50, // 具体完成的task
  18. "totalTasks": 50 // 总共的task
  19. }
  20. ],
  21. "bulkloadJobs": [],
  22. "inrcJobs": [],
  23. "mergeTasks": []
  24. }
  25. }

全量任务的终止、重试、删除

终止地址(POST):
http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/abort
重试地址(POST):
http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/retry
删除地址(POST):
http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/del
返回值:

  1. {
  2. "success": "true",
  3. "message": "ok"
  4. }

实时归档任务的创建

地址: http://{{ bds-master }}:12311/exporter/incr/create
参数:

  1. @POST
  2. @Path("/incr/create")
  3. @Produces("text/plain")
  4. public String createIncrJob(
  5. @FormParam("name") String name, // 通道名
  6. @FormParam("src") String srcClusterName, //源HBase集群
  7. @FormParam("dst") String dstClusteName, // 目标HDFS集群
  8. @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔
  9. @FormParam("extraAttrs") String extraAttrs) // 可选,高级参数例如 ArchiveDir=/xxx,FlushInterval=60000

查看实时归档任务详情

地址:
http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/detail

实时归档任务的终止、重试、删除

终止地址(POST):
http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/abort
重试地址(POST):
http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/retry
删除地址(POST):
http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/del