全部产品
云市场

API参考

更新时间:2019-07-31 13:32:29

一. 接口规范

1.公共请求Header

名字 描述
x-datahub-client-version API版本信息
x-datahub-security-token Security Token,如果存在
Date 标准GMT时间,格式: EEE, dd MMM yyyy HH:mm:ss z
Authorization 签名信息
Content-Type 传输数据序列化协议

2. 公共返回Header

名字 描述
Content-Type 传输数据序列化协议
Content-Length 传输数据长度
x-datahub-request-id 全局唯一请求ID

3. 错误码

名字 描述 备注
InvalidParameter 参数错误
InvalidCursor Cursor无效
NoSuchXXX 资源不存在
XXXAlreadyExist 资源已存在
Unauthorized 认证失败 AccessKey信息错误,用户时间戳不对等
NoPermission 鉴权失败 RAM权限错误
OperationDenied 禁止操作 操作禁止,比如删除有topic存在的project
LimitExceeded 流控受限 服务端QPS、流量等限制
InvalidShardOperation Shard分裂或合并,变成Sealed
MalformedRecord 数据格式不正确
OffsetReseted 点位重置
OffsetSessionChanged SubId被其他用户open占用
SubscriptionOffline 订阅下线
InternalServerError 系统内部错误
其他 其他非可重试异常,后续可能会被回收

4. 统一错误返回格式

名字 描述
ErrorCode 错误码
ErrorMessage 详细错误描述信息

错误响应示例:

  1. HTTP/1.1 403
  2. x-datahub-request-id: 2018050817492199d6650a00000039
  3. Content-Type: application/json
  4. Content-Length: xxx
  5. {
  6. "ErrorCode": "Unauthorized",
  7. "ErrorMessage": "Authroize failed"
  8. }

5. 限制描述

名字 描述
ProjectName 长度:[3, 32],仅包含字母、数字和’_’, 以字母开头,不区分大小写
TopicName 长度:[3, 128],仅包含字母、数字和’_’, 以字母开头,不区分大小写

二、Authorization字段计算的方法

  1. Authorization = "DATAHUB " + AccessId + ":" + Signature
  2. Signature = base64(hmac-sha1(AccessKey,
  3. HTTPMethod + "\n"
  4. + Content-Type + "\n"
  5. + Date + "\n"
  6. + CanonicalizedDataHubHeaders + "\n"
  7. + CanonicalizedResource))
  • AccessKey 表示签名所需的密钥。
  • HTTPMethod 表示HTTP 请求的Method,主要有PUT、GET、POST、HEAD、DELETE等。
  • \n 表示换行符。
  • Content-Type 表示请求内容的类型,一般固定为“application/json”。
  • Date 表示此次操作的时间,且必须为GMT格式,如“Sun, 22 Nov 2015 08:16:38 GMT”。
  • CanonicalizedDataHubHeaders 表示以 x-datahub- 为前缀的HTTP Header的字典序排列。
  • CanonicalizedResource 表示用户想要访问的DataHub资源的Url,若包含param,必须按字典序。

CanonicalizedDataHubHeaders构造方法

所有以 x-datahub- 为前缀的HTTP Header被称为 CanonicalizedDataHubHeaders。构造方法如下:

  1. 将所有以 x-datahub- 为前缀的HTTP请求头的名字转换成小写 。如X-DATAHUB-Client-Version:1.1需要转换成x-datahub-client-version:1.1
  2. 如果请求是以STS获得的AccessKeyId和AccessKeySecret发送时,还需要将获得的security-token值以x-datahub-security-token:token的形式加入到签名字符串中。
  3. 将上一步得到的所有HTTP请求头按照名字的字典序进行升序排列。
  4. 删除请求头和内容之间分隔符两端出现的任何空格。如x-datahub-client-versionn : 1.1转换成:x-datahub-client-version:1.1
  5. 将每一个头和内容用 \n 分隔符分隔拼成最后的CanonicalizedDataHubHeaders

CanonicalizedResource构造方法

