RestAPI Reader best practices

更新时间:
复制 MD 格式

The Data Integration RestAPI Reader plugin enables you to read data from RESTful APIs. By configuring an HTTP request URL, you can retrieve data using various methods, including fetching data within a specific time range, retrieving paginated data, or looping through request parameters. The plugin converts the retrieved data into data types supported by Data Integration and passes it to a downstream writer plugin. This topic describes common use cases for RESTful API data sources.

Note

Background information

This topic describes how the DataWorks data integration RestAPI Reader reads data and returns results.

Property

Description

Response format

Only JSON responses are supported.

Readable data types

You can read the INT, BOOLEAN, DATE, DOUBLE, FLOAT, LONG, and STRING data types.

Request methods

The RestAPI Reader supports GET and POST request methods.

Authentication methods

The RestAPI Reader supports no authentication, or one of the following authentication methods: Basic Auth, Token Auth, and Aliyun API Signature. You can select an authentication method that is supported by your data source and configure the required authentication parameters.

  • Basic Auth: Basic authentication.

    If the data source API supports authentication with a username and password, select this method. Then, configure the username and password. During data integration, the credentials are sent to the RESTful endpoint through the Basic Auth protocol for authentication.

  • Token Auth: Token-based authentication.

    If the data source API supports token-based authentication, select this method. Then, configure a fixed token value. During data integration, the token is passed in the request header for authentication. For example: {"Authorization":"Bearer TokenXXXXXX"}.

    Note

    To use a custom encryption method, you can use the Token authentication method and provide the encrypted authentication information as the AuthToken.

Practice 1: Read API data by time range

API definition

This practice reads data from a RESTful API and writes it to a MaxCompute partitioned table. The example uses a sample GET API that returns data for a specified time range. The API details are as follows.

Note

The API in this practice is for demonstration purposes only. You can adapt the configuration for your API.

  • Sample request:

    http://TestAPIAddress:Port/rest/test2?startTime=<StartTime>&endTime=<EndTime>

    The startTime and endTime request parameters specify the time range for data retrieval.

  • Sample response:

    {
        "status": "success",
        "totalNum": 187,
        "data": [
            {
                "axis": "series1",
                "value": 9191352,
                "createTime": "2023-01-04 00:07:20"
            },
            {
                "axis": "series1",
                "value": 6645322,
                "createTime": "2023-01-04 00:14:47"
            },
            {
                "axis": "series1",
                "value": 2078369,
                "createTime": "2023-01-04 00:22:13"
            },
            {
                "axis": "series1",
                "value": 7325410,
                "createTime": "2023-01-04 00:29:30"
            },
            {
                "axis": "series1",
                "value": 7448456,
                "createTime": "2023-01-04 00:37:04"
            },
            {
                "axis": "series1",
                "value": 5808077,
                "createTime": "2023-01-04 00:44:30"
            },
            {
                "axis": "series1",
                "value": 5625821,
                "createTime": "2023-01-04 00:52:06"
            }
        ]
    }

    The data field contains the results. The retrieved data has three fields: axis, value, and createTime.

  • To test the API, use a tool like Postman to send a GET request and pass the startTime and endTime parameters, for example, from 2023-01-04 00:00:00 to 2023-01-04 23:59:59. The API returns a 200 OK response with totalNum as 187, and the data array contains data within the specified time range.

Create a MaxCompute partitioned table

This practice synchronizes data from the API to a MaxCompute partitioned table. First, you must create the destination table to store the data.

Note

An overwrite write strategy for a partitioned table allows you to overwrite partitions. This makes the data synchronization task rerunnable without causing data duplication. Partitioned tables also simplify subsequent data analysis.

The following DDL statement creates the table.

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest2
(
  `axis`  STRING
  ,`value` BIGINT
  ,`createTime` STRING
)
PARTITIONED BY
(
  ds  STRING
)
LIFECYCLE 3650;

If you are using the Standard Edition of DataWorks and deploy the partitioned table to the production environment, you can then view this table in Data Map.

