API参考
一. 接口规范
1.公共请求Header
| 名字 | 描述 | 
| x-datahub-client-version | API版本信息 | 
| x-datahub-security-token | Security Token,如果存在 | 
| Date | 标准GMT时间,格式:EEE, dd MMM yyyy HH:mm:ss z | 
| Authorization | 签名信息 | 
| Content-Type | 传输数据序列化协议 | 
2. 公共返回Header
| 名字 | 描述 | 
| Content-Type | 传输数据序列化协议 | 
| Content-Length | 传输数据长度 | 
| x-datahub-request-id | 全局唯一请求ID | 
3. 错误码
| 名字 | 描述 | 备注 | 
| InvalidParameter | 参数错误 | |
| InvalidCursor | Cursor无效 | |
| NoSuchXXX | 资源不存在 | |
| XXXAlreadyExist | 资源已存在 | |
| Unauthorized | 认证失败 | AccessKey信息错误,用户时间戳不对等 | 
| NoPermission | 鉴权失败 | RAM权限错误 | 
| OperationDenied | 禁止操作 | 操作禁止,比如删除有topic存在的project | 
| LimitExceeded | 流控受限 | 服务端QPS、流量等限制 | 
| InvalidShardOperation | Shard分裂或合并,变成Sealed | |
| MalformedRecord | 数据格式不正确 | |
| OffsetReseted | 点位重置 | |
| OffsetSessionChanged | SubId被其他用户open占用 | |
| SubscriptionOffline | 订阅下线 | |
| InternalServerError | 系统内部错误 | |
| 其他 | 其他非可重试异常,后续可能会被回收 | 
4. 统一错误返回格式
| 名字 | 描述 | 
| ErrorCode | 错误码 | 
| ErrorMessage | 详细错误描述信息 | 
错误响应示例:
    HTTP/1.1 403
    x-datahub-request-id: 2018050817492199d6650a00000039
    Content-Type: application/json
    Content-Length: xxx
    {
        "ErrorCode": "Unauthorized",
        "ErrorMessage": "Authroize failed"
    }5. 限制描述
| 名字 | 描述 | 
| ProjectName | 长度:[3, 32],仅包含字母、数字和'_', 以字母开头,不区分大小写 | 
| TopicName | 长度:[3, 128],仅包含字母、数字和'_', 以字母开头,不区分大小写 | 
二、Authorization字段计算的方法
Authorization = "DATAHUB " + AccessId + ":" + Signature
Signature = base64(hmac-sha1(AccessKey,
            HTTPMethod + "\n"
            + Content-Type + "\n"
            + Date + "\n"
            + CanonicalizedDataHubHeaders + "\n"
            + CanonicalizedResource))- AccessKey表示签名所需的密钥。