用户发送请求中想访问的DataHub目标资源被称为CanonicalizedResource。构造方法如下:

  1. CanonicalizedResource置成空字符串 “”;
  2. 放入要访问的DataHub资源,如某个topic: /projects/test_project/topics/test_topic
  3. 如果请求的资源包含额外的url参数,按照字典序,从小到大排列并以 & 为分隔符生成参数字符串。在CanonicalizedResource字符串尾添加 ?和参数字符串。此时的CanonicalizedResource如:/projects/test_project/topics/test_topic/connectors/sink_odps?donetime

计算签名头规则

  • 签名的字符串必须为 UTF-8 格式。含有中文字符的签名字符串必须先进行 UTF-8 编码,再与AccessKey计算最终签名。
  • 签名的方法用RFC 2104中定义的HMAC-SHA1方法,其中Key为 AccessKey
  • x-datahub- 开头的header在签名验证前需要符合以下规范:
    • header的名字需要变成小写。
    • header按字典序自小到大排序。
    • 分割header name和value的冒号前后不能有空格。
    • 每个Header之后都有一个换行符“\n”,如果没有Header,CanonicalizedDataHubHeaders就设置为空。

签名示例

假如AccessKeyId是”44CF9590006BF252F707”,AccessKeySecret是”OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV”

请求 签名字符串计算公式 签名字符串
POST /projects/test_project/topics/test_topic HTTP/1.1 Host: http://dh-cn-hangzhou.aliyuncs.com User-Agent: customer x-datahub-client-version: 1.1 Content-Type: application/json Date: Thu, 10 Jan 2019 07:28:29 GMT Signature = base64(hmac-sha1(AccessKey,HTTPMethod + “\n” + Content-Type + “\n” + Date + “\n” + CanonicalizedDataHubHeaders+ CanonicalizedResource)) POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic

python计算签名方法如下:

  1. import base64
  2. import hmac
  3. import sha
  4. h = hmac.new("OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV",
  5. "POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic", sha)
  6. Signature = base64.b64encode(h.digest())
  7. print("Signature: %s" % Signature)

计算得到的Signature为2ZOa0YVc6PwOrqaOYzpNGb/3peU=,所以Authorization为DATAHUB 44CF9590006BF252F707:2ZOa0YVc6PwOrqaOYzpNGb/3peU=

所以请求的头部如下:

  1. POST /projects/connector_test/topics/event_time_test HTTP/1.1
  2. Authorization: DATAHUB 44CF9590006BF252F707:2ZOa0YVc6PwOrqaOYzpNGb/3peU=
  3. Content-Type: application/json
  4. Date: Thu, 10 Jan 2019 07:28:29 GMT
  5. Host: http://dh-cn-hangzhou.aliyuncs.com
  6. User-Agent: customer
  7. x-datahub-client-version: 1.1

三、接口定义

创建Project

请求

  • 请求语法

    1. POST /projects/<ProjectName> HTTP/1.1
  • 请求元素

名称 类型 描述
Comment String 描述信息,限制1024字节

响应

  • 响应语法

    1. HTTP/1.1 201 Created

示例

  • 请求示例

    1. POST /projects/<ProjectName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Comment": "test project"
    9. }
  • 响应示例

    1. HTTP/1.1 201 Created
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Length: 0

查询Project

请求

  • 请求语法

    1. GET /projects/<ProjectName> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
CreateTime long 创建时间,单位:秒
LastModifyTime long 更新时间,单位:秒
Comment String 描述信息

示例

  • 请求示例

    1. GET /projects/<ProjectName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Content-Length: xxx
    5. {
    6. "Comment": "test project",
    7. "CreateTime": 1525763481,
    8. "LastModifyTime": 1525763481
    9. }

查询Project列表

请求

  • 请求语法

    1. GET /projects HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
ProjectNames List Project名称列表

示例

  • 请求示例

    1. GET /projects HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "ProjectNames": [
    7. "project1",
    8. "projcet2"
    9. ]
    10. }

更新Project

请求

  • 请求语法

    1. PUT /projects/<ProjectName> HTTP/1.1
  • 请求元素

名称 类型 描述
Comment String 描述信息

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. PUT /projects/<ProjectName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Comment": "update comment"
    9. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Length: 0

删除Project

请求

  • 请求语法

    1. DELETE /projects/<ProjectName> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. DELETE /projects/<ProjectName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Length: 0