Configure the batch synchronization task

  1. Add a RestAPI data source.

    In your DataWorks workspace, add a RestAPI data source. For more information, see Configure a RestAPI data source. In the Add RestAPI Data Source dialog box, enter the Data Source Name and Data Source Description, select the Applicable Environment (development or production), and configure the Default Request Headers (default is {}). Key configurations are as follows:

    • url: Enter the URL of the RESTful API.

    • Authentication Method: Select an authentication method that your data source API supports and configure the required parameters.

    • Resource group connectivity: RestAPI data sources support only an exclusive resource group for Data Integration. You must select one and test its connectivity to the data source.

  2. Create and configure a batch synchronization node.

    In DataStudio, create a batch synchronization node. For more information, see Configure a batch synchronization node in wizard mode. Key configurations are as follows:

    • Key configurations for Data Source:

      • Data Source: Select the RestAPI data source that you created in the previous step.

      • Request Method: This practice uses the GET method. Select GET.

      • Return Data Structure: The API in this practice returns a JSON array. Select Array Data.

      • The JSON path for data storage: The API returns data in the data field, so set this parameter to data.

      • Request Param: Use request parameters with scheduling parameters to synchronize the previous day's data.

        • Set the request parameters to startTime=${extract_day} ${start_time}&endTime=${extract_day} ${end_time}.

        • In the scheduling configuration that follows, add three scheduling parameters: extract_day=${yyyy-mm-dd}, start_time=00:00:00, and end_time=23:59:59.

        For example, if the task runs on January 5, 2023, the value of extract_day is 2023-01-04. The request parameters are concatenated as: startTime=2023-01-04 00:00:00&endTime=2023-01-04 23:59:59.

    • Key configurations for Data Destination:

      • Data Source and Table: Select the MaxCompute partitioned table that you created.

      • Partition Information: Configure the partition by using scheduling parameters.

        • Set the partition information to ${bizdate}.

        • In the scheduling configuration that follows, add one scheduling parameter: bizdate=$bizdate.

        For example, if the task runs on January 5, 2023, the partition value is 20230104.

    • Field Mapping: Based on the data schema defined in the API, enter the fields from the RestAPI. Note that field names are case-sensitive. After you add the fields, you can map the columns by using The same name mapping or by mapping the fields manually.

Test the run

This tutorial uses scheduling parameters. Therefore, after you configure the batch synchronization task, you can test it by clicking Run with Parameters at the top of the batch synchronization node page and entering test values for the scheduling parameters as prompted. This button is displayed in the toolbar as a run icon with a parameter tag to the right of the Run button. After the test run is complete, you can view the operation log at the bottom of the page to check whether the scheduling parameter values are as expected.

Verify the data

You can run an ad hoc query in DataStudio to check whether data was synchronized to MaxCompute correctly. The following code provides a sample statement:

select * from ods_xiaobo_rest2 where ds='20230104' order by createtime;

In this statement, ods_xiaobo_rest2 is the MaxCompute partitioned table that you created, and 20230104 is the partition value used for the test run.

After the query finishes, check the results at the bottom of the page. The table should display the axis, value, and createTime fields, as well as the value for the partition ds. If this data matches the JSON data from the API, the synchronization was successful.

Deploy the task and backfill data

After you test the task and verify the data, you can deploy the batch synchronization task to the production environment. For more information, see Deployment process for tasks in a standard workspace. In the toolbar at the top of the node editing page, click the Submit icon. After the task is successfully deployed, the cycle task appears in Operation Center. You can then backfill historical data. For more information about the backfill data feature, see Manage backfill instances.

Practice 2: Read paginated API data

Scenario: API definition

This practice shows how to read data from a self-built RESTful GET endpoint for testing and write it to a MaxCompute partitioned table.

Note

The API in this practice is for demonstration only. You can adapt the configuration based on the API you are using.

  • Sample request:

    http://TestAPIAddress:Port/rest/test1?pageSize=5&pageNum=1

    pageSize and pageNum are request parameters that specify the page size and page number, respectively.

  • Sample response:

    {
        "status": "success",
        "totalNum": 304,
        "data": [
            {
                "id": 6,
                "name": "Test User 6"
            },
            {
                "id": 7,
                "name": "Test User 7"
            },
            {
                "id": 8,
                "name": "Test User 8"
            },
            {
                "id": 9,
                "name": "Test User 9"
            },
            {
                "id": 10,
                "name": "Test User 10"
            }
        ]
    }

    data is the key for the returned data array. The retrieved data contains two fields: id and name.

  • Example call in an API testing tool: In an API testing tool such as Postman, send a GET request with the parameters pageSize=5 and pageNum=2. The API returns a 200 OK response. The value of totalNum is 304, and the data array contains the user data for the second page, starting with "id": 11 and "name": "Test User 11".