- HTTPMethod表示HTTP 请求的Method,主要有PUT、GET、POST、HEAD、DELETE等。
- \n表示换行符。
- Content-Type表示请求内容的类型,一般固定为“application/json”。
- Date表示此次操作的时间,且必须为GMT格式,如“Sun, 22 Nov 2015 08:16:38 GMT”。
- CanonicalizedDataHubHeaders表示以- x-datahub-为前缀的HTTP Header的字典序排列。
- CanonicalizedResource表示用户想要访问的DataHub资源的Url,若包含param,必须按字典序。
CanonicalizedDataHubHeaders构造方法
所有以 x-datahub- 为前缀的HTTP Header被称为 CanonicalizedDataHubHeaders。构造方法如下:
- 将所有以 - x-datahub-为前缀的HTTP请求头的名字转换成小写 。如- X-DATAHUB-Client-Version:1.1需要转换成- x-datahub-client-version:1.1。
- 如果请求是以STS获得的AccessKeyId和AccessKeySecret发送时,还需要将获得的security-token值以 - x-datahub-security-token:token的形式加入到签名字符串中。
- 将上一步得到的所有HTTP请求头按照名字的字典序进行升序排列。 
- 删除请求头和内容之间分隔符两端出现的任何空格。如 - x-datahub-client-versionn : 1.1转换成:- x-datahub-client-version:1.1。
- 将每一个头和内容用 - \n分隔符分隔拼成最后的- CanonicalizedDataHubHeaders。
CanonicalizedResource构造方法
用户发送请求中想访问的DataHub目标资源被称为CanonicalizedResource。构造方法如下:
- 将 - CanonicalizedResource置成空字符串 “”;
- 放入要访问的DataHub资源,比如某个topic: - /projects/<ProjectName>/topics/<TopicName>
- 如果请求的资源包含额外的URL参数,按照字典序,从小到大排列并以 & 为分隔符生成参数字符串。在CanonicalizedResource字符串尾添加 ?和参数字符串。此时的CanonicalizedResource如: - /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps?donetime
计算签名头规则
- 签名的字符串必须为 UTF-8 格式。含有中文字符的签名字符串必须先进行 UTF-8 编码,再与 - AccessKey计算最终签名。
- 签名的方法用RFC 2104中定义的HMAC-SHA1方法,其中Key为 - AccessKey。
- 以 - x-datahub-开头的header在签名验证前需要符合以下规范:
- header的名字需要变成小写。 
- header按字典序自小到大排序。 
- 分割header name和value的冒号前后不能有空格。 
- 每个Header之后都有一个换行符“\n”,如果没有Header,CanonicalizedDataHubHeaders就设置为空。 
签名示例:
| 请求 | 签名字符串计算公式 | 签名字符串 | 
| POST /projects/test_project/topics/test_topic HTTP/1.1 Host: https://dh-cn-hangzhou.aliyuncs.com User-Agent: customer x-datahub-client-version: 1.1 Content-Type: application/json Date: Thu, 10 Jan 2019 07:28:29 GMT | Signature = base64(hmac-sha1(AccessKey,HTTPMethod + “\n” + Content-Type + “\n” + Date + “\n” + CanonicalizedDataHubHeaders+ CanonicalizedResource)) | POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic | 
python计算签名方法如下:
import base64
import hmac
import sha
h = hmac.new("****your accessKey*****",
"POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic", sha)
Signature = base64.b64encode(h.digest())
print("Signature: %s" % Signature)请求头部示例:
Authorization值格式:DATAHUB AccessId:Signature
POST /projects/test_project/topics/test_topic HTTP/1.1
Authorization: DATAHUB AccessId:Signature
Content-Type: application/json
Date: Thu, 10 Jan 2019 07:28:29 GMT
Host: http://dh-cn-hangzhou.aliyuncs.com
User-Agent: customer
x-datahub-client-version: 1.1Java 8 签名参考样例
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
public abstract class Authorization {
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final String DEFAULT_HASH = "HmacSHA1";
    private static final String X_DATAHUB_PREFIX = "x-datahub";
    private static final String HEADER_CONTENT_TYPE = "Content-Type";
    private static final String HEADER_DATE = "Date";
    static public String getAkAuthorization(Request request) {
        String canonicalURL = request.getUrlPath();
        String canonicalQueryString = request.getQueryStrings();
        String canonicalHeaderString = getCanonicalHeaders(
                getSortedHeadersToSign(request.getHeaders()));
        String canonicalRequest = request.getMethod().toUpperCase() + "\n" +
                canonicalHeaderString + "\n" + canonicalURL;
        if (canonicalQueryString != null && !canonicalQueryString.isEmpty()) {
            canonicalRequest += "?" + canonicalQueryString;
        }
        String signature = HMAC1Sign(request.getAccessKey(), canonicalRequest);
        return "DATAHUB " + request.getAccessId() + ":" + signature;
    }
    static private String HMAC1Sign(String accessKey, String canonicalRequest) {
        try {
            SecretKeySpec signingKey = new SecretKeySpec(accessKey.getBytes(), DEFAULT_HASH);
            Mac mac = Mac.getInstance(DEFAULT_HASH);
            mac.init(signingKey);
            return Base64.getEncoder().encodeToString(
                    mac.doFinal(canonicalRequest.getBytes(DEFAULT_ENCODING))
            ).trim();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
    static private String getCanonicalHeaders(Map<String, String> headers) {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<String, String>> pairs = headers.entrySet().iterator();
        while (pairs.hasNext()) {
            Map.Entry<String, String> pair = pairs.next();
            if (pair.getKey().startsWith(X_DATAHUB_PREFIX)) {
                sb.append(pair.getKey());
                sb.append(":");
                sb.append(pair.getValue());
            } else {
                sb.append(pair.getValue());
            }
            if (pairs.hasNext()) {
                sb.append("\n");
            }
        }
        return sb.toString();
    }
    static private SortedMap<String, String> getSortedHeadersToSign(Map<String, String> headers) {
        SortedMap<String, String> sortedHeaders = new TreeMap<>();
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            String lowerKey = entry.getKey().toLowerCase();
            if (lowerKey.equalsIgnoreCase(HEADER_CONTENT_TYPE) ||
                    lowerKey.equalsIgnoreCase(HEADER_DATE) ||
                    lowerKey.startsWith(X_DATAHUB_PREFIX)) {
                if (!entry.getValue().isEmpty()) {
                    sortedHeaders.put(lowerKey, entry.getValue());
                }
            }
        }
        if (!sortedHeaders.containsKey(HEADER_CONTENT_TYPE.toLowerCase())) {
            sortedHeaders.put(HEADER_CONTENT_TYPE.toLowerCase(), "");
        }
        return sortedHeaders;
    }
    public static class Request {
        private String accessId;
        private String accessKey;
        private String urlPath;
        private String method;
        private Map<String, String> headers;
        private String queryStrings;
        public String getAccessId() {
            return accessId;
        }
        public Request setAccessId(String accessId) {
            this.accessId = accessId;
            return this;
        }
        public String getAccessKey() {
            return accessKey;
        }
        public Request setAccessKey(String accessKey) {
            this.accessKey = accessKey;
            return this;
        }
        public String getUrlPath() {
            return urlPath;
        }
        public Request setUrlPath(String urlPath) {
            this.urlPath = urlPath;
            return this;
        }
        public String getMethod() {
            return method;
        }
        public Request setMethod(String method) {
            this.method = method;
            return this;
        }
        public Map<String, String> getHeaders() {
            return headers;
        }
        public Request setHeaders(Map<String, String> headers) {
            this.headers = headers;
            return this;
        }
        public String getQueryStrings() {
            return queryStrings;
        }
        public Request setQueryStrings(String queryStrings) {
            this.queryStrings = queryStrings;
            return this;
        }
    }
    public static void main(String[] args) {
        Map<String, String> headers = new HashMap<>();
        headers.put("Date", "Thu, 10 Jan 2019 07:28:29 GMT");
        headers.put("x-datahub-client-version", "1.1");
        headers.put("Content-type", "application/json");
        String accessId = "testKeyID";
        String accessKey = "testKeySecret";
        String method = "POST";
        String path = "/projects/test_project/topics/test_topic";
        String canonicalQueryString = ""; //字典序 a=x&b=y
        Authorization.Request authRequest = new Authorization.Request()
                .setAccessId(accessId)
                .setAccessKey(accessKey)
                .setMethod(method.toUpperCase())
                .setUrlPath(path)
                .setHeaders(headers)
                .setQueryStrings(canonicalQueryString);
        System.out.println(Authorization.getAkAuthorization(authRequest));
    }
}三、接口定义
创建Project
请求
请求语法
  POST /projects/<ProjectName> HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Comment | String | 描述信息,限制1024字节 | 
响应
响应语法
  HTTP/1.1 201 Created示例
请求示例
  POST /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  {
      "Comment": "test project"
  }响应示例
  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0查询Project
请求
请求语法
  GET /projects/<ProjectName> HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| CreateTime | long | 创建时间,单位:秒 | 
| LastModifyTime | long | 更新时间,单位:秒 | 
| Comment | String | 描述信息 | 
示例
请求示例
  GET /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: xxx
  {
      "Comment": "test project",
      "CreateTime": 1525763481,
      "LastModifyTime": 1525763481
  }查询Project列表
请求
请求语法
  GET /projects HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| ProjectNames | List | Project名称列表 | 
示例
请求示例
  GET /projects HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
      "ProjectNames": [
          "project1",
          "projcet2"
      ]
  }更新Project