创建Topic

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  • 请求元素
名称 类型 描述
Action String 操作类型
ShardCount int 初始shard数目
Lifecycle int 数据存储生命周期
RecordType String BLOB(非结构化数据)/TUPLE(结构化数据)
RecordSchema String 创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数
Comment String 描述信息

响应

  • 响应语法

    1. HTTP/1.1 201 Created

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Action": "create",
    9. "ShardCount": 1,
    10. "Lifecycle": 1,
    11. "RecordType": "TUPLE",
    12. "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
    13. "Comment": "create topic"
    14. }
  • 响应示例

    1. HTTP/1.1 201 Created
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Content-Length: 0

查询Topic

请求

  • 请求语法

    1. GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
ShardCount int 初始shard数目
Lifecycle int 数据存储生命周期
RecordType String BLOB(非结构化数据)/TUPLE(结构化数据)
RecordSchema String 创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数
Comment String 描述信息
CreateTime long 创建时间
LastModifyTime long 更新时间

示例

  • 请求示例

    1. GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 201 Created
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Content-Length: xxx
    5. {
    6. "ShardCount": 1,
    7. "Lifecycle": 1,
    8. "RecordType": "TUPLE",
    9. "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
    10. "Comment": "create topic",
    11. "CreateTime": 1525763481,
    12. "LastModifyTime": 1525763481
    13. }

查询Topic列表

请求

  • 请求语法

    1. GET /projects/<ProjectName>/topics HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
TopicNames List Project名称列表

示例

  • 请求示例

    1. GET /projects/<ProjectNames>/topics HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "TopicNames": [
    7. "topic1",
    8. "topic2"
    9. ]
    10. }

更新Topic

请求

  • 请求语法

    1. PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  • 请求元素

名称 类型 描述
Comment String 描述信息

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Comment": "update comment"
    9. }

删除Topic

请求

  • 请求语法

    1. DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Length: 0

获取Shard列表

请求

  • 请求语法

    1. GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
ShardId String Shard Id
State String Shard当前状态,包括OPENING,ACTIVE,CLOSED等状态
BeginHashKey String 起始HashKey
EndHashKey String 终止HashKey
ParentShardIds List Shard分裂或合并之前的父Shard信息

示例

  • 请求示例

    1. GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "Shards": [
    7. {
    8. "ShardId": "0",
    9. "State": "ACTIVE",
    10. "BeginHashKey":"00000000000000000000000000000000",
    11. "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
    12. "ParentShardIds:[]
    13. }
    14. ]
    15. }

分裂Shard

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: split
ShardId String 需要分裂的Shard Id
SplitKey String 按照此Key进行Split,SplitKey一般等于 BeginHashKey + (EndHashKey - BeginHashKey) / 2

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
NewShards List 分裂后的Shard列表

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Action": "split",
    9. "ShardId": "0",
    10. "SplitKye": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
    11. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "NewShards": [
    7. {
    8. "ShardId": "1",
    9. "BeginHashKey":"00000000000000000000000000000000",
    10. "EndHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
    11. },
    12. {
    13. "ShardId":"0",
    14. "BeginHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
    15. "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
    16. }
    17. ]
    18. }

合并Shard

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: merge
ShardId String 需要合并的Shard Id
AdjacentShardId String 临近的并且满足合并条件的Shard Id

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
ShardId String 合并后的Shard Id
BeginHashKey String 起始HashKey
EndHashKey String 终止HashKey

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Action": "merge",
    9. "ShardId": "0",
    10. "AdjacentShardId": "1"
    11. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "ShardId":"2",
    7. "BeginHashKey":"00000000000000000000000000000000",
    8. "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
    9. }

查询数据Cursor

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
  • 请求元素

名称 类型 描述
Action String Action为: cursor
Type String 按照何种类型获取Cursor,包括: OLDEST, LATEST, SYSTEM_TIME, SEQUENCE
SystemTime Int64 Type为SYSTEM_TIME时填写,单位Ms
Sequence Int64 Type为SEQUENCE时填写

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
Cursor String 返回的Cursor信息
RecordTime Int64 数据写入DataHub时间, 单位Ms
Sequence Int64 数据写入的Sequence,单Shard内唯一

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: application/json
    6. Content-Length: xxx
    7. {
    8. "Action": "cursor",
    9. "Type": "SEQUENCE",
    10. "Sequence": 1
    11. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "Cursor": "30005af19b3800000000000000000000",
    7. "RecordTime": 1525783352873,
    8. "Sequence": 1
    9. }

