文档

API参考

更新时间:

API参考

一. 接口规范

1.公共请求Header

名字

描述

x-datahub-client-version

API版本信息

x-datahub-security-token

Security Token,如果存在

Date

标准GMT时间,格式:EEE, dd MMM yyyy HH:mm:ss z

Authorization

签名信息

Content-Type

传输数据序列化协议

2. 公共返回Header

名字

描述

Content-Type

传输数据序列化协议

Content-Length

传输数据长度

x-datahub-request-id

全局唯一请求ID

3. 错误码

名字

描述

备注

InvalidParameter

参数错误

InvalidCursor

Cursor无效

NoSuchXXX

资源不存在

XXXAlreadyExist

资源已存在

Unauthorized

认证失败

AccessKey信息错误,用户时间戳不对等

NoPermission

鉴权失败

RAM权限错误

OperationDenied

禁止操作

操作禁止,比如删除有topic存在的project

LimitExceeded

流控受限

服务端QPS、流量等限制

InvalidShardOperation

Shard分裂或合并,变成Sealed

MalformedRecord

数据格式不正确

OffsetReseted

点位重置

OffsetSessionChanged

SubId被其他用户open占用

SubscriptionOffline

订阅下线

InternalServerError

系统内部错误

其他

其他非可重试异常,后续可能会被回收

4. 统一错误返回格式

名字

描述

ErrorCode

错误码

ErrorMessage

详细错误描述信息

错误响应示例:

    HTTP/1.1 403
    x-datahub-request-id: 2018050817492199d6650a00000039
    Content-Type: application/json
    Content-Length: xxx

    {
        "ErrorCode": "Unauthorized",
        "ErrorMessage": "Authroize failed"
    }

5. 限制描述

名字

描述

ProjectName

长度:[3, 32],仅包含字母、数字和'_', 以字母开头,不区分大小写

TopicName

长度:[3, 128],仅包含字母、数字和'_', 以字母开头,不区分大小写

二、Authorization字段计算的方法

Authorization = "DATAHUB " + AccessId + ":" + Signature
Signature = base64(hmac-sha1(AccessKey,
            HTTPMethod + "\n"
            + Content-Type + "\n"
            + Date + "\n"
            + CanonicalizedDataHubHeaders + "\n"
            + CanonicalizedResource))

  • AccessKey 表示签名所需的密钥。

  • HTTPMethod 表示HTTP 请求的Method,主要有PUT、GET、POST、HEAD、DELETE等。

  • \n 表示换行符。

  • Content-Type 表示请求内容的类型,一般固定为“application/json”。

  • Date 表示此次操作的时间,且必须为GMT格式,如“Sun, 22 Nov 2015 08:16:38 GMT”。

  • CanonicalizedDataHubHeaders 表示以 x-datahub- 为前缀的HTTP Header的字典序排列。

  • CanonicalizedResource 表示用户想要访问的DataHub资源的Url,若包含param,必须按字典序。

CanonicalizedDataHubHeaders构造方法

所有以 x-datahub- 为前缀的HTTP Header被称为 CanonicalizedDataHubHeaders。构造方法如下:

  1. 将所有以 x-datahub- 为前缀的HTTP请求头的名字转换成小写 。如X-DATAHUB-Client-Version:1.1需要转换成x-datahub-client-version:1.1

  2. 如果请求是以STS获得的AccessKeyId和AccessKeySecret发送时,还需要将获得的security-token值以x-datahub-security-token:token的形式加入到签名字符串中。

  3. 将上一步得到的所有HTTP请求头按照名字的字典序进行升序排列。

  4. 删除请求头和内容之间分隔符两端出现的任何空格。如x-datahub-client-versionn : 1.1转换成:x-datahub-client-version:1.1

  5. 将每一个头和内容用 \n 分隔符分隔拼成最后的CanonicalizedDataHubHeaders

CanonicalizedResource构造方法

用户发送请求中想访问的DataHub目标资源被称为CanonicalizedResource。构造方法如下:

  1. CanonicalizedResource置成空字符串 “”;

  2. 放入要访问的DataHub资源,比如某个topic: /projects/<ProjectName>/topics/<TopicName>

  3. 如果请求的资源包含额外的URL参数,按照字典序,从小到大排列并以 & 为分隔符生成参数字符串。在CanonicalizedResource字符串尾添加 ?和参数字符串。此时的CanonicalizedResource如:/projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps?donetime

