本文为您展示DataHub的 Java SDK的Topic操作。
Topic说明
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据,目前支持Tuple与Blob两种类型:
Blob类型Topic支持写入一块二进制数据作为一个Record。
Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。
仅支持以下数据类型:
类型
含义
值域
BIGINT
8字节有符号整型
-9223372036854775807 ~ 9223372036854775807
DOUBLE
8字节双精度浮点数
-1.0 _10^308 ~ 1.0 _10^308
BOOLEAN
布尔类型
可取以下任意一组:
True/False
true/false
0/1
TIMESTAMP
时间戳类型
表示到微秒的时间戳。
STRING
字符串,只支持UTF-8编码
单个STRING列最长允许2MB。
TINYINT
单字节整型
-128 ~127
SMALLINT
双字节整型
-32768 ~ 32767
INTEGER
4字节整型
-2147483648 ~ 2147483647
FLOAT
4字节单精度浮点数
-3.40292347_10^38 ~ 3.40292347_10^38
说明DataHub 中的
TINYINT
、SMALLINT
、INTEGER
、FLOAT
类型从java sdk 2.16.1-public开始支持。
创建Topic
创建Tuple Topic
创建Blob Topic
删除Topic
删除Topic之前需保证Topic中没有subscription和connector,否则会异常:NoPermissionException。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
异常描述
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
NoPermissionException |
| 没有权限,通常是RAM配置不正确,或没有正确授权子账号。 |
代码示例
public static void deleteTopic(String projectName, String topicName) {
try {
datahubClient.deleteTopic(projectName, topicName);
System.out.println("delete topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
列出Topic
以列表的形式列出配置项目下的所有Topic。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
代码示例
public static void listTopic(String projectName ) {
try {
ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
if (listTopicResult.getTopicNames().size() > 0) {
for (String tName : listTopicResult.getTopicNames()) {
System.out.println(tName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新Topic
更新Topic信息,可更新Topic的描述以及Topic的生命周期。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
comment | int | Topic描述 |
lifeCycle | String | Topic生命周期 |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
try {
comment = "new topic comment";
lifeCycle = 1;
datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
System.out.println("update topic successful");
//查看更新后结果
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询Topic
根据项目名称和Topic名称来查询Topic的相关属性。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
public static void getTopic(String projectName, String topicName) {
try {
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getShardCount() + "\t"
+ getTopicResult.getLifeCycle() + "\t"
+ getTopicResult.getRecordType() + "\t"
+ getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}