请求
请求语法
  PUT /projects/<ProjectName> HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Comment | String | 描述信息 | 
响应
响应语法
  HTTP/1.1 200 OK示例
请求示例
  PUT /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  {
      "Comment": "update comment"
  }响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0删除Project
请求
请求语法
  DELETE /projects/<ProjectName> HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK示例
请求示例
  DELETE /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0创建Topic
请求
请求语法
  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Action | String | 操作类型 | 
| ShardCount | int | 初始shard数目 | 
| Lifecycle | int | 数据存储生命周期 | 
| RecordType | String | BLOB(非结构化数据)/TUPLE(结构化数据) | 
| RecordSchema | String | 创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数 | 
| Comment | String | 描述信息 | 
| ExpandMode | String | 开启扩展模式传值extend,其他情况不需要该参数。 | 
响应
响应语法
  HTTP/1.1 201 Created示例
请求示例
  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  {
      "Action": "create",
      "ShardCount": 1,
      "Lifecycle": 1,
      "RecordType": "TUPLE",
      "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}}",
      "Comment": "create topic",
      "ExpandMode": "extend"
  }响应示例
  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: 0查询Topic
请求
请求语法
  GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| ShardCount | int | 初始shard数目 | 
| Lifecycle | int | 数据存储生命周期 | 
| RecordType | String | BLOB(非结构化数据)/TUPLE(结构化数据) | 
| RecordSchema | String | 创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数 | 
| Comment | String | 描述信息 | 
| CreateTime | long | 创建时间 | 
| LastModifyTime | long | 更新时间 | 
示例
请求示例
  GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: xxx
  {
      "ShardCount": 1,
      "Lifecycle": 1,
      "RecordType": "TUPLE",
      "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
      "Comment": "create topic",
      "CreateTime": 1525763481,
      "LastModifyTime": 1525763481
  }查询Topic列表
