本文将为您介绍如何通过DataWorks数据同步功能,将HDFS上的数据迁移至MaxCompute,或从MaxCompute将数据迁移至HDFS。无论您使用Hadoop还是Spark,均可以与MaxCompute之间的数据进行双向同步。

环境准备

  1. Hadoop集群搭建

    进行数据迁移前,您需要保证自己的Hadoop集群环境正常。本文使用阿里云EMR服务自动化搭建Hadoop集群,详细过程请参见创建集群

    本文使用的EMR Hadoop版本信息如下:

    EMR版本: EMR-3.11.0

    集群类型: HADOOP

    软件信息: HDFS2.7.2 / YARN2.7.2 / Hive2.3.3 / Ganglia3.7.2 / Spark2.2.1 / HUE4.1.0 / Zeppelin0.7.3 / Tez0.9.1 / Sqoop1.4.6 / Pig0.14.0 / ApacheDS2.0.0 / Knox0.13.0

    Hadoop集群使用经典网络,区域为华东1(杭州),主实例组ECS计算资源配置公网及内网IP,高可用选择为否(非HA模式)。

  2. MaxCompute

    开通MaxCompute并创建好项目。本文以在华东1(杭州)区域创建项目bigdata_DOC为例,同时启动DataWorks相关服务。

数据准备

  1. Hadoop集群创建测试数据
    1. 进入EMR Hadoop集群控制台界面,使用交互式工作台,新建交互式任务doc。本例中HIVE建表语句如下。
      CREATE TABLE IF NOT
      EXISTS hive_doc_good_sale(
         create_time timestamp,
         category STRING,
         brand STRING,
         buyer_id STRING,
         trans_num BIGINT,
         trans_amount DOUBLE,
         click_cnt BIGINT
         )
         PARTITIONED BY (pt string) ROW FORMAT
      DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n'
    2. 选择运行,出现Query executed successfully提示,则说明成功在EMR Hadoop集群上创建了表hive_doc_good_sale。

    3. 插入测试数据。您可以选择从OSS或其他数据源导入测试数据,也可以手动插入少量的测试数据。本文中手动插入数据如下。
      insert into
      hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','外套','品牌A','lilei',3,500.6,7),('2018-08-22','生鲜','品牌B','lilei',1,303,8),('2018-08-22','外套','品牌C','hanmeimei',2,510,2),(2018-08-22,'卫浴','品牌A','hanmeimei',1,442.5,1),('2018-08-22','生鲜','品牌D','hanmeimei',2,234,3),('2018-08-23','外套','品牌B','jimmy',9,2000,7),('2018-08-23','生鲜','品牌A','jimmy',5,45.1,5),('2018-08-23','外套','品牌E','jimmy',5,100.2,4),('2018-08-24','生鲜','品牌G','peiqi',10,5560,7),('2018-08-24','卫浴','品牌F','peiqi',1,445.6,2),('2018-08-24','外套','品牌A','ray',3,777,3),('2018-08-24','卫浴','品牌G','ray',3,122,3),('2018-08-24','外套','品牌C','ray',1,62,7) ;
    4. 完成插入数据后,您可以执行select * from hive_doc_good_sale where pt =1;语句,检查Hadoop集群表中是否已存在数据可以用于迁移。

  2. 利用DataWorks新建目标表
    1. 登录DataWorks控制台,单击相应工作空间操作栏下的进入数据开发
    2. 进入DataStudio(数据开发)页面,选择新建 >

    3. 新建表对话框中,填写表名,并单击提交
    4. 进入新建表页面,选择DDL模式
    5. DDL模式对话框中输入建表语句,单击生成表结构,并确认操作。本示例的建表语句如下所示:
      CREATE TABLE IF NOT EXISTS hive_doc_good_sale(
         create_time string,
         category STRING,
         brand STRING,
         buyer_id STRING,
         trans_num BIGINT,
         trans_amount DOUBLE,
         click_cnt BIGINT
         )
         PARTITIONED BY (pt string) ;

      在建表过程中,需要考虑HIVE数据类型与MaxCompute数据类型的映射,当前数据映射关系请参见与Hive数据类型映射表

      由于本文使用DataWorks进行数据迁移,而DataWorks数据同步功能暂不支持TIMESTAMP类型数据。因此在DataWorks建表语句中,将create_time设置为STRING类型。上述步骤同样可通过odpscmd命令行工具完成,命令行工具安装和配置请参见安装并配置客户端

      说明 考虑到部分HIVE与MaxCompute数据类型的兼容问题,建议在odpscmd客户端上执行以下命令。
      set odps.sql.type.system.odps2=true;
      set odps.sql.hive.compatible=true;
    6. 完成建表后,单击左侧导航栏中的表管理,即可查看当前创建的MaxCompute表。

