本文档介绍如何使用DataWorks实现MaxCompute与文件存储HDFS之间的双向数据同步。您可以将MaxCompute数据同步至文件存储HDFS,也可以将文件存储HDFS数据同步至MaxCompute。

注意 请确保Maxcompute、文件存储HDFS、Dataworks在同一个地域(region)下。

准备工作

  • 开通文件存储HDFS服务并创建文件系统实例和挂载点,详情请参见HDFS快速入门
  • 在Hadoop集群所有节点上安装JDK,版本不能低于1.8。
  • 在Hadoop集群中配置文件存储HDFS实例,详情请参见挂载文件系统
  • 开通MaxCompute,详情请参见配置MaxCompute
  • 开通DataWorks,详情请参见配置DataWorks

配置步骤

  1. 配置DataWorks独享数据集成资源组

    开通配置及使用步骤请参见购买独享资源,并且注意以下几方面配置。

    • 配置的独享数据集成资源组,需要与文件存储HDFS在同一region的同一可用区(AZ)下。
    • 如果文件存储HDFS相对应的AZ下的DataWorks无资源,或者文件存储HDFS与独享数据集成资源组在相同region不同AZ下,则需要提交工单联系DataWorks工作人员获取支持。
    • 在配置DataWorks专有网络及交换机地址时,要与文件存储HDFS挂载点所绑定的专有网络和交换机地址一致,详情请参见操作独享资源
  2. 在独享数据集成资源组下,放置文件存储HDFS依赖。
    在放置文件存储HDFS依赖时,需要提交工单联系DataWorks工作人员获取支持。
  3. 配置数据同步任务。
    配置数据任务的步骤请参见通过向导模式配置任务,详细参数设置如下:
    1. 配置数据走向。
      • 如果配置从MaxCompute数据同步到文件存储HDFS时,配置的数据去向的数据源为HDFS 。
      • 如果配置从文件存储HDFS同步数据到MaxCompute时,配置的数据来源的数据源为HDFS。
    2. 根据提示使用脚本模式配置同步任务。
    3. 在配置脚本中,添加文件存储HDFS配置参数。
      • 同步数据到文件存储HDFS时,需要配置HDFS Writer,详情请参见配置HDFS Writer
      • 从文件存储HDFS同步数据时,需要配置HDFS Reader,详情请参见配置HDFS Reader
      在配置HDFS Writer和HDFS Reader时,需在同步脚本中添加hadoopConfig参数并配置文件存储HDFS的实现类。如下图所示。
      说明
      • defaultFS需配置为文件存储HDFS的挂载点地址。
      • hadoopConfig需添加文件存储HDFS的实现类。
      "hadoopConfig": {
                          "fs.dfs.impl": "com.alibaba.dfs.DistributedFileSystem",
                          "fs.AbstractFileSystem.dfs.impl" : "com.alibaba.dfs.DFS"
                      }
      HDFS_最佳实践_5_1
  4. 执行数据脚本。
    执行数据脚本时,需要选用在步骤1中配置的独享数据集成资源组。保存数据同步脚本并运行。

验证MaxCompute数据同步至文件存储HDFS