请求
请求语法
  GET /projects/<ProjectName>/topics HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| TopicNames | List | Project名称列表 | 
示例
请求示例
  GET /projects/<ProjectNames>/topics HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
      "TopicNames": [
          "topic1",
          "topic2"
      ]
  }更新Topic
请求
请求语法
  PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Comment | String | 描述信息 | 
响应
响应语法
  HTTP/1.1 200 OK示例
请求示例
  PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Comment": "update comment"
  }删除Topic
请求
请求语法
  DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK示例
请求示例
  DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0获取Shard列表
请求
请求语法
  GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| ShardId | String | Shard Id | 
| State | String | Shard当前状态,包括OPENING,ACTIVE,CLOSED等状态 | 
| BeginHashKey | String | 起始HashKey | 
| EndHashKey | String | 终止HashKey | 
| ParentShardIds | List | Shard分裂或合并之前的父Shard信息 | 
示例
请求示例
  GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
      "Shards": [
              {
                      "ShardId": "0",
                      "State": "ACTIVE",
                      "BeginHashKey":"00000000000000000000000000000000",
                      "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                      "ParentShardIds:[]
              }
      ]
  }分裂Shard
请求
请求语法
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:split | 
| ShardId | String | 需要分裂的Shard Id | 
| SplitKey | String | 按照此Key进行Split,SplitKey一般等于 BeginHashKey + (EndHashKey - BeginHashKey) / 2 | 
响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| NewShards | List | 分裂后的Shard列表 | 
示例
请求示例
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "split",
          "ShardId": "0",
          "SplitKye": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
  }响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
          "NewShards": [
                  {
                          "ShardId": "1",
                          "BeginHashKey":"00000000000000000000000000000000",
                          "EndHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
                  },
                  {
                          "ShardId":"0",
                          "BeginHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                          "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
                  }
          ]
  }合并Shard
请求
请求语法
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Action | String | Action为:merge | 
| ShardId | String | 需要合并的Shard Id | 
| AdjacentShardId | String | 临近的并且满足合并条件的Shard Id | 
响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| ShardId | String | 合并后的Shard Id | 
| BeginHashKey | String | 起始HashKey | 
| EndHashKey | String | 终止HashKey | 
示例
请求示例
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "merge",
          "ShardId": "0",
          "AdjacentShardId": "1"
  }响应示例
  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
        Content-Type: applicaton/json
        Conent-Length: xxx
        
        {
                "ShardId":"2",
                "BeginHashKey":"00000000000000000000000000000000",
                "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
        }查询数据Cursor
请求
请求语法
  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Action | String | Action为:cursor | 
| Type | String | 按照何种类型获取Cursor,包括:OLDEST, LATEST, SYSTEM_TIME, SEQUENCE | 
| SystemTime | Int64 | Type为SYSTEM_TIME时填写,单位ms | 
| Sequence | Int64 | Type为SEQUENCE时填写 | 
响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| Cursor | String | 返回的Cursor信息 | 
| RecordTime | Int64 | 数据写入DataHub时间, 单位ms | 
| Sequence | Int64 | 数据写入的Sequence,单Shard内唯一 | 
示例
请求示例
  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "cursor",
          "Type": "SEQUENCE",
          "Sequence": 1
  }响应示例
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
          "Cursor": "30005af19b3800000000000000000000",
          "RecordTime": 1525783352873,
          "Sequence": 1
  }写入数据 - 不按shard写入