计算签名头规则

  • 签名的字符串必须为 UTF-8 格式。含有中文字符的签名字符串必须先进行 UTF-8 编码,再与AccessKey计算最终签名。

  • 签名的方法用RFC 2104中定义的HMAC-SHA1方法,其中Key为 AccessKey

  • x-datahub-开头的header在签名验证前需要符合以下规范:

  • header的名字需要变成小写。

  • header按字典序自小到大排序。

  • 分割header name和value的冒号前后不能有空格。

  • 每个Header之后都有一个换行符“\n”,如果没有Header,CanonicalizedDataHubHeaders就设置为空。

签名示例

请求

签名字符串计算公式

签名字符串

POST /projects/test_project/topics/test_topic HTTP/1.1 Host: https://dh-cn-hangzhou.aliyuncs.com

User-Agent: customer x-datahub-client-version: 1.1 Content-Type: application/json Date: Thu, 10 Jan 2019 07:28:29 GMT

Signature = base64(hmac-sha1(AccessKey,HTTPMethod + “\n” + Content-Type + “\n” + Date + “\n” + CanonicalizedDataHubHeaders+ CanonicalizedResource))

POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic

python计算签名方法如下:

import base64
import hmac
import sha
h = hmac.new("****your accessKey*****",
"POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic", sha)
Signature = base64.b64encode(h.digest())
print("Signature: %s" % Signature)

请求头部示例:

Authorization值格式:DATAHUB AccessId:Signature

POST /projects/test_project/topics/test_topic HTTP/1.1
Authorization: DATAHUB AccessId:Signature
Content-Type: application/json
Date: Thu, 10 Jan 2019 07:28:29 GMT
Host: http://dh-cn-hangzhou.aliyuncs.com
User-Agent: customer
x-datahub-client-version: 1.1

Java 8 签名参考样例

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

public abstract class Authorization {
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final String DEFAULT_HASH = "HmacSHA1";
    private static final String X_DATAHUB_PREFIX = "x-datahub";
    private static final String HEADER_CONTENT_TYPE = "Content-Type";
    private static final String HEADER_DATE = "Date";

    static public String getAkAuthorization(Request request) {
        String canonicalURL = request.getUrlPath();
        String canonicalQueryString = request.getQueryStrings();
        String canonicalHeaderString = getCanonicalHeaders(
                getSortedHeadersToSign(request.getHeaders()));

        String canonicalRequest = request.getMethod().toUpperCase() + "\n" +
                canonicalHeaderString + "\n" + canonicalURL;
        if (canonicalQueryString != null && !canonicalQueryString.isEmpty()) {
            canonicalRequest += "?" + canonicalQueryString;
        }

        String signature = HMAC1Sign(request.getAccessKey(), canonicalRequest);

        return "DATAHUB " + request.getAccessId() + ":" + signature;
    }

