RestAPI Reader最佳实践

数据集成RestAPI Reader插件提供了读取RESTful接口数据的能力,通过配置HTTP请求地址,可获取RestAPI类型的数据源数据(例如获取时间范围内的数据、获取分页数据、循环请求参数获取数据等),并转换为数据集成支持的数据类型,传递给下游Writer插件。本文为您举例介绍常见的RestAPI数据源使用示例。

背景信息

DataWorks数据集成RestAPI Reader在读取数据和返回读取结果的能力如下。

维度

能力支持

返回值类型

当前仅支持JSON格式的返回结果。

读取数据类型

支持读取INT、BOOLEAN、DATE、DOUBLE、FLOAT、LONG、STRING数据类型。

请求方式

支持请求方式为GET和POST的HTTP接口。

认证方式

支持无认证或者以下三种验证方式:Basic Auth、Token Auth和Aliyun API Signature。您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。

  • Basic Auth:基础验证。

    如果数据源API支持用户名和密码的方式进行验证,您可选择此种验证方式,并在选择完成后配置用于验证的用户名和密码,后续数据集成过程中对接数据源时,通过Basic Auth协议传递给RESTful地址,完成验证。

  • Token Auth:Token验证。

    如果数据源API支持Token的方式进行验证,您可选择此种验证方式,并在选择完成后配置用于验证的固定Token值,后续数据集成过程中对接数据源时,通过传入header中进行验证,例如:{"Authorization":"Bearer TokenXXXXXX"}。

  • Aliyun API Signature:阿里云API签名验证。

    如果数据源为阿里云产品,且此阿里云产品的API支持通过AccessKey和AccessSecret的方式进行验证,您可选择此种种验证方式,并在选择完成后配置用于验证的AccessKey和AccessSecret。

实践1:读取一个接口数据,该接口根据时间范围查询数据

示例场景:接口定义

本实践示例的场景为读取一个RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试GET接口,会根据接口输入的时间范围参数返回该时间范围内读取到的数据。本示例的接口详情如下。

说明

您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。

  • 接口请求示例:

    http://TestAPIAddress:Port/rest/test2?startTime=<StartTime>&endTime=<EndTime>

    其中startTime和endTime为请求参数,表示读取数据的时间范围。

  • 返回结果示例:

    {
        "status": "success",
        "totalNum": 187,
        "data": [
            {
                "axis": "series1",
                "value": 9191352,
                "createTime": "2023-01-04 00:07:20"
            },
            {
                "axis": "series1",
                "value": 6645322,
                "createTime": "2023-01-04 00:14:47"
            },
            {
                "axis": "series1",
                "value": 2078369,
                "createTime": "2023-01-04 00:22:13"
            },
            {
                "axis": "series1",
                "value": 7325410,
                "createTime": "2023-01-04 00:29:30"
            },
            {
                "axis": "series1",
                "value": 7448456,
                "createTime": "2023-01-04 00:37:04"
            },
            {
                "axis": "series1",
                "value": 5808077,
                "createTime": "2023-01-04 00:44:30"
            },
            {
                "axis": "series1",
                "value": 5625821,
                "createTime": "2023-01-04 00:52:06"
            }
        ]
    }

    其中data为返回的数据存储路径,读取到的数据有3个字段:axisvaluecreateTime

  • 接口测试工具调用示例:apirest2

准备工作:创建MaxCompute分区表

本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张用于存储同步过来的数据的分区表。

说明

分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。

建表语句如下。

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest2
(
  `axis`  STRING
  ,`value` BIGINT
  ,`createTime` STRING
)
PARTITIONED BY
(
  ds  STRING
)
LIFECYCLE 3650;

如果您使用的是标准版的DataWorks,并将创建的分区表提交到生产环境,后续您可以在数据地图中查看到创建的此表。

