数据集成RestAPI Reader插件提供了读取RESTful接口数据的能力,通过配置HTTP请求地址,可获取RestAPI类型的数据源数据(例如获取时间范围内的数据、获取分页数据、循环请求参数获取数据等),并转换为数据集成支持的数据类型,传递给下游Writer插件。本文为您举例介绍常见的RestAPI数据源使用示例。
背景信息
DataWorks数据集成RestAPI Reader在读取数据和返回读取结果的能力如下。
维度 | 能力支持 |
返回值类型 | 当前仅支持JSON格式的返回结果。 |
读取数据类型 | 支持读取INT、BOOLEAN、DATE、DOUBLE、FLOAT、LONG、STRING数据类型。 |
请求方式 | 支持请求方式为GET和POST的HTTP接口。 |
认证方式 | 支持无认证或者以下三种验证方式:Basic Auth、Token Auth和Aliyun API Signature。您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。
|
实践1:读取一个接口数据,该接口根据时间范围查询数据
示例场景:接口定义
本实践示例的场景为读取一个RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试GET接口,会根据接口输入的时间范围参数返回该时间范围内读取到的数据。本示例的接口详情如下。
您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。
接口请求示例:
http://TestAPIAddress:Port/rest/test2?startTime=<StartTime>&endTime=<EndTime>
其中startTime和endTime为请求参数,表示读取数据的时间范围。
返回结果示例:
{ "status": "success", "totalNum": 187, "data": [ { "axis": "series1", "value": 9191352, "createTime": "2023-01-04 00:07:20" }, { "axis": "series1", "value": 6645322, "createTime": "2023-01-04 00:14:47" }, { "axis": "series1", "value": 2078369, "createTime": "2023-01-04 00:22:13" }, { "axis": "series1", "value": 7325410, "createTime": "2023-01-04 00:29:30" }, { "axis": "series1", "value": 7448456, "createTime": "2023-01-04 00:37:04" }, { "axis": "series1", "value": 5808077, "createTime": "2023-01-04 00:44:30" }, { "axis": "series1", "value": 5625821, "createTime": "2023-01-04 00:52:06" } ] }
其中data为返回的数据存储路径,读取到的数据有3个字段:
axis
、value
、createTime
。接口测试工具调用示例:
准备工作:创建MaxCompute分区表
本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张用于存储同步过来的数据的分区表。
分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。
建表语句如下。
CREATE TABLE IF NOT EXISTS ods_xiaobo_rest2
(
`axis` STRING
,`value` BIGINT
,`createTime` STRING
)
PARTITIONED BY
(
ds STRING
)
LIFECYCLE 3650;
如果您使用的是标准版的DataWorks,并将创建的分区表提交到生产环境,后续您可以在数据地图中查看到创建的此表。
配置同步任务
添加RestAPI数据源。
在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源。核心配置要点如下。
url:配置为RESTful接口的地址。
验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。
资源组连通性:RestAPI数据源仅支持使用独享数据集成资源组,您需要选择一个独享数据集成资源组,测试数据源与此资源组的连通性。
创建离线同步节点并配置同步任务。
在DataWorks的DataStudio中创建一个离线同步节点,操作详情请参见通过向导模式配置离线同步任务,其中核心配置要点如下。
数据来源配置要点:
数据源:选择上述步骤中创建的RestAPI数据源。
请求Method:本示例接口为GET接口,此处选择GET。
返回数据结构:本示例接口的返回结果为一个JSON数组,此处选择数组数据。
数据存储json路径:本示例接口获取的数据存储于data下,此处配置为data。
请求参数:请求参数与调度参数搭配使用,实现每天同步当天数据的。
请求参数配置为
startTime=${extract_day} ${start_time}&endTime=${extract_day} ${end_time}
后续调度配置的调度参数中,新增三个调度参数:
extract_day=${yyyy-mm-dd}
、start_time=00:00:00
、end_time=23:59:59
。
假设运行日期为2023-01-05,则
extract_day
取值为2023-01-04,请求参数将拼接为:startTime=2023-01-04 00:00:00&endTime=2023-01-04 23:59:59
。
数据去向配置要点:
数据源、表:选择上述步骤中创建的MaxCompute分区表。
分区信息:分区信息和调度参数搭配使用。
分区信息配置为
${bizdate}
。后续调度配置的调度参数中,新增1个调度参数:
bizdate=$bizdate
。
假设运行日期为2023-01-05,则分区信息的取值为20230104。
字段映射配置要点:根据接口中的数据定义,填写RestAPI接口中的字段,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。
测试运行
本实践使用了调度参数,因此完成离线同步任务的配置后,您可以在离线同步节点页面顶部单击带参运行,根据界面提示填写测试的调度参数取值,进行离线同步任务测试。测试运行完成后,您可以在界面下方查看运行日程,检查调度参数取值是否符合预期。
数据检查
您可以在DataStudio的临时查询中查看数据是否正确同步到了MaxCompute,临时查询的示例语句如下。
select * from ods_xiaobo_rest2 where ds='20230104' order by createtime;
其中ods_xiaobo_rest2为上述步骤中创建的MaxCompute分区表,20230104为测试运行时的分区取值。
运行完成后,您可以在下方的查询结果处查看数据是否正确同步到了MaxCompute。
提交发布与补数据
测试运行与数据检查完成后,您可将离线同步任务提交发布到生产环境,操作详情请参见标准模式工作空间任务发布流程。提交发布成功后,您可以在DataWorks的运维中心中找到这个周期任务,此时可通过补数据的方式,将历史时间段内的数据补上。补数据的功能介绍和操作指导请参见执行补数据并查看补数据实例。
实践2:读取一个接口数据,该接口为一个分页的RestAPI接口
示例场景:接口定义
本实践示例的场景为读取一个RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试GET接口,本示例的接口详情如下。
您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。
接口请求示例:
http://TestAPIAddress:Port/rest/test1?pageSize=5&pageNum=1
其中pageSize和pageNum为请求参数,表示页长和页号。
返回结果示例:
{ "status": "success", "totalNum": 304, "data": [ { "id": 6, "name": "测试用户6" }, { "id": 7, "name": "测试用户7" }, { "id": 8, "name": "测试用户8" }, { "id": 9, "name": "测试用户9" }, { "id": 10, "name": "测试用户10" } ] }
其中data为返回的数据存储路径,读取到的数据有2个字段:
id
、name
。接口测试工具调用示例:
准备工作:创建MaxCompute分区表
本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张用于存储同步过来的数据的分区表。
分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。
建表语句如下。
CREATE TABLE IF NOT EXISTS ods_xiaobo_rest1
(
`id` BIGINT
,`name` STRING
)
PARTITIONED BY
(
ds STRING
)
LIFECYCLE 3650;
如果您使用的是标准版的DataWorks,并将创建的分区表提交到生产环境,后续您可以在数据地图中查看到创建的此表。
配置同步任务
添加RestAPI数据源。
在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源。核心配置要点如下。
url:配置为RESTful接口的地址。
验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。
资源组连通性:RestAPI数据源仅支持使用独享数据集成资源组,您需要选择一个独享数据集成资源组,测试数据源与此资源组的连通性。
创建离线同步节点并配置同步任务。
在DataWorks的DataStudio中创建一个离线同步节点,操作详情请参见通过向导模式配置离线同步任务,其中核心配置要点如下。
数据来源配置要点:
数据源:选择上述步骤中创建的RestAPI数据源。
请求Method:本示例接口为GET接口,此处选择GET。
返回数据结构:本示例接口的返回结果为一个JSON数组,此处选择数组数据。
数据存储json路径:本示例接口获取的数据存储于data下,此处配置为data。
请求参数:页长固定,此处配置为
pageSize=50
,建议不要单页特别大,会给RestAPI服务端和同步任务带来压力。请求次数:本示例选择多次请求。
本接口的分页参数是pageNum,选择多次请求后,下方的相关参数配置为:
多次请求对应参数:配置为pageNum。
StartIndex:配置为1。
Step:配置为1。
EndIndex:配置为100。
数据去向配置要点:
数据源、表:选择上述步骤中创建的MaxCompute分区表。
分区信息:分区信息和调度参数搭配使用。
分区信息配置为
${bizdate}
。后续调度配置的调度参数中,新增1个调度参数:
bizdate=$bizdate
。
假设运行日期为2023-01-05,则分区信息的取值为20230104。
字段映射配置要点:根据接口中的数据定义,填写RestAPI接口中的字段,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。
测试运行
本实践使用了调度参数,因此完成离线同步任务的配置后,您可以在离线同步节点页面顶部单击带参运行,根据界面提示填写测试的调度参数取值,进行离线同步任务测试。测试运行完成后,您可以在界面下方查看运行日程,检查调度参数取值是否符合预期。
数据检查
您可以在DataStudio的临时查询中查看数据是否正确同步到了MaxCompute,临时查询的示例语句如下。
select * from ods_xiaobo_rest1 where ds='20230104' order by id;
其中ods_xiaobo_rest1为上述步骤中创建的MaxCompute分区表,20230104为测试运行时的分区取值。
运行完成后,您可以在下方的查询结果处查看数据是否正确同步到了MaxCompute。
实践3:读取一个接口数据,该接口为POST类型的RestAPI接口
示例场景:接口定义
本实践示例的场景为读取一个RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试POST接口,本示例的接口详情如下。
您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。
接口请求示例:
http://TestAPIAddress:Port/rest/test3
requestBody的格式为JSON。
{ "userId":16, "startTime":"2023-01-04 00:00:00", "endTime":"2023-01-04 23:59:59" }
返回结果示例:
{ "status": "success", "totalNum": 289, "data": [ { "user": { "id": 16, "name": "用户16" }, "axis": "series1", "value": 8231053, "createTime": "2023-01-04 00:04:57" }, { "user": { "id": 16, "name": "用户16" }, "axis": "series1", "value": 6519928, "createTime": "2023-01-04 00:09:51" }, { "user": { "id": 16, "name": "用户16" }, "axis": "series1", "value": 2915920, "createTime": "2023-01-04 00:14:36" }, { "user": { "id": 16, "name": "用户16" }, "axis": "series1", "value": 7971851, "createTime": "2023-01-04 00:19:51" }, { "user": { "id": 16, "name": "用户16" }, "axis": "series1", "value": 6598996, "createTime": "2023-01-04 00:24:30" } ] }
其中data为返回的数据存储路径,读取到的数据有5个字段:
user.id
、user.name
、axis
、value
、createTime
。接口测试工具调用示例:
准备工作:创建MaxCompute分区表
本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张用于存储同步过来的数据的分区表。
分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。
建表语句如下。
CREATE TABLE IF NOT EXISTS ods_xiaobo_rest3
(
`user_id` BIGINT
,`name` STRING
,`axis` STRING
,`value` BIGINT
,`create_time` STRING
)
PARTITIONED BY
(
ds STRING
)
LIFECYCLE 3650;
如果您使用的是标准版的DataWorks,并将创建的分区表提交到生产环境,后续您可以在数据地图中查看到创建的此表。
配置同步任务
添加RestAPI数据源。
在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源。核心配置要点如下。
url:配置为RESTful接口的地址。
验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。
资源组连通性:RestAPI数据源仅支持使用独享数据集成资源组,您需要选择一个独享数据集成资源组,测试数据源与此资源组的连通性。
创建离线同步节点并配置同步任务。
在DataWorks的DataStudio中创建一个离线同步节点,操作详情请参见通过向导模式配置离线同步任务,其中核心配置要点如下。
数据来源配置要点:
数据源:选择上述步骤中创建的RestAPI数据源。
请求Method:本示例接口为POST接口,此处选择POST。
返回数据结构:本示例接口的返回结果为一个JSON数组,此处选择数组数据。
数据存储json路径:本示例接口获取的数据存储于data下,此处配置为data。
Header:本示例的POST接口接受的请求体是JSON格式的,此处配置为
{"Content-Type":"application/json"}
。请求参数:请求参数与调度参数搭配使用,实现每天同步当天数据的。
请求参数配置为
{ "userId":16, "startTime":"${extract_day} 00:00:00", "endTime":"${extract_day} 23:59:59" }
后续调度配置的调度参数中,新增1个调度参数:
extract_day=${yyyy-mm-dd}
。
数据去向配置要点:
数据源、表:选择上述步骤中创建的MaxCompute分区表。
分区信息:分区信息和调度参数搭配使用。
分区信息配置为
${bizdate}
。后续调度配置的调度参数中,新增1个调度参数:
bizdate=$bizdate
。
假设运行日期为2023-01-05,则分区信息的取值为20230104。
字段映射配置要点:根据接口中的数据定义,填写RestAPI接口中的字段,多个字段可以使用英文点号分隔,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。
测试运行
本实践使用了调度参数,因此完成离线同步任务的配置后,您可以在离线同步节点页面顶部单击带参运行,根据界面提示填写测试的调度参数取值,进行离线同步任务测试。测试运行完成后,您可以在界面下方查看运行日程,检查调度参数取值是否符合预期。
数据检查
您可以在DataStudio的临时查询中查看数据是否正确同步到了MaxCompute,临时查询的示例语句如下。
select * from ods_xiaobo_rest3 where ds='20230105' order by create_time;
其中ods_xiaobo_rest3为上述步骤中创建的MaxCompute分区表,20230105为测试运行时的分区取值。
运行完成后,您可以在下方的查询结果处查看数据是否正确同步到了MaxCompute。
实践4:循环某请求参数读取RestAPI接口
示例场景:接口定义
本实践示例的场景为循环读取RESTful接口数据并写入一个MaxCompute分区表中,其中使用的示例RESTful接口为一个自建的测试GET接口,会根据日期(date)、省份(province)、城市(city)入参返回相关的温度数据。
您实际操作时,可根据您使用的接口调整相关配置,本示例的接口仅作为本实践的示例,为您演示实践的操作流程。
请求示例:
http://TestAPIAddress:Port/rest/test5?date=2023-01-04&province=zhejiang&city=hangzhou
返回示例:
{ "province": "P1", "city": "hz", "date": "2023-01-04", "minTemperature": "-14", "maxTemperature": "-7", "unit": "℃", "weather": "cool" }
接口测试工具调用示例:
准备工作:创建参数表和MaxCompute分区表
本实践将从接口处读取的数据同步至MaxCompute分区表中,因此首先需要创建一张参数表,用于存储需要循环的province、city,再创建一张用于存储同步过来的数据的分区表。
分区表配合覆盖写命令,可以实现分区覆盖写的效果,让数据同步任务具备可重跑性,重跑时数据不会重复,并且在进一步做数据分析时,分区表也更易于数据分析。
建表语句如下:
创建参数表
CREATE TABLE IF NOT EXISTS `citys`
(
`province` STRING ,
`city` STRING
);
insert into citys
select 'shanghai','shanghai'
union all select 'zhejiang','hangzhou'
union all select 'sichuan','chengdu';
创建MaxCompute分区表
CREATE TABLE IF NOT EXISTS ods_xiaobo_rest5
(
`minTemperature` STRING ,
`maxTemperature` STRING ,
`unit` STRING ,
`weather` STRING
)
PARTITIONED BY
(
`province` STRING ,
`city` STRING ,
`ds` STRING
)
LIFECYCLE 3650;
如果您使用的是标准版的DataWorks,并将创建的表提交到生产环境,后续您可以在数据地图中查看到创建的此表。
配置同步任务
添加RestAPI数据源。
在DataWorks的工作空间中添加一个RestAPI类型的数据源,操作详情请参见配置RestAPI数据源。核心配置要点如下:
url:配置为RESTful接口的地址。
验证方法:您可以根据数据源API实际支持的验证方式选择对应的验证方式并配置验证参数。
资源组连通性:选择指定的资源组,并进行连通性测试。
在数据开发中创建赋值节点setval_citys。具体操作,请参见赋值节点。
核心配置要点如下:
序号
说明
①
赋值语言:ODPS SQL
赋值代码:
SELECT province ,city FROM citys;
②
重跑属性:配置为运行成功或失败后皆可重跑。
配置完成后提交发布赋值节点。
在数据开发中创建for-each节点。具体操作,请参见for-each节点。核心配置要点如下:
序号
说明
①
重跑属性:配置为运行成功或失败后皆可重跑。
②
依赖的上游节点:选择前面步骤的节点,即setval_citys节点。
③
节点上下文参数:选择输入参数的来源。
④
离线同步节点:配置for-each内部离线同步节点,详情请参见下面步骤。
创建离线同步节点并配置同步任务。具体操作,请参见通过向导模式配置离线同步任务。
核心配置要点如下:
序号
说明
①
调度参数配置如下:
bizdate=$[yyyymmdd-1] bizdate_year=$[yyyy-1] bizdate_month=$[mm-1] bizdate_day=$[dd-1]
②
配置RestAPI请求参数。其中province、city参数来自循环节点。
date=${bizdate_year}-${bizdate_month}-${bizdate_day}&province=${dag.foreach.current[0]}&city=${dag.foreach.current[1]}
③
配置MaxCompute分区参数,其中province参数来自循环节点。
province=${dag.foreach.current[0]}
④
配置MaxCompute分区参数,其中city参数来自循环节点。
city=${dag.foreach.current[1]}
⑤
配置MaxCompute分区参数,其中ds参数来自调度参数。
ds=${bizdate}
⑥
根据接口中的数据定义,填写RestAPI中的字段,注意字段名为大小写敏感。添加好字段后,可以使用同名映射或者手动连线的方式,建立列映射。
配置完成后提交发布for-each节点。
测试运行
在成功提交发布赋值节点和for-each节点后,在运维中心的周期任务中,对赋值节点进行补数据操作。具体操作,请参见执行补数据并查看补数据实例。
选择补数据的业务日期,选中运行的节点。
任务运行后,在运行日志中查看调度参数渲染是否符合预期。
如图中所示,调度参数渲染正确,数据将被写入MaxCompute表的
province=shanghai,city=shanghai,ds=20231215
分区中。
数据检查
您可以在数据开发的临时查询功能中查看数据是否正确同步到了MaxCompute,临时查询的示例如下:
示例中ods_xiaobo_rest5为上述准备工作中创建的MaxCompute分区表。
SELECT weather
,mintemperature
,maxtemperature
,unit
,province
,city
,ds
FROM ods_xiaobo_rest5
WHERE ds != 1
ORDER BY ds,province,city;
运行完成后,查看数据是否正确同步到MaxCompute中。