前提条件
- 购买LTS数据迁移同步服务,配置LTS操作页面账户密码,并登录LTS操作页面。
- 打通LTS到HBase和HDFS集群(OSS)的网络。
- 登录LTS操作页面, 创建HBase数据源。
- 登录LTS操作页面, 创建HDFS、OSS数据源。
版本
- HBase
- 自建HBase1.x、2.x (未开启kerberos)
- EMR HBase
- 标准版云HBase、增强版云HBase(集群版本, 暂不支持单机版)
- HDFS:HDFS 2.x
- OSS:阿里云OSS
全量任务
- 任务创建
- 进入LTS操作页面,点击HBase导入导出-> HBase全量数据导出。
- 点击创建迁移任务,选择对应的HBase数据源和HDFS、OSS数据源,指定导出的表,点击创建。
- 查看任务进度。
- 进入LTS操作页面,点击HBase导入导出-> HBase全量数据导出。
- 文件目录迁移完成后,目标HDFS制定目录下就会生成对应的parquet格式的文件
[root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test Found 2 items drwxr-xr-x - hadoop hadoop 0 2020-01-21 12:11 /test/table1 drwxr-xr-x - hadoop hadoop 0 2020-01-21 12:11 /test/table2
表文件[root@iZm5efk3ij3rq0jcfskjseZ ~]# hadoop fs -ls /test4/table1 Found 50 items -rw-r--r-- 2 hadoop hadoop 112574504 2020-01-21 12:11 /test4/table1/08efa07b46664576a969a9642d99badc -rw-r--r-- 2 hadoop hadoop 117327879 2020-01-21 12:11 /test4/table1/0f1223ebbfef49baba05c5f37fab88df -rw-r--r-- 2 hadoop hadoop 112009633 2020-01-21 12:11 /test4/table1/183e0580aa3b4373ad23089217d3d9f9 -rw-r--r-- 2 hadoop hadoop 112924525 2020-01-21 12:11 /test4/table1/184a92f35c5d4f4c917af30f07ff73e4
- 文件内容对于HBase表来说, 生成的parquet文件Schema如下:
- rowkey - HBase KV 的rowkey, byte数组
- version - HBase KV 的时间戳, long类型
- op - 标识 KV 的类型, 4 表示 put, 8、10、12、14 表示的delete, int 类型
- family - 列族, byte数组
- qualifier - 列名, byte数组
- val - KV的值, byte数组
scala> spark.read.parquet("/test/t1") res0: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 4 more fields] scala> res0.show() +----------------+-------------+---+-------+--------------------+--------------------+ | rowkey| version| op| family| qualifier| val| +----------------+-------------+---+-------+--------------------+--------------------+ |[69 64 5F 32 30]|1578465770000| 4|[66 31]| [66]| [31 2E 35]| |[69 64 5F 32 30]|1578465770000| 4|[66 31]| [69 64]| [32 30]| |[69 64 5F 32 30]|1578465770000| 4|[66 31]| [74 65]|[73 64 61 66 61 7...| |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [66]| [31 2E 35]| |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [69 64]| [32 31]| |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [74 65]|[73 64 61 66 61 7...| |[69 64 5F 32 31]|1578466086000| 4|[66 31]| [74 69 6D 65]|[32 30 3A 31 34 3...| +----------------+-------------+---+-------+--------------------+--------------------+
对于Phoenix表来说,生成的parquet文件Schema相比HBase会添加主键列:
示例中,COL1、COL2是Phoenix表TABLE1的联合主键, 字段类型是string。
scala> val df = spark.read.parquet("/test/TABLE1/3d2a8c3d63bd4fec8dfb86594d3be27e"); df: org.apache.spark.sql.DataFrame = [rowkey: binary, version: bigint ... 6 more fields] scala> df.show(); +----------+-------------+---+----+----+------+---------+-------------------+ | rowkey| version| op|COL1|COL2|family|qualifier| val| +----------+-------------+---+----+----+------+---------+-------------------+ |[43 00 42]|1580786254253| 4| C| B| 0| 00000000| [78]| |[43 00 42]|1580786254253| 4| C| B| 0| NEW_COL1|[66 73 64 66 73 64]| |[43 00 42]|1580786254253| 4| C| B| 0| COL3| [80 00 00 01]| +----------+-------------+---+----+----+------+---------+-------------------+
增量任务
- 任务创建
- 进入LTS操作页面,点击HBase导入导出-> HBase实时数据归档。
- 点击创建同步任务, 指定源HBase集群和目标HDFS、OSS数据源,填写要导出的表。
- 参数说明高级配置说明
- ArchiveDir - 数据归档的目录
- FlushInterval - 实时数据flush到HDFS或者OSS的时间间隔
- 文件内容
HBase表和Phoenix表的文件内容格式在全量格式的基础之上增加了分区字段dt、hh、mm分别表示日期、小时、分钟
OPEN API
- 全量任务创建
地址:
http://{{LTS-master }}:12311/exporter/fulldata/create
参数:@POST @Path("/fulldata/create") @Produces("text/plain") public String createFullDataJob( @FormParam("src") String srcName, //源HBase集群 @FormParam("dst") String sinkName, // 目标HDFS集群 @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔 @FormParam("targetDir") String targetDir, // 指定数据导出到目标HDFS的指定目录 @FormParam("extraAttrs") String extraAttrs) // 可选可不填
例子:curl -d "src=hb-m5exyncdfv9l8lf9b&dst=hb-m5e23u1tx9zc2uep7-hdfs&tableNames=table1 {\"contentType\": \"KVType\"};table2 {\"contentType\": \"KVType\"}&targetDir=/test4" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{{LTS-master}}:12311/hdfs/fulldata/create # tableNames 传参为 table1 {"contentType": "KVType"} 表示以KV的格式导出table1的数据
返回值:{ "message": "workflow-e2203f973650430b9bb2e5e13f56e708", //任务id "success": "true" // 创建是否成功, 如果异常,此属性为false }
- 查看全量任务详情
地址:
http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/detail
例子:curl http://{{ LTS-master }}:12311/hdfs/fulldata/workflow-e2203f973650430b9bb2e5e13f56e708/detail
返回值:{ "message": "ok", "success": "true", "tunnel": { "src": "hb-m5exyncdfv9l8lf9b", "dst": "hb-m5e23u1tx9zc2uep7-hdfs", "jobName": "workflow-e2203f973650430b9bb2e5e13f56e708", "state": "SUCCEEDED", "tableNames": [ "table1 {\"contentType\": \"KVType\"}" ], "fullJobs": [ { "jobName": "fulldata-68bd455003804efdbddfdc406220b7a4", "state": "SUCCEEDED", // 任务的状态, SUCCEEDED|RUNNING|FAILED|RETRY|KILLED|QUEUED|ASSIGNED "table": "table1 {\"contentType\": \"KVType\"}", "finishedTasks": 50, // 具体完成的task数 "totalTasks": 50 // 总共的task数 } ], "bulkloadJobs": [], "inrcJobs": [], "mergeTasks": [] } }
- 全量任务的终止、重试、删除
- 终止地址(POST):
http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/abort
- 重试地址(POST):
http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/retry
- 删除地址(POST):
http://{{LTS-master }}:12311/exporter/fulldata/{{任务id}}/del
返回值:{ "success": "true", "message": "ok" }
- 终止地址(POST):
- 实时归档任务的创建
地址:
http://{{LTS-master }}:12311/exporter/incr/create
参数:@POST @Path("/incr/create") @Produces("text/plain") public String createIncrJob( @FormParam("name") String name, // 通道名 @FormParam("src") String srcClusterName, //源HBase集群 @FormParam("dst") String dstClusteName, // 目标HDFS集群 @FormParam("tableNames") String tableNames, // 需要归档的表,多张表用;分隔 @FormParam("extraAttrs") String extraAttrs) // 可选,高级参数例如 ArchiveDir=/xxx,FlushInterval=60000
- 查看实时归档任务详情
地址:
http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/detail
- 实时归档任务的终止、重试、删除
- 终止地址(POST):
http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/abort
- 重试地址(POST):
http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/retry
- 删除地址(POST):
http://{{LTS-master }}:12311/exporter/incr/{{任务id}}/del
- 终止地址(POST):
在文档使用中是否遇到以下问题
更多建议
匿名提交