    static private String HMAC1Sign(String accessKey, String canonicalRequest) {
        try {
            SecretKeySpec signingKey = new SecretKeySpec(accessKey.getBytes(), DEFAULT_HASH);
            Mac mac = Mac.getInstance(DEFAULT_HASH);
            mac.init(signingKey);
            return Base64.getEncoder().encodeToString(
                    mac.doFinal(canonicalRequest.getBytes(DEFAULT_ENCODING))
            ).trim();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    static private String getCanonicalHeaders(Map<String, String> headers) {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<String, String>> pairs = headers.entrySet().iterator();
        while (pairs.hasNext()) {
            Map.Entry<String, String> pair = pairs.next();
            if (pair.getKey().startsWith(X_DATAHUB_PREFIX)) {
                sb.append(pair.getKey());
                sb.append(":");
                sb.append(pair.getValue());
            } else {
                sb.append(pair.getValue());
            }

            if (pairs.hasNext()) {
                sb.append("\n");
            }
        }
        return sb.toString();
    }

    static private SortedMap<String, String> getSortedHeadersToSign(Map<String, String> headers) {
        SortedMap<String, String> sortedHeaders = new TreeMap<>();

        for (Map.Entry<String, String> entry : headers.entrySet()) {
            String lowerKey = entry.getKey().toLowerCase();
            if (lowerKey.equalsIgnoreCase(HEADER_CONTENT_TYPE) ||
                    lowerKey.equalsIgnoreCase(HEADER_DATE) ||
                    lowerKey.startsWith(X_DATAHUB_PREFIX)) {
                if (!entry.getValue().isEmpty()) {
                    sortedHeaders.put(lowerKey, entry.getValue());
                }
            }
        }

        if (!sortedHeaders.containsKey(HEADER_CONTENT_TYPE.toLowerCase())) {
            sortedHeaders.put(HEADER_CONTENT_TYPE.toLowerCase(), "");
        }

        return sortedHeaders;
    }

    public static class Request {
        private String accessId;
        private String accessKey;
        private String urlPath;
        private String method;
        private Map<String, String> headers;
        private String queryStrings;

        public String getAccessId() {
            return accessId;
        }

        public Request setAccessId(String accessId) {
            this.accessId = accessId;
            return this;
        }

        public String getAccessKey() {
            return accessKey;
        }

        public Request setAccessKey(String accessKey) {
            this.accessKey = accessKey;
            return this;
        }

        public String getUrlPath() {
            return urlPath;
        }

        public Request setUrlPath(String urlPath) {
            this.urlPath = urlPath;
            return this;
        }

        public String getMethod() {
            return method;
        }

        public Request setMethod(String method) {
            this.method = method;
            return this;
        }

        public Map<String, String> getHeaders() {
            return headers;
        }

        public Request setHeaders(Map<String, String> headers) {
            this.headers = headers;
            return this;
        }

        public String getQueryStrings() {
            return queryStrings;
        }

        public Request setQueryStrings(String queryStrings) {
            this.queryStrings = queryStrings;
            return this;
        }
    }

    public static void main(String[] args) {
        Map<String, String> headers = new HashMap<>();
        headers.put("Date", "Thu, 10 Jan 2019 07:28:29 GMT");
        headers.put("x-datahub-client-version", "1.1");
        headers.put("Content-type", "application/json");

        String accessId = "testKeyID";
        String accessKey = "testKeySecret";
        String method = "POST";
        String path = "/projects/test_project/topics/test_topic";
        String canonicalQueryString = ""; //字典序 a=x&b=y

        Authorization.Request authRequest = new Authorization.Request()
                .setAccessId(accessId)
                .setAccessKey(accessKey)
                .setMethod(method.toUpperCase())
                .setUrlPath(path)
                .setHeaders(headers)
                .setQueryStrings(canonicalQueryString);
        System.out.println(Authorization.getAkAuthorization(authRequest));
    }
}

三、接口定义

创建Project

请求

请求语法

  POST /projects/<ProjectName> HTTP/1.1

请求元素

名称

类型

描述

Comment

String

描述信息,限制1024字节

响应

响应语法

  HTTP/1.1 201 Created

示例

请求示例

  POST /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx

  {
      "Comment": "test project"
  }

响应示例

  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

查询Project

请求

请求语法

  GET /projects/<ProjectName> HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

CreateTime

long

创建时间,单位:秒

LastModifyTime

long

更新时间,单位:秒

Comment

String

描述信息

示例

请求示例

  GET /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: xxx

  {
      "Comment": "test project",
      "CreateTime": 1525763481,
      "LastModifyTime": 1525763481
  }

查询Project列表

请求

请求语法

  GET /projects HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

ProjectNames

List

Project名称列表

示例

请求示例

  GET /projects HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "ProjectNames": [
          "project1",
          "projcet2"
      ]
  }

更新Project

请求

请求语法

  PUT /projects/<ProjectName> HTTP/1.1

请求元素

名称

类型

描述

Comment

String

描述信息

响应

响应语法

  HTTP/1.1 200 OK

示例

请求示例

  PUT /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx

  {
      "Comment": "update comment"
  }

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

删除Project

请求

请求语法