数据同步

  1. 新建自定义资源组

    由于MaxCompute项目所处的网络环境与Hadoop集群中的数据节点(data node)网络通常不可达,您可以通过自定义资源组的方式,将DataWorks的同步任务运行在Hadoop集群的Master节点上(Hadoop集群内Master节点和数据节点通常可达)。

    1. 查看Hadoop集群data node
      进入EMR控制台,选择首页 > 集群管理 > 集群 > 主机列表

      您也可以通过单击上图中Master节点的ECS ID,进入ECS实例详情页。然后单击远程连接进入ECS,执行hadoop dfsadmin –report命令查看data node。

      本示例的data node只具有内网地址,很难与DataWorks默认资源组互通,所以需要设置自定义资源组,将master node设置为执行DataWorks数据同步任务的节点。

    2. 新建自定义资源组
      1. 进入数据集成 > 资源组页面,单击右上角的新增自定义资源组。关于自定义资源组的详细信息请参见新增任务资源
        说明 目前仅专业版及以上版本方可使用此入口。
      2. 添加服务器时,需要输入ECS UUID和机器IP等信息(对于经典网络类型,需要输入服务器名称。对于专有网络类型,需要输入服务器UUID)。目前仅DataWorks V2.0华东2区支持经典网络类型的调度资源添加,对于其他区域,无论您使用的是经典网络还是专有网络类型,在添加调度资源组时都请选择专有网络类型。
        机器IP需要填写master node公网IP(内网IP有可能不可达)。ECS的UUID需要进入master node管理终端,通过命令dmidecode | grep UUID获取(如果您的hadoop集群并非搭建在EMR环境上,也可以通过该命令获取)。

      3. 添加服务器后,需要保证master node与DataWorks网络可达。如果您使用的是ECS服务器,需要设置服务器安全组。
        • 如果您使用的内网IP互通,请参见添加安全组
        • 如果您使用的是公网IP,可以直接设置安全组公网出入方向规则。本文中设置公网入方向放通所有端口(实际应用场景中,为了您的数据安全,强烈建议设置详细的放通规则)。

      4. 完成上述步骤后,按照提示安装自定义资源组agent。当前状态显示为可用时,则新增自定义资源组成功。

        如果状态为不可用,您可以登录master node,执行tail –f/home/admin/alisatasknode/logs/heartbeat.log命令查看DataWorks与master node之间心跳报文是否超时。

  2. 新建数据源

    DataWorks新建工作空间后,默认设置自己为数据源odps_first。因此只需要添加Hadoop集群数据源。更多详情请参见配置HDFS数据源

    1. 进入数据集成页面,选择同步资源管理 > 数据源,单击新增数据源
    2. 新增数据源弹出框中,选择数据源类型为HDFS

    3. 填写HDFS数据源的各配置项。

      配置 说明
      数据源名称 数据源名称必须以字母、数字、下划线组合,且不能以数字和下划线开头。
      数据源描述 对数据源进行简单描述,不得超过80个字符。
      适用环境 可以选择开发生产环境。
      说明 仅标准模式工作空间会显示此配置。
      DefaultFS 对于EMR Hadoop集群而言,如果Hadoop集群为HA集群,则此处地址为hdfs://emr-header-1的IP:8020。如果Hadoop集群为非HA集群,则此处地址为hdfs://emr-header-1的IP:9000

      本实验中的emr-header-1与DataWorks通过公网连接,因此此处填写公网IP并放通安全组。

    4. 完成配置后,单击测试连通性
    5. 测试连通性通过后,单击完成
      说明 如果EMR Hadoop集群设置网络类型为专有网络,则不支持连通性测试。
  3. 配置数据同步任务
    1. 进入数据开发页面,选择新建 > 数据集成 > 数据同步

    2. 新建节点对话框中,输入节点名称,单击提交
    3. 成功创建数据同步节点后,单击工具栏中的转换脚本按钮。

    4. 单击提示对话框中的确认,即可进入脚本模式进行开发。

    5. 单击工具栏中的导入模板按钮。

    6. 导入模板对话框中,选择来源类型数据源目标类型数据源,单击确认

    7. 新建同步任务完成后,通过导入模板已生成了基本的读取端配置。此时您可以继续手动配置数据同步任务的读取端数据源,以及需要同步的表信息等。本示例的代码如下所示,更多详情请参见配置HDFS Reader
      {
        "configuration": {
          "reader": {
            "plugin": "hdfs",
            "parameter": {
              "path": "/user/hive/warehouse/hive_doc_good_sale/", 
              "datasource": "HDFS1",
              "column": [
                {
                  "index": 0,
                  "type": "string"
                },
                {
                  "index": 1,
                  "type": "string"
                },
                {
                  "index": 2,
                  "type": "string"
                },
                {
                  "index": 3,
                  "type": "string"
                },
                {
                  "index": 4,
                  "type": "long"
                },
                {
                  "index": 5,
                  "type": "double"
                },
                {
                  "index": 6,
                  "type": "long"
                }
              ],
              "defaultFS": "hdfs://121.199.11.138:9000",
              "fieldDelimiter": ",",
              "encoding": "UTF-8",
              "fileType": "text"
            }
          },
          "writer": {
            "plugin": "odps",
            "parameter": {
              "partition": "pt=1",
              "truncate": false,
              "datasource": "odps_first",
              "column": [
                "create_time",
                "category",
                "brand",
                "buyer_id",
                "trans_num",
                "trans_amount",
                "click_cnt"
              ],
              "table": "hive_doc_good_sale"
            }
          },
          "setting": {
            "errorLimit": {
              "record": "1000"
            },
            "speed": {
              "throttle": false,
              "concurrent": 1,
              "mbps": "1",
            }
          }
        },
        "type": "job",
        "version": "1.0"
      }
      其中,path参数为数据在Hadoop集群中存放的位置.您可以在登录master node后,执行hdfs dfs –ls /user/hive/warehouse/hive_doc_good_sale命令确认。对于分区表,您可以不指定分区,DataWorks数据同步会自动递归到分区路径。

    8. 完成配置后,单击运行。如果提示任务运行成功,则说明同步任务已完成。如果运行失败,可以通过日志进行排查。