Prerequisites: Create a MaxCompute partitioned table

Because this practice synchronizes data from the API to a MaxCompute partitioned table, you must first create the destination table.

Note

Using an overwrite command on a partitioned table lets you rewrite specific partitions. This makes the data synchronization task rerunnable, preventing data duplication. Partitioned tables also facilitate further data analysis.

The following DDL statement creates the table.

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest1
(
  `id` BIGINT
  ,`name` STRING
)
PARTITIONED BY
(
  ds  STRING
)
LIFECYCLE 3650;

If you use the Standard Edition of DataWorks and deploy the partitioned table to the production environment, you can view this table in Data Map.

Configure the synchronization task

  1. Add a RestAPI data source.

    In your DataWorks workspace, add a RestAPI data source. For more information, see Configure a RestAPI data source. In the Add RestAPI Data Source dialog box, enter the Data Source Name and Data Source Description, select the Applicable Environment (development or production), and configure the Default Request Headers (default is {}). Key configurations are as follows:

    • url: Enter the URL of the RESTful API.

    • Authentication Method: Select an authentication method that your data source API supports and configure the required parameters.

    • Resource group connectivity: RestAPI data sources support only an exclusive resource group for Data Integration. You must select one and test its connectivity to the data source.

  2. Create a batch synchronization node and configure the synchronization task.

    In DataStudio of DataWorks, create a batch synchronization node. For details, see Configure a task in the codeless UI. Key settings:

    • Key settings for Data Source:

      • Data Source: Select the RestAPI data source that you created in the previous step.

      • Request Method: This example uses a GET API. Select GET.

      • Return Data Structure: The API in this example returns a JSON array. Select Array Data.

      • The JSON path for data storage: In this example, the data is located under the data key in the response. Set this parameter to data.

      • Request Param: The page size is fixed. Set this parameter to pageSize=50. Avoid setting a large page size, as it can place a high load on both the REST API server and the synchronization task.

      • The number of requests: In this example, select Multiple Requests.

        The pagination parameter for this API is pageNum. After you select Multiple Requests, configure the following parameters:

        • Parameter used for multiple requests: Set to pageNum.

        • StartIndex: Set to 1.

        • Step: Set to 1.

        • EndIndex: Set to 100.

    • Key configurations for Data Destination:

      • Data Source and Table: Select the MaxCompute partitioned table that you created.

      • Partition Information: Configure the partition by using scheduling parameters.

        • Set the partition information to ${bizdate}.

        • In the scheduling configuration that follows, add one scheduling parameter: bizdate=$bizdate.

        For example, if the task runs on January 5, 2023, the partition value is 20230104.

    • Field Mapping: Based on the data schema defined in the API, enter the fields from the RestAPI. Note that field names are case-sensitive. After you add the fields, you can map the columns by using The same name mapping or by mapping the fields manually.

Run a test

This tutorial uses scheduling parameters. Therefore, after you configure the batch synchronization task, you can test it by clicking Run with Parameters at the top of the batch synchronization node page and entering test values for the scheduling parameters as prompted. This button is displayed in the toolbar as a run icon with a parameter tag to the right of the Run button. After the test run is complete, you can view the operation log at the bottom of the page to check whether the scheduling parameter values are as expected.

Verify the data

You can run an ad hoc query in DataStudio to check whether the data has been correctly synchronized to MaxCompute. The following is a sample query:

select * from ods_xiaobo_rest1 where ds='20230104' order by id;

In this statement, ods_xiaobo_rest1 is your MaxCompute partitioned table, and 20230104 is the partition value from the test run.

After the query runs, check the results to verify that the data was correctly synchronized to MaxCompute. The results table should display values for the id and name fields, as well as the value for the partition key ds. Confirm that this data matches the JSON data returned by the API, which indicates a successful synchronization.

Practice 3: Read data from a POST API