  DELETE /projects/<ProjectName> HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

示例

请求示例

  DELETE /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

创建Topic

请求

请求语法

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

请求元素

名称

类型

描述

Action

String

操作类型

ShardCount

int

初始shard数目

Lifecycle

int

数据存储生命周期

RecordType

String

BLOB(非结构化数据)/TUPLE(结构化数据)

RecordSchema

String

创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数

Comment

String

描述信息

ExpandMode

String

开启扩展模式传值extend,其他情况不需要该参数。

响应

响应语法

  HTTP/1.1 201 Created

示例

请求示例

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx

  {
      "Action": "create",
      "ShardCount": 1,
      "Lifecycle": 1,
      "RecordType": "TUPLE",
      "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}}",
      "Comment": "create topic",
      "ExpandMode": "extend"
  }

响应示例

  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: 0

查询Topic

请求

请求语法

  GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

ShardCount

int

初始shard数目

Lifecycle

int

数据存储生命周期

RecordType

String

BLOB(非结构化数据)/TUPLE(结构化数据)

RecordSchema

String

创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数

Comment

String

描述信息

CreateTime

long

创建时间

LastModifyTime

long

更新时间

示例

请求示例

  GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: xxx

  {
      "ShardCount": 1,
      "Lifecycle": 1,
      "RecordType": "TUPLE",
      "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
      "Comment": "create topic",
      "CreateTime": 1525763481,
      "LastModifyTime": 1525763481
  }

查询Topic列表

请求

请求语法

  GET /projects/<ProjectName>/topics HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

TopicNames

List

Project名称列表

示例

请求示例

  GET /projects/<ProjectNames>/topics HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "TopicNames": [
          "topic1",
          "topic2"
      ]
  }

更新Topic

请求

请求语法

  PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

请求元素

名称

类型

描述

Comment

String

描述信息

响应

响应语法

  HTTP/1.1 200 OK

示例

请求示例

  PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Comment": "update comment"
  }

删除Topic

请求

请求语法

  DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

示例

请求示例

  DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

获取Shard列表

请求

请求语法

  GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

ShardId

String

Shard Id

State

String

Shard当前状态,包括OPENING,ACTIVE,CLOSED等状态

BeginHashKey

String

起始HashKey

EndHashKey

String

终止HashKey

ParentShardIds

List

Shard分裂或合并之前的父Shard信息

示例

请求示例

  GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "Shards": [
              {
                      "ShardId": "0",
                      "State": "ACTIVE",
                      "BeginHashKey":"00000000000000000000000000000000",
                      "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                      "ParentShardIds:[]
              }
      ]
  }

分裂Shard

请求

请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:split

ShardId

String

需要分裂的Shard Id

SplitKey

String

按照此Key进行Split,SplitKey一般等于 BeginHashKey + (EndHashKey - BeginHashKey) / 2

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

NewShards

List

分裂后的Shard列表

示例

请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "split",
          "ShardId": "0",
          "SplitKye": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
  }

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "NewShards": [
                  {
                          "ShardId": "1",
                          "BeginHashKey":"00000000000000000000000000000000",
                          "EndHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
                  },
                  {
                          "ShardId":"0",
                          "BeginHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                          "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
                  }
          ]
  }

合并Shard

请求

请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

请求元素

名称

类型

描述

Action

String

Action为:merge

ShardId

String

需要合并的Shard Id

AdjacentShardId

String

临近的并且满足合并条件的Shard Id

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

ShardId

String

合并后的Shard Id

BeginHashKey

String

起始HashKey

EndHashKey

String

终止HashKey

示例

请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "merge",
          "ShardId": "0",
          "AdjacentShardId": "1"
  }

响应示例

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
        Content-Type: applicaton/json
        Conent-Length: xxx
        
        {
                "ShardId":"2",
                "BeginHashKey":"00000000000000000000000000000000",
                "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
        }

查询数据Cursor

请求

请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1

请求元素

名称

类型

描述

Action

String

Action为:cursor

Type

String

按照何种类型获取Cursor,包括:OLDEST, LATEST, SYSTEM_TIME, SEQUENCE

SystemTime

Int64

Type为SYSTEM_TIME时填写,单位ms

