API参考
一. 接口规范
1.公共请求Header
名字 | 描述 |
x-datahub-client-version | API版本信息 |
x-datahub-security-token | Security Token,如果存在 |
Date | 标准GMT时间,格式:EEE, dd MMM yyyy HH:mm:ss z |
Authorization | 签名信息 |
Content-Type | 传输数据序列化协议 |
2. 公共返回Header
名字 | 描述 |
Content-Type | 传输数据序列化协议 |
Content-Length | 传输数据长度 |
x-datahub-request-id | 全局唯一请求ID |
3. 错误码
名字 | 描述 | 备注 |
InvalidParameter | 参数错误 | |
InvalidCursor | Cursor无效 | |
NoSuchXXX | 资源不存在 | |
XXXAlreadyExist | 资源已存在 | |
Unauthorized | 认证失败 | AccessKey信息错误,用户时间戳不对等 |
NoPermission | 鉴权失败 | RAM权限错误 |
OperationDenied | 禁止操作 | 操作禁止,比如删除有topic存在的project |
LimitExceeded | 流控受限 | 服务端QPS、流量等限制 |
InvalidShardOperation | Shard分裂或合并,变成Sealed | |
MalformedRecord | 数据格式不正确 | |
OffsetReseted | 点位重置 | |
OffsetSessionChanged | SubId被其他用户open占用 | |
SubscriptionOffline | 订阅下线 | |
InternalServerError | 系统内部错误 | |
其他 | 其他非可重试异常,后续可能会被回收 |
4. 统一错误返回格式
名字 | 描述 |
ErrorCode | 错误码 |
ErrorMessage | 详细错误描述信息 |
错误响应示例:
HTTP/1.1 403
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: application/json
Content-Length: xxx
{
"ErrorCode": "Unauthorized",
"ErrorMessage": "Authroize failed"
}
5. 限制描述
名字 | 描述 |
ProjectName | 长度:[3, 32],仅包含字母、数字和'_', 以字母开头,不区分大小写 |
TopicName | 长度:[3, 128],仅包含字母、数字和'_', 以字母开头,不区分大小写 |
二、Authorization字段计算的方法
Authorization = "DATAHUB " + AccessId + ":" + Signature
Signature = base64(hmac-sha1(AccessKey,
HTTPMethod + "\n"
+ Content-Type + "\n"
+ Date + "\n"
+ CanonicalizedDataHubHeaders + "\n"
+ CanonicalizedResource))
AccessKey
表示签名所需的密钥。HTTPMethod
表示HTTP 请求的Method,主要有PUT、GET、POST、HEAD、DELETE等。\n
表示换行符。Content-Type
表示请求内容的类型,一般固定为“application/json”。Date
表示此次操作的时间,且必须为GMT格式,如“Sun, 22 Nov 2015 08:16:38 GMT”。CanonicalizedDataHubHeaders
表示以x-datahub-
为前缀的HTTP Header的字典序排列。CanonicalizedResource
表示用户想要访问的DataHub资源的Url,若包含param,必须按字典序。
CanonicalizedDataHubHeaders构造方法
所有以 x-datahub-
为前缀的HTTP Header被称为 CanonicalizedDataHubHeaders
。构造方法如下:
将所有以
x-datahub-
为前缀的HTTP请求头的名字转换成小写 。如X-DATAHUB-Client-Version:1.1
需要转换成x-datahub-client-version:1.1
。如果请求是以STS获得的AccessKeyId和AccessKeySecret发送时,还需要将获得的security-token值以
x-datahub-security-token:token
的形式加入到签名字符串中。将上一步得到的所有HTTP请求头按照名字的字典序进行升序排列。
删除请求头和内容之间分隔符两端出现的任何空格。如
x-datahub-client-versionn : 1.1
转换成:x-datahub-client-version:1.1
。将每一个头和内容用
\n
分隔符分隔拼成最后的CanonicalizedDataHubHeaders
。
CanonicalizedResource构造方法
用户发送请求中想访问的DataHub目标资源被称为CanonicalizedResource。构造方法如下:
将
CanonicalizedResource
置成空字符串 “”;放入要访问的DataHub资源,比如某个topic:
/projects/<ProjectName>/topics/<TopicName>
如果请求的资源包含额外的URL参数,按照字典序,从小到大排列并以 & 为分隔符生成参数字符串。在CanonicalizedResource字符串尾添加 ?和参数字符串。此时的CanonicalizedResource如:
/projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps?donetime
计算签名头规则
签名的字符串必须为 UTF-8 格式。含有中文字符的签名字符串必须先进行 UTF-8 编码,再与
AccessKey
计算最终签名。签名的方法用RFC 2104中定义的HMAC-SHA1方法,其中Key为
AccessKey
。以
x-datahub-
开头的header在签名验证前需要符合以下规范:header的名字需要变成小写。
header按字典序自小到大排序。
分割header name和value的冒号前后不能有空格。
每个Header之后都有一个换行符“\n”,如果没有Header,CanonicalizedDataHubHeaders就设置为空。
签名示例:
请求 | 签名字符串计算公式 | 签名字符串 |
POST /projects/test_project/topics/test_topic HTTP/1.1 Host: https://dh-cn-hangzhou.aliyuncs.com User-Agent: customer x-datahub-client-version: 1.1 Content-Type: application/json Date: Thu, 10 Jan 2019 07:28:29 GMT | Signature = base64(hmac-sha1(AccessKey,HTTPMethod + “\n” + Content-Type + “\n” + Date + “\n” + CanonicalizedDataHubHeaders+ CanonicalizedResource)) | POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic |
python计算签名方法如下:
import base64
import hmac
import sha
h = hmac.new("****your accessKey*****",
"POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic", sha)
Signature = base64.b64encode(h.digest())
print("Signature: %s" % Signature)
请求头部示例:
Authorization值格式:DATAHUB AccessId:Signature
POST /projects/test_project/topics/test_topic HTTP/1.1
Authorization: DATAHUB AccessId:Signature
Content-Type: application/json
Date: Thu, 10 Jan 2019 07:28:29 GMT
Host: http://dh-cn-hangzhou.aliyuncs.com
User-Agent: customer
x-datahub-client-version: 1.1
Java 8 签名参考样例
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
public abstract class Authorization {
private static final String DEFAULT_ENCODING = "UTF-8";
private static final String DEFAULT_HASH = "HmacSHA1";
private static final String X_DATAHUB_PREFIX = "x-datahub";
private static final String HEADER_CONTENT_TYPE = "Content-Type";
private static final String HEADER_DATE = "Date";
static public String getAkAuthorization(Request request) {
String canonicalURL = request.getUrlPath();
String canonicalQueryString = request.getQueryStrings();
String canonicalHeaderString = getCanonicalHeaders(
getSortedHeadersToSign(request.getHeaders()));
String canonicalRequest = request.getMethod().toUpperCase() + "\n" +
canonicalHeaderString + "\n" + canonicalURL;
if (canonicalQueryString != null && !canonicalQueryString.isEmpty()) {
canonicalRequest += "?" + canonicalQueryString;
}
String signature = HMAC1Sign(request.getAccessKey(), canonicalRequest);
return "DATAHUB " + request.getAccessId() + ":" + signature;
}
static private String HMAC1Sign(String accessKey, String canonicalRequest) {
try {
SecretKeySpec signingKey = new SecretKeySpec(accessKey.getBytes(), DEFAULT_HASH);
Mac mac = Mac.getInstance(DEFAULT_HASH);
mac.init(signingKey);
return Base64.getEncoder().encodeToString(
mac.doFinal(canonicalRequest.getBytes(DEFAULT_ENCODING))
).trim();
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
static private String getCanonicalHeaders(Map<String, String> headers) {
StringBuilder sb = new StringBuilder();
Iterator<Map.Entry<String, String>> pairs = headers.entrySet().iterator();
while (pairs.hasNext()) {
Map.Entry<String, String> pair = pairs.next();
if (pair.getKey().startsWith(X_DATAHUB_PREFIX)) {
sb.append(pair.getKey());
sb.append(":");
sb.append(pair.getValue());
} else {
sb.append(pair.getValue());
}
if (pairs.hasNext()) {
sb.append("\n");
}
}
return sb.toString();
}
static private SortedMap<String, String> getSortedHeadersToSign(Map<String, String> headers) {
SortedMap<String, String> sortedHeaders = new TreeMap<>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
String lowerKey = entry.getKey().toLowerCase();
if (lowerKey.equalsIgnoreCase(HEADER_CONTENT_TYPE) ||
lowerKey.equalsIgnoreCase(HEADER_DATE) ||
lowerKey.startsWith(X_DATAHUB_PREFIX)) {
if (!entry.getValue().isEmpty()) {
sortedHeaders.put(lowerKey, entry.getValue());
}
}
}
if (!sortedHeaders.containsKey(HEADER_CONTENT_TYPE.toLowerCase())) {
sortedHeaders.put(HEADER_CONTENT_TYPE.toLowerCase(), "");
}
return sortedHeaders;
}
public static class Request {
private String accessId;
private String accessKey;
private String urlPath;
private String method;
private Map<String, String> headers;
private String queryStrings;
public String getAccessId() {
return accessId;
}
public Request setAccessId(String accessId) {
this.accessId = accessId;
return this;
}
public String getAccessKey() {
return accessKey;
}
public Request setAccessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}
public String getUrlPath() {
return urlPath;
}
public Request setUrlPath(String urlPath) {
this.urlPath = urlPath;
return this;
}
public String getMethod() {
return method;
}
public Request setMethod(String method) {
this.method = method;
return this;
}
public Map<String, String> getHeaders() {
return headers;
}
public Request setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
}
public String getQueryStrings() {
return queryStrings;
}
public Request setQueryStrings(String queryStrings) {
this.queryStrings = queryStrings;
return this;
}
}
public static void main(String[] args) {
Map<String, String> headers = new HashMap<>();
headers.put("Date", "Thu, 10 Jan 2019 07:28:29 GMT");
headers.put("x-datahub-client-version", "1.1");
headers.put("Content-type", "application/json");
String accessId = "testKeyID";
String accessKey = "testKeySecret";
String method = "POST";
String path = "/projects/test_project/topics/test_topic";
String canonicalQueryString = ""; //字典序 a=x&b=y
Authorization.Request authRequest = new Authorization.Request()
.setAccessId(accessId)
.setAccessKey(accessKey)
.setMethod(method.toUpperCase())
.setUrlPath(path)
.setHeaders(headers)
.setQueryStrings(canonicalQueryString);
System.out.println(Authorization.getAkAuthorization(authRequest));
}
}
三、接口定义
创建Project
请求
请求语法
POST /projects/<ProjectName> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Comment | String | 描述信息,限制1024字节 |
响应
响应语法
HTTP/1.1 201 Created
示例
请求示例
POST /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Comment": "test project"
}
响应示例
HTTP/1.1 201 Created
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0
查询Project
请求
请求语法
GET /projects/<ProjectName> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
CreateTime | long | 创建时间,单位:秒 |
LastModifyTime | long | 更新时间,单位:秒 |
Comment | String | 描述信息 |
示例
请求示例
GET /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Content-Length: xxx
{
"Comment": "test project",
"CreateTime": 1525763481,
"LastModifyTime": 1525763481
}
查询Project列表
请求
请求语法
GET /projects HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
ProjectNames | List | Project名称列表 |
示例
请求示例
GET /projects HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"ProjectNames": [
"project1",
"projcet2"
]
}
更新Project
请求
请求语法
PUT /projects/<ProjectName> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Comment | String | 描述信息 |
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
PUT /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Comment": "update comment"
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0
删除Project
请求
请求语法
DELETE /projects/<ProjectName> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
DELETE /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0
创建Topic
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | 操作类型 |
ShardCount | int | 初始shard数目 |
Lifecycle | int | 数据存储生命周期 |
RecordType | String | BLOB(非结构化数据)/TUPLE(结构化数据) |
RecordSchema | String | 创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数 |
Comment | String | 描述信息 |
ExpandMode | String | 开启扩展模式传值extend,其他情况不需要该参数。 |
响应
响应语法
HTTP/1.1 201 Created
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "create",
"ShardCount": 1,
"Lifecycle": 1,
"RecordType": "TUPLE",
"RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}}",
"Comment": "create topic",
"ExpandMode": "extend"
}
响应示例
HTTP/1.1 201 Created
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Content-Length: 0
查询Topic
请求
请求语法
GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
ShardCount | int | 初始shard数目 |
Lifecycle | int | 数据存储生命周期 |
RecordType | String | BLOB(非结构化数据)/TUPLE(结构化数据) |
RecordSchema | String | 创建TUPLE类型topic时需指定schema, BLOB类型时,不传该参数 |
Comment | String | 描述信息 |
CreateTime | long | 创建时间 |
LastModifyTime | long | 更新时间 |
示例
请求示例
GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 201 Created
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Content-Length: xxx
{
"ShardCount": 1,
"Lifecycle": 1,
"RecordType": "TUPLE",
"RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
"Comment": "create topic",
"CreateTime": 1525763481,
"LastModifyTime": 1525763481
}
查询Topic列表
请求
请求语法
GET /projects/<ProjectName>/topics HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
TopicNames | List | Project名称列表 |
示例
请求示例
GET /projects/<ProjectNames>/topics HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"TopicNames": [
"topic1",
"topic2"
]
}
更新Topic
请求
请求语法
PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Comment | String | 描述信息 |
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Comment": "update comment"
}
删除Topic
请求
请求语法
DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0
获取Shard列表
请求
请求语法
GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
ShardId | String | Shard Id |
State | String | Shard当前状态,包括OPENING,ACTIVE,CLOSED等状态 |
BeginHashKey | String | 起始HashKey |
EndHashKey | String | 终止HashKey |
ParentShardIds | List | Shard分裂或合并之前的父Shard信息 |
示例
请求示例
GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Shards": [
{
"ShardId": "0",
"State": "ACTIVE",
"BeginHashKey":"00000000000000000000000000000000",
"EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"ParentShardIds:[]
}
]
}
分裂Shard
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:split |
ShardId | String | 需要分裂的Shard Id |
SplitKey | String | 按照此Key进行Split,SplitKey一般等于 BeginHashKey + (EndHashKey - BeginHashKey) / 2 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
NewShards | List | 分裂后的Shard列表 |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "split",
"ShardId": "0",
"SplitKye": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"NewShards": [
{
"ShardId": "1",
"BeginHashKey":"00000000000000000000000000000000",
"EndHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
},
{
"ShardId":"0",
"BeginHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
}
]
}
合并Shard
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:merge |
ShardId | String | 需要合并的Shard Id |
AdjacentShardId | String | 临近的并且满足合并条件的Shard Id |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
ShardId | String | 合并后的Shard Id |
BeginHashKey | String | 起始HashKey |
EndHashKey | String | 终止HashKey |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "merge",
"ShardId": "0",
"AdjacentShardId": "1"
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"ShardId":"2",
"BeginHashKey":"00000000000000000000000000000000",
"EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
}
查询数据Cursor
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:cursor |
Type | String | 按照何种类型获取Cursor,包括:OLDEST, LATEST, SYSTEM_TIME, SEQUENCE |
SystemTime | Int64 | Type为SYSTEM_TIME时填写,单位ms |
Sequence | Int64 | Type为SEQUENCE时填写 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
Cursor | String | 返回的Cursor信息 |
RecordTime | Int64 | 数据写入DataHub时间, 单位ms |
Sequence | Int64 | 数据写入的Sequence,单Shard内唯一 |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "cursor",
"Type": "SEQUENCE",
"Sequence": 1
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Cursor": "30005af19b3800000000000000000000",
"RecordTime": 1525783352873,
"Sequence": 1
}
写入数据 - 不按shard写入
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:pub |
ShardId | String | ShardId |
Attributes | Map<String, String> | 用户属性字段 |
Data | 如果为BLOB类型,Data为数据Base64编码后数据;如果为TUPLE数据,为String类型数组 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
FailedRecordCount | Int | 失败条数 |
FailedRecords | Array | 失败详情 |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "pub",
"Records": [
{
"ShardId": "0",
"Attributes": {
"attr1": "value1",
"attr2": "value2"
},
"Data": ["A","B","3","4"]
}
]
}
// BLOB
{
"Action": "pub",
"Record": [
{
"ShardId": "0",
"Attributes": {
"attr1": "value1",
"attr2": "value2"
},
"Data": "Base64String"
}
]
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"FailedRecordCount": 1,
"FailedRecords": [
{
"Index": 0,
"ErrorCode": "errorCode",
"ErrorMessage": "errormsg"
}
]
}
读取数据
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:sub |
Cursor | String | 从Cursor位置开始读取 |
Limit | Int | 读取的条数 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
NextCursor | String | 下一条数据的Cursor信息 |
SystemTime | Int64 | record写入DataHub的时间,单位ms |
Cursor | String | record对应的Cursor信息 |
Sequence | Int64 | record写入DataHub的Sequence |
Attributes | Map | 用户的属性字段 |
Data | 用户的数据字段 |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "sub",
"Cursor": "30005af19b3800000000000000000000",
"Limit": 1
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"NextCursor": "30005af19b3800000000000000090001",
"Records": [
{
"Cursor": "30005af19b3800000000000000000000",
"SystemTime": 1525783352873,
"Sequence": 1,
"Attributes": {
"key1": "value1",
"key2": "value2"
},
"Data": ["AAA", "100"]
}
]
}
新增Field
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:appendfield |
FieldName | String | Field名称 |
FieldType | String | Field类型,包括STRING, BIGINT等 |
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "appendfield",
"FieldName": "field1",
"FieldType": "BIGINT"
}
创建Connector
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Type | String | Connector类型,如SINK_ODPS等 |
ColumnFields | Array | 需同步的Field列表 |
Config | Map | Connector相关配置 |
响应
响应语法
HTTP/1.1 201 Created
示例
请求示例,以SINK_ODPS为例
POST /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Type": "SINK_ODPS",
"ColumnFields": ["field1", "field2"],
"Config": {
"Project": "odpsProject",
"Topic": "odpsTopic",
"OdpsEndpoint": "xxx",
"TunnelEndpoint": "xxx",
"AccessId": "xxx",
"AccessKey": "xxx",
"PartitionMode": "SYSTEM_TIME",
"TimeRange": 60,
"PartitionConfig": {
"pt": "%Y%m%d",
"ct": "%H%M"
}
}
}
查询Connector
请求
请求语法
GET /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
查询Connector列表
请求
请求语法
GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Connectors": [
"sink_odps", "sink_ads"
]
}
删除Connector
请求
请求语法
DELETE /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
Reload Connector
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:reload |
ShardId | String | 不设置则Reload整个connector |
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "reload"
}
获取Connector Shard状态信息
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:status |
ShardId | String | 获取此ShardId的状态信息 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
CurrentSequence | Int64 | 当前处理点位信息 |
State | Enum | 当前Shard的运行状态 |
LastErrorMessage | String | 当State不为CONTEXT_EXECUTING时,返回错误信息 |
DiscardCount | Int64 | 从connector运行到现在丢弃的数据总条数 |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "status",
"ShardId": "0"
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"State": "CONTEXT_EXECUTING",
"CurrentSequence": 10,
"DiscardCount": 0,
"LastErrorMessage": ""
}
Append Connector Field
请求
请求语法
POST /projects//topics//connectors/ HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:appendfield |
FieldName | String | Field名称 |
响应
响应语法
HTTP/1.1 200 OK
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "appendfiled",
"FieldName": "field1"
}
创建订阅
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:create |
Comment | String | 描述信息 |
响应
响应语法
HTTP/1.1 201 Created
响应元素
名称 | 类型 | 描述 |
SubId | String | 订阅Id |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "create",
"Comment": "xxxx"
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"SubId": "1542078393028fzsZx"
}
查询订阅
请求
请求语法
GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
SubId | String | 订阅Id |
State | Int | 0: online, 1: offline |
Comment | String | 描述信息 |
示例
请求示例
GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"SubId": "xxxx",
"Comment": "xxxx"
"State": 1
}
查询订阅列表
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:list |
PageIndex | Int | 分页Index |
PageSize | Int | 分页Size |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
TotalCount | Int | Page总数 |
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "list",
"PageIndex": 1,
"PageSize": 10
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Subscriptions": [
{
"Comment": "xxxx",
"State": 1,
"SubId": "1542079169844gH8HM"
},
{
"Comment": "xxxx",
"State": 1,
"SubId": "1542078393028fzsZx"
}
],
"TotalCount": 2
}
删除订阅
请求
请求语法
DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
示例
请求示例
DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
更新订阅状态
请求
请求语法
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"State": 0
}
open点位session
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:open |
ShardIds | Array | Shard列表 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
Timestamp | Int64 | 点位时间戳,单位ms |
Sequence | Int64 | 点位Sequence |
Version | Int64 | Session VersionId |
SessionId | String | SessionId |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "open",
"ShardIds": ["0"]
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Offsets": {
"0": {
"Timestamp": 1000,
"Sequence": 1,
"Version": 1,
"SessionId": "xxx"
}
}
}
查询点位
请求
请求语法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:get |
ShardIds | Array | Shard列表 |
响应
响应语法
HTTP/1.1 200 OK
响应元素
名称 | 类型 | 描述 |
Timestamp | Int64 | 点位时间戳,单位ms |
Sequence | Int64 | 点位Sequence |
Version | Int64 | Session VersionId |
SessionId | String | SessionId |
示例
请求示例
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "get",
"ShardIds": ["0"]
}
响应示例
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Offsets": {
"0": {
"Timestamp": 1000,
"Sequence": 1,
"Version": 1,
"SessionId": "xxx"
}
}
}
提交点位
请求
请求语法
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
请求元素
名称 | 类型 | 描述 |
Action | String | Action为:commit |
Timestamp | Int64 | 点位时间戳,单位ms |
Sequence | Int64 | 点位Sequence |
Version | Int64 | Session VersionId |
SessionId | Int64 | SessionId |
响应
响应语法
HTTP/1.1 200 OK
示例
请求示例
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "commit",
"Offsets": {
"0": {
"Timestamp": 1000,
"Sequence": 1,
"Version": 1,
"SessionId": 1
}
}
}