API definition

This practice demonstrates how to read data from a RESTful API and write it to a MaxCompute partitioned table. The practice uses a custom-built test POST API. The API details are as follows.

Note

The API in this practice is for demonstration only and illustrates the workflow. You can adapt the configuration for your own API.

  • Sample request:

    http://TestAPIAddress:Port/rest/test3

    The request body is in JSON format.

    {
      "userId":16,
      "startTime":"2023-01-04 00:00:00",
      "endTime":"2023-01-04 23:59:59"
    }
  • Sample response:

    {
        "status": "success",
        "totalNum": 289,
        "data": [
            {
                "user": {
                    "id": 16,
                    "name": "User 16"
                },
                "axis": "series1",
                "value": 8231053,
                "createTime": "2023-01-04 00:04:57"
            },
            {
                "user": {
                    "id": 16,
                    "name": "User 16"
                },
                "axis": "series1",
                "value": 6519928,
                "createTime": "2023-01-04 00:09:51"
            },
            {
                "user": {
                    "id": 16,
                    "name": "User 16"
                },
                "axis": "series1",
                "value": 2915920,
                "createTime": "2023-01-04 00:14:36"
            },
            {
                "user": {
                    "id": 16,
                    "name": "User 16"
                },
                "axis": "series1",
                "value": 7971851,
                "createTime": "2023-01-04 00:19:51"
            },
            {
                "user": {
                    "id": 16,
                    "name": "User 16"
                },
                "axis": "series1",
                "value": 6598996,
                "createTime": "2023-01-04 00:24:30"
            }
        ]
    }

    In the response, data is the key that contains the returned data. The data contains five fields: user.id, user.name, axis, value, and createTime.

  • Sample call using an API testing tool: In an API testing tool such as Postman, send a POST request. For the request body, select the raw format and set the type to JSON. Pass a request body that contains userId, startTime, and endTime. The API returns a 200 OK response. The response body shows that totalNum is 289 and the data array contains records for the specified user and time range.

Create a MaxCompute partitioned table

This practice synchronizes data from the API to a MaxCompute partitioned table. First, create a partitioned table to store the data.

Note

Using a partitioned table in overwrite mode allows you to rerun the batch synchronization task without creating duplicate data. Partitioned tables also simplify data analysis.

The DDL statement is as follows.

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest3
(
  `user_id` BIGINT
  ,`name` STRING
  ,`axis`  STRING
  ,`value` BIGINT
  ,`create_time` STRING
)
PARTITIONED BY
(
  ds  STRING
)
LIFECYCLE 3650;

If you are using the standard edition of DataWorks and deploy the partitioned table to the production environment, you can later find this table in Data Map.

Configure the batch synchronization task

  1. Add a RestAPI data source.

    In your DataWorks workspace, add a RestAPI data source. For more information, see Configure a RestAPI data source. In the Add RestAPI Data Source dialog box, enter the Data Source Name and Data Source Description, select the Applicable Environment (development or production), and configure the Default Request Headers (default is {}). Key configurations are as follows:

    • url: Enter the URL of the RESTful API.

    • Authentication Method: Select an authentication method that your data source API supports and configure the required parameters.

    • Resource group connectivity: RestAPI data sources support only an exclusive resource group for Data Integration. You must select one and test its connectivity to the data source.

  2. Create and configure a batch synchronization node.

    In DataStudio of DataWorks, create a batch synchronization node. For details, see Configure by using the codeless UI. Key settings are as follows.

    • Key settings for Data Source:

      • Data Source: Select the RestAPI data source that you created in the previous step.

      • Request Method: This practice uses a POST API. Select POST.

      • Return Data Structure: The API in this practice returns a JSON array. Select Array Data.

      • The JSON path for data storage: Set this to data, which is the key that holds the data array in the API response.

      • Header: Set this parameter to {"Content-Type":"application/json"} to specify that the request body is in JSON format.

      • Request Param: Combine request parameters with scheduling parameters to synchronize the current day's data on a daily basis.

        • Configure the request parameters as follows:

          {
          
              "userId":16,
          
              "startTime":"${extract_day} 00:00:00",
          
              "endTime":"${extract_day} 23:59:59"
          
          }
        • In the scheduling configurations, add a new scheduling parameter: extract_day=${yyyy-mm-dd}.

    • Key configurations for Data Destination:

      • Data Source and Table: Select the MaxCompute partitioned table that you created.

      • Partition Information: Configure the partition by using scheduling parameters.

        • Set the partition information to ${bizdate}.

        • In the scheduling configuration that follows, add one scheduling parameter: bizdate=$bizdate.

        For example, if the task runs on January 5, 2023, the partition value is 20230104.

    • For Field Mapping, enter the fields from the RestAPI interface based on the data definitions in the API. Use periods to separate multiple fields. Note that field names are case-sensitive. After you add the fields, create column mappings by using The same name mapping or by manually creating connections.