Sequence

Int64

Type为SEQUENCE时填写

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

Cursor

String

返回的Cursor信息

RecordTime

Int64

数据写入DataHub时间, 单位ms

Sequence

Int64

数据写入的Sequence,单Shard内唯一

示例

请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "cursor",
          "Type": "SEQUENCE",
          "Sequence": 1
  }

响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "Cursor": "30005af19b3800000000000000000000",
          "RecordTime": 1525783352873,
          "Sequence": 1
  }

写入数据 - 不按shard写入

请求

请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

请求元素

名称

类型

描述

Action

String

Action为:pub

ShardId

String

ShardId

Attributes

Map<String, String>

用户属性字段

Data

如果为BLOB类型,Data为数据Base64编码后数据;如果为TUPLE数据,为String类型数组

响应

响应语法

  HTTP/1.1 200 OK

响应元素

名称

类型

描述

FailedRecordCount

Int

失败条数

FailedRecords

Array

失败详情

示例

请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "Action": "pub",
          "Records": [
                  {
                          "ShardId": "0",
                          "Attributes": {
                                  "attr1": "value1",
                                  "attr2": "value2"
                          },
                          "Data": ["A","B","3","4"]
                  }
          ]
  }

  // BLOB
  {
          "Action": "pub",
          "Record": [
                  {
                          "ShardId": "0",
                          "Attributes": {
                                  "attr1": "value1",
                                  "attr2": "value2"
                          },
                          "Data": "Base64String"
                  }
          ]
  }

  • 响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "FailedRecordCount": 1,
      "FailedRecords": [
                      {
                              "Index": 0,
                              "ErrorCode": "errorCode",
                              "ErrorMessage": "errormsg"
                      }
      ]
  }

读取数据

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:sub

Cursor

String

从Cursor位置开始读取

Limit

Int

读取的条数

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 响应元素

名称

类型

描述

NextCursor

String

下一条数据的Cursor信息

SystemTime

Int64

record写入DataHub的时间,单位ms

Cursor

String

record对应的Cursor信息

Sequence

Int64

record写入DataHub的Sequence

Attributes

Map

用户的属性字段

Data

用户的数据字段

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "sub",
          "Cursor": "30005af19b3800000000000000000000",
          "Limit": 1
  }

  • 响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "NextCursor": "30005af19b3800000000000000090001",
      "Records": [
                      {
                              "Cursor": "30005af19b3800000000000000000000",
                              "SystemTime": 1525783352873,
                              "Sequence": 1,
                              "Attributes": {
                                      "key1": "value1",
                                      "key2": "value2"
                              },
                              "Data": ["AAA", "100"]
                      }
      ]
  }

新增Field

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:appendfield

FieldName

String

Field名称

FieldType

String

Field类型,包括STRING, BIGINT等

响应

  • 响应语法

  HTTP/1.1 200 OK

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "appendfield",
          "FieldName": "field1",
          "FieldType": "BIGINT"
  }

创建Connector

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

  • 请求元素

名称

类型

描述

Type

String

Connector类型,如SINK_ODPS等

ColumnFields

Array

需同步的Field列表

Config

Map

Connector相关配置

响应

  • 响应语法

  HTTP/1.1 201 Created

示例

  • 请求示例,以SINK_ODPS为例

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Type": "SINK_ODPS",
          "ColumnFields": ["field1", "field2"],
          "Config": {
                  "Project": "odpsProject",
                  "Topic": "odpsTopic",
                  "OdpsEndpoint": "xxx",
                  "TunnelEndpoint": "xxx",
                  "AccessId": "xxx",
                  "AccessKey": "xxx",
                  "PartitionMode": "SYSTEM_TIME",
                  "TimeRange": 60,
                  "PartitionConfig": {
                          "pt": "%Y%m%d",
                          "ct": "%H%M"
                  }
          }
  }

查询Connector

请求

  • 请求语法

  GET /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

响应

  • 响应语法

  HTTP/1.1 200 OK

查询Connector列表

请求

  • 请求语法

  GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1

响应

  • 响应语法

  HTTP/1.1 200 OK

