更新时间:2021-01-11 20:12
Datahub C++ SDK目前必须使用GCC 4.9.2编译,使用前请检查编译环境是否适合,确认检查通过后再安装使用。
用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。
以下代码用于使用DataHub域名新建DataHubClient:
/* Configuration */
Account account;
account.id = "";
account.key = "=";
std::string projectName = "test_project";
std::string topicName = "test_cpp";
std::string comment = "test";
std::string endpoint = "";
Configuration conf(account, endpoint);
/* Datahub Client */
DatahubClient client(conf);
项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。
CreateProjectResult cCeateProject(string projectName, string comment);
创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。
void CreateProject()
{
try
{
client.CreateProject(projectName, comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
DeleteProjectResult DeleteProject(string projectName);
删除Project时必须保证Project中已经没有Topic。
void DeleteProject()
{
try
{
client.DeleteProject(projectName);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateProjectResult UpdateProject(string projectName, string comment);
更新project信息,目前只支持更新comment。
void UpdateProject()
{
try
{
client.UpdateProject(projectName,comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
ListProjectResult ListProject();listProject
返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。
void ListProject()
{
try
{
client.ListProject();
std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
GetProjectResult GetProject(string projectName);
void GetProject()
{
try
{
GetProjectResult projectResult = client.GetProject(projectName);
std::cout<<projectResult.GetProject()<<std::endl;
std::cout<<projectResult.GetComment()<<std::endl;
std::cout<<projectResult.GetCreator()<<std::endl;
std::cout<<projectResult.GetCreateTime()<<std::endl;
std::cout<<projectResult.GetLastModifyTime()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:
类型 | 含义 | 值域 |
---|---|---|
BIGINT | 8字节有符号整型 | -9223372036854775807 ~ 9223372036854775807 |
DOUBLE | 8字节双精度浮点数 | -1.0 10^308 ~ 1.0 10^308 |
BOOLEAN | 布尔类型 | True/False或true/false或0/1 |
TIMESTAMP | 时间戳类型 | 表示到微秒的时间戳类型 |
STRING | 字符串,只支持UTF-8编码 | 单个STRING列最长允许1MB |
CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, string comment);
参数
void CreateTupleTopic(){
try
{
client.CreateTopic(projectName, topicName, shardCount,lifeCycle,RecordType.TUPLE, schema,comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, string comment);
void CreateBlobTopic()
{
try
{
client.CreateTopic(projectName,topicName,shardCount,lifeCycle,RecordType.BLOB,comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
DeleteTopicResult DeleteTopic(string projectName, string topicName);
删除topic之前要保证topic中没有subscription和connector
void DeleteTopic()
{
try
{
ListTopicResult listTopicResult = client.ListTopic(projectName);
cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
ListTopicResult ListTopic(string projectName);
void ListTopic(){
try
{
ListTopicResult listTopicResult = client.ListTopic(projectName);
cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateTopicResult UpdateTopic(string projectName, string topicName,int updateLifecycle, string comment);
void GetTopic(){
try
{
client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
GetTopicResult GetTopic(string projectName, tring topicName);
void GetTopic(){
try
{
GetTopicResult getTopicResult = client.GetTopic(projectName, topicName);
cout<<getTopicResult.GetComment()<<endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
void ListShard()
{
try
{
ListShardResult listShardResult = client.ListShard(projectName, topicName);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
SplitShardResult splitShard(String projectName, String topicName, String shardId);
SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);
指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。
void SplitShard()
{
try
{
SplitShardResult splitShardResult = Client.SplitShard(projectName, topicName, shardId);
for (ShardEntry entry : splitShardResult.getNewShards())
{
std::cout << "shardId is: " << entry << std::endl;
}
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。
示例
void MergeShard()
{
try
{
MergeShardResult MergeShardResult = Client.MergeShard(projectName, topicName, shardId, adjacentShardId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。
读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式 有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
GetCursorResult GetCursor(String projectName, String topicName, String shardId, CursorType type);
GetCursorResult GetCursor(String projectName, String topicName, String shardId, int64_t timestamp);
参数
CursorType Which type used to get cursor.
void GetCursor()
{
try
{
GetCursorResult r1 = client.GetCursor(projectName, topicName, "0", OLDEST);
std::string cursor = r1.GetCursor();
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
读取数据接口
GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);
参数
示例
读取Tuple topic数据
void GetRecord()
{
try
{
GetRecordResult r2 = client.GetRecord(projectName, topicName, "0", cursor, 1000);
int count = r2.GetRecordCount();
std::cout << "read record count is: " << count << std::endl;
for (int i = 0; i < count; ++i)
{
const RecordResult& recordResult = r2.GetRecord(i);
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field& field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch(fieldType)
{
case BIGINT:
std::cout << recordResult.GetBigint(j) << std::endl;
break;
case DOUBLE:
printf("%.15lf\n", recordResult.GetDouble(j));
break;
case STRING:
std::cout << recordResult.GetString(j) << std::endl;
break;
default:
break;
}
}
}
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
PutRecordsResult putRecords(String projectName, String topicName, vector records,string compress);
PutRecordsResult putRecordByShard(String projectName, String topicName, String shardId,vector records);
参数
void PutRecord()
{
try{
std::vector<RecordEntry> records;
for (int32_t i = 0; i < 100; ++i)
{
RecordEntry record(schema.GetFieldCount());
record.SetShardId("0");
for (int i = 0; i < schema.GetFieldCount(); ++i)
{
const Field& field = schema.GetField(i);
const FieldType fieldType = field.GetFieldType();
switch(fieldType)
{
case BIGINT:
record.SetBigint(i, 1);
break;
case DOUBLE:
record.SetDouble(i, 117.120799999999);
break;
case STRING:
record.SetString(i, "345");
break;
default:
break;
}
}
records.push_back(record);
}
PutRecordResult put_ret0 = client.PutRecord(projectName, topicName, records, "lz4");
std::cout<<put_ret0.GetFailedRecordCount()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String day);
GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String shardId);
示例
void GetMeter()
{
try
{
GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。
CreateSubscriptionResult CreateSubscription(String projectName, String topicName, String comment);
void CreateSubscription()
{
try
{
CreateSubscriptionResult createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
DeleteSubscriptionResult DeleteSubscription(string projectName, string topicName, string subId);
void DeleteSubscription()
{
try
{
client.DeleteSubscription(projectName, topicName, subId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateSubscriptionResult UpdateSubscription(string projectName, string topicName, string subId, String comment);
更新已存在的Subscription,目前只支持更新comment。
void UpdateSubscription()
{
try
{
client.UpdateSubscription(projectName, topicName, subId, SubscriptionComment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
ListSubscriptionResult ListSubscription(string projectName, string topicName, int pageNum, int pageSize);
_
listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。
void ListSubscription()
{
try
{
ListSubscriptionResult subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
GetSubscriptionResult GetSubscription(string projectName, string topicName, string subId);
void GetSubscription()
{
try
{
GetSubscriptionResult getSubscriptionResult = client.GetSubscription(projectName, topicName, subId);
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateSubscriptionStateResult UpdateSubscriptionState(string projectName, string topicName, string subId, SubscriptionState state);
_
Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。
void UpdateSubscriptionState()
{
try
{
client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。
OpenSubscriptionOffsetSessionResult InitSubscriptionOffsetSession(string projectName, string topicName, string subId, StringVec shardIds);
openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。
示例
void InitSubscriptionOffsetSession()
{
try
{
OpenSubscriptionOffsetSessionResult offsetSessionResult =
client.InitSubscriptionOffsetSession(projectName, topicName, createSubscriptionResult.GetSubId(), shardIds);
std::cout << offsetSessionResult.GetOffsets().size() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
GetSubscriptionOffsetResult GetSubscriptionOffset(string projectName, string topicName, string subId, StringVec shardIds);
_
getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象
//获取点位
void GetSubscriptionOffset()
{
try
{
GetSubscriptionOffsetResult getSubscriptionOffsetResult =client.GetSubscriptionOffset(projectName, topicName,subId, shardIds);
std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
ResetSubscriptionOffsetResult ResetSubscriptionOffset(string projectName, string topicName, string subId, Map offsets);
重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。
示例:
void ResetSubscriptionOffset()
{
try
{
client.ResetSubscriptionOffset(projectName, topicName, SubId, resetSubscriptionOffsets);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。
CreateConnectorResult CreateConnector(string projectName, string topicName, ConnectorType connectorType, List columnFields, SinkConfig config);
CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, int64_t sinkStartTime, StringVec columnFields, SinkConfig config);
void CreateConnector()
{
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(odps_endpoint);
setProject(odps_project);
setTable(odps_table);
setAccessId(odps_accessId);
setAccessKey(odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
//设置分区格式
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try
{
CreateConnectorResult connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
DeleteConnectorResult DeleteConnector(string projectName, string topicName, string connectorId);
void DeleteConnector()
{
try
{
client.DeleteConnector(projectName, topicName, connectorId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
GetConnectorResult getConnectorResult (projectName,topicName,ConnectorType.SINK_ODPS);
void GetConnector()
{
try
{
GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateConnectorResult UpdateConnector(string projectName, string topicName, string connectorId,SinkConfig config);
void UpdateConnector()
{
SinkOdpsConfig updateConfig;
updateConfig.SetEndpoint("ODPS_ENDPOINT");
updateConfig.SetProject("ODPS_PROJECT");
updateConfig.SetTable("ODPS_TABLE");
updateConfig.SetAccessId("ODPS_ACCESSID");
updateConfig.SetAccessKey("ODPS_ACCESSKEY");
updateConfig.SetPartitionMode(sdk::PartitionMode::EVENT_TIME);
updateConfig.SetTimeRange(30);
updateConfig.SetPartitionConfig(updatePartitionConfig);
try
{
GetConnectorResult updateConnectorResult = client.GetConnector(projectName, topicName, connectorId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateConnectorStateResult UpdateConnectorState(string projectName, string topicName, string connectorId, ConnectorState connectorState);
void UpdateConnectorState()
{
try
{
client.UpdateConnectorState(projectName, topicName, connectorId, sdk::ConnectorState::CONNECTOR_STOPPED);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
UpdateConnectorOffsetResult UpdateConnectorOffset(string projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);
void UpdateConnectorOffset()
{
ConnectorOffset offset = new ConnectorOffset() {{
setSequence(10);
setTimestamp(1000);
}};
try
{
//更新Connector点位需要先停止Connector
Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.STOPPED);
Client.UpdateConnectorOffset(projectName, topicName, connectorId, connectorOffset1);
Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.RUNNING);
}
catch(DatahubException e){
cerr << e << endl;
}
}
ListConnectorResult ListConnector(string projectName, string topicName);
void ListConnector() {
try {
const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
std::string connectorId = listConnectorResult.GetConnectorIds()[0];
} catch(std::exception& e){
cerr << e << endl;
}
}
ConnectorShardStatusEntry GetConnectorShardStatusByShard(string projectName, string topicName, string connectorId,string shardId);
ConnectorShardStatusEntry GetConnectorShardStatus(string projectName, string topicName, string connectorId);
void GetConnectorShardStatusByShard() {
try {
const GetConnectorShardStatusByShardResult& shardStatusByShardResult = client.GetConnectorShardStatusByShard(projectName, topicName, connectorId, "0");
sdk::ConnectorShardStatusEntry statuUpdateSubscriptionsEntry = shardStatusByShardResult.GetStatusEntry();
std::cout << (statusEntry.GetState()>= sdk::ConnectorShardState::CONTEXT_HANG && statusEntry.GetState() <= sdk::ConnectorShardState::CONTEXT_FINISHED) << std::endl;
} catch(std::exception& e){
cerr << e << endl;
}
}
ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId);
ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId, string shardId);
void ReloadConnector() {
try {
client.ReloadConnector(projectName, topicName, connectorId);
} catch(std::exception& e){
cerr << e << endl;
}
}
GetConnectorDoneTimeResult GetConnectorDoneTime(string projectName, string topicName, string connectorId);
void GetDoneTime() {
try {
const GetConnectorDoneTimeResult& doneTimeResult = client.GetConnectorDoneTime(projectName, topicName, connectorId);
std::cout << doneTimeResult.GetDoneTime() << std::endl;
} catch(std::exception& e){
cerr << e << endl;
}
}
AppendConnectorFieldResult AppendConnectorField(string projectName, string topicName, string connectorId, String fieldName);
可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。
void AppendConnectorField() {
try {
client.AppendConnectorField(projectName, topicName, connectorId, fieldName2);
} catch(std::exception& e){
cerr << e << endl;
}
}
在文档使用中是否遇到以下问题
更多建议
匿名提交