写入数据 - 不按shard写入

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  • 请求元素

名称 类型 描述
Action String Action为: pub
ShardId String ShardId
Attributes Map 用户属性字段
Data 如果为BLOB类型,Data为数据Base64编码后数据;如果为TUPLE数据,为String类型数组

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
FailedRecordCount Int 失败条数
FailedRecords Array 失败详情

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "pub",
    9. "Records": [
    10. {
    11. "ShardId": "0",
    12. "Attributes": {
    13. "attr1": "value1",
    14. "attr2": "value2"
    15. },
    16. "Data": ["A","B","3","4"]
    17. }
    18. ]
    19. }
  1. // BLOB
  2. {
  3. "Action": "pub",
  4. "Record": [
  5. {
  6. "ShardId": "0",
  7. "Attributes": {
  8. "attr1": "value1",
  9. "attr2": "value2"
  10. },
  11. "Data": "Base64String"
  12. }
  13. ]
  14. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "FailedRecordCount": 1,
    7. "FailedRecords": [
    8. {
    9. "Index": 0,
    10. "ErrorCode": "errorCode",
    11. "ErrorMessage": "errorMsg"
    12. }
    13. ]
    14. }

读取数据

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: sub
Cursor String 从Cursor位置开始读取
Limit Int 读取的条数

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
NextCursor String 下一条数据的Cursor信息
SystemTime Int64 record写入DataHub的时间,单位Ms
Cursor String record对应的Cursor信息
Sequence Int64 record写入Datahub的Sequence
Attributes Map 用户的属性字段
Data 用户的数据字段

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "sub",
    9. "Cursor": "30005af19b3800000000000000000000",
    10. "Limit": 1
    11. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "NextCursor": "30005af19b3800000000000000090001",
    7. "Records": [
    8. {
    9. "Cursor": "30005af19b3800000000000000000000",
    10. "SystemTime": 1525783352873,
    11. "Sequence": 1,
    12. "Attributes": {
    13. "key1": "value1",
    14. "key2": "value2"
    15. },
    16. "Data": ["AAA", "100"]
    17. }
    18. ]
    19. }

新增Field

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: appendfield
FieldName String Field名称
FieldType String Field类型,包括STRING, BIGINT等

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "appendfield",
    9. "FieldName": "field1",
    10. "FieldType": "BIGINT"
    11. }

创建Connector

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  • 请求元素
名称 类型 描述
Type String Connector类型,如SINK_ODPS等
ColumnFields Array 需同步的Field列表
Config Map Connector相关配置

响应

  • 响应语法

    1. HTTP/1.1 201 Created

示例

  • 请求示例,以SINK_ODPS为例

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Type": "SINK_ODPS",
    9. "ColumnFields": ["field1", "field2"],
    10. "Config": {
    11. "Project": "odpsProject",
    12. "Topic": "odpsTopic",
    13. "OdpsEndpoint": "xxx",
    14. "TunnelEndpoint": "xxx",
    15. "AccessId": "xxx",
    16. "AccessKey": "xxx",
    17. "PartitionMode": "SYSTEM_TIME",
    18. "TimeRange": 60,
    19. "PartitionConfig": {
    20. "pt": "%Y%m%d",
    21. "ct": "%H%M"
    22. }
    23. }
    24. }

查询Connector

请求

  • 请求语法

    1. GET /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK

查询Connector列表

请求

  • 请求语法

    1. GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "Connectors": [
    7. "sink_odps", "sink_ads"
    8. ]
    9. }

删除Connector

请求

  • 请求语法

    1. DELETE /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK

Reload Connector

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: reload
ShardId String 不设置则Reload整个connector

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "reload"
    9. }

获取Connector Shard状态信息

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: status
ShardId String 获取此ShardId的状态信息

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
CurrentSequence Int64 当前处理点位信息
State Enum 当前Shard的运行状态
LastErrorMessage String 当State不为CONTEXT_EXECUTING时,返回错误信息
DiscardCount Int64 从connector运行到现在丢弃的数据总条数

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "status",
    9. "ShardId": "0"
    10. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "State": "CONTEXT_EXECUTING",
    7. "CurrentSequence": 10,
    8. "DiscardCount": 0,
    9. "LastErrorMessage": ""
    10. }