配置同步任务

  1. 添加RestAPI数据源。

    在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源image.png核心配置要点如下。

    • url:配置为RESTful接口的地址。

    • 验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。

    • 资源组连通性:RestAPI数据源仅支持使用独享数据集成资源组,您需要选择一个独享数据集成资源组,测试数据源与此资源组的连通性。

  2. 创建离线同步节点并配置同步任务。

    在DataWorks的DataStudio中创建一个离线同步节点,操作详情请参见通过向导模式配置离线同步任务,其中核心配置要点如下。restapi2mc配置同步任务

    • 数据来源配置要点:

      • 数据源:选择上述步骤中创建的RestAPI数据源。

      • 请求Method:本示例接口为GET接口,此处选择GET

      • 返回数据结构:本示例接口的返回结果为一个JSON数组,此处选择数组数据

      • 数据存储json路径:本示例接口获取的数据存储于data下,此处配置为data

      • 请求参数:请求参数与调度参数搭配使用,实现每天同步当天数据的。

        • 请求参数配置为startTime=${extract_day} ${start_time}&endTime=${extract_day} ${end_time}

        • 后续调度配置的调度参数中,新增三个调度参数:extract_day=${yyyy-mm-dd}start_time=00:00:00end_time=23:59:59

        假设运行日期为2023-01-05,则extract_day取值为2023-01-04,请求参数将拼接为:startTime=2023-01-04 00:00:00&endTime=2023-01-04 23:59:59

    • 数据去向配置要点:

      • 数据源:选择上述步骤中创建的MaxCompute分区表。

      • 分区信息:分区信息和调度参数搭配使用。

        • 分区信息配置为${bizdate}

        • 后续调度配置的调度参数中,新增1个调度参数:bizdate=$bizdate

        假设运行日期为2023-01-05,则分区信息的取值为20230104。

    • 字段映射配置要点:根据接口中的数据定义,填写RestAPI接口中的字段,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。

测试运行

本实践使用了调度参数,因此完成离线同步任务的配置后,您可以在离线同步节点页面顶部单击带参运行,根据界面提示填写测试的调度参数取值,进行离线同步任务测试。带参运行测试运行完成后,您可以在界面下方查看运行日程,检查调度参数取值是否符合预期。

数据检查

您可以在DataStudio的临时查询中查看数据是否正确同步到了MaxCompute,临时查询的示例语句如下。

select * from ods_xiaobo_rest2 where ds='20230104' order by createtime;

其中ods_xiaobo_rest2为上述步骤中创建的MaxCompute分区表,20230104为测试运行时的分区取值。

运行完成后,您可以在下方的查询结果处查看数据是否正确同步到了MaxCompute。结果1

提交发布与补数据

测试运行与数据检查完成后,您可将离线同步任务提交发布到生产环境,操作详情请参见标准模式工作空间任务发布流程提交发布提交发布成功后,您可以在DataWorks的运维中心中找到这个周期任务,此时可通过补数据的方式,将历史时间段内的数据补上。补数据的功能介绍和操作指导请参见执行补数据并查看补数据实例

实践2:读取一个接口数据,该接口为一个分页的RestAPI接口

示例场景:接口定义

本实践示例的场景为读取一个RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试GET接口,本示例的接口详情如下。

说明

您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。

  • 接口请求示例:

    http://TestAPIAddress:Port/rest/test1?pageSize=5&pageNum=1

    其中pageSize和pageNum为请求参数,表示页长和页号。

  • 返回结果示例:

    {
        "status": "success",
        "totalNum": 304,
        "data": [
            {
                "id": 6,
                "name": "测试用户6"
            },
            {
                "id": 7,
                "name": "测试用户7"
            },
            {
                "id": 8,
                "name": "测试用户8"
            },
            {
                "id": 9,
                "name": "测试用户9"
            },
            {
                "id": 10,
                "name": "测试用户10"
            }
        ]
    }

    其中data为返回的数据存储路径,读取到的数据有2个字段:idname

  • 接口测试工具调用示例:apirest1

准备工作:创建MaxCompute分区表

本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张用于存储同步过来的数据的分区表。

说明

分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。

建表语句如下。

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest1
(
  `id` BIGINT
  ,`name` STRING
)
PARTITIONED BY
(
  ds  STRING
)
LIFECYCLE 3650;