验证结果

  1. 单击左侧导航栏中的临时查询
  2. 选择新建 > ODPS SQL

  3. 编写并执行SQL语句,查看导入hive_doc_good_sale的数据。SQL语句如下所示:
    --查看是否成功写入MaxCompute
    select * from hive_doc_good_sale where pt=1;

    您也可以在odpscmd命令行工具中输入select * FROM hive_doc_good_sale where pt =1;,查询表结果。

MaxCompute数据迁移到Hadoop

如果您想实现MaxCompute数据迁移至Hadoop,步骤与上述步骤类似,不同的是同步脚本内的reader和writer对象需要对调,具体实现脚本如下。

{
  "configuration": {
    "reader": {
      "plugin": "odps",
      "parameter": {
      "partition": "pt=1",
      "isCompress": false,
      "datasource": "odps_first",
      "column": [
        "create_time",
        "category",
        "brand",
      "buyer_id",
      "trans_num",
      "trans_amount",
      "click_cnt"
    ],
    "table": "hive_doc_good_sale"
    }
  },
  "writer": {
    "plugin": "hdfs",
    "parameter": {
    "path": "/user/hive/warehouse/hive_doc_good_sale",
    "fileName": "pt=1",
    "datasource": "HDFS_data_source",
    "column": [
      {
        "name": "create_time",
        "type": "string"
      },
      {
        "name": "category",
        "type": "string"
      },
      {
        "name": "brand",
        "type": "string"
      },
      {
        "name": "buyer_id",
        "type": "string"
      },
      {
        "name": "trans_num",
        "type": "BIGINT"
      },
      {
        "name": "trans_amount",
        "type": "DOUBLE"
      },
      {
        "name": "click_cnt",
        "type": "BIGINT"
      }
    ],
    "defaultFS": "hdfs://47.99.162.100:9000",
    "writeMode": "append",
    "fieldDelimiter": ",",
    "encoding": "UTF-8",
    "fileType": "text"
    }
  },
  "setting": {
    "errorLimit": {
      "record": "1000"
  },
  "speed": {
    "throttle": false,
    "concurrent": 1,
    "mbps": "1",
  }
  }
},
"type": "job",
"version": "1.0"
}

您需要参见配置HDFS Writer,在运行上述同步任务前,对Hadoop集群进行设置。在运行同步任务后,手动复制同步过去的文件。