示例

  • 请求示例

  GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

  • 响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "Connectors": [
                  "sink_odps", "sink_ads"
          ]
  }

删除Connector

请求

  • 请求语法

  DELETE /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

响应

  • 响应语法

  HTTP/1.1 200 OK

Reload Connector

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:reload

ShardId

String

不设置则Reload整个connector

响应

  • 响应语法

  HTTP/1.1 200 OK

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "reload"
  }

获取Connector Shard状态信息

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:status

ShardId

String

获取此ShardId的状态信息

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 响应元素

名称

类型

描述

CurrentSequence

Int64

当前处理点位信息

State

Enum

当前Shard的运行状态

LastErrorMessage

String

当State不为CONTEXT_EXECUTING时,返回错误信息

DiscardCount

Int64

从connector运行到现在丢弃的数据总条数

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "status",
          "ShardId": "0"
  }

  • 响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "State": "CONTEXT_EXECUTING",
          "CurrentSequence": 10,
          "DiscardCount": 0,
          "LastErrorMessage": ""
  }

Append Connector Field

请求

  • 请求语法

POST /projects//topics//connectors/ HTTP/1.1
  • 请求元素

名称

类型

描述

Action

String

Action为:appendfield

FieldName

String

Field名称

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "appendfiled",
          "FieldName": "field1"
  }

创建订阅

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:create

Comment

String

描述信息

响应

  • 响应语法

  HTTP/1.1 201 Created

  • 响应元素

名称

类型

描述

SubId

String

订阅Id

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "create",
          "Comment": "xxxx"
  }

  • 响应示例

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
         Content-Type: applicaton/json
  Conent-Length: xxx

        {
          "SubId": "1542078393028fzsZx"
  }

查询订阅

请求

  • 请求语法

  GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 响应元素

名称

类型

描述

SubId

String

订阅Id

State

Int

0: online, 1: offline

Comment

String

描述信息

示例

  • 请求示例

  GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT

  • 响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "SubId": "xxxx",
          "Comment": "xxxx"
          "State": 1
  }

查询订阅列表

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:list

PageIndex

Int

分页Index

PageSize

Int

分页Size

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 响应元素

名称

类型

描述

TotalCount

Int

Page总数

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "list",
          "PageIndex": 1,
          "PageSize": 10
  }

  • 响应示例

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Subscriptions": [
                  {
                           "Comment": "xxxx",
                            "State": 1,
                            "SubId": "1542079169844gH8HM"
                  },
                  {
                          "Comment": "xxxx",
                          "State": 1,
                          "SubId": "1542078393028fzsZx"
                  }
          ],
          "TotalCount": 2
  }

删除订阅

请求

  • 请求语法

  DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

示例

  • 请求示例

  DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

更新订阅状态

请求

  • 请求语法

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

响应

  • 响应语法

  HTTP/1.1 200 OK

示例

  • 请求示例

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "State": 0
  }

open点位session

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:open

ShardIds

Array

Shard列表

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 响应元素

名称

类型

描述

Timestamp

Int64

点位时间戳,单位ms

Sequence

Int64

点位Sequence

Version

Int64

Session VersionId

SessionId

String

SessionId

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "open",
          "ShardIds": ["0"]
  }

  • 响应示例

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": "xxx"
                  }
          }
  }

查询点位

请求

  • 请求语法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:get

ShardIds

Array

Shard列表

响应

  • 响应语法

  HTTP/1.1 200 OK

  • 响应元素

名称

类型

描述

Timestamp

Int64

点位时间戳,单位ms

Sequence

Int64

点位Sequence

Version

Int64

Session VersionId

SessionId

String

SessionId

示例

  • 请求示例

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "get",
          "ShardIds": ["0"]
  }

  • 响应示例

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": "xxx"
                  }
          }
  }

提交点位

请求

  • 请求语法

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1

  • 请求元素

名称

类型

描述

Action

String

Action为:commit

Timestamp

Int64

点位时间戳,单位ms

Sequence

Int64

点位Sequence

Version

Int64

Session VersionId

SessionId

Int64

SessionId

响应

  • 响应语法

  HTTP/1.1 200 OK

示例

  • 请求示例

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "commit",
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": 1
                  }
          }
  }