如果您使用的是标准版的DataWorks,并将创建的分区表提交到生产环境,后续您可以在数据地图中查看到创建的此表。

配置同步任务

  1. 添加RestAPI数据源。

    在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源image.png核心配置要点如下。

    • url:配置为RESTful接口的地址。

    • 验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。

    • 资源组连通性:RestAPI数据源仅支持使用独享数据集成资源组,您需要选择一个独享数据集成资源组,测试数据源与此资源组的连通性。

  2. 创建离线同步节点并配置同步任务。

    在DataWorks的DataStudio中创建一个离线同步节点,操作详情请参见通过向导模式配置离线同步任务,其中核心配置要点如下。实践2

    • 数据来源配置要点:

      • 数据源:选择上述步骤中创建的RestAPI数据源。

      • 请求Method:本示例接口为GET接口,此处选择GET

      • 返回数据结构:本示例接口的返回结果为一个JSON数组,此处选择数组数据

      • 数据存储json路径:本示例接口获取的数据存储于data下,此处配置为data

      • 请求参数:页长固定,此处配置为pageSize=50,建议不要单页特别大,会给RestAPI服务端和同步任务带来压力。

      • 请求次数:本示例选择多次请求

        本接口的分页参数是pageNum,选择多次请求后,下方的相关参数配置为:

        • 多次请求对应参数:配置为pageNum。

        • StartIndex:配置为1。

        • Step:配置为1。

        • EndIndex:配置为100。

    • 数据去向配置要点:

      • 数据源:选择上述步骤中创建的MaxCompute分区表。

      • 分区信息:分区信息和调度参数搭配使用。

        • 分区信息配置为${bizdate}

        • 后续调度配置的调度参数中,新增1个调度参数:bizdate=$bizdate

        假设运行日期为2023-01-05,则分区信息的取值为20230104。

    • 字段映射配置要点:根据接口中的数据定义,填写RestAPI接口中的字段,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。

测试运行

本实践使用了调度参数,因此完成离线同步任务的配置后,您可以在离线同步节点页面顶部单击带参运行,根据界面提示填写测试的调度参数取值,进行离线同步任务测试。带参运行测试运行完成后,您可以在界面下方查看运行日程,检查调度参数取值是否符合预期。

数据检查

您可以在DataStudio的临时查询中查看数据是否正确同步到了MaxCompute,临时查询的示例语句如下。

select * from ods_xiaobo_rest1 where ds='20230104' order by id;

其中ods_xiaobo_rest1为上述步骤中创建的MaxCompute分区表,20230104为测试运行时的分区取值。

运行完成后,您可以在下方的查询结果处查看数据是否正确同步到了MaxCompute。结果1

实践3:读取一个接口数据,该接口为POST类型的RestAPI接口

示例场景:接口定义

本实践示例的场景为读取一个RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试POST接口,本示例的接口详情如下。

说明

您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。

  • 接口请求示例:

    http://TestAPIAddress:Port/rest/test3

    requestBody的格式为JSON。

    {
      "userId":16,
      "startTime":"2023-01-04 00:00:00",
      "endTime":"2023-01-04 23:59:59"
    }
  • 返回结果示例:

    {
        "status": "success",
        "totalNum": 289,
        "data": [
            {
                "user": {
                    "id": 16,
                    "name": "用户16"
                },
                "axis": "series1",
                "value": 8231053,
                "createTime": "2023-01-04 00:04:57"
            },
            {
                "user": {
                    "id": 16,
                    "name": "用户16"
                },
                "axis": "series1",
                "value": 6519928,
                "createTime": "2023-01-04 00:09:51"
            },
            {
                "user": {
                    "id": 16,
                    "name": "用户16"
                },
                "axis": "series1",
                "value": 2915920,
                "createTime": "2023-01-04 00:14:36"
            },
            {
                "user": {
                    "id": 16,
                    "name": "用户16"
                },
                "axis": "series1",
                "value": 7971851,
                "createTime": "2023-01-04 00:19:51"
            },
            {
                "user": {
                    "id": 16,
                    "name": "用户16"
                },
                "axis": "series1",
                "value": 6598996,
                "createTime": "2023-01-04 00:24:30"
            }
        ]
    }

    其中data为返回的数据存储路径,读取到的数据有5个字段:user.iduser.nameaxisvaluecreateTime

  • 接口测试工具调用示例:apirest3