请求
请求语法
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1请求元素
| 名称 | 类型 | 描述 | 
| Action | String | Action为:pub | 
| ShardId | String | ShardId | 
| Attributes | Map<String, String> | 用户属性字段 | 
| Data | 如果为BLOB类型,Data为数据Base64编码后数据;如果为TUPLE数据,为String类型数组 | 
响应
响应语法
  HTTP/1.1 200 OK响应元素
| 名称 | 类型 | 描述 | 
| FailedRecordCount | Int | 失败条数 | 
| FailedRecords | Array | 失败详情 | 
示例
请求示例
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
          "Action": "pub",
          "Records": [
                  {
                          "ShardId": "0",
                          "Attributes": {
                                  "attr1": "value1",
                                  "attr2": "value2"
                          },
                          "Data": ["A","B","3","4"]
                  }
          ]
  }  // BLOB
  {
          "Action": "pub",
          "Record": [
                  {
                          "ShardId": "0",
                          "Attributes": {
                                  "attr1": "value1",
                                  "attr2": "value2"
                          },
                          "Data": "Base64String"
                  }
          ]
  }- 响应示例 
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
      "FailedRecordCount": 1,
      "FailedRecords": [
                      {
                              "Index": 0,
                              "ErrorCode": "errorCode",
                              "ErrorMessage": "errormsg"
                      }
      ]
  }读取数据
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:sub | 
| Cursor | String | 从Cursor位置开始读取 | 
| Limit | Int | 读取的条数 | 
响应
- 响应语法 
  HTTP/1.1 200 OK- 响应元素 
| 名称 | 类型 | 描述 | 
| NextCursor | String | 下一条数据的Cursor信息 | 
| SystemTime | Int64 | record写入DataHub的时间,单位ms | 
| Cursor | String | record对应的Cursor信息 | 
| Sequence | Int64 | record写入DataHub的Sequence | 
| Attributes | Map | 用户的属性字段 | 
| Data | 用户的数据字段 | 
示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "sub",
          "Cursor": "30005af19b3800000000000000000000",
          "Limit": 1
  }- 响应示例 
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
      "NextCursor": "30005af19b3800000000000000090001",
      "Records": [
                      {
                              "Cursor": "30005af19b3800000000000000000000",
                              "SystemTime": 1525783352873,
                              "Sequence": 1,
                              "Attributes": {
                                      "key1": "value1",
                                      "key2": "value2"
                              },
                              "Data": ["AAA", "100"]
                      }
      ]
  }新增Field
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:appendfield | 
| FieldName | String | Field名称 | 
| FieldType | String | Field类型,包括STRING, BIGINT等 | 
响应
- 响应语法 
  HTTP/1.1 200 OK示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "appendfield",
          "FieldName": "field1",
          "FieldType": "BIGINT"
  }创建Connector
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Type | String | Connector类型,如SINK_ODPS等 | 
| ColumnFields | Array | 需同步的Field列表 | 
| Config | Map | Connector相关配置 | 
响应
- 响应语法 
  HTTP/1.1 201 Created示例
- 请求示例,以SINK_ODPS为例 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Type": "SINK_ODPS",
          "ColumnFields": ["field1", "field2"],
          "Config": {
                  "Project": "odpsProject",
                  "Topic": "odpsTopic",
                  "OdpsEndpoint": "xxx",
                  "TunnelEndpoint": "xxx",
                  "AccessId": "xxx",
                  "AccessKey": "xxx",
                  "PartitionMode": "SYSTEM_TIME",
                  "TimeRange": 60,
                  "PartitionConfig": {
                          "pt": "%Y%m%d",
                          "ct": "%H%M"
                  }
          }
  }查询Connector
请求
- 请求语法 
  GET /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1响应
- 响应语法 
  HTTP/1.1 200 OK查询Connector列表
请求
- 请求语法 
  GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1响应
- 响应语法 
  HTTP/1.1 200 OK示例
- 请求示例 
  GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString- 响应示例 
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
          "Connectors": [
                  "sink_odps", "sink_ads"
          ]
  }删除Connector
请求
- 请求语法 
  DELETE /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1响应
- 响应语法 
  HTTP/1.1 200 OKReload Connector
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:reload | 
| ShardId | String | 不设置则Reload整个connector | 
响应
- 响应语法 
  HTTP/1.1 200 OK示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "reload"
  }获取Connector Shard状态信息
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:status | 
| ShardId | String | 获取此ShardId的状态信息 | 
响应
- 响应语法 
  HTTP/1.1 200 OK- 响应元素 
