安装SDK
在Maven项目中添加依赖:
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.25.1</version>
</dependency>
JDK:使用1.8及以上版本。
如果API开启了密钥认证,您还需要在工程中配置相应的Access Key和Secret Key,请参考下文身份验证
身份验证
背景信息
AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。
Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。
前提条件
已获取RAM用户账号的AccessKey ID和AccessKey Secret。相关操作,请参见查看RAM用户的AccessKey信息。
- 重要
阿里云账号(即主账号)的AccessKey泄露会威胁该账号下所有资源的安全。为保证账号安全,强烈建议您为RAM用户创建AccessKey,非必要情况下请勿为阿里云主账号创建AccessKey。
RAM用户的AccessKey Secret只能在创建AccessKey时显示,创建完成后不支持查看。请在创建好AccessKey后,及时并妥善保存AccessKey Secret。
已安装阿里云SDK Credentials工具。
Maven安装方式(推荐使用Credentials最新版本):
<dependency> <groupId>com.aliyun</groupId> <artifactId>credentials-java</artifactId> <version>0.2.11</version> </dependency>
JDK版本为1.7及以上。
配置方案
本文示例的是通过配置环境变量方式,更多方式请访问配置环境变量
使用配置文件的方案时,请确保您系统中不存在环境变量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。否则,配置文件将不生效。
阿里云SDK支持通过定义ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。
配置方法
配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。
Linux和macOS系统配置方法
执行以下命令:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
<access_key_id>
需替换为已准备好的AccessKey ID,<access_key_secret>
替换为AccessKey Secret。Windows系统配置方法
新建环境变量文件,添加环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
,并写入已准备好的AccessKey ID和AccessKey Secret。重启Windows系统。
代码示例
Client credentialClient = new Client();
String accessKeyId = credentialClient.getAccessKeyId();
String accessKeySecret = credentialClient.getAccessKeySecret();
注意事项
对于SDK版本升级由2.9版本升级上来的用户:必须要注意setTimestampInms接口替换后,显示将值乘以1000。
接口调用正常情况下,SDK只有
putRecords / putRecordsByShard
和getRecords
接口是需要进行频繁调用进行数据读写的,其他的接口比如getTopic
、getCursor
、listShard
等接口一般只有初始化时需要调用。Client初始化在项目工程中,可以有一个或者多个DatahubClient实例,DatahubClient实例可以并发使用。
不同包中的同名类遇到相同类名不同包路径的情况,2.12版本使用的均为com.aliyun.datahub.client包中的类,其他包中的类是为了兼容版本低于2.12的使用方式。例如:
// 2.12版本 com.aliyun.datahub.client.model.RecordSchema // 使用2.12之前版本SDK编写的代码,如果在升级SDK后无需修改代码,则继续使用此类型 com.aliyun.datahub.common.data.RecordSchema
出现错误
Parse body failed, Offset: 0
,尝试将enableBinary参数设置为false
。
SDK实践指南
初始化
用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。以下代码用于使用域名列表新建DataHubClient:
sdk 2.25.1及以上版本(推荐)
//使用新的Batch传输协议创建DataHubClient实例
DatahubConfig.Protocol protocol = DatahubConfig.Protocol.BATCH;
DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
//Protocol可不设置,不设置默认使用PROTOBUF传输协议
new DatahubConfig(endpoint, new AliyunAccount(accessId, accessKey), protocol)
).setHttpConfig(new HttpConfig().setCompressType(CompressType.ZSTD)).build();
配置描述:
DatahubConfig
名称 | 描述 |
endpoint | DataHub服务地址。 |
account | 阿里云账号信息。 |
enableBinary | 是否采用二进制传输,服务端从2.12版本开始支持,之前版本需设置为false,专有云使用时出现错误Parse body failed, Offset: 0,尝试设置为false。 |
HttpConfig
名称 | 描述 |
readTimeout | Socket读写超时时间,默认10s。 |
connTimeout | TCP连接超时时间,默认10s。 |
maxRetryCount | 请求失败重试,默认1,不建议修改,重试由上层业务层处理。 |
debugRequest | 是否打印请求日志信息,默认false。 |
compressType | 数据传输压缩方式,默认lz4压缩,支持lz4, deflate,ztsd压缩。 |
proxyUri | 代理服务器主机地址。 |
proxyUsername | 代理服务器验证的用户名。 |
proxyPassword | 代理服务器验证的密码。 |
SDK统计信息
SDK支持针对put/get等请求进行QPS等统计,开启方式:
ClientMetrics.startMetrics();
metric统计信息默认打印到日志文件中,需要配置slf4j,其中metric package为:com.aliyun.datahub.client.metrics
。
写入数据到DataHub
以Tuple类型Topic为例。
public void writeTupleTopic(int maxRetry) {
String shardId = "9";
// 生成十条数据
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性
recordEntry.addAttribute("key1", "value11");
TupleRecordData data = new TupleRecordData(this.recordSchema);
data.setField("field1", "Hello World");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
}
int retryNum = 0;
while (retryNum < maxRetry) {
try {
// 服务端从2.12版本开始支持,之前版本请使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
PutRecordsResult putRecordsResult = this.datahubClient.putRecords(Constant.projectName,
Constant.topicName, recordEntries);
System.out.println("write tuple data successful");
System.out.println(putRecordsResult.getPutErrorEntries());
break;
} catch (InvalidParameterException e) {
// invalid parameter
e.printStackTrace();
throw e;
} catch (AuthorizationFailureException e) {
// AK error
e.printStackTrace();
throw e;
} catch (ResourceNotFoundException e) {
// project or topic not found
e.printStackTrace();
throw e;
} catch (ShardSealedException e) {
// shard status is CLOSED, read only
e.printStackTrace();
throw e;
} catch (LimitExceededException e) {
// limit exceed, retry
e.printStackTrace();
retryNum++;
} catch (DatahubClientException e) {
// other error
e.printStackTrace();
retryNum++;
}
}
}
创建订阅消费DataHub数据
//点位消费示例,并在消费过程中进行点位的提交
public static void example() {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
String cursor = null;
//sequence < 0说明未消费
if (subscriptionOffset.getSequence() < 0) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 获取下一条记录的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
long recordCount = 0L;
// 每次读取10条record
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//消费数据
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// 处理数据完成后,设置点位
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
//提交点位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// 退出. Offline: 订阅下线; SubscriptionSessionInvalid: 表示订阅被其他客户端同时消费
break;
} catch (SubscriptionOffsetResetException e) {
// 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
// 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: 针对不同异常决定是否退出
} catch (Exception e) {
break;
}
}
}
异常类型
Java SDK(>= 2.12)对datahub的异常类型进行了整理,用户try catch机制对异常类型进行捕获并进行相应处理。
其中异常类型中,除DatahubClientException和LimitExceededException之外,其余均属于不可重试错误,而DatahubClientException中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientException和LimitExceededException时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。
以下为使用2.12以上版本的各类异常,包路径为com.aliyun.datahub.client.exception
。
类名 | 错误码 | 描述 |
InvalidParameterException | InvalidParameter, InvalidCursor | 非法参数。 |
ResourceNotFoundException | ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo | 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
ResourceAlreadyExistException | ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist | 资源已存在(创建时如果资源已存在,就会抛出这个异常。 |
SeekOutOfRangeException | SeekOutOfRange | getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间。 |
AuthorizationFailureException | Unauthorized | Authorization 签名解析异常,检查AK是否填写正确。 |
NoPermissionException | NoPermission, OperationDenied | 没有权限,通常是RAM配置不正确,或没有正确授权子账号。 |
ShardSealedException | InvalidShardOperation | shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常。 |
LimitExceededException | LimitExceeded | 接口使用超限,参考限制描述。 |
SubscriptionOfflineException | SubscriptionOffline | 订阅处于下线状态不可用。 |
SubscriptionSessionInvalidException | OffsetSessionChanged, OffsetSessionClosed | 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。 |
SubscriptionOffsetResetException | OffsetReseted | 订阅点位被重置。 |
MalformedRecordException | MalformedRecord,ShardNotReady | 非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持、等等。 |
DatahubClientException | 其他所有,并且是所有异常的基类 | 如排除以上异常情况,通常重试即可,但应限制重试次数。 |
API说明
Project操作
项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。
创建 Project
CreateProjectResult createProject(String projectName, String comment);
创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。
参数
projectName project name
comment project comment
Exception
DatahubClientException
示例
public static void createProject(String projectName,String projectComment) {
try {
datahubClient.createProject(projectName, projectComment);
System.out.println("create project successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
删除 Project
DeleteProjectResult deleteProject(String projectName);删除Project时必须保证Project中已经没有Topic。 参数 projectName project name。
Exception
DatahubClientException
NoPermissionException, project内仍有topic时则会抛出的异常
示例
public static void deleteProject(String projectName) {
try {
datahubClient.deleteProject(projectName);
System.out.println("delete project successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Project
UpdateProjectResult updateProject(String projectName, String comment);更新project信息,目前只支持更新comment。 参数 projectName project name comment project comment。
Exception
DatahubClientException
示例
public static void updateProject(String projectName,String newComment) {
try {
datahubClient.updateProject(projectName, newComment);
System.out.println("update project successful");
} catch (DatahubClientException e) {
System.out.println("other error");
}
}
列出 Project
ListProjectResult listProject();listProject返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。
参数
Exception
DatahubClientException
示例
public static void listProject() {
try {
ListProjectResult listProjectResult = datahubClient.listProject();
if (listProjectResult.getProjectNames().size() > 0) {
for (String pName : listProjectResult.getProjectNames()) {
System.out.println(pName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Project
GetProjectResult getProject(String projectName);可以查看当前Project的一些属性信息。
Exception
DatahubClientException
示例
public static void getProject(String projectName) {
try {
GetProjectResult getProjectResult = datahubClient.getProject(projectName );
System.out.println(getProjectResult.getCreateTime() + "\t"
+ getProjectResult.getLastModifyTime() + "\t"
+ getProjectResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Topic 操作
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。
目前支持Tuple与Blob两种类型:
Blob类型Topic支持写入一块二进制数据作为一个Record。
Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。
目前支持以下几种数据类型:
类型 | 含义 | 值域 |
BIGINT | 8字节有符号整型 | -9223372036854775807 ~ 9223372036854775807 |
DOUBLE | 8字节双精度浮点数 | -1.0 _10^308 ~ 1.0 _10^308 |
BOOLEAN | 布尔类型 | True/False或true/false或0/1 |
TIMESTAMP | 时间戳类型 | 表示到微秒的时间戳类型 |
STRING | 字符串,只支持UTF-8编码 | 单个STRING列最长允许2MB |
TINYINT | 单字节整型 | -128 ~127 |
SMALLINT | 双字节整型 | -32768 ~ 32767 |
INTEGER | 4字节整型 | -2147483648 ~ 2147483647 |
FLOAT | 4字节单精度浮点数 | -3.40292347_10^38 ~ 3.40292347_10^38 |
DataHub 中的 TINYINT、SMALLINT、INTEGER、FLOAT类型从java sdk 2.16.1-public开始支持。
创建 Tuple Topic
CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment);
参数
projectName The name of the project in which you create.
topicName The name of the topic.
shardCount The initial shard count of the topic.
lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
recordType The type of the record you want to write. Now support TUPLE and BLOB.
recordSchema The records schema of this topic.
comment The comment of the topic.
Exception
DatahubClientException
示例
public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
RecordSchema schema = new RecordSchema();
schema.addField(new Field("bigint_field", FieldType.BIGINT));
schema.addField(new Field("double_field", FieldType.DOUBLE));
schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
schema.addField(new Field("tinyint_field", FieldType.TINYINT));
schema.addField(new Field("smallint_field", FieldType.SMALLINT));
schema.addField(new Field("integer_field", FieldType.INTEGER));
schema.addField(new Field("floar_field", FieldType.FLOAT));
schema.addField(new Field("decimal_field", FieldType.DECIMAL));
schema.addField(new Field("string_field", FieldType.STRING));
try {
datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
System.out.println("create topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
创建 Blob Topic
CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment);
参数
projectName The name of the project in which you create.
topicName The name of the topic.
shardCount The initial shard count of the topic.
lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
recordType The type of the record you want to write. Now support TUPLE and BLOB.
comment The comment of the topic.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ResourceAlreadyExistException
示例
public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
try {
datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
System.out.println("create topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
删除 Topic
删除topic之前要保证topic中没有subscription和connector,否则会报错NoPermission。
DeleteTopicResult deleteTopic(String projectName, String topicName);
参数
projectName The name of the project in which you delete.
topicName The name of the topic.
Exception
DatahubClientException
NoPermissionException, topic内存在subscription和connector会报此错误
示例
public static void deleteTopic(String projectName, String topicName) {
try {
datahubClient.deleteTopic(projectName, topicName);
System.out.println("delete topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
列出 Topic
ListTopicResult listTopic(String projectName);
参数
projectName The name of the project in which you list.
示例
public static void listTopic(String projectName ) {
try {
ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
if (listTopicResult.getTopicNames().size() > 0) {
for (String tName : listTopicResult.getTopicNames()) {
System.out.println(tName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Topic
更新Topic信息,目前支持更新comment和lifeCycle。
UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment);
参数
projectName The name of the project in which you list.
topicName The name of the topic.
comment The comment to modify.
lifeCycle the lifeCycle of the topic
Exception
DatahubClientException
示例
public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
try {
comment = "new topic comment";
lifeCycle = 1;
datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
System.out.println("update topic successful");
//查看更新后结果
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Topic
GetTopicResult getTopic(String projectName, String topicName);可以获取Topic的相关属性。
参数
projectName The name of the project in which you get.
topicName The name of the topic.
Exception
DatahubClientException
示例
public static void getTopic(String projectName, String topicName) {
try {
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getShardCount() + "\t"
+ getTopicResult.getLifeCycle() + "\t"
+ getTopicResult.getRecordType() + "\t"
+ getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Tuple Topic 新增 Field
新增Field既可以新增一列,也可以一次性插入多列
AppendFieldResult appendField(String projectName, String topicName, Field field);
参数
projectName The name of the project in which you get.
topicName The name of the topic.
fields The fields to append. All field value must allow null.
Exception
DatahubClientException
示例
public static void appendNewField(String projectName,String topicName) {
try {
Field newField = new Field("newField", FieldType.STRING, true,"comment");
datahubClient.appendField(projectName, topicName, newField);
System.out.println("append field successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
AppendFieldResult appendField(String projectName, String topicName, List fields);
参数
projectName The name of the project in which you get.
topicName The name of the topic.
fields The fields to append. All field value must allow null.
Exception
DatahubClientException
示例
public static void appendNewField(String projectName,String topicName) {
try {
List<Field> list = new ArrayList<>();
Field newField1 = new Field("newField1", FieldType.STRING, true,"comment");
list.add(newField1);
datahubClient.appendField(projectName, topicName, list);
System.out.println("append field successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Shard操作
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
列出 Shard
ListShardResult listShard(String projectName, String topicName);
参数
projectName The name of the project.
topicName The name of the topic.
Exception
DatahubClientException
示例
public static void listShard(String projectName, String topicName) {
try {
ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
if (listShardResult.getShards().size() > 0) {
for (ShardEntry entry : listShardResult.getShards()) {
System.out.println(entry.getShardId() + "\t"
+ entry.getState() + "\t"
+ entry.getLeftShardId() + "\t"
+ entry.getRightShardId());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
分裂 Shard
指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。CLOSED状态的shard只可以读,不可以写,可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。
SplitShardResult splitShard(String projectName, String topicName, String shardId); SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The shard which to split.
splitKey The split key which is used to split shard.
Exception
DatahubClientException
示例
public static void splitShard(String projectName, String topicName, String shardId) {
try {
shardId = "0";
SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
for (ShardEntry entry : splitShardResult.getNewShards()) {
System.out.println(entry.getShardId());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
合并 Shard
合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。
MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The shard which will be merged.
adjacentShardId The adjacent shard of the specified shard.
Exception
DatahubClientException
示例
public static void mergeShard() {
try {
String shardId = "7";
//adjacentShardId位置必须和shardId相邻,shard相邻信息可在listShard返回结果中查看
String adjacentShardId = "8";
MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
System.out.println("merge successful");
System.out.println(mergeShardResult.getShardId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Shard扩展
shard扩展要求扩展的shard数量不得小于原有shard数量
ExtendShardResult extendShard(String projectName, String topicName, int shardCount);
参数
projectName The name of the project.
topicName The name of the topic.
shardCount The num of shards to extend to.
adjacentShardId The adjacent shard of the specified shard.
Exception
DatahubClientException
示例
public static void extendTopic(String projectName, String topicName, int shardCount) { try { ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } }
读写数据
状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。
1.读数据
读数据分为两步,首先是获取cursor,接着将获取到的cursor值传入到getRecords方法,需要注意的是,DataHub已经提供了订阅功能,用户可直接关联订阅对数据进行消费,服务端自动保存点位,读数据的主要用途在于抽样查看数据的质量
2.获取cursor
读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
LATEST : 表示获取的cursor指向当前最新的record。
SEQUENCE : 表示获取的cursor指向该序列的record。
SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。
应该选择哪种呢?
首先要明白一点,读取的数据一定要在有效期内,也就是生命周期内,否则读取数据会报错
如果场景是需要从头开始读取数据的话,选取OLDEST无疑是最合适的,如果数据都在有效期内,则会从第一条数据开始读取。
如果是做抽样使用,需要查看某个时间点之后数据是否是正常的,则应该选择SYSTEM_TIME 模式,将会从获取到的sequence+1位置开始读取。
使用场景是查看当前最新数据的情况,则应该使用LATEST ,用户可通过LATEST 模式一直读取最新写入的数据,当前也可以读取最新写入数据的前N条,这就需要先获取到sequence,然后sequence-N距离最新写入前N条的数据。
GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type); GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
CursorType Which type used to get cursor.
Exception
DatahubClientException
SeekOutOfRangeException
示例代码
抽样场景,先将Date转换为timestamp,然后获取cursor。
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
//将时间转为时间戳形式
String time = "2019-07-01 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = 0L;
try {
Date date = simpleDateFormat.parse(time);
timestamp = date.getTime();//获取时间的时间戳
//System.out.println(timestamp);
}
//获取时间time之后的数据读取位置
String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
} catch (ParseException e) {
System.out.println(e.getErrorOffset());
}
}
从头开始读取数据
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* OLDEST用法示例 */
String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
读取最新写入的数据
分为两种情况,第一种是最新写入的最后一条数据。
第二种是最新写入的前N条数据。
需要先获取最新写入数据的sequence,然后再获取cursor。
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* LATEST用法示例 */
String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
/* SEQUENCE用法示例 */
//获取最新数据的sequence
long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
//获取最新的十条数据的读取位置
String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
}
catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
读取数据接口:
GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit); GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
schema If you read TUPLE records, you need this parameter.
cursor The start cursor used to read data.
limit Max record size to read.
Exception
DatahubClientException
示例
1). 读取Tuple topic数据
public static void example(String projectName,String topicName) {
//每次最多读取数据量
int recordLimit = 1000;
String shardId = "7";
// 获取cursor, 这里获取有效数据中时间最久远的record游标
// 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(10000);
continue;
}
for (RecordEntry entry : result.getRecords()) {
TupleRecordData data = (TupleRecordData) entry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
}
// 拿到下一个游标
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// 非法游标或游标已过期,建议重新定位后开始消费
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());;
}
}
}
2). 读取Blob topic数据
public static void example(String projectName,String topicName) {
//每次最多读取数据量
int recordLimit = 1000;
String shardId = "7";
// 获取cursor, 这里获取有效数据中时间最久远的record游标
// 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(10000);
continue;
}
/* 消费数据 */
for (RecordEntry record: result.getRecords()){
BlobRecordData data = (BlobRecordData) record.getRecordData();
System.out.println(new String(data.getData()));
}
// 拿到下一个游标
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// 非法游标或游标已过期,建议重新定位后开始消费
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
写数据
服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShard
和putRecords
两个接口。针对第二个接口,用户需要判断PutRecordsResult
结果以确认数据是否写入成功;而putRecordsByShard
接口则直接通过异常告知用户是否成功。如果服务端支持,建议用户使用putRecordsByShard
接口。
PutRecordsResult putRecords(String projectName, String topicName, List records); PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
records Records list to written.
Exception
DatahubClientException
1). 写入Tuple topic
// 写入Tuple型数据
public static void tupleExample(String project,String topic,int retryTimes) {
// 获取schema
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// 生成十条数据
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
}
}
//重试机制
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}
```java
<br />
<br />**2). 写入Blob topic**<br />
```java
// 写入blob型数据
public static void blobExample() {
// 生成十条数据
List<RecordEntry> recordEntries = new ArrayList<>();
String shardId = "4";
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性
recordEntry.addAttribute("key1", "value1");
BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
recordEntry.setShardId("0");
}
while (true) {
try {
// 服务端从2.12版本开始支持,之前版本请使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
break;
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
多方式写入
在DataHub 2.12之前版本,DataHub仅支持putRecords
接口,在RecordEntry
类中包含shardId
、partitionKey
和hashKey
三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中。
2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗。
1). 按照ShardID写入推荐方式,使用示例如下:
RecordEntry entry = new RecordEntry();
entry.setShardId("0");
2). 按HashKey写入指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的Shard操作决定数据写入的Shard使用示例:
RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
2). 按PartitionKey写入指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的Shard操作决定写入的Shard使用示例:
RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");
Meter 操作
获取Meter
GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void getMeter(String projectName,String topicName) {
String shardId = "5";
try {
GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(projectName, topicName, shardId);
System.out.println("get meter successful");
System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Subscribtion操作
订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。
创建 Subscription
CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment);
comment格式为:{“application”:”应用”,”description”:”描述”}
参数
projectName The name of the project.
topicName The name of the topic.
comment The comment of the subscription.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void createSubscription(String projectName,String topicName) {
try {
CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(projectName, topicName, Constant.subscribtionComment);
System.out.println("create subscription successful");
System.out.println(createSubscriptionResult.getSubId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
删除 Subscription
DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void deleteSubscription(String projectName,String topicName,String subId) {
try {
datahubClient.deleteSubscription(projectName, topicName, subId);
System.out.println("delete subscription successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Subscription
更新已存在的Subscription,目前只支持更新comment。
UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
comment The comment you want to update.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateSubscription(String projectName, String topicName, String subId, String comment){
try {
datahubClient.updateSubscription(projectName,topicName,subId,comment)
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
列出 Subscription
listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。
ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize);
参数
projectName The name of the project.
topicName The name of the topic.
pageNum The page number used to list subscriptions.
pageSize The page size used to list subscriptions.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例代码
示例代码
public static void listSubscription(String projectName, String topicName, int pageNum, int pageSize) {
try {
ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(projectName, topicName, pageNum, pageSize);
if (listSubscriptionResult.getSubscriptions().size() > 0) {
System.out.println(listSubscriptionResult.getTotalCount());
System.out.println(listSubscriptionResult.getSubscriptions().size());
for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
System.out.println(entry.getSubId() + "\t"
+ entry.getState() + "\t"
+ entry.getType() + "\t"
+ entry.getComment());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Subscription
GetSubscriptionResult getSubscription(String projectName, String topicName, String subId);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
public static void getSubscription(String projectName, String topicName, String subId) {
try {
GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(projectName, topicName, subId);
System.out.println(getSubscriptionResult.getSubId() + "\t"
+ getSubscriptionResult.getState() + "\t"
+ getSubscriptionResult.getType() + "\t"
+ getSubscriptionResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Subscription 状态
Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。
UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
state The state you want to change.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateSubscriptionState(String projectName, String topicName,String subId) {
try {
datahubClient.updateSubscriptionState(projectName, topicName, subId, SubscriptionState.ONLINE);
System.out.println("update subscription state successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
offset 操作
一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。
初始化 offset
openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。
OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
shardIds The id list of the shards.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void openSubscriptionSession(String projectName, String topicName) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getSessionId() + "\t"
+ subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
获取 offset
GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds);
getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象,与openSubscriptionSession返回结果基本上相同,但是GetSubscriptionOffsetResult中的offset没有sessionId的,是作为只读的方法来使用。
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
shardIds The id list of the shards.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
//获取点位
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
提交 offset
CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets);
提交点位会验证versionId和sessionId,必须与当前的一致;提交的点位信息没有严格限制,建议按照record中的真实sequence和timestamp来填写。
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
offsets The offset map of shards.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
SubscriptionOffsetResetException
SubscriptionSessionInvalidException
SubscriptionOfflineException
示例
//提交点位
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
while (true) {
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
//这里仅仅测试提交,完整过程请参考点位消费样例
subscriptionOffset.setSequence(10);
subscriptionOffset.setTimestamp(100);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
offsets.put(shardId, subscriptionOffset);
// 提交点位
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}
重置 offset
ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String shardId, Map offsets);
重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
offsets The offset map of shards.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
//重置点位
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
List<String> shardIds = Arrays.asList("0");
//选择想要重置点位到的时间,并转换为时间戳
String time = "2019-07-09 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(time);
long timestamp = date.getTime();//获取时间的时间戳
long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
SubscriptionOffset offset = new SubscriptionOffset();
offset.setTimestamp(timestamp);
offset.setSequence(sequence);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
for (String shardId : shardIds) {
offsets.put(shardId, offset);
}
try {
datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
System.out.println("reset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
关联订阅消费DataHub数据
同读取DataHub数据类似,使用订阅进行消费的不同是订阅存储了消费的点位,用户可自由选择消费点位
注意事项:
首先调用openSubscriptionSession初始化offset,获取version + session信息,全局只初始化一次,多次调用此方法,会造成原有session失效,无法提交点位
调用getcursor获取订阅的点位进行消费,消费完一条数据后,调用getNextCursor获取下一条数据点位,继续消费
提交点位时,调用commitSubscriptionOffset提交点位,commit操作会检查version和session信息,必须完全一致才能提交成功
//点位消费示例,并在消费过程中进行点位的提交
public static void example(String projectName, String topicName,String subId) {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
String cursor = null;
//sequence < 0说明未消费
if (subscriptionOffset.getSequence() < 0) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 获取下一条记录的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
long recordCount = 0L;
// 每次读取10条record
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//消费数据
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// 处理数据完成后,设置点位
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
点位//提交点位点位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
// 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
break;
} catch (OffsetResetedException e) {
// 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
// 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: 针对不同异常决定是否退出
} catch (Exception e) {
break;
}
}
}
Connector 操作
DataHub Connector是把DataHub服务中的流式数据到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。
创建 Connector
CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config); CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector which you want create.
columnFields Which fields you want synchronize.
sinkStartTime Start time to sink from datahub. Unit: Ms
config Detail config of specified connector type.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ODPS示例
public static void createConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(Constant.odps_endpoint);
setProject(Constant.odps_project);
setTable(Constant.odps_table);
setAccessId(Constant.odps_accessId);
setAccessKey(Constant.odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
//设置分区格式
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
OSS示例:
public static void createOssConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
SinkOssConfig config = new SinkOssConfig() {{
setAccessId(Constant.oss_accessId);
setAccessKey(Constant.oss_accessKey);
setAuthMode(AuthMode.STS);
setBucket(Constant.oss_bucket);
setEndpoint(Constant.oss_endpoint);
setPrefix(Constant.oss_prefix);
setTimeFormat(Constant.oss_timeFormat);
setTimeRange(60);
}};
try {
//创建Connector
datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
TableStore示例:
public static void createOtsConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
final SinkOtsConfig config = new SinkOtsConfig() {{
setAccessId(Constant.ots_accessId);
setAccessKey(Constant.ots_accessKey);
setEndpoint(Constant.ots_endpoint);
setInstance(Constant.ots_instance);
setTable(Constant.ots_table);
setAuthMode(AuthMode.AK);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Hologres示例:
public static void createHoloConnector(String projectName,String topicName) {
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkHologresConfig config = new SinkHologresConfig() {{
setAccessId(Constant.accessId);
setAccessKey(Constant.accessKey);
setProjectName(Constant.projectName);
setTopicName(Constant.topicName);
setAuthMode(AuthMode.AK);
setInstanceId(Constant.instanceId);
//设置时间格式
setTimestampUnit(TimestampUnit.MILLISECOND);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
ElasticSearch 示例:
public static void createEsConnector(String projectName,String topicName){
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkEsConfig config = new SinkEsConfig() {{
setEndpoint(Constant.es_endpoint);
setIdFields(Constant.es_fields);
setIndex(Constant.es_index);
setPassword(Constant.es_password);
setProxyMode(Constant.es_proxyMode);
setTypeFields(Constant.es_typeFields);
setUser(Constant.es_user);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
函数计算示例:
public static void createFcConnector(String projectName,String topicName){
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkFcConfig config = new SinkFcConfig() {{
setEndpoint(Constant.fc_endpoint);
setAccessId(Constant.fc_accessId);
setAccessKey(Constant.fc_accessKey);
setAuthMode(AuthMode.AK);
setFunction(Constant.fc_function);
setService(Constant.fc_service);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Mysql示例:
public static void createMysqlConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkMysqlConfig config = new SinkMysqlConfig() {{
setDatabase( Constant.mysql_database);
setHost(Constant.mysql_host);
setInsertMode(InsertMode.OVERWRITE);
setPassword(Constant.mysql_password);
setPort(Constant.mysql_port);
setTable(Constant.mysql_table);
setUser(Constant.mysql_user);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
删除 Connector
DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector which you want delete.
columnFields Which fields you want synchronize.
sinkStartTime Start time to sink from datahub. Unit: Ms
config Detail config of specified connector type.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void deleteConnector(String projectName,String topicName) {
try {
datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("delete connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Connector
GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector which you want get.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void getConnector(String projectName,String topicName) {
try {
GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
for (String fieldName : getConnectorResult.getColumnFields()) {
System.out.println(fieldName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Connector
更新Connector的配置。
UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector which you want update.
config Detail config of specified connector type.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateConnector(String projectName,String topicName) {
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
//修改ak
config.setTimeRange(100);
config.setAccessId(accessId);
config.setAccessKey(accessKey);
//修改timestamp类型
config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
try {
datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新Connector Field
UpdateConnectorResult updateConnector(String projectName, String topicName, String connectorId, List columnFields);
参数
projectName The name of the project.
topicName The name of the topic.
connectorId The id of connector which you want update.
columnFields New import fields.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateConnector(String projectName,String topicName) {
String connectorId = "";
//columnField代表的是同步到下游的所有字段,并不只是新增的字段
List<String> columnField = new ArrayList<>();
columnField.add("f1");
try {
batchClient.updateConnector(projectName, topicName,connectorId,columnField);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Connector state
UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
connectorState The state of the connector. Support: ConnectorState.STOPPED, ConnectorState.RUNNING.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateConnectorState(String projectName,String topicName) {
try {
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("update connector state successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新 Connector offset
UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
shardId The id of the shard. If shardId is null, then update all shards offset.
offset The connector offset.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateConnectorOffset(String projectName,String topicName) {
ConnectorOffset offset = new ConnectorOffset() {{
setSequence(10);
setTimestamp(1000);
}};
try {
//更新Connector点位需要先停止Connector
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("update connector offset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
列出 Connector
ListConnectorResult listConnector(String projectName, String topicName);
参数
projectName The name of the project.
topicName The name of the topic.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void listConnector(String projectName,String topicName) {
try {
ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
for (String cName : listConnectorResult.getConnectorNames()) {
System.out.println(cName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Connector Shard 状态
GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType); ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
shardId The id of the shard.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
try {
ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println(connectorShardStatusEntry.getState() + "\t"
+ connectorShardStatusEntry.getCurrSequence() + "\t"
+ connectorShardStatusEntry.getDiscardCount() + "\t"
+ connectorShardStatusEntry.getUpdateTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void getConnectorShardStatus(String projectName,String topicName) {
try {
GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
+ entry.getValue().getCurrSequence() + "\t"
+ entry.getValue().getDiscardCount() + "\t"
+ entry.getValue().getUpdateTime());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
重启 Connector
ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType); ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
shardId The id of the shard.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void reloadConnector(String projectName,String topicName ) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Connector 完成时间
GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void getDoneTime(String projectName,String topicName ) {
try {
GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorDoneTimeResult.getDoneTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新VPC白名单
UpdateProjectVpcWhitelistResult updateProjectVpcWhitelist(String projectName, String vpcIds);
参数
projectName The name of the project.
vpcids The vpcIds to modify.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void updateProjectVpcWhitelist(String projectName) {
String vpcid = "12345";
try {
datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
添加新的Field
AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName);
可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
fieldName The field to append. Field value must allow null.
Exception
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
示例
public static void appendConnectorField(String projectName,String topicName) {
String newField = "newfield";
try {
//要求topic的schema和MaxCompute的table中都存在列newfield,并且表结构完全一致
datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
example示例
批量操作
请使用console命令工具。