准备工作:创建MaxCompute分区表

本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张用于存储同步过来的数据的分区表。

说明

分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。

建表语句如下。

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest3
(
  `user_id` BIGINT
  ,`name` STRING
  ,`axis`  STRING
  ,`value` BIGINT
  ,`create_time` STRING
)
PARTITIONED BY
(
  ds  STRING
)
LIFECYCLE 3650;

如果您使用的是标准版的DataWorks,并将创建的分区表提交到生产环境,后续您可以在数据地图中查看到创建的此表。

配置同步任务

  1. 添加RestAPI数据源。

    在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源image.png核心配置要点如下。

    • url:配置为RESTful接口的地址。

    • 验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。

    • 资源组连通性:RestAPI数据源仅支持使用独享数据集成资源组,您需要选择一个独享数据集成资源组,测试数据源与此资源组的连通性。

  2. 创建离线同步节点并配置同步任务。

    在DataWorks的DataStudio中创建一个离线同步节点,操作详情请参见通过向导模式配置离线同步任务,其中核心配置要点如下。实践3

    • 数据来源配置要点:

      • 数据源:选择上述步骤中创建的RestAPI数据源。

      • 请求Method:本示例接口为POST接口,此处选择POST

      • 返回数据结构:本示例接口的返回结果为一个JSON数组,此处选择数组数据

      • 数据存储json路径:本示例接口获取的数据存储于data下,此处配置为data

      • Header:本示例的POST接口接受的请求体是JSON格式的,此处配置为{"Content-Type":"application/json"}

      • 请求参数:请求参数与调度参数搭配使用,实现每天同步当天数据的。

        • 请求参数配置为

          {
          
              "userId":16,
          
              "startTime":"${extract_day} 00:00:00",
          
              "endTime":"${extract_day} 23:59:59"
          
          }
        • 后续调度配置的调度参数中,新增1个调度参数:extract_day=${yyyy-mm-dd}

    • 数据去向配置要点:

      • 数据源:选择上述步骤中创建的MaxCompute分区表。

      • 分区信息:分区信息和调度参数搭配使用。

        • 分区信息配置为${bizdate}

        • 后续调度配置的调度参数中,新增1个调度参数:bizdate=$bizdate

        假设运行日期为2023-01-05,则分区信息的取值为20230104。

    • 字段映射配置要点:根据接口中的数据定义,填写RestAPI接口中的字段,多个字段可以使用英文点号分隔,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。

测试运行

本实践使用了调度参数,因此完成离线同步任务的配置后,您可以在离线同步节点页面顶部单击带参运行,根据界面提示填写测试的调度参数取值,进行离线同步任务测试。带参运行测试运行完成后,您可以在界面下方查看运行日程,检查调度参数取值是否符合预期。

数据检查

您可以在DataStudio的临时查询中查看数据是否正确同步到了MaxCompute,临时查询的示例语句如下。

select * from ods_xiaobo_rest3 where ds='20230105' order by create_time;

其中ods_xiaobo_rest3为上述步骤中创建的MaxCompute分区表,20230105为测试运行时的分区取值。

运行完成后,您可以在下方的查询结果处查看数据是否正确同步到了MaxCompute。结果3

实践4:循环某请求参数读取RestAPI接口

示例场景:接口定义

本实践示例的场景为循环读取RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试GET接口,会根据日期(date)、省份(province)、城市(city)入参返回相关的温度数据。

说明

您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。

  • 请求示例:

    http://TestAPIAddress:Port/rest/test5?date=2023-01-04&province=zhejiang&city=hangzhou
  • 返回示例:

    {
      "province": "P1",
      "city": "hz",
      "date": "2023-01-04",
      "minTemperature": "-14",
      "maxTemperature": "-7",
      "unit": "℃",
      "weather": "cool"
    }
  • 接口测试工具调用示例:image.png