| 名称 | 类型 | 描述 | 
| CurrentSequence | Int64 | 当前处理点位信息 | 
| State | Enum | 当前Shard的运行状态 | 
| LastErrorMessage | String | 当State不为CONTEXT_EXECUTING时,返回错误信息 | 
| DiscardCount | Int64 | 从connector运行到现在丢弃的数据总条数 | 
示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "status",
          "ShardId": "0"
  }- 响应示例 
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  {
          "State": "CONTEXT_EXECUTING",
          "CurrentSequence": 10,
          "DiscardCount": 0,
          "LastErrorMessage": ""
  }Append Connector Field
请求
- 请求语法 
POST /projects//topics//connectors/ HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:appendfield | 
| FieldName | String | Field名称 | 
响应
- 响应语法 
  HTTP/1.1 200 OK- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "appendfiled",
          "FieldName": "field1"
  }创建订阅
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:create | 
| Comment | String | 描述信息 | 
响应
- 响应语法 
  HTTP/1.1 201 Created- 响应元素 
| 名称 | 类型 | 描述 | 
| SubId | String | 订阅Id | 
示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "create",
          "Comment": "xxxx"
  }- 响应示例 
  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
         Content-Type: applicaton/json
  Conent-Length: xxx
        {
          "SubId": "1542078393028fzsZx"
  }查询订阅
请求
- 请求语法 
  GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1响应
- 响应语法 
  HTTP/1.1 200 OK- 响应元素 
| 名称 | 类型 | 描述 | 
| SubId | String | 订阅Id | 
| State | Int | 0: online, 1: offline | 
| Comment | String | 描述信息 | 
示例
- 请求示例 
  GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT- 响应示例 
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "SubId": "xxxx",
          "Comment": "xxxx"
          "State": 1
  }查询订阅列表
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:list | 
| PageIndex | Int | 分页Index | 
| PageSize | Int | 分页Size | 
响应
- 响应语法 
  HTTP/1.1 200 OK- 响应元素 
| 名称 | 类型 | 描述 | 
| TotalCount | Int | Page总数 | 
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "list",
          "PageIndex": 1,
          "PageSize": 10
  }- 响应示例 
  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Subscriptions": [
                  {
                           "Comment": "xxxx",
                            "State": 1,
                            "SubId": "1542079169844gH8HM"
                  },
                  {
                          "Comment": "xxxx",
                          "State": 1,
                          "SubId": "1542078393028fzsZx"
                  }
          ],
          "TotalCount": 2
  }删除订阅
请求
- 请求语法 
  DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1示例
- 请求示例 
  DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString更新订阅状态
请求
- 请求语法 
  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1响应
- 响应语法 
  HTTP/1.1 200 OK示例
- 请求示例 
  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "State": 0
  }open点位session
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:open | 
| ShardIds | Array | Shard列表 | 
响应
- 响应语法 
  HTTP/1.1 200 OK- 响应元素 
| 名称 | 类型 | 描述 | 
| Timestamp | Int64 | 点位时间戳,单位ms | 
| Sequence | Int64 | 点位Sequence | 
| Version | Int64 | Session VersionId | 
| SessionId | String | SessionId | 
示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "open",
          "ShardIds": ["0"]
  }- 响应示例 
  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": "xxx"
                  }
          }
  }查询点位
请求
- 请求语法 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:get | 
| ShardIds | Array | Shard列表 | 
响应
- 响应语法 
  HTTP/1.1 200 OK- 响应元素 
| 名称 | 类型 | 描述 | 
| Timestamp | Int64 | 点位时间戳,单位ms | 
| Sequence | Int64 | 点位Sequence | 
| Version | Int64 | Session VersionId | 
| SessionId | String | SessionId | 
示例
- 请求示例 
  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "get",
          "ShardIds": ["0"]
  }- 响应示例 
  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": "xxx"
                  }
          }
  }提交点位
请求
- 请求语法 
  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1- 请求元素 
| 名称 | 类型 | 描述 | 
| Action | String | Action为:commit | 
| Timestamp | Int64 | 点位时间戳,单位ms | 
| Sequence | Int64 | 点位Sequence | 
| Version | Int64 | Session VersionId | 
| SessionId | Int64 | SessionId | 
响应
- 响应语法 
  HTTP/1.1 200 OK示例
- 请求示例 
  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "commit",
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": 1
                  }
          }
  }