Test run

This tutorial uses scheduling parameters. Therefore, after you configure the batch synchronization task, you can test it by clicking Run with Parameters at the top of the batch synchronization node page and entering test values for the scheduling parameters as prompted. This button is displayed in the toolbar as a run icon with a parameter tag to the right of the Run button. After the test run is complete, you can view the operation log at the bottom of the page to check whether the scheduling parameter values are as expected.

Data verification

You can run an ad hoc query in DataStudio to verify that the data has been correctly synchronized to MaxCompute. The following is a sample statement for the ad hoc query.

select * from ods_xiaobo_rest3 where ds='20230105' order by create_time;

In this statement, ods_xiaobo_rest3 is the MaxCompute partitioned table created previously, and 20230105 is the partition value for the test run.

After the query finishes, check the query result at the bottom of the page to verify that the data has been correctly synchronized to MaxCompute. The results table should display values for fields such as user_id, name, axis, value, and create_time, as well as the value for the partition ds. Confirm that these values match the data returned by the API and that the nested fields were mapped and synchronized correctly.

Practice 4: Iterate parameters to read from a REST API

Use case: API definition

This practice demonstrates how to build a workflow that iteratively reads data from a RESTful API and writes it to a MaxCompute partitioned table. This practice uses a self-built test GET API that returns temperature data based on the date, province, and city input parameters.

Note

The API in this practice is for demonstration purposes only. You can adjust the configurations based on the API you use.

  • Sample request:

    http://TestAPIAddress:Port/rest/test5?date=2023-01-04&province=zhejiang&city=hangzhou
  • Sample response:

    {
      "province": "P1",
      "city": "hz",
      "date": "2023-01-04",
      "minTemperature": "-14",
      "maxTemperature": "-7",
      "unit": "°C",
      "weather": "cool"
    }
  • Sample API call in a testing tool: In an API testing tool such as Postman, send a GET request and pass the parameters date=2023-01-04, province=p1, and city=hz. The API returns a JSON response that includes the minTemperature (-14), maxTemperature (-7), unit (°C), and weather (cool) information.

Prerequisites: Create parameter and MaxCompute partitioned tables

In this practice, you synchronize data from an API to a MaxCompute partitioned table. First, create a parameter table to store the province and city values for iteration. Then, create a destination partitioned table to store the synchronized data.

Note

Using a partitioned table in overwrite mode enables partition overwrites. This makes the data synchronization task rerunnable without creating duplicate data. Partitioned tables also simplify data analysis.

The following are the DDL statements:

Parameter table

CREATE TABLE IF NOT EXISTS `citys`
(
  `province` STRING ,
  `city` STRING
);

insert into citys
select 'shanghai','shanghai'
union all select 'zhejiang','hangzhou'
union all select 'sichuan','chengdu';

MaxCompute partitioned table

CREATE TABLE IF NOT EXISTS ods_xiaobo_rest5
(
    `minTemperature` STRING ,
    `maxTemperature` STRING ,
    `unit` STRING ,
    `weather` STRING 
)
PARTITIONED BY 
(
    `province` STRING ,
    `city` STRING ,
    `ds`  STRING
)
LIFECYCLE 3650;

If you use the Standard Edition of DataWorks and deploy the created tables to the production environment, you can view the tables in Data Map.

