C++ SDK

C++ SDK

安装

安装要求说明

DataHub C++ SDK目前必须使用GCC 4.9.2编译,使用前请检查编译环境是否适合,确认检查通过后再安装使用。

SDK下载

初始化

用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。 以下代码用于使用域名列表新建DataHubClient:

     /* Configuration */
    Account account;
    account.id = "";
    account.key = "=";

    std::string projectName = "test_project";
    std::string topicName = "test_cpp";
    std::string comment = "test";

    std::string endpoint = "";
    Configuration conf(account, endpoint);

    /* Datahub Client */
    DatahubClient client(conf);

Project操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建project

CreateProjectResult cCeateProject(string projectName, string comment); 创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。

  • 参数

    • projectName project name

    • comment project comment

  • 示例

void CreateProject()
{
    try
    {
        client.CreateProject(projectName, comment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }
}

删除Project

DeleteProjectResult DeleteProject(string projectName);删除Project时必须保证Project中已经没有Topic。

  • 参数

    • projectName project name

  • 示例

void DeleteProject()
{
    try
    {
        client.DeleteProject(projectName);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }
}

更新Project

UpdateProjectResult UpdateProject(string projectName, string comment); 更新project信息,目前只支持更新comment。

  • 参数

    • projectName project name

    • comment project comment

  • 示例

    void UpdateProject()
    {
      try
      {
          client.UpdateProject(projectName,comment);
      }
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
    }

列出Project

ListProjectResult ListProject();listProject返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。

  • 示例

    void ListProject()
    {
      try
      {
          client.ListProject();
          std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
      }
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
    }

查询Project

GetProjectResult GetProject(string projectName);

  • 参数

    • projectName project name

  • 示例

    void GetProject()
    {
      try
      {
         GetProjectResult projectResult = client.GetProject(projectName);
         std::cout<<projectResult.GetProject()<<std::endl;
         std::cout<<projectResult.GetComment()<<std::endl;
         std::cout<<projectResult.GetCreator()<<std::endl;
         std::cout<<projectResult.GetCreateTime()<<std::endl;
         std::cout<<projectResult.GetLastModifyTime()<<std::endl;    
      }
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
    }

Topic操作

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:

  1. Blob类型Topic支持写入一块二进制数据作为一个Record

  2. Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:

类型

含义

值域

BIGINT

8字节有符号整型

-9223372036854775807 ~ 9223372036854775807

DOUBLE

8字节双精度浮点数

-1.0 10^308 ~ 1.0 10^308

BOOLEAN

布尔类型

True/False或true/false或0/1

TIMESTAMP

时间戳类型

表示到微秒的时间戳类型

STRING

字符串,只支持UTF-8编码

单个STRING列最长允许1MB

创建Tuple Topic

CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, string comment);

  • 参数

    • projectName The name of the project in which you create.

    • topicName The name of the topic.

    • shardCount The initial shard count of the topic.

    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.

    • recordType The type of the record you want to write. Now support TUPLE and BLOB.

    • recordSchema The records schema of this topic.

    • comment The comment of the topic.

  • 示例

void CreateTupleTopic(){
    try
    {
        client.CreateTopic(projectName, topicName, shardCount,lifeCycle,RecordType.TUPLE, schema,comment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }
}

创建Blob Topic

CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, string comment);

  • 参数

    • projectName The name of the project in which you create.

    • topicName The name of the topic.

    • shardCount The initial shard count of the topic.

    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.

    • recordType The type of the record you want to write. Now support TUPLE and BLOB.

    • comment The comment of the topic.

  • 示例