Append Connector Field

请求

  • 请求语法

    POST /projects//topics//connectors/ HTTP/1.1

  • 请求元素

名称 类型 描述
Action String Action为: appendfield
FieldName String Field名称

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "appendfiled",
    9. "FieldName": "field1"
    10. }

创建订阅

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: create
Comment String 描述信息

响应

  • 响应语法

    1. HTTP/1.1 201 Created
  • 响应元素

名称 类型 描述
SubId String 订阅Id

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "create",
    9. "Comment": "xxxx"
    10. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "SubId": "1542078393028fzsZx"
    7. }

查询订阅

请求

  • 请求语法

    1. GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
SubId String 订阅Id
State Int 0: online, 1: offline
Comment String 描述信息

示例

  • 请求示例

    1. GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Authorization: AuthorizationString
    4. Content-Type: applicaton/json
    5. Conent-Length: xxx
    6. {
    7. "SubId": "xxxx",
    8. "Comment": "xxxx"
    9. "State": 1
    10. }

查询订阅列表

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  • 请求元素
名称 类型 描述
Action String Action为: list
PageIndex Int 分页Index
PageSize Int 分页Size

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
TotalCount Int Page总数
  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "list",
    9. "PageIndex": 1,
    10. "PageSize": 10
    11. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "Subscriptions": [
    7. {
    8. "Comment": "xxxx",
    9. "State": 1,
    10. "SubId": "1542079169844gH8HM"
    11. },
    12. {
    13. "Comment": "xxxx",
    14. "State": 1,
    15. "SubId": "1542078393028fzsZx"
    16. }
    17. ],
    18. "TotalCount": 2
    19. }

删除订阅

请求

  • 请求语法

    1. DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

示例

  • 请求示例

    1. DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString

更新订阅状态

请求

  • 请求语法

    1. PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "State": 0
    9. }

open点位session

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  • 请求元素

名称 类型 描述
Action String Action为: open
ShardIds Array Shard列表

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素
名称 类型 描述
Timestamp Int64 点位时间戳,单位Ms
Sequence Int64 点位Sequence
Version Int64 Session VersionId
SessionId Int64 SessionId

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "open",
    9. "ShardIds": ["0"]
    10. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "Offsets": {
    7. "0": {
    8. "Timestamp": 1000,
    9. "Sequence": 1,
    10. "Version": 1,
    11. "SessionId": xxx
    12. }
    13. }
    14. }

查询点位

请求

  • 请求语法

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  • 请求元素

名称 类型 描述
Action String Action为: get
ShardIds Array Shard列表

响应

  • 响应语法

    1. HTTP/1.1 200 OK
  • 响应元素

名称 类型 描述
Timestamp Int64 点位时间戳,单位Ms
Sequence Int64 点位Sequence
Version Int64 Session VersionId

示例

  • 请求示例

    1. POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "get",
    9. "ShardIds": ["0"]
    10. }
  • 响应示例

    1. HTTP/1.1 200 OK
    2. x-datahub-request-id: 2018050817492199d6650a00000039
    3. Content-Type: applicaton/json
    4. Conent-Length: xxx
    5. {
    6. "Offsets": {
    7. "0": {
    8. "Timestamp": 1000,
    9. "Sequence": 1,
    10. "Version": 1,
    11. }
    12. }
    13. }

提交点位

请求

  • 请求语法

    1. PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  • 请求元素

名称 类型 描述
Action String Action为: commit
Timestamp Int64 点位时间戳,单位Ms
Sequence Int64 点位Sequence
Version Int64 Session VersionId
SessionId Int64 SessionId

响应

  • 响应语法

    1. HTTP/1.1 200 OK

示例

  • 请求示例

    1. PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
    2. x-datahub-client-version: 1.1
    3. Date: Tue, 08 May 2018 09:47:48 GMT
    4. Authorization: AuthorizationString
    5. Content-Type: applicaton/json
    6. Conent-Length: xxx
    7. {
    8. "Action": "commit",
    9. "Offsets": {
    10. "0": {
    11. "Timestamp": 1000,
    12. "Sequence": 1,
    13. "Version": 1,
    14. "SessionId": 1
    15. }
    16. }
    17. }