准备工作:创建参数表和MaxCompute分区表

本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张参数表,用于存储需要循环的province、city,再创建一张用于存储同步过来的数据的分区表。

说明

分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。

建表语句如下:

创建参数表

CREATE TABLE IF NOT EXISTS `citys`
(
  `province` STRING ,
  `city` STRING
);

insert into citys
select 'shanghai','shanghai'
union all select 'zhejiang','hangzhou'
union all select 'sichuan','chengdu';

创建MaxCompute分区表

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest5
(
    `minTemperature` STRING ,
    `maxTemperature` STRING ,
    `unit` STRING ,
    `weather` STRING 
)
PARTITIONED BY 
(
    `province` STRING ,
    `city` STRING ,
    `ds`  STRING
)
LIFECYCLE 3650;

如果您使用的是标准版的DataWorks,并将创建的表提交到生产环境,后续您可以在数据地图中查看到创建的此表。

配置同步任务

  1. 添加RestAPI数据源。

    在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源image.png核心配置要点如下:

    • url:配置为RESTful接口的地址。

    • 验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。

    • 资源组连通性:选择指定的资源组,并进行连通性测试。

  2. 数据开发中创建赋值节点setval_citys。具体操作,请参见赋值节点image.png

    核心配置要点如下:

    序号

    说明

    • 赋值语言:ODPS SQL

    • 赋值代码:

      SELECT  province
              ,city
      FROM    citys;

    重跑属性:配置为运行成功或失败后皆可重跑

    配置完成后提交发布赋值节点。

  3. 数据开发中创建for-each节点。具体操作,请参见for-each节点image.png核心配置要点如下:

    序号

    说明

    重跑属性:配置为运行成功或失败后皆可重跑

    依赖的上游节点:选择前面步骤的节点,即setval_citys节点。

    节点上下文参数:选择输入参数的来源。

    离线同步节点:配置for-each内部离线同步节点,详情请参见下面步骤。

  4. 创建离线同步节点并配置同步任务。具体操作,请参见通过向导模式配置离线同步任务image.png

    核心配置要点如下:

    序号

    说明

    调度参数配置如下:

    bizdate=$[yyyymmdd-1] 
    bizdate_year=$[yyyy-1] 
    bizdate_month=$[mm-1] 
    bizdate_day=$[dd-1]

    配置RestAPI请求参数。其中province、city参数来自循环节点。

    date=${bizdate_year}-${bizdate_month}-${bizdate_day}&province=${dag.foreach.current[0]}&city=${dag.foreach.current[1]}

    配置MaxCompute分区参数,其中province参数来自循环节点。

    province=${dag.foreach.current[0]}

    配置MaxCompute分区参数,其中city参数来自循环节点。

    city=${dag.foreach.current[1]}

    配置MaxCompute分区参数,其中ds参数来自调度参数。

    ds=${bizdate}

    根据接口中的数据定义,填写RestAPI中的字段,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。

    配置完成后提交发布for-each节点。

测试运行

  1. 在成功提交发布赋值节点和for-each节点后,在运维中心的周期任务中,对赋值节点进行补数据操作。具体操作,请参见执行补数据并查看补数据实例image.png

  1. 选择补数据的业务日期,选中运行的节点。image.png

  2. 任务运行后,在运行日志中查看调度参数渲染是否符合预期。image.png

    如图中所示,调度参数渲染正确,数据将被写入MaxCompute表的province=shanghai,city=shanghai,ds=20231215分区中。

数据检查

您可以在数据开发的临时查询功能中查看数据是否正确同步到了MaxCompute,临时查询的示例如下:

示例中ods_xiaobo_rest5为上述准备工作中创建的MaxCompute分区表。

SELECT  weather
        ,mintemperature
        ,maxtemperature
        ,unit
        ,province
        ,city
        ,ds
FROM    ods_xiaobo_rest5
WHERE   ds != 1
ORDER BY ds,province,city;

运行完成后,查看数据是否正确同步到MaxCompute中。image.png