RestAPI(HTTP形式)数据源

RestAPI数据源为您提供读取和写入RestAPI双向通道的功能,本文为您介绍DataWorks的RestAPI数据同步的能力支持情况。

使用限制

  • 目前该数据源仅支持独享数据集成资源组

  • 目前不支持设置超时参数,当前DataWorks内置的请求超时时间是60s, 如果您的API查询返回时间超过60s将导致任务失败。

支持的字段类型

类型分类

数据集成column配置类型

整数类

LONG,INT

字符串类

STRING

浮点类

DOUBLE,FLOAT

布尔类

BOOLEAN

日期时间类

DATE

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

常见问题

  1. 只能指定好请求数据的翻页次数吗?

    答:是的

  2. 是否支持自动翻页,例如当请求参数后面没数据时便停止翻页。

    答:不支持, 否则无法进行split切分。

  3. 如果需要指定翻页次数,但指定翻页次数比实际页数多,导致后面数据为空,系统会如何处理?

    答:当后面页数数据为空时,相当于SQL查到空数据,系统将会继续查询下一条数据。

  4. 只支持解析一层JSON据吗?

    答:是的,不会进行深入解析。

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"restapi",
            "parameter":{
                "url":"http://127.0.0.1:5000/get_array5",
                "dataMode":"oneData",
                "responseType":"json",
                "column":[
                    {
                        "type":"long",
                        "name":"a.b"  //从a.b路径中查找数据
                    },
                    {
                        "type":"string",  //从a.c路径中查找数据
                        "name":"a.c"
                    }
                ],
                "dirtyData":"null",
                "method":"get",
                "defaultHeader":{
                    "X-Custom-Header":"test header"
                },
                "customHeader":{
                    "X-Custom-Header2":"test header2"
                },
                "parameters":"abc=1&def=1"
            },
            "name":"restapireader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{

            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""
        },
        "speed":{
            "throttle":true,  //当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,  //作业并发数。 
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

脚本模式配置说明如下:

Restapi插件发出http(s)请求后,会获得请求响应body(body是一个json),dataPath用来配置从body中提取数据的json path路径。举2个示例如下:


以接口返回数据body如下举例,其中业务数据在DATA内,且接口一次返回了多行数据(DATA是一个数组):
{
    "HEADER": {
        "BUSID": "bid1",
        "RECID": "uuid",
        "SENDER": "dc",
        "RECEIVER": "pre",
        "DTSEND": "202201250000"
    },
    "DATA": [
        {
            "SERNR": "sernr1"
        },
        {
            "SERNR": "sernr2"
        }
    ]
}

如果需要将DATA中的多行数据抽取为多条同步记录,则需要将 column 配置为 "column": [ "SERNR" ],dataMode 配置为 "dataMode": "multiData",dataPath 配置为 "dataPath": "DATA"


以接口返回数据body如下举例,其中业务数据在content.DATA内,且接口一次返回了1行数据(DATA是一个对象):
{
    "HEADER": {
        "BUSID": "bid1",
        "RECID": "uuid",
        "SENDER": "dc",
        "RECEIVER": "pre",
        "DTSEND": "202201250000"
    },
    "content": {
        "DATA": {
            "SERNR": "sernr2"
        }
    }
}

如果需要将content.DATA中的一行数据抽取为一条同步记录,则需要将 column 配置为 "column": [ "SERNR" ],dataMode 配置为 "dataMode": "oneData",dataPath 配置为 "dataPath": "content.DATA"
                

Reader脚本参数

说明

以下的参数包含在添加数据源和配置数据集成任务节点的过程中。

当前插件暂不支持使用调度参数。

参数

描述

是否必选

默认值

url

RESTful接口地址。

dataMode

RESTful请求返回的结果JSON数据的格式。

  • oneData:从返回的JSON中取其1条数据。

  • multiData:从返回的JSON中取一个JSON数组,传递多条数据给writer。

responseType

返回结果的数据格式,目前仅支持JSON格式。

JSON

column

读取字段列表,type指定源数据的类型,name指定当前column数据获取的JSON路径。您可以指定column字段信息,配置如下。

"column":[{"type":"long","name":"a.b" //从a.b路径中查找数据},{"type":"string","name":"a.c"//从a.c路径中查找数据}]

对于您指定的column信息,type和name必须填写。

dataPath

从返回结果中查询单个JSON对象或者JSON数组的路径。

method

请求方法,支持get或post两种方式。

customHeader

传递给RESTful接口的header信息。

parameters

传递给RESTful接口的参数信息。

  • get方法填入abc=1&def=1

  • post方法填入JSON类型参数。

dirtyData

当从指定的column json路径中找不到数据时的处理方式。

  • dirty:当一条数据解析时遇到column找不时这条数据置为脏数据。

  • null:当一条数据解析时遇到column找不到时,这个column设置为null。

dirty

requestTimes

从RESTful地址中请求数据的次数。

  • single:只进行一次请求。

  • multiple:进行多次请求。

single

requestParam

若requestTimes设为multiple时,需要指定循环的参数,例如pageNumber,插件会根据设置的startIndex、endIndex、step三个参数循环传递pageNumber参数给RESTful接口,进行多次请求。

startIndex

循环请求的起点,起点包含在循环请求之内。

endIndex

循环请求的终点,终点包含在循环请求之内。

step

循环请求的步长。

authType

验证方法。包括:

  • Basic Auth:基础验证。

    如果数据源API支持用户名和密码的方式进行验证,您可选择此种验证方式,并在选择完成后配置用于验证的用户名和密码,后续数据集成过程中对接数据源时,通过Basic Auth协议传递给RESTful地址,完成验证。

  • Token Auth:Token验证。

    如果数据源API支持Token的方式进行验证,您可选择此种验证方式,并在选择完成后配置用于验证的固定Token值,后续数据集成过程中对接数据源时,通过传入header中进行验证,例如:{"Authorization":"Bearer TokenXXXXXX"}。

  • Aliyun API Signature:阿里云API签名验证。

    如果数据源为阿里云产品,且此阿里云产品的API支持通过AccessKey和AccessSecret的方式进行验证,您可选择此种种验证方式,并在选择完成后配置用于验证的AccessKey和AccessSecret。

authUsername/authPassword

Basic Auth验证的用户名和密码。

authToken

Token Auth验证的token。

accessKey/accessSecret

Aliyun API签名验证的账户信息。

Writer脚本Demo

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{

            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"restapi",
            "parameter":{
                "url":"http://127.0.0.1:5000/writer1",
                "dataMode":"oneData",
                "responseType":"json",
                "column":[
                    {
                        "type":"long", //放置column数据到路径a.b
                        "name":"a.b"
                    },
                    {
                        "type":"string", //放置column数据到路径a.c
                        "name":"a.c"
                    }
                ],
                "method":"post",
                "defaultHeader":{
                    "X-Custom-Header":"test header"
                },
                "customHeader":{
                    "X-Custom-Header2":"test header2"
                },
                "parameters":"abc=1&def=1",
                "batchSize":256
            },
            "name":"restapiwriter",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer脚本参数

参数

描述

是否必须

默认值

url

RESTful接口地址。

dataMode

RESTful请求传递的JSON数据的格式。

  • oneData:一次请求只传递一条数据,有几条数据就进行几次请求。

  • multiData:一次请求传递一批数据,根据reader端切分的task数确定请求次数。

column

生成JSON数据对应的字段路径列表,type指定源数据的类型,name指定当前column数据放置的JSON路径。您可以指定column字段信息,配置如下。

"column":[{"type":"long","name":"a.b" //放置column数据到路径a.b},{"type":"string","name":"a.c"//放置column数据到路径a.c}]

说明

对于您指定的column信息,type和name必须填写。

dataPath

数据结果放置的JSON对象的路径。

method

请求方法,支持post和put。

customHeader

传递给RESTful接口的header信息。

authType

验证方法。

  • Basic Auth:基础验证。

    如果数据源API支持用户名和密码的方式进行验证,您可选择此种验证方式,并在选择完成后配置用于验证的用户名和密码,后续数据集成过程中对接数据源时,通过Basic Auth协议传递给RESTful地址,完成验证。

  • Token Auth:Token验证。

    如果数据源API支持Token的方式进行验证,您可选择此种验证方式,并在选择完成后配置用于验证的固定Token值,后续数据集成过程中对接数据源时,通过传入header中进行验证,例如:{"Authorization":"Bearer TokenXXXXXX"}。

  • Aliyun API Signature:阿里云API签名验证。

    如果数据源为阿里云产品,且此阿里云产品的API支持通过AccessKey和AccessSecret的方式进行验证,您可选择此种种验证方式,并在选择完成后配置用于验证的AccessKey和AccessSecret。

authUsername/authPassword

Basic Auth验证的用户名密码。

authToken

Token Auth验证的token。

accessKey/accessSecret

Aliyun API签名验证的账户信息。

batchSize

dataMode为multiData时,一次请求最大的数据条数。

512