本文档主要介绍如何在HBase中增量归档OSS/HDFS。
前提条件
- 已购买BDS数据迁移同步服务,配置BDS操作页面账户密码,并登录BDS操作页面。
- 已打通BDS到HBase和HDFS集群(OSS)的网络。
- 登录BDS操作页面已创建HBase、HDFS和OSS数据源。
支持版本
- HBase
-
- 自建HBase1.x、2.x (未开启kerberos)。
- MR HBase
- 标准版云HBase、增强版云HBase(集群版本, 暂不支持单机版)。
- HDFS
- HDFS 2.x
- OSS
- 阿里云OSS
全量任务
- 任务创建
-
- 进入BDS操作页面,单击HBase导入导出 > HBase全量数据导出
- 单击创建迁移任务,选择对应的HBase数据源和HDFS、OSS数据源,指定导出的表,单击创建
- 查看任务进度。
- 进入BDS操作页面,单击HBase导入导出 > HBase全量数据导出
文件目录
[root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test
Found 2 items
drwxr-xr-x - hadoop hadoop 0 2020-01-21 12:11 /test/table1
drwxr-xr-x - hadoop hadoop 0 2020-01-21 12:11 /test/table2
表文件
[root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test4/table1
Found 50 items
-rw-r--r-- 2 hadoop hadoop 112574504 2020-01-21 12:11 /test4/table1/08efa07b46664576a969a9642d99badc
-rw-r--r-- 2 hadoop hadoop 117327879 2020-01-21 12:11 /test4/table1/0f1223ebbfef49baba05c5f37fab88df
-rw-r--r-- 2 hadoop hadoop 112009633 2020-01-21 12:11 /test4/table1/183e0580aa3b4373ad23089217d3d9f9
-rw-r--r-- 2 hadoop hadoop 112924525 2020-01-21 12:11 /test4/table1/184a92f35c5d4f4c917af30f07ff73e4
文件内容
对于HBase表来说, 生成的parquet文件Schema如下:rowkey - HBase KV 的rowkey, byte数组version - HBase KV 的时间戳, long类型op - 标识 KV 的类型, 4 表示 put, 8、10、12、14 表示的delete, int 类型family - 列族, byte数组qualifier - 列名, byte数组val - KV的值, byte数组
scala> spark.read.parquet("/test/t1")
res0: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 4 more fields]
scala> res0.show()
+----------------+-------------+---+-------+--------------------+--------------------+
| rowkey| version| op| family| qualifier| val|
+----------------+-------------+---+-------+--------------------+--------------------+
|[69 64 5F 32 30]|1578465770000| 4|[66 31]| [66]| [31 2E 35]|
|[69 64 5F 32 30]|1578465770000| 4|[66 31]| [69 64]| [32 30]|
|[69 64 5F 32 30]|1578465770000| 4|[66 31]| [74 65]|[73 64 61 66 61 7...|
|[69 64 5F 32 31]|1578466086000| 4|[66 31]| [66]| [31 2E 35]|
|[69 64 5F 32 31]|1578466086000| 4|[66 31]| [69 64]| [32 31]|
|[69 64 5F 32 31]|1578466086000| 4|[66 31]| [74 65]|[73 64 61 66 61 7...|
|[69 64 5F 32 31]|1578466086000| 4|[66 31]| [74 69 6D 65]|[32 30 3A 31 34 3...|
+----------------+-------------+---+-------+--------------------+--------------------+
对于Phoenix表来说,生成的parquet文件Schema相比HBase会添加主键列:示例中,COL1、COL2是Phoenix表TABLE1的联合主键, 字段类型是string
scala> val df = spark.read.parquet("/test/TABLE1/3d2a8c3d63bd4fec8dfb86594d3be27e");
df: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 6 more fields]
scala> df.show();
+----------+-------------+---+----+----+------+---------+-------------------+
| rowkey| version| op|COL1|COL2|family|qualifier| val|
+----------+-------------+---+----+----+------+---------+-------------------+
|[43 00 42]|1580786254253| 4| C| B| 0| 00000000| [78]|
|[43 00 42]|1580786254253| 4| C| B| 0| NEW_COL1|[66 73 64 66 73 64]|
|[43 00 42]|1580786254253| 4| C| B| 0| COL3| [80 00 00 01]|
+----------+-------------+---+----+----+------+---------+-------------------+
增量任务
- 任务创建
-
- 进入BDS操作页面,在左侧导航栏选择HBase导入导出 >HBase实时数据归档
- 单击创建同步任务, 指定源HBase集群和目标HDFS、OSS数据源,填写要导出的表
- 参数说明
- 高级配置说明
- ArchiveDir:数据归档的目录。
- FlushInterval:实时数据flush到HDFS或者OSS的时间间隔。
- 文件内容
- HBase表和Phoenix表的文件内容格式在全量格式的基础之上增加了分区字段dt、hh、mm分别表示日期、小时、分钟。
OPEN API
- 全量任务创建
-
地址:http://{{bds-master }}:12311/exporter/fulldata/create参数:
@POST @Path("/fulldata/create") @Produces("text/plain") public String createFullDataJob( @FormParam("src") String srcName, //源HBase集群 @FormParam("dst") String sinkName, // 目标HDFS集群 @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔 @FormParam("targetDir") String targetDir, // 指定数据导出到目标HDFS的指定目录 @FormParam("extraAttrs") String extraAttrs) // 可选可不填
例子:curl -d "src=hb-m5exyncdfv9l8lf9b&dst=hb-m5e23u1tx9zc2uep7-hdfs&tableNames=table1 {\"contentType\": \"KVType\"};table2 {\"contentType\": \"KVType\"}&targetDir=/test4" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{{bds-master}}:12311/hdfs/fulldata/create # tableNames 传参为 table1 {"contentType": "KVType"} 表示以KV的格式导出table1的数据
返回值:{ "message": "workflow-e2203f973650430b9bb2e5e13f56e708", //任务id "success": "true" // 创建是否成功, 如果异常,此属性为false }
- 查看全量任务详情
-
地址:http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/detail例子:
curl http://{{ bds-master }}:12311/hdfs/fulldata/workflow-e2203f973650430b9bb2e5e13f56e708/detail
返回值:{ "message": "ok", "success": "true", "tunnel": { "src": "hb-m5exyncdfv9l8lf9b", "dst": "hb-m5e23u1tx9zc2uep7-hdfs", "jobName": "workflow-e2203f973650430b9bb2e5e13f56e708", "state": "SUCCEEDED", "tableNames": [ "table1 {\"contentType\": \"KVType\"}" ], "fullJobs": [ { "jobName": "fulldata-68bd455003804efdbddfdc406220b7a4", "state": "SUCCEEDED", // 任务的状态, SUCCEEDED|RUNNING|FAILED|RETRY|KILLED|QUEUED|ASSIGNED "table": "table1 {\"contentType\": \"KVType\"}", "finishedTasks": 50, // 具体完成的task数 "totalTasks": 50 // 总共的task数 } ], "bulkloadJobs": [], "inrcJobs": [], "mergeTasks": [] } }
- 全量任务的终止、重试、删除
-
终止地址(POST):http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/abort重试地址(POST):http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/retry删除地址(POST):http://{{ bds-master }}:12311/exporter/fulldata/{{任务id}}/del返回值:
{ "success": "true", "message": "ok" }
- 实时归档任务的创建
-
地址:http://{{ bds-master }}:12311/exporter/incr/create参数:
@POST @Path("/incr/create") @Produces("text/plain") public String createIncrJob( @FormParam("name") String name, // 通道名 @FormParam("src") String srcClusterName, //源HBase集群 @FormParam("dst") String dstClusteName, // 目标HDFS集群 @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔 @FormParam("extraAttrs") String extraAttrs) // 可选,高级参数例如 ArchiveDir=/xxx,FlushInterval=60000
- 查看实时归档任务详情
- 地址:http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/detail
- 实时归档任务的终止、重试、删除
-
终止地址(POST):http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/abort重试地址(POST):http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/retry删除地址(POST):http://{{ bds-master }}:12311/exporter/incr/{{任务id}}/del
在文档使用中是否遇到以下问题
更多建议
匿名提交