全部产品

C++ SDK

C++ SDK

安装

安装要求说明

Datahub C++ SDK目前必须使用GCC 4.9.2编译,使用前请检查编译环境是否适合,确认检查通过后再安装使用。

SDK下载

初始化


用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。

以下代码用于使用DataHub域名新建DataHubClient:

  1. /* Configuration */
  2. Account account;
  3. account.id = "";
  4. account.key = "=";
  5. std::string projectName = "test_project";
  6. std::string topicName = "test_cpp";
  7. std::string comment = "test";
  8. std::string endpoint = "";
  9. Configuration conf(account, endpoint);
  10. /* Datahub Client */
  11. DatahubClient client(conf);

Project操作


项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建project


CreateProjectResult cCeateProject(string projectName, string comment);

创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。

  • 参数
    • projectName project name
    • comment project comment
  • 示例
  1. void CreateProject()
  2. {
  3. try
  4. {
  5. client.CreateProject(projectName, comment);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

删除Project


DeleteProjectResult DeleteProject(string projectName);
删除Project时必须保证Project中已经没有Topic。

  • 参数
    • projectName project name
  • 示例
  1. void DeleteProject()
  2. {
  3. try
  4. {
  5. client.DeleteProject(projectName);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

更新Project


UpdateProjectResult UpdateProject(string projectName, string comment);
更新project信息,目前只支持更新comment。

  • 参数
    • projectName project name
    • comment project comment
  • 示例
    1. void UpdateProject()
    2. {
    3. try
    4. {
    5. client.UpdateProject(projectName,comment);
    6. }
    7. catch(DatahubException e)
    8. {
    9. cerr << e << endl;
    10. }
    11. }

列出Project


ListProjectResult ListProject();listProject
返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。

  • 示例
    1. void ListProject()
    2. {
    3. try
    4. {
    5. client.ListProject();
    6. std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
    7. }
    8. catch(DatahubException e)
    9. {
    10. cerr << e << endl;
    11. }
    12. }

查询Project


GetProjectResult GetProject(string projectName);

  • 参数
    • projectName project name
  • 示例
    1. void GetProject()
    2. {
    3. try
    4. {
    5. GetProjectResult projectResult = client.GetProject(projectName);
    6. std::cout<<projectResult.GetProject()<<std::endl;
    7. std::cout<<projectResult.GetComment()<<std::endl;
    8. std::cout<<projectResult.GetCreator()<<std::endl;
    9. std::cout<<projectResult.GetCreateTime()<<std::endl;
    10. std::cout<<projectResult.GetLastModifyTime()<<std::endl;
    11. }
    12. catch(DatahubException e)
    13. {
    14. cerr << e << endl;
    15. }
    16. }

Topic操作


Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:

  1. Blob类型Topic支持写入一块二进制数据作为一个Record
  2. Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:
类型 含义 值域
BIGINT 8字节有符号整型 -9223372036854775807 ~ 9223372036854775807
DOUBLE 8字节双精度浮点数 -1.0 10^308 ~ 1.0 10^308
BOOLEAN 布尔类型 True/False或true/false或0/1
TIMESTAMP 时间戳类型 表示到微秒的时间戳类型
STRING 字符串,只支持UTF-8编码 单个STRING列最长允许1MB

创建Tuple Topic


CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, string comment);

  • 参数

    • projectName The name of the project in which you create.
    • topicName The name of the topic.
    • shardCount The initial shard count of the topic.
    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordType The type of the record you want to write. Now support TUPLE and BLOB.
    • recordSchema The records schema of this topic.
    • comment The comment of the topic.
  • 示例
  1. void CreateTupleTopic(){
  2. try
  3. {
  4. client.CreateTopic(projectName, topicName, shardCount,lifeCycle,RecordType.TUPLE, schema,comment);
  5. }
  6. catch(DatahubException e)
  7. {
  8. cerr << e << endl;
  9. }
  10. }

创建Blob Topic


CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, string comment);

  • 参数
    • projectName The name of the project in which you create.
    • topicName The name of the topic.
    • shardCount The initial shard count of the topic.
    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordType The type of the record you want to write. Now support TUPLE and BLOB.
    • comment The comment of the topic.
  • 示例
  1. void CreateBlobTopic()
  2. {
  3. try
  4. {
  5. client.CreateTopic(projectName,topicName,shardCount,lifeCycle,RecordType.BLOB,comment);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

删除Topic


DeleteTopicResult DeleteTopic(string projectName, string topicName);
删除topic之前要保证topic中没有subscription和connector

  • 参数
    • projectName The name of the project in which you delete.
    • topicName The name of the topic.
  • 示例
  1. void DeleteTopic()
  2. {
  3. try
  4. {
  5. ListTopicResult listTopicResult = client.ListTopic(projectName);
  6. cout<<listTopicResult.GetTopicNames().size()<<std::endl;
  7. }
  8. catch(DatahubException e)
  9. {
  10. cerr << e << endl;
  11. }
  12. }

列出Topic


ListTopicResult ListTopic(string projectName);

  • 参数
    • projectName The name of the project in which you list.
  • 示例
  1. void ListTopic(){
  2. try
  3. {
  4. ListTopicResult listTopicResult = client.ListTopic(projectName);
  5. cout<<listTopicResult.GetTopicNames().size()<<std::endl;
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

更新Topic


UpdateTopicResult UpdateTopic(string projectName, string topicName,int updateLifecycle, string comment);

  • 参数
    • projectName The name of the project in which you list.
    • topicName The name of the topic.
    • comment The comment to modify.
    • updateLifecycle The lifeCycle of topic
  • 示例:
  1. void GetTopic(){
  2. try
  3. {
  4. client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
  5. }
  6. catch(DatahubException e)
  7. {
  8. cerr << e << endl;
  9. }
  10. }

查询Topic


GetTopicResult GetTopic(string projectName, tring topicName);

  • 参数
    • projectName The name of the project in which you get.
    • topicName The name of the topic.
  • 示例
  1. void GetTopic(){
  2. try
  3. {
  4. GetTopicResult getTopicResult = client.GetTopic(projectName, topicName);
  5. cout<<getTopicResult.GetComment()<<endl;
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

Shard操作


Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出Shard

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
  • 示例
  1. void ListShard()
  2. {
  3. try
  4. {
  5. ListShardResult listShardResult = client.ListShard(projectName, topicName);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

分裂Shard


SplitShardResult splitShard(String projectName, String topicName, String shardId);
SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);
指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。

  • 参数
    • projectName The name of the project
    • topicName The name of the topic.
    • shardId The shard which to split.
    • splitKey The split key which is used to split shard.
  • 示例
    1. void SplitShard()
    2. {
    3. try
    4. {
    5. SplitShardResult splitShardResult = Client.SplitShard(projectName, topicName, shardId);
    6. for (ShardEntry entry : splitShardResult.getNewShards())
    7. {
    8. std::cout << "shardId is: " << entry << std::endl;
    9. }
    10. }
    11. catch(DatahubException e)
    12. {
    13. cerr << e << endl;
    14. }
    15. }

合并Shard


合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The shard which to split.
    • splitKey The split key which is used to split shard.
  • 示例

    1. void MergeShard()
    2. {
    3. try
    4. {
    5. MergeShardResult MergeShardResult = Client.MergeShard(projectName, topicName, shardId, adjacentShardId);
    6. }
    7. catch(DatahubException e)
    8. {
    9. cerr << e << endl;
    10. }
    11. }

读写数据


状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

读数据

获取cursor


读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式 有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
  • LATEST : 表示获取的cursor指向当前最新的record。
  • SEQUENCE : 表示获取的cursor指向该序列的record。
  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。


GetCursorResult GetCursor(String projectName, String topicName, String shardId, CursorType type);

GetCursorResult GetCursor(String projectName, String topicName, String shardId, int64_t timestamp);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • CursorType Which type used to get cursor.

      示例

      1. void GetCursor()
      2. {
      3. try
      4. {
      5. GetCursorResult r1 = client.GetCursor(projectName, topicName, "0", OLDEST);
      6. std::string cursor = r1.GetCursor();
      7. }
      8. catch(DatahubException e)
      9. {
      10. cerr << e << endl;
      11. }
      12. }


      读取数据接口

      GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • cursor The start cursor used to read data.
    • limit Max record size to read.
  • 示例

    • 读取Tuple topic数据

      1. void GetRecord()
      2. {
      3. try
      4. {
      5. GetRecordResult r2 = client.GetRecord(projectName, topicName, "0", cursor, 1000);
      6. int count = r2.GetRecordCount();
      7. std::cout << "read record count is: " << count << std::endl;
      8. for (int i = 0; i < count; ++i)
      9. {
      10. const RecordResult& recordResult = r2.GetRecord(i);
      11. for (int j = 0; j < schema.GetFieldCount(); ++j)
      12. {
      13. const Field& field = schema.GetField(j);
      14. const FieldType fieldType = field.GetFieldType();
      15. switch(fieldType)
      16. {
      17. case BIGINT:
      18. std::cout << recordResult.GetBigint(j) << std::endl;
      19. break;
      20. case DOUBLE:
      21. printf("%.15lf\n", recordResult.GetDouble(j));
      22. break;
      23. case STRING:
      24. std::cout << recordResult.GetString(j) << std::endl;
      25. break;
      26. default:
      27. break;
      28. }
      29. }
      30. }
      31. }
      32. catch(DatahubException e)
      33. {
      34. cerr << e << endl;
      35. }
      36. }

      写数据

      PutRecordsResult putRecords(String projectName, String topicName, vector records,string compress);
      PutRecordsResult putRecordByShard(String projectName, String topicName, String shardId,vector records);

  • 参数

    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • records Records list to written.
  • 示例
  1. void PutRecord()
  2. {
  3. try{
  4. std::vector<RecordEntry> records;
  5. for (int32_t i = 0; i < 100; ++i)
  6. {
  7. RecordEntry record(schema.GetFieldCount());
  8. record.SetShardId("0");
  9. for (int i = 0; i < schema.GetFieldCount(); ++i)
  10. {
  11. const Field& field = schema.GetField(i);
  12. const FieldType fieldType = field.GetFieldType();
  13. switch(fieldType)
  14. {
  15. case BIGINT:
  16. record.SetBigint(i, 1);
  17. break;
  18. case DOUBLE:
  19. record.SetDouble(i, 117.120799999999);
  20. break;
  21. case STRING:
  22. record.SetString(i, "345");
  23. break;
  24. default:
  25. break;
  26. }
  27. }
  28. records.push_back(record);
  29. }
  30. PutRecordResult put_ret0 = client.PutRecord(projectName, topicName, records, "lz4");
  31. std::cout<<put_ret0.GetFailedRecordCount()<<std::endl;
  32. }
  33. catch(DatahubException e)
  34. {
  35. cerr << e << endl;
  36. }
  37. }

Meter操作


GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String day);
GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String shardId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • day the topic of date
    • shardId The id of the shard.


示例

  1. void GetMeter()
  2. {
  3. try
  4. {
  5. GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

Subscribtion操作


订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建 Subscription


CreateSubscriptionResult CreateSubscription(String projectName, String topicName, String comment);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • comment The comment of the subscription.
  • 示例
  1. void CreateSubscription()
  2. {
  3. try
  4. {
  5. CreateSubscriptionResult createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

删除 Subscription


DeleteSubscriptionResult DeleteSubscription(string projectName, string topicName, string subId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
  • 示例
  1. void DeleteSubscription()
  2. {
  3. try
  4. {
  5. client.DeleteSubscription(projectName, topicName, subId);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

更新 Subscription


UpdateSubscriptionResult UpdateSubscription(string projectName, string topicName, string subId, String comment);
更新已存在的Subscription,目前只支持更新comment。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • comment The comment you want to update.
  • 示例
  1. void UpdateSubscription()
  2. {
  3. try
  4. {
  5. client.UpdateSubscription(projectName, topicName, subId, SubscriptionComment);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

列出 Subscription


ListSubscriptionResult ListSubscription(string projectName, string topicName, int pageNum, int pageSize);
_
listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • pageNum The page number used to list subscriptions.
    • pageSize The page size used to list subscriptions.
  • 示例代码
  1. void ListSubscription()
  2. {
  3. try
  4. {
  5. ListSubscriptionResult subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

查询 Subscription

GetSubscriptionResult GetSubscription(string projectName, string topicName, string subId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
  1. void GetSubscription()
  2. {
  3. try
  4. {
  5. GetSubscriptionResult getSubscriptionResult = client.GetSubscription(projectName, topicName, subId);
  6. std::cout << getSubscriptionResult.GetComment() << std::endl;
  7. }
  8. catch(DatahubException e)
  9. {
  10. cerr << e << endl;
  11. }
  12. }

更新 Subscription 状态


UpdateSubscriptionStateResult UpdateSubscriptionState(string projectName, string topicName, string subId, SubscriptionState state);
_
Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • state The state you want to change.
  • 示例
  1. void UpdateSubscriptionState()
  2. {
  3. try
  4. {
  5. client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
  6. std::cout << getSubscriptionResult.GetComment() << std::endl;
  7. }
  8. catch(DatahubException e)
  9. {
  10. cerr << e << endl;
  11. }
  12. }

offset 操作


一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化offset


OpenSubscriptionOffsetSessionResult InitSubscriptionOffsetSession(string projectName, string topicName, string subId, StringVec shardIds);


openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • shardIds The id list of the shards.

示例

  1. void InitSubscriptionOffsetSession()
  2. {
  3. try
  4. {
  5. OpenSubscriptionOffsetSessionResult offsetSessionResult =
  6. client.InitSubscriptionOffsetSession(projectName, topicName, createSubscriptionResult.GetSubId(), shardIds);
  7. std::cout << offsetSessionResult.GetOffsets().size() << std::endl;
  8. }
  9. catch(DatahubException e)
  10. {
  11. cerr << e << endl;
  12. }
  13. }

获取 offset


GetSubscriptionOffsetResult GetSubscriptionOffset(string projectName, string topicName, string subId, StringVec shardIds);
_
getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • shardIds The id list of the shards.
  • 示例
  1. //获取点位
  2. void GetSubscriptionOffset()
  3. {
  4. try
  5. {
  6. GetSubscriptionOffsetResult getSubscriptionOffsetResult =client.GetSubscriptionOffset(projectName, topicName,subId, shardIds);
  7. std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
  8. std::cout << getSubscriptionResult.GetComment() << std::endl;
  9. }
  10. catch(DatahubException e)
  11. {
  12. cerr << e << endl;
  13. }
  14. }

重置 offset


ResetSubscriptionOffsetResult ResetSubscriptionOffset(string projectName, string topicName, string subId, Map offsets);

重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • subId The id of the subscription.
    • offsets The offset map of shards.

示例:

  1. void ResetSubscriptionOffset()
  2. {
  3. try
  4. {
  5. client.ResetSubscriptionOffset(projectName, topicName, SubId, resetSubscriptionOffsets);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }


Connector 操作


DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建 Connector


CreateConnectorResult CreateConnector(string projectName, string topicName, ConnectorType connectorType, List columnFields, SinkConfig config);
CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, int64_t sinkStartTime, StringVec columnFields, SinkConfig config);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want create.
    • columnFields Which fields you want synchronize.
    • sinkStartTime Start time to sink from datahub. Unit: Ms
    • config Detail config of specified connector type.
  • ODPS示例
  1. void CreateConnector()
  2. {
  3. SinkOdpsConfig config = new SinkOdpsConfig() {{
  4. setEndpoint(odps_endpoint);
  5. setProject(odps_project);
  6. setTable(odps_table);
  7. setAccessId(odps_accessId);
  8. setAccessKey(odps_accessKey);
  9. setPartitionMode(PartitionMode.SYSTEM_TIME);
  10. setTimeRange(60);
  11. }};
  12. //设置分区格式
  13. SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
  14. addConfig("ds", "%Y%m%d");
  15. addConfig("hh", "%H");
  16. addConfig("mm", "%M");
  17. }};
  18. config.setPartitionConfig(partitionConfig);
  19. try
  20. {
  21. CreateConnectorResult connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
  22. std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
  23. }
  24. catch(DatahubException e)
  25. {
  26. cerr << e << endl;
  27. }
  28. }

删除 Connector


DeleteConnectorResult DeleteConnector(string projectName, string topicName, string connectorId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • connectorId the connectorId of topic
  • 示例
  1. void DeleteConnector()
  2. {
  3. try
  4. {
  5. client.DeleteConnector(projectName, topicName, connectorId);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

查询 Connector


GetConnectorResult getConnectorResult (projectName,topicName,ConnectorType.SINK_ODPS);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector which you want get.
  • 示例
  1. void GetConnector()
  2. {
  3. try
  4. {
  5. GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

更新 Connector


UpdateConnectorResult UpdateConnector(string projectName, string topicName, string connectorId,SinkConfig config);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • connectorId The connectorId of topic
    • config Detail config of specified connector type.
  • 示例
  1. void UpdateConnector()
  2. {
  3. SinkOdpsConfig updateConfig;
  4. updateConfig.SetEndpoint("ODPS_ENDPOINT");
  5. updateConfig.SetProject("ODPS_PROJECT");
  6. updateConfig.SetTable("ODPS_TABLE");
  7. updateConfig.SetAccessId("ODPS_ACCESSID");
  8. updateConfig.SetAccessKey("ODPS_ACCESSKEY");
  9. updateConfig.SetPartitionMode(sdk::PartitionMode::EVENT_TIME);
  10. updateConfig.SetTimeRange(30);
  11. updateConfig.SetPartitionConfig(updatePartitionConfig);
  12. try
  13. {
  14. GetConnectorResult updateConnectorResult = client.GetConnector(projectName, topicName, connectorId);
  15. }
  16. catch(DatahubException e)
  17. {
  18. cerr << e << endl;
  19. }
  20. }

更新 Connector state


UpdateConnectorStateResult UpdateConnectorState(string projectName, string topicName, string connectorId, ConnectorState connectorState);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • connectorId The connectorId of topic
    • connectorState The state of the connector. Support: ConnectorState.PAUSED, ConnectorState.RUNNING.
  • 示例
  1. void UpdateConnectorState()
  2. {
  3. try
  4. {
  5. client.UpdateConnectorState(projectName, topicName, connectorId, sdk::ConnectorState::CONNECTOR_STOPPED);
  6. }
  7. catch(DatahubException e)
  8. {
  9. cerr << e << endl;
  10. }
  11. }

更新 Connector offset


UpdateConnectorOffsetResult UpdateConnectorOffset(string projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • ConnectorType The type of connector.
    • shardId The id of the shard. If shardId is null, then update all shards offset.
    • offset The connector offset.
  • 示例
  1. void UpdateConnectorOffset()
  2. {
  3. ConnectorOffset offset = new ConnectorOffset() {{
  4. setSequence(10);
  5. setTimestamp(1000);
  6. }};
  7. try
  8. {
  9. //更新Connector点位需要先停止Connector
  10. Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.STOPPED);
  11. Client.UpdateConnectorOffset(projectName, topicName, connectorId, connectorOffset1);
  12. Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.RUNNING);
  13. }
  14. catch(DatahubException e){
  15. cerr << e << endl;
  16. }
  17. }

列出 Connector

ListConnectorResult ListConnector(string projectName, string topicName);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
  • 示例
  1. void ListConnector() {
  2. try {
  3. const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
  4. std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
  5. std::string connectorId = listConnectorResult.GetConnectorIds()[0];
  6. } catch(std::exception& e){
  7. cerr << e << endl;
  8. }
  9. }

查询 Connector Shard 状态

ConnectorShardStatusEntry GetConnectorShardStatusByShard(string projectName, string topicName, string connectorId,string shardId);


ConnectorShardStatusEntry GetConnectorShardStatus(string projectName, string topicName, string connectorId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • connectorId The connectorId of topic
  • 示例
  1. void GetConnectorShardStatusByShard() {
  2. try {
  3. const GetConnectorShardStatusByShardResult& shardStatusByShardResult = client.GetConnectorShardStatusByShard(projectName, topicName, connectorId, "0");
  4. sdk::ConnectorShardStatusEntry statuUpdateSubscriptionsEntry = shardStatusByShardResult.GetStatusEntry();
  5. std::cout << (statusEntry.GetState()>= sdk::ConnectorShardState::CONTEXT_HANG && statusEntry.GetState() <= sdk::ConnectorShardState::CONTEXT_FINISHED) << std::endl;
  6. } catch(std::exception& e){
  7. cerr << e << endl;
  8. }
  9. }

重启 Connector

ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId);


ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId, string shardId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • shardId The id of the shard.
    • connectorId The connectorId of topic
  • 示例
  1. void ReloadConnector() {
  2. try {
  3. client.ReloadConnector(projectName, topicName, connectorId);
  4. } catch(std::exception& e){
  5. cerr << e << endl;
  6. }
  7. }

查询 Connector 完成时间

GetConnectorDoneTimeResult GetConnectorDoneTime(string projectName, string topicName, string connectorId);

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • connectorId The connectorId of topic
  • 示例
  1. void GetDoneTime() {
  2. try {
  3. const GetConnectorDoneTimeResult& doneTimeResult = client.GetConnectorDoneTime(projectName, topicName, connectorId);
  4. std::cout << doneTimeResult.GetDoneTime() << std::endl;
  5. } catch(std::exception& e){
  6. cerr << e << endl;
  7. }
  8. }

添加新的Field

AppendConnectorFieldResult AppendConnectorField(string projectName, string topicName, string connectorId, String fieldName);


可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。

  • 参数
    • projectName The name of the project.
    • topicName The name of the topic.
    • fieldName The field to append. Field value must allow null.
    • connectorId The connectorId of topic
  • 示例
  1. void AppendConnectorField() {
  2. try {
  3. client.AppendConnectorField(projectName, topicName, connectorId, fieldName2);
  4. } catch(std::exception& e){
  5. cerr << e << endl;
  6. }
  7. }