void CreateBlobTopic()
{
    try
    {
        client.CreateTopic(projectName,topicName,shardCount,lifeCycle,RecordType.BLOB,comment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

删除Topic

DeleteTopicResult DeleteTopic(string projectName, string topicName);删除topic之前要保证topic中没有subscription和connector

  • 参数

    • projectName The name of the project in which you delete.

    • topicName The name of the topic.

  • 示例

void DeleteTopic()
{
    try
    {
         ListTopicResult listTopicResult = client.ListTopic(projectName);
        cout<<listTopicResult.GetTopicNames().size()<<std::endl;   
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

列出Topic

ListTopicResult ListTopic(string projectName);

  • 参数

    • projectName The name of the project in which you list.

  • 示例

void ListTopic(){
    try
    {
         ListTopicResult listTopicResult = client.ListTopic(projectName);
         cout<<listTopicResult.GetTopicNames().size()<<std::endl;        
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

更新Topic

UpdateTopicResult UpdateTopic(string projectName, string topicName,int updateLifecycle, string comment);

  • 参数

    • projectName The name of the project in which you list.

    • topicName The name of the topic.

    • comment The comment to modify.

    • updateLifecycle The lifeCycle of topic

  • 示例:

void GetTopic(){
    try
    {
        client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

查询Topic

GetTopicResult GetTopic(string projectName, tring topicName);

  • 参数

    • projectName The name of the project in which you get.

    • topicName The name of the topic.

  • 示例

void GetTopic(){
    try
    {
        GetTopicResult getTopicResult = client.GetTopic(projectName, topicName);
        cout<<getTopicResult.GetComment()<<endl;
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

Shard操作

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出Shard

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

  • 示例

void ListShard()
{
    try
    {
        ListShardResult listShardResult = client.ListShard(projectName, topicName);

    } 
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

分裂Shard

SplitShardResult splitShard(String projectName, String topicName, String shardId); SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。

  • 参数

    • projectName The name of the project

    • topicName The name of the topic.

    • shardId The shard which to split.

    • splitKey The split key which is used to split shard.

  • 示例

    void SplitShard()
    {
      try
      {
           SplitShardResult splitShardResult = Client.SplitShard(projectName, topicName, shardId);
          for (ShardEntry entry : splitShardResult.getNewShards()) 
          {
              std::cout << "shardId is: " << entry << std::endl;
          }
      }
      catch(DatahubException e)
      {
          cerr << e << endl;
      }
    }

合并Shard

合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The shard which to split.

    • splitKey The split key which is used to split shard.

  • 示例

    void MergeShard()
    {
      try
      {
          MergeShardResult MergeShardResult = Client.MergeShard(projectName, topicName, shardId, adjacentShardId);
    
      }
      catch(DatahubException e)
      {
          cerr << e << endl;
      }
    }

读写数据

状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

读数据

获取cursor

读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。

  • LATEST : 表示获取的cursor指向当前最新的record。

  • SEQUENCE : 表示获取的cursor指向该序列的record。

  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

GetCursorResult GetCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult GetCursor(String projectName, String topicName, String shardId, int64_t timestamp);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • CursorType Which type used to get cursor.

      示例

      void GetCursor()
      {
      try
      {
         GetCursorResult r1 = client.GetCursor(projectName, topicName, "0", OLDEST);
         std::string cursor = r1.GetCursor();
      } 
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
      }

      读取数据接口

      GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • cursor The start cursor used to read data.

    • limit Max record size to read.

  • 示例

    • 读取Tuple topic数据

      void GetRecord()
      {
      try
      {
         GetRecordResult r2 = client.GetRecord(projectName, topicName, "0", cursor, 1000);
         int count = r2.GetRecordCount();
         std::cout << "read record count is: " << count << std::endl;
         for (int i = 0; i < count; ++i)
         {
             const RecordResult& recordResult = r2.GetRecord(i);
             for (int j = 0; j < schema.GetFieldCount(); ++j)
             {
                 const Field& field = schema.GetField(j);
                 const FieldType fieldType = field.GetFieldType();
                 switch(fieldType)
                 {
                     case BIGINT:
                         std::cout << recordResult.GetBigint(j) << std::endl;
                         break;
                     case DOUBLE:
                         printf("%.15lf\n", recordResult.GetDouble(j));
                         break;
                     case STRING:
                         std::cout << recordResult.GetString(j) << std::endl;
                         break;
                     default:
                     break;
                 }
             }
         }
      
      } 
      catch(DatahubException e)
      {
           cerr << e << endl;
      }
      }

      写数据

      PutRecordsResult putRecords(String projectName, String topicName, vector records,string compress);PutRecordsResult putRecordByShard(String projectName, String topicName, String shardId,vector records);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • records Records list to written.

  • 示例

void PutRecord()
{
    try{
        std::vector<RecordEntry> records;
        for (int32_t i = 0; i < 100; ++i)
        {
            RecordEntry record(schema.GetFieldCount());
            record.SetShardId("0");
            for (int i = 0; i < schema.GetFieldCount(); ++i)
            {
                const Field& field = schema.GetField(i);
                const FieldType fieldType = field.GetFieldType();
                switch(fieldType)
                {
                    case BIGINT:
                           record.SetBigint(i, 1);
                           break;
                    case DOUBLE:
                        record.SetDouble(i, 117.120799999999);
                        break;
                    case STRING:
                        record.SetString(i, "345");
                        break;
                    default:
                        break;
                }
            }
            records.push_back(record);
        }
    PutRecordResult put_ret0 = client.PutRecord(projectName, topicName, records, "lz4");
    std::cout<<put_ret0.GetFailedRecordCount()<<std::endl;
        } 
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Meter操作

GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String day); GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String shardId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • day the topic of date

    • shardId The id of the shard.

示例

void GetMeter()
{
    try
    {
        GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);

    }
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Subscribtion操作

订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建 Subscription

CreateSubscriptionResult CreateSubscription(String projectName, String topicName, String comment);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • comment The comment of the subscription.

  • 示例

void CreateSubscription()
{
  try
  {
     CreateSubscriptionResult createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);

  } 
  catch(DatahubException e)
  {
     cerr << e << endl;
  }
}

删除 Subscription

DeleteSubscriptionResult DeleteSubscription(string projectName, string topicName, string subId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

  • 示例

void DeleteSubscription() 
{
  try 
  {
      client.DeleteSubscription(projectName, topicName, subId);

  } 
  catch(DatahubException e)
  {
     cerr << e << endl;
  }
}

更新 Subscription

UpdateSubscriptionResult UpdateSubscription(string projectName, string topicName, string subId, String comment);更新已存在的Subscription,目前只支持更新comment。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • comment The comment you want to update.

  • 示例

void UpdateSubscription()
{
    try 
    {
        client.UpdateSubscription(projectName, topicName, subId, SubscriptionComment);

      }
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

列出 Subscription

ListSubscriptionResult ListSubscription(string projectName, string topicName, int pageNum, int pageSize);_listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • pageNum The page number used to list subscriptions.

    • pageSize The page size used to list subscriptions.

  • 示例代码

 void ListSubscription() 
 {
    try 
    {
        ListSubscriptionResult subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId);

    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

查询 Subscription

GetSubscriptionResult GetSubscription(string projectName, string topicName, string subId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

void GetSubscription() 
{
    try 
    {
        GetSubscriptionResult  getSubscriptionResult = client.GetSubscription(projectName, topicName, subId);
        std::cout << getSubscriptionResult.GetComment() << std::endl;
    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

更新 Subscription 状态

UpdateSubscriptionStateResult UpdateSubscriptionState(string projectName, string topicName, string subId, SubscriptionState state);_Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • state The state you want to change.

  • 示例

 void UpdateSubscriptionState()
 {
    try 
    {
        client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

offset 操作

一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化offset

OpenSubscriptionOffsetSessionResult InitSubscriptionOffsetSession(string projectName, string topicName, string subId, StringVec shardIds);openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • shardIds The id list of the shards.

示例

 void InitSubscriptionOffsetSession()
 {
    try 
    {
        OpenSubscriptionOffsetSessionResult offsetSessionResult =
        client.InitSubscriptionOffsetSession(projectName, topicName, createSubscriptionResult.GetSubId(), shardIds);
        std::cout << offsetSessionResult.GetOffsets().size() << std::endl;

    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

获取 offset

GetSubscriptionOffsetResult GetSubscriptionOffset(string projectName, string topicName, string subId, StringVec shardIds);_getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • shardIds The id list of the shards.

  • 示例

//获取点位
void GetSubscriptionOffset() 
{
    try 
    {
        GetSubscriptionOffsetResult  getSubscriptionOffsetResult =client.GetSubscriptionOffset(projectName, topicName,subId, shardIds);
        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

重置 offset

ResetSubscriptionOffsetResult ResetSubscriptionOffset(string projectName, string topicName, string subId, Map offsets);重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • offsets The offset map of shards.

示例:

void ResetSubscriptionOffset() 
{
    try 
    {
        client.ResetSubscriptionOffset(projectName, topicName, SubId, resetSubscriptionOffsets);
    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Connector 操作

DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建 Connector

CreateConnectorResult CreateConnector(string projectName, string topicName, ConnectorType connectorType, List columnFields, SinkConfig config);CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, int64_t sinkStartTime, StringVec columnFields, SinkConfig config);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector which you want create.

    • columnFields Which fields you want synchronize.

    • sinkStartTime Start time to sink from datahub. Unit: Ms

    • config Detail config of specified connector type.

  • ODPS示例

void  CreateConnector() 
{
    SinkOdpsConfig config = new SinkOdpsConfig() {{
        setEndpoint(odps_endpoint);
        setProject(odps_project);
        setTable(odps_table);
        setAccessId(odps_accessId);
        setAccessKey(odps_accessKey);
        setPartitionMode(PartitionMode.SYSTEM_TIME);
        setTimeRange(60);
    }};
    //设置分区格式
    SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
        addConfig("ds", "%Y%m%d");
        addConfig("hh", "%H");
        addConfig("mm", "%M");
    }};
    config.setPartitionConfig(partitionConfig);
    try 
    {
        CreateConnectorResult  connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
        std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

删除 Connector

DeleteConnectorResult DeleteConnector(string projectName, string topicName, string connectorId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • connectorId the connectorId of topic

  • 示例

void DeleteConnector() 
{
    try 
    {
         client.DeleteConnector(projectName, topicName, connectorId);
      } 
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

查询 Connector

GetConnectorResult getConnectorResult (projectName,topicName,ConnectorType.SINK_ODPS);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector which you want get.

  • 示例

void GetConnector() 
{
    try 
    {
        GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

更新 Connector

UpdateConnectorResult UpdateConnector(string projectName, string topicName, string connectorId,SinkConfig config);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • connectorId The connectorId of topic

    • config Detail config of specified connector type.

  • 示例

 void UpdateConnector() 
 {
    SinkOdpsConfig updateConfig;
    updateConfig.SetEndpoint("ODPS_ENDPOINT");
    updateConfig.SetProject("ODPS_PROJECT");
    updateConfig.SetTable("ODPS_TABLE");
    updateConfig.SetAccessId("ODPS_ACCESSID");
    updateConfig.SetAccessKey("ODPS_ACCESSKEY");
    updateConfig.SetPartitionMode(sdk::PartitionMode::EVENT_TIME);
    updateConfig.SetTimeRange(30);
    updateConfig.SetPartitionConfig(updatePartitionConfig);
       try 
    {
         GetConnectorResult updateConnectorResult = client.GetConnector(projectName, topicName, connectorId);
      } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

更新 Connector state

UpdateConnectorStateResult UpdateConnectorState(string projectName, string topicName, string connectorId, ConnectorState connectorState);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • connectorId The connectorId of topic

    • connectorState The state of the connector. Support: ConnectorState.PAUSED, ConnectorState.RUNNING.

  • 示例

 void UpdateConnectorState() 
 {
    try 
    {
        client.UpdateConnectorState(projectName, topicName, connectorId, sdk::ConnectorState::CONNECTOR_STOPPED);
    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
  }

更新 Connector offset

UpdateConnectorOffsetResult UpdateConnectorOffset(string projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

    • shardId The id of the shard. If shardId is null, then update all shards offset.

    • offset The connector offset.

  • 示例

void UpdateConnectorOffset() 
{
    ConnectorOffset offset = new ConnectorOffset() {{
        setSequence(10);
        setTimestamp(1000);
    }};
    try 
    {
        //更新Connector点位需要先停止Connector
        Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.STOPPED);
           Client.UpdateConnectorOffset(projectName, topicName, connectorId, connectorOffset1);
        Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.RUNNING);
    } 
    catch(DatahubException e){
          cerr << e << endl;
    }
}

列出 Connector

说明

ListConnectorResult ListConnector(string projectName, string topicName);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

  • 示例

void ListConnector() {
  try {
       const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
        std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;

        std::string connectorId = listConnectorResult.GetConnectorIds()[0];
  } catch(std::exception& e){
          cerr << e << endl;
    }
 }

查询 Connector Shard 状态

说明

ConnectorShardStatusEntry GetConnectorShardStatusByShard(string projectName, string topicName, string connectorId,string shardId);

ConnectorShardStatusEntry GetConnectorShardStatus(string projectName, string topicName, string connectorId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • connectorId The connectorId of topic

  • 示例

 void GetConnectorShardStatusByShard() {
    try {
         const GetConnectorShardStatusByShardResult& shardStatusByShardResult = client.GetConnectorShardStatusByShard(projectName, topicName, connectorId, "0");

    sdk::ConnectorShardStatusEntry statuUpdateSubscriptionsEntry = shardStatusByShardResult.GetStatusEntry();

    std::cout << (statusEntry.GetState()>= sdk::ConnectorShardState::CONTEXT_HANG && statusEntry.GetState() <= sdk::ConnectorShardState::CONTEXT_FINISHED) << std::endl;
  }  catch(std::exception& e){
          cerr << e << endl;
    }
}

重启 Connector

说明

ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId);

ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId, string shardId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • connectorId The connectorId of topic

  • 示例

 void ReloadConnector() {
    try {
    client.ReloadConnector(projectName, topicName, connectorId);

  }  catch(std::exception& e){
          cerr << e << endl;
    }
}

查询 Connector 完成时间

说明

GetConnectorDoneTimeResult GetConnectorDoneTime(string projectName, string topicName, string connectorId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • connectorId The connectorId of topic

  • 示例

 void GetDoneTime() {
    try {
        const GetConnectorDoneTimeResult& doneTimeResult = client.GetConnectorDoneTime(projectName, topicName, connectorId);
        std::cout << doneTimeResult.GetDoneTime() << std::endl;

  }  catch(std::exception& e){
          cerr << e << endl;
    }

}

添加新的Field

说明

AppendConnectorFieldResult AppendConnectorField(string projectName, string topicName, string connectorId, String fieldName);

可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • fieldName The field to append. Field value must allow null.

    • connectorId The connectorId of topic

  • 示例

 void AppendConnectorField() {
    try {
        client.AppendConnectorField(projectName, topicName, connectorId, fieldName2);

  } catch(std::exception& e){
          cerr << e << endl;
    }

}
阿里云首页 数据总线 DataHub 相关技术圈