本文为您展示DataHub的 C++ SDK的Topic操作。
Topic说明
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据,目前支持Tuple与Blob两种类型:
Blob类型Topic支持写入一块二进制数据作为一个Record。
Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:
类型
含义
值域
BIGINT
8字节有符号整型
-9223372036854775807 ~ 9223372036854775807
DOUBLE
8字节双精度浮点数
-1.0 _10^308 ~ 1.0 _10^308
BOOLEAN
布尔类型
可取以下任意一组:
True/False
true/false
0/1
TIMESTAMP
时间戳类型
表示到微秒的时间戳。
STRING
字符串,只支持UTF-8编码
单个STRING列最长允许2MB。
TINYINT
单字节整型
-128 ~127
SMALLINT
双字节整型
-32768 ~ 32767
INTEGER
4字节整型
-2147483648 ~ 2147483647
FLOAT
4字节单精度浮点数
-3.40292347_10^38 ~ 3.40292347_10^38
创建Topic
创建Tuple Topic
参数说明
参数名 | 参数类型 | 参数说明 |
fieldName | string | field名称。 |
fieldComment | string | field说明。 |
代码示例
void CreateTupleTopic(){
RecordSchema schema;
std::string fieldName1 = "a";
std::string fieldName2 = "b";
std::string fieldName3 = "c";
std::string fieldComment1 = "field1 comment";
std::string fieldComment2 = "field2 comment";
std::string fieldComment3 = "field3 comment";
schema.AddField(Field(fieldName1/*Fieldname*/, BIGINT, true, fieldComment1));
schema.AddField(Field(fieldName2/*Fieldname*/, DOUBLE, true, fieldComment2));
schema.AddField(Field(fieldName3/*Fieldname*/, STRING, true, fieldComment3));
/* Create Topic */
int shardCount = 3;
int lifeCycle = 7;
RecordType type = TUPLE;
std::string projectName = "";
std::string topicName = "";
try
{
client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, schema, comment);
}
catch (const DatahubException& e)
{
std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
创建Blob Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | string | 项目名称。 |
topicName | string | Topic名称。 |
代码示例
void CreateBlobTopic()
{
/* Create Topic */
int shardCount = 3;
int lifeCycle = 7;
RecordType type = BLOB;
std::string projectName = "";
std::string topicName = "";
try
{
client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, comment);
}
catch (const DatahubException& e)
{
std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
}
删除Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | string | 项目名称。 |
topicName | string | Topic名称。 |
代码示例
void DeleteTopic()
{
std::string projectName = "";
std::string topicName = "";
try
{
client.DeleteTopic(projectName, topicName);
}
catch(const DatahubException& e)
{
std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
获取Topic列表
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | string | 项目名称。 |
代码示例
void ListTopic(){
std::string projectName = "";
try
{
const ListTopicResult& listTopicResult = client.ListTopic(projectName);
std::cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
更新Topic
参数说明
参数名 | 参数类型 | 参数说明 |
updateComment | string | 更新说明。 |
updateLifecycle | int | 更新生命周期。 |
projectName | string | 项目名称。 |
topicName | string | Topic名称。 |
代码示例
void UpdateTopic(){
const std::string updateComment = "test1";
int updateLifecycle = 7;
std::string projectName = "";
std::string topicName = "";
try
{
client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
}
catch(const DatahubException& e)
{
std::cerr << "Update topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
查询Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | string | 项目名称。 |
topicName | string | Topic名称。 |
代码示例
void GetTopic(){
std::string projectName = "";
std::string topicName = "";
try
{
const GetTopicResult& getTopicResult = client.GetTopic(projectName, topicName);
cout<<getTopicResult.GetComment()<<endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}