以下示例用来验证MaxCompute数据是否同步到了文件存储HDFS上。

  1. 在MaxCompute中创建测试表。
    在MaxCompute中的创建表,详情请参见在MaxCompute创建表
    CREATE TABLE IF NOT EXISTS maxcompute2df  
    (                                         
     id               BIGINT COMMENT '编号',  
     name             STRING COMMENT '姓名',  
     gender           STRING COMMENT '性别',  
     age              BIGINT COMMENT '年龄',  
     birth            STRING COMMENT '生日'   
    );
  2. 在测试表中插入测试数据。
    insert into maxcompute2dfs values(1,'测试用户1','男',20,'2000-1-1');
    insert into maxcompute2dfs values(2,'测试用户2','男',20,'2000-1-1');
    insert into maxcompute2dfs values(3,'测试用户3','女',20,'2000-1-1');
    insert into maxcompute2dfs values(4,'测试用户4','女',20,'2000-1-1');
  3. 在文件存储HDFS上创建目录。
    hadoop fs -mkdir dfs://f-xxxxx.cn-xxxx.dfs.aliyuncs.com:10290/maxcompute2dfs
  4. 在DataWorks中编写数据同步脚本。
    配置MaxCompute Reader和HDFS Writer脚本,详情请参见配置MaxCompute Reader配置HDFS Writer
    {
        "type": "job",
        "steps": [
            {
                "stepType": "odps",
                "parameter": {
                    "tableType": null,
                    "partition": [],
                    "datasource": "odps_first", 
                    "column": [
                        "*"
                    ],
                    "guid": null,
                    "emptyAsNull": false,
                    "table": "maxcompute2dfs"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "hdfs",
                "parameter": {
                    "path": "/maxcompute2dfs",
                    "fileName": "maxcompute2dfs",
                    "datasource": "xxx",
                    "column": [
                        {
                            "name": "id",
                            "type": "long"
                        },
                        {
                            "name": "name",
                            "type": "string"
                        },
                        {
                            "name": "gender",
                            "type": "string"
                        },
                        {
                            "name": "age",
                            "type": "long"
                        },
                        {
                            "name": "birth",
                            "type": "string"
                        }
                    ],
                    "defaultFS": "dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290",
                    "writeMode": "append",
                    "encoding": "UTF-8",
                    "fieldDelimiter": ",",
                    "fileType": "text",
                    "hadoopConfig": {
                        "fs.dfs.impl": "com.alibaba.dfs.DistributedFileSystem",
                        "fs.AbstractFileSystem.dfs.impl" : "com.alibaba.dfs.DFS"
                    }
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        }
    }
  5. 加载创建的独享数据集成资源组并执行数据脚本。
  6. 查看MaxCompute是否成功同步数据到文件存储HDFS。
    hadoop fs -cat dfs://f-xxxxx.cn-xxxx.dfs.aliyuncs.com:10290/maxcompute2dfs/*

验证文件存储HDFS数据同步至MaxCompute

以下示例用来验证文件存储HDFS数据是否同步到了MaxCompute上。
说明 在验证文件存储HDFS数据同步到MaxCompute中时,文件存储HDFS上的测试数据是使用验证MaxCompute数据同步至文件存储HDFS章节中由MaxCompute同步过去的数据,将该数据再同步到MaxCompute的另外一张表中。
  1. 在MaxCompute中创建新的测试表。
    在MaxCompute中的创建表,详情请参见在MaxCompute创建表
    CREATE TABLE IF NOT EXISTS dfs2maxcompute  
    (                                         
     id               BIGINT COMMENT '编号',  
     name             STRING COMMENT '姓名',  
     gender           STRING COMMENT '性别',  
     age              BIGINT COMMENT '年龄',  
     birth            STRING COMMENT '生日'   
    );
  2. 在DataWorks中编写数据同步脚本。
    配置MaxCompute Reader和HDFS Writer脚本,详情请参见配置MaxCompute Reader配置HDFS Writer
    {
        "type": "job",
        "steps": [
            {
                "stepType": "hdfs",
                "parameter": {
                    "path": "/maxcompute2dfs",
                    "fileName": "maxcompute2dfs*",
                    "datasource": "xxx",
                    "column": [
                        {
                            "index": 0,
                            "type": "long"
                        },
                        {
                            "index": 1,
                            "type": "string"
                        },
                        {
                            "index": 2,
                            "type": "string"
                        },
                        {
                            "index": 3,
                            "type": "long"
                        },
                        {
                            "index": 4,
                            "type": "string"
                        }
                    ],
                    "defaultFS": "dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290",
                    "encoding": "UTF-8",
                    "fieldDelimiter": ",",
                    "fileType": "text",
                    "hadoopConfig": {
                        "fs.dfs.impl": "com.alibaba.dfs.DistributedFileSystem",
                        "fs.AbstractFileSystem.dfs.impl" : "com.alibaba.dfs.DFS"
                    }
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter":{
                    "partition":"",
                    "truncate":true,
                    "compress":false,
                    "datasource":"odps_first",
                    "column": [
                            "id",
                            "name",
                            "gender",
                            "age",
                            "birth"
                    ],
                    "guid": null,
                    "emptyAsNull": false,
                    "table": "dfs2maxcompute"
                },
                "name": "Writer",
                "category": "writer"
            }        
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        }
    }
  3. 加载创建的独享数据集成资源组并执行数据脚本。
  4. 查看文件存储HDFS是否成功同步数据到MaxCompute。