C++ SDK
C++ SDK
安装
安装要求说明
DataHub C++ SDK目前必须使用GCC 4.9.2编译,使用前请检查编译环境是否适合,确认检查通过后再安装使用。
SDK下载
初始化
用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。 以下代码用于使用域名列表新建DataHubClient:
/* Configuration */
Account account;
account.id = "";
account.key = "=";
std::string projectName = "test_project";
std::string topicName = "test_cpp";
std::string comment = "test";
std::string endpoint = "";
Configuration conf(account, endpoint);
/* Datahub Client */
DatahubClient client(conf);
Project操作
项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。
创建project
CreateProjectResult cCeateProject(string projectName, string comment); 创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。
参数
projectName project name
comment project comment
示例
void CreateProject()
{
try
{
client.CreateProject(projectName, comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
删除Project
DeleteProjectResult DeleteProject(string projectName);删除Project时必须保证Project中已经没有Topic。
参数
projectName project name
示例
void DeleteProject()
{
try
{
client.DeleteProject(projectName);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新Project
UpdateProjectResult UpdateProject(string projectName, string comment); 更新project信息,目前只支持更新comment。
参数
projectName project name
comment project comment
示例
void UpdateProject() { try { client.UpdateProject(projectName,comment); } catch(DatahubException e) { cerr << e << endl; } }
列出Project
ListProjectResult ListProject();listProject返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。
示例
void ListProject() { try { client.ListProject(); std::cout<<listProjectResult.GetProjectNames().size()<<std::endl; } catch(DatahubException e) { cerr << e << endl; } }
查询Project
GetProjectResult GetProject(string projectName);
参数
projectName project name
示例
void GetProject() { try { GetProjectResult projectResult = client.GetProject(projectName); std::cout<<projectResult.GetProject()<<std::endl; std::cout<<projectResult.GetComment()<<std::endl; std::cout<<projectResult.GetCreator()<<std::endl; std::cout<<projectResult.GetCreateTime()<<std::endl; std::cout<<projectResult.GetLastModifyTime()<<std::endl; } catch(DatahubException e) { cerr << e << endl; } }
Topic操作
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:
Blob类型Topic支持写入一块二进制数据作为一个Record
Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:
类型 |
含义 |
值域 |
BIGINT |
8字节有符号整型 |
-9223372036854775807 ~ 9223372036854775807 |
DOUBLE |
8字节双精度浮点数 |
-1.0 10^308 ~ 1.0 10^308 |
BOOLEAN |
布尔类型 |
True/False或true/false或0/1 |
TIMESTAMP |
时间戳类型 |
表示到微秒的时间戳类型 |
STRING |
字符串,只支持UTF-8编码 |
单个STRING列最长允许1MB |
创建Tuple Topic
CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, string comment);
参数
projectName The name of the project in which you create.
topicName The name of the topic.
shardCount The initial shard count of the topic.
lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
recordType The type of the record you want to write. Now support TUPLE and BLOB.
recordSchema The records schema of this topic.
comment The comment of the topic.
示例
void CreateTupleTopic(){
try
{
client.CreateTopic(projectName, topicName, shardCount,lifeCycle,RecordType.TUPLE, schema,comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
创建Blob Topic
CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, string comment);
参数
projectName The name of the project in which you create.
topicName The name of the topic.
shardCount The initial shard count of the topic.
lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
recordType The type of the record you want to write. Now support TUPLE and BLOB.
comment The comment of the topic.
示例
void CreateBlobTopic()
{
try
{
client.CreateTopic(projectName,topicName,shardCount,lifeCycle,RecordType.BLOB,comment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
删除Topic
DeleteTopicResult DeleteTopic(string projectName, string topicName);删除topic之前要保证topic中没有subscription和connector
参数
projectName The name of the project in which you delete.
topicName The name of the topic.
示例
void DeleteTopic()
{
try
{
ListTopicResult listTopicResult = client.ListTopic(projectName);
cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
列出Topic
ListTopicResult ListTopic(string projectName);
参数
projectName The name of the project in which you list.
示例
void ListTopic(){
try
{
ListTopicResult listTopicResult = client.ListTopic(projectName);
cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新Topic
UpdateTopicResult UpdateTopic(string projectName, string topicName,int updateLifecycle, string comment);
参数
projectName The name of the project in which you list.
topicName The name of the topic.
comment The comment to modify.
updateLifecycle The lifeCycle of topic
示例:
void GetTopic(){
try
{
client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
查询Topic
GetTopicResult GetTopic(string projectName, tring topicName);
参数
projectName The name of the project in which you get.
topicName The name of the topic.
示例
void GetTopic(){
try
{
GetTopicResult getTopicResult = client.GetTopic(projectName, topicName);
cout<<getTopicResult.GetComment()<<endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Shard操作
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
列出Shard
参数
projectName The name of the project.
topicName The name of the topic.
示例
void ListShard()
{
try
{
ListShardResult listShardResult = client.ListShard(projectName, topicName);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
分裂Shard
SplitShardResult splitShard(String projectName, String topicName, String shardId); SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。
参数
projectName The name of the project
topicName The name of the topic.
shardId The shard which to split.
splitKey The split key which is used to split shard.
示例
void SplitShard() { try { SplitShardResult splitShardResult = Client.SplitShard(projectName, topicName, shardId); for (ShardEntry entry : splitShardResult.getNewShards()) { std::cout << "shardId is: " << entry << std::endl; } } catch(DatahubException e) { cerr << e << endl; } }
合并Shard
合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。
参数
projectName The name of the project.
topicName The name of the topic.
shardId The shard which to split.
splitKey The split key which is used to split shard.
示例
void MergeShard() { try { MergeShardResult MergeShardResult = Client.MergeShard(projectName, topicName, shardId, adjacentShardId); } catch(DatahubException e) { cerr << e << endl; } }
读写数据
状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。
读数据
获取cursor
读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
LATEST : 表示获取的cursor指向当前最新的record。
SEQUENCE : 表示获取的cursor指向该序列的record。
SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。
GetCursorResult GetCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult GetCursor(String projectName, String topicName, String shardId, int64_t timestamp);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
CursorType Which type used to get cursor.
示例
void GetCursor() { try { GetCursorResult r1 = client.GetCursor(projectName, topicName, "0", OLDEST); std::string cursor = r1.GetCursor(); } catch(DatahubException e) { cerr << e << endl; } }
读取数据接口
GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
cursor The start cursor used to read data.
limit Max record size to read.
示例
读取Tuple topic数据
void GetRecord() { try { GetRecordResult r2 = client.GetRecord(projectName, topicName, "0", cursor, 1000); int count = r2.GetRecordCount(); std::cout << "read record count is: " << count << std::endl; for (int i = 0; i < count; ++i) { const RecordResult& recordResult = r2.GetRecord(i); for (int j = 0; j < schema.GetFieldCount(); ++j) { const Field& field = schema.GetField(j); const FieldType fieldType = field.GetFieldType(); switch(fieldType) { case BIGINT: std::cout << recordResult.GetBigint(j) << std::endl; break; case DOUBLE: printf("%.15lf\n", recordResult.GetDouble(j)); break; case STRING: std::cout << recordResult.GetString(j) << std::endl; break; default: break; } } } } catch(DatahubException e) { cerr << e << endl; } }
写数据
PutRecordsResult putRecords(String projectName, String topicName, vector records,string compress);PutRecordsResult putRecordByShard(String projectName, String topicName, String shardId,vector records);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
records Records list to written.
示例
void PutRecord()
{
try{
std::vector<RecordEntry> records;
for (int32_t i = 0; i < 100; ++i)
{
RecordEntry record(schema.GetFieldCount());
record.SetShardId("0");
for (int i = 0; i < schema.GetFieldCount(); ++i)
{
const Field& field = schema.GetField(i);
const FieldType fieldType = field.GetFieldType();
switch(fieldType)
{
case BIGINT:
record.SetBigint(i, 1);
break;
case DOUBLE:
record.SetDouble(i, 117.120799999999);
break;
case STRING:
record.SetString(i, "345");
break;
default:
break;
}
}
records.push_back(record);
}
PutRecordResult put_ret0 = client.PutRecord(projectName, topicName, records, "lz4");
std::cout<<put_ret0.GetFailedRecordCount()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Meter操作
GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String day); GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String shardId);
参数
projectName The name of the project.
topicName The name of the topic.
day the topic of date
shardId The id of the shard.
示例
void GetMeter()
{
try
{
GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Subscribtion操作
订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。
创建 Subscription
CreateSubscriptionResult CreateSubscription(String projectName, String topicName, String comment);
参数
projectName The name of the project.
topicName The name of the topic.
comment The comment of the subscription.
示例
void CreateSubscription()
{
try
{
CreateSubscriptionResult createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
删除 Subscription
DeleteSubscriptionResult DeleteSubscription(string projectName, string topicName, string subId);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
示例
void DeleteSubscription()
{
try
{
client.DeleteSubscription(projectName, topicName, subId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新 Subscription
UpdateSubscriptionResult UpdateSubscription(string projectName, string topicName, string subId, String comment);更新已存在的Subscription,目前只支持更新comment。
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
comment The comment you want to update.
示例
void UpdateSubscription()
{
try
{
client.UpdateSubscription(projectName, topicName, subId, SubscriptionComment);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
列出 Subscription
ListSubscriptionResult ListSubscription(string projectName, string topicName, int pageNum, int pageSize);_listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。
参数
projectName The name of the project.
topicName The name of the topic.
pageNum The page number used to list subscriptions.
pageSize The page size used to list subscriptions.
示例代码
void ListSubscription()
{
try
{
ListSubscriptionResult subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
查询 Subscription
GetSubscriptionResult GetSubscription(string projectName, string topicName, string subId);
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
void GetSubscription()
{
try
{
GetSubscriptionResult getSubscriptionResult = client.GetSubscription(projectName, topicName, subId);
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新 Subscription 状态
UpdateSubscriptionStateResult UpdateSubscriptionState(string projectName, string topicName, string subId, SubscriptionState state);_Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
state The state you want to change.
示例
void UpdateSubscriptionState()
{
try
{
client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
offset 操作
一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。
初始化offset
OpenSubscriptionOffsetSessionResult InitSubscriptionOffsetSession(string projectName, string topicName, string subId, StringVec shardIds);openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
shardIds The id list of the shards.
示例
void InitSubscriptionOffsetSession()
{
try
{
OpenSubscriptionOffsetSessionResult offsetSessionResult =
client.InitSubscriptionOffsetSession(projectName, topicName, createSubscriptionResult.GetSubId(), shardIds);
std::cout << offsetSessionResult.GetOffsets().size() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
获取 offset
GetSubscriptionOffsetResult GetSubscriptionOffset(string projectName, string topicName, string subId, StringVec shardIds);_getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象
参数
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
shardIds The id list of the shards.
示例
//获取点位
void GetSubscriptionOffset()
{
try
{
GetSubscriptionOffsetResult getSubscriptionOffsetResult =client.GetSubscriptionOffset(projectName, topicName,subId, shardIds);
std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
重置 offset
ResetSubscriptionOffsetResult ResetSubscriptionOffset(string projectName, string topicName, string subId, Map offsets);重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。
参数
-
projectName The name of the project.
topicName The name of the topic.
subId The id of the subscription.
offsets The offset map of shards.
示例:
void ResetSubscriptionOffset()
{
try
{
client.ResetSubscriptionOffset(projectName, topicName, SubId, resetSubscriptionOffsets);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Connector 操作
DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。
创建 Connector
CreateConnectorResult CreateConnector(string projectName, string topicName, ConnectorType connectorType, List columnFields, SinkConfig config);CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, int64_t sinkStartTime, StringVec columnFields, SinkConfig config);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector which you want create.
columnFields Which fields you want synchronize.
sinkStartTime Start time to sink from datahub. Unit: Ms
config Detail config of specified connector type.
ODPS示例
void CreateConnector()
{
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(odps_endpoint);
setProject(odps_project);
setTable(odps_table);
setAccessId(odps_accessId);
setAccessKey(odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
//设置分区格式
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try
{
CreateConnectorResult connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
删除 Connector
DeleteConnectorResult DeleteConnector(string projectName, string topicName, string connectorId);
参数
projectName The name of the project.
topicName The name of the topic.
connectorId the connectorId of topic
示例
void DeleteConnector()
{
try
{
client.DeleteConnector(projectName, topicName, connectorId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
查询 Connector
GetConnectorResult getConnectorResult (projectName,topicName,ConnectorType.SINK_ODPS);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector which you want get.
示例
void GetConnector()
{
try
{
GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新 Connector
UpdateConnectorResult UpdateConnector(string projectName, string topicName, string connectorId,SinkConfig config);
参数
projectName The name of the project.
topicName The name of the topic.
connectorId The connectorId of topic
config Detail config of specified connector type.
示例
void UpdateConnector()
{
SinkOdpsConfig updateConfig;
updateConfig.SetEndpoint("ODPS_ENDPOINT");
updateConfig.SetProject("ODPS_PROJECT");
updateConfig.SetTable("ODPS_TABLE");
updateConfig.SetAccessId("ODPS_ACCESSID");
updateConfig.SetAccessKey("ODPS_ACCESSKEY");
updateConfig.SetPartitionMode(sdk::PartitionMode::EVENT_TIME);
updateConfig.SetTimeRange(30);
updateConfig.SetPartitionConfig(updatePartitionConfig);
try
{
GetConnectorResult updateConnectorResult = client.GetConnector(projectName, topicName, connectorId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新 Connector state
UpdateConnectorStateResult UpdateConnectorState(string projectName, string topicName, string connectorId, ConnectorState connectorState);
参数
projectName The name of the project.
topicName The name of the topic.
connectorId The connectorId of topic
connectorState The state of the connector. Support: ConnectorState.PAUSED, ConnectorState.RUNNING.
示例
void UpdateConnectorState()
{
try
{
client.UpdateConnectorState(projectName, topicName, connectorId, sdk::ConnectorState::CONNECTOR_STOPPED);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
更新 Connector offset
UpdateConnectorOffsetResult UpdateConnectorOffset(string projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);
参数
projectName The name of the project.
topicName The name of the topic.
ConnectorType The type of connector.
shardId The id of the shard. If shardId is null, then update all shards offset.
offset The connector offset.
示例
void UpdateConnectorOffset()
{
ConnectorOffset offset = new ConnectorOffset() {{
setSequence(10);
setTimestamp(1000);
}};
try
{
//更新Connector点位需要先停止Connector
Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.STOPPED);
Client.UpdateConnectorOffset(projectName, topicName, connectorId, connectorOffset1);
Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.RUNNING);
}
catch(DatahubException e){
cerr << e << endl;
}
}
列出 Connector
ListConnectorResult ListConnector(string projectName, string topicName);
参数
projectName The name of the project.
topicName The name of the topic.
示例
void ListConnector() {
try {
const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
std::string connectorId = listConnectorResult.GetConnectorIds()[0];
} catch(std::exception& e){
cerr << e << endl;
}
}
查询 Connector Shard 状态
ConnectorShardStatusEntry GetConnectorShardStatusByShard(string projectName, string topicName, string connectorId,string shardId);
ConnectorShardStatusEntry GetConnectorShardStatus(string projectName, string topicName, string connectorId);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
connectorId The connectorId of topic
示例
void GetConnectorShardStatusByShard() {
try {
const GetConnectorShardStatusByShardResult& shardStatusByShardResult = client.GetConnectorShardStatusByShard(projectName, topicName, connectorId, "0");
sdk::ConnectorShardStatusEntry statuUpdateSubscriptionsEntry = shardStatusByShardResult.GetStatusEntry();
std::cout << (statusEntry.GetState()>= sdk::ConnectorShardState::CONTEXT_HANG && statusEntry.GetState() <= sdk::ConnectorShardState::CONTEXT_FINISHED) << std::endl;
} catch(std::exception& e){
cerr << e << endl;
}
}
重启 Connector
ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId);
ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId, string shardId);
参数
projectName The name of the project.
topicName The name of the topic.
shardId The id of the shard.
connectorId The connectorId of topic
示例
void ReloadConnector() {
try {
client.ReloadConnector(projectName, topicName, connectorId);
} catch(std::exception& e){
cerr << e << endl;
}
}
查询 Connector 完成时间
GetConnectorDoneTimeResult GetConnectorDoneTime(string projectName, string topicName, string connectorId);
参数
projectName The name of the project.
topicName The name of the topic.
connectorId The connectorId of topic
示例
void GetDoneTime() {
try {
const GetConnectorDoneTimeResult& doneTimeResult = client.GetConnectorDoneTime(projectName, topicName, connectorId);
std::cout << doneTimeResult.GetDoneTime() << std::endl;
} catch(std::exception& e){
cerr << e << endl;
}
}
添加新的Field
AppendConnectorFieldResult AppendConnectorField(string projectName, string topicName, string connectorId, String fieldName);
可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。
参数
projectName The name of the project.
topicName The name of the topic.
fieldName The field to append. Field value must allow null.
connectorId The connectorId of topic
示例
void AppendConnectorField() {
try {
client.AppendConnectorField(projectName, topicName, connectorId, fieldName2);
} catch(std::exception& e){
cerr << e << endl;
}
}