Configure the synchronization task

  1. Add a RestAPI data source.

    In your DataWorks workspace, add a RestAPI data source. For more information, see Configure a RestAPI data source. In the Add RestAPI Data Source dialog box, specify the Data Source Name and Data Source Description, select the Environment (Development or Production), and configure the Default Request Headers (the default is {}). The key configurations are as follows:

    • url: Enter the URL of the RESTful API.

    • Authentication Method: Select an authentication method supported by the API and configure the required parameters.

    • Resource group connectivity: Select a resource group and test its connectivity.

  2. In DataStudio, create an assignment node named setval_citys. For more information, see Assignment node.

    The key configurations are as follows:

    Item

    Description

    • Assignment Language: ODPS SQL

    • Assignment Code:

      SELECT  province
              ,city
      FROM    citys;

    Rerun Property: Set this parameter to You can run again after successful or failed operation..

    After you configure the assignment node, deploy it.

  3. In DataStudio, create a for-each node. For more information, see for-each node. The key configurations are as follows:

    Item

    Description

    Rerun Property: Set this parameter to You can run again after successful or failed operation..

    Ancestor Node: Select the node created in the previous step, the setval_citys node.

    Node Context Parameters: Select the source of the input parameters.

    Batch Synchronization Node: Configure the batch synchronization node within the for-each node. For more information, see the next step.

  4. Create a batch synchronization node and configure the synchronization task. For more information, see Configure a task in wizard mode.

    The key configurations are as follows:

    Item

    Description

    Configure scheduling parameters as follows:

    bizdate=$[yyyymmdd-1] 
    bizdate_year=$[yyyy-1] 
    bizdate_month=$[mm-1] 
    bizdate_day=$[dd-1]

    Configure the RestAPI request parameters. The province and city parameters are inherited from the for-each node.

    date=${bizdate_year}-${bizdate_month}-${bizdate_day}&province=${dag.foreach.current[0]}&city=${dag.foreach.current[1]}

    Configure the MaxCompute partition parameter for province. This value is passed from the for-each node.

    province=${dag.foreach.current[0]}

    Configure the MaxCompute partition parameter for city. This value is passed from the for-each node.

    city=${dag.foreach.current[1]}

    Configure the MaxCompute partition parameter for ds. The value is inherited from the scheduling parameters.

    ds=${bizdate}

    Based on the API response schema, enter the source fields from the RestAPI. Note that field names are case-sensitive. After you add the fields, click The same name mapping or map the fields manually.

    After you configure the node, deploy the for-each node.

Run a test

  1. After deploying the assignment and for-each nodes, go to Operation Center, find the periodic task for the assignment node, and run a backfill. For more information, see Manage backfill instances.

  2. Based on your business requirements, select the business date for the backfill task and its descendant nodes.

  3. After the task runs, view the details of the backfill instance to confirm that the execution parameters and run logs are correct.

    In this example, data is written to the province=shanghai,city=shanghai,ds=20231215 partition of the MaxCompute table.

Verify the data

Run an ad hoc query in DataStudio to verify that the data was synchronized correctly to MaxCompute. The following is an example query:

In this example, ods_xiaobo_rest5 is the MaxCompute partitioned table that you created in the Prerequisites section.

SELECT  weather
        ,mintemperature
        ,maxtemperature
        ,unit
        ,province
        ,city
        ,ds
FROM    ods_xiaobo_rest5
WHERE   ds != 1
ORDER BY ds,province,city;

When the query finishes, check the results to verify that the data has been synchronized to MaxCompute.

Weather

Min temperature

Max temperature

Unit

Province

City

ds

COOL

3

9

°C

shanghai

shanghai

20231215

HAZY

-2

6

°C

sichuan

chengdu

20231215

FOGGY

19

28

°C

zhejiang

hangzhou

20231215

SNOWY

-16

-5

°C

shanghai

shanghai

20231216

SNOWY

-16

-8

°C

sichuan

chengdu

20231216

SUNNY

15

25

°C

zhejiang

hangzhou

20231216

COOL

-10

2

°C

shanghai

shanghai

20231217

HAZY

15

24

°C

sichuan

chengdu

20231217

FOGGY

4

11

°C

zhejiang

hangzhou

20231217

HOT

-14

-7

°C

shanghai

shanghai

20231218

FOGGY

-2

4

°C

sichuan

chengdu

20231218

RAINY

9

19

°C

zhejiang

hangzhou

20231218