安装
安装要求说明
DataHub C++ SDK目前必须使用GCC 4.9.2编译,使用前请检查编译环境是否适合,确认检查通过后再安装使用。
SDK下载
初始化
用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。 以下代码用于使用域名列表新建DataHubClient:
/* Configuration */
Account account;
account.id = "";
account.key = "=";
std::string projectName = "test_project";
std::string topicName = "test_cpp";
std::string comment = "test";
std::string endpoint = "";
Configuration conf(account, endpoint);
/* Datahub Client */
DatahubClient client(conf);
Project操作
项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。
创建project
void CreateProject()
{
std::string projectName = "";
std::string comment = "";
try
{
client.CreateProject(projectName, comment);
}
catch(const DatahubException& e)
{
std::cerr << "Create project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
删除Project
void DeleteProject()
{
std::string projectName = "";
try
{
client.DeleteProject(projectName);
}
catch(const DatahubException& e)
{
std::cerr << "Delete project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
更新Project
void UpdateProject()
{
std::string projectName = "";
std::string comment = "";
try
{
client.UpdateProject(projectName,comment);
}
catch(const DatahubException& e)
{
std::cerr << "Update project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
获取Project列表
void ListProject()
{
try
{
const ListProjectResult& listProjectResult = client.ListProject();
std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "List project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
查询Project信息
void GetProject()
{
std::string projectName = "";
try
{
const GetProjectResult& projectResult = client.GetProject(projectName);
std::cout<<projectResult.GetProject()<<std::endl;
std::cout<<projectResult.GetComment()<<std::endl;
std::cout<<projectResult.GetCreator()<<std::endl;
std::cout<<projectResult.GetCreateTime()<<std::endl;
std::cout<<projectResult.GetLastModifyTime()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "get project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Topic操作
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:
Blob类型Topic支持写入一块二进制数据作为一个Record
Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:
类型 | 含义 | 值域 |
BIGINT | 8字节有符号整型 | -9223372036854775807 ~ 9223372036854775807 |
DOUBLE | 8字节双精度浮点数 | -1.0 _10^308 ~ 1.0 _10^308 |
BOOLEAN | 布尔类型 | True/False或true/false或0/1 |
TIMESTAMP | 时间戳类型 | 表示到微秒的时间戳类型 |
STRING | 字符串,只支持UTF-8编码 | 单个STRING列最长允许2MB |
TINYINT | 单字节整型 | -128 ~127 |
SMALLINT | 双字节整型 | -32768 ~ 32767 |
INTEGER | 4字节整型 | -2147483648 ~ 2147483647 |
FLOAT | 4字节单精度浮点数 | -3.40292347_10^38 ~ 3.40292347_10^38 |
创建Tuple Topic
void CreateTupleTopic(){
RecordSchema schema;
std::string fieldName1 = "a";
std::string fieldName2 = "b";
std::string fieldName3 = "c";
std::string fieldComment1 = "field1 comment";
std::string fieldComment2 = "field2 comment";
std::string fieldComment3 = "field3 comment";
schema.AddField(Field(fieldName1/*Fieldname*/, BIGINT, true, fieldComment1));
schema.AddField(Field(fieldName2/*Fieldname*/, DOUBLE, true, fieldComment2));
schema.AddField(Field(fieldName3/*Fieldname*/, STRING, true, fieldComment3));
/* Create Topic */
int shardCount = 3;
int lifeCycle = 7;
RecordType type = TUPLE;
std::string projectName = "";
std::string topicName = "";
try
{
client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, schema, comment);
}
catch (const DatahubException& e)
{
std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
创建Blob Topic
void CreateBlobTopic()
{
/* Create Topic */
int shardCount = 3;
int lifeCycle = 7;
RecordType type = BLOB;
std::string projectName = "";
std::string topicName = "";
try
{
client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, comment);
}
catch (const DatahubException& e)
{
std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
}
删除Topic
void DeleteTopic()
{
std::string projectName = "";
std::string topicName = "";
try
{
client.DeleteTopic(projectName, topicName);
}
catch(const DatahubException& e)
{
std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
获取Topic列表
void ListTopic(){
std::string projectName = "";
try
{
const ListTopicResult& listTopicResult = client.ListTopic(projectName);
std::cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
更新Topic
void UpdateTopic(){
const std::string updateComment = "test1";
int updateLifecycle = 7;
std::string projectName = "";
std::string topicName = "";
try
{
client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
}
catch(const DatahubException& e)
{
std::cerr << "Update topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
查询Topic
void GetTopic(){
std::string projectName = "";
std::string topicName = "";
try
{
const GetTopicResult& getTopicResult = client.GetTopic(projectName, topicName);
cout<<getTopicResult.GetComment()<<endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Shard操作
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
列出Shard列表
void ListShard()
{
try
{
const ListShardResult& lsr = client.ListShard(projectName, topicName);
std::cout<<lsr.GetShards().size()<<std::endl;
std::vector<ShardEntry> shards = lsr.GetShards();
std::vector<std::string> shardIds;
for (size_t i = 0; i < shards.size(); ++i)
{
ShardEntry shardEntry = shards[i];
shardIds.push_back(shardEntry.GetShardId());
}
}
catch(const DatahubException& e)
{
std::cerr << "List shard fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
分裂Shard
void SplitShard()
{
std::string projectName = "";
std::string topicName = "";
try
{
const SplitShardResult& ssr = client.SplitShard(projectName, topicName, "0", "00000000000000000000000000AAAAAA");
std::cout<<ssr.GetChildShards().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Split shardId fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
合并Shard
void MergeShard()
{
std::string projectName = "";
std::string topicName = "";
try
{
const MergeShardResult& msr = client.MergeShard(projectName, topicName, "0", "1");
std::cout<<msr.GetChildShard().GetShardId()<<std::endl;
std::cout<<msr.GetChildShard().GetBeginHashKey()<<std::endl;
std::cout<<msr.GetChildShard().GetEndHashKey()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Merge sahrd fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
读写数据
状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。
读数据
获取cursor
读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
LATEST : 表示获取的cursor指向当前最新的record。
SEQUENCE : 表示获取的cursor指向该序列的record。
SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。
void GetCursor()
{
try
{
const GetCursorResult& r1 = client.GetCursor(projectName, topicName, "0", CURSOR_TYPE_OLDEST);
std::string cursor = r1.GetCursor();
}
catch(const DatahubException& e)
{
std::cerr << "Get cursor fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
读取数据
void sub(DatahubClient& client)
{
std::string projectName = "";
std::string topicName = "";
std::string shardId = "0";
std::string cursor;
RecordSchema schema;
OpenSubscriptionOffsetSessionResult osor = client.InitSubscriptionOffsetSession(projectName, topicName, subId, {shardId});
SubscriptionOffset subscription = osor.GetOffsets().at(shardId);
if (subscription.GetSequence() < 0)
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
}
else
{
int64_t nextSequence = subscription.GetSequence() + 1;
try
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_SEQUENCE, nextSequence).GetCursor();
}
catch (const DatahubException& e)
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
}
}
int64_t readTotal = 0;
int fetchNum = 10;
while (true)
{
try
{
GetRecordResult grr = client.GetRecord(projectName, topicName, shardId, cursor, fetchNum, subId);
if (grr.GetRecordCount() <= 0)
{
std::cout << "Read null, wait for 1s." << std::endl;
sleep(1);
continue;
}
for (auto recordResult : grr.GetRecords())
{
ProcessRecords(recordResult);
if (++readTotal % 1000 == 0)
{
subscription.SetSequence(recordResult.GetSequence());
subscription.SetTimestamp(recordResult.GetSystemTime());
std::map<std::string, SubscriptionOffset> offsets;
offsets[shardId] = subscription;
try
{
client.UpdateSubscriptionOffset(projectName, topicName, subId, offsets);
}
catch (const DatahubException& e)
{
std::cerr << "Update subscription offset fail. requestId: "<< e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
throw ;
}
}
}
cursor = grr.GetNextCursor();
}
catch (const DatahubException& e)
{
std::cerr << "Get record fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
inline void ProcessRecords(const RecordResult& record)
{
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = record.GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field &field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld\n", record.GetBigint(j));
break;
case DOUBLE:
printf("%.15lf\n", record.GetDouble(j));
break;
case STRING:
printf("%s\n", record.GetString(j).c_str());
break;
default:
break;
}
}
}
}
}
协同消费数据示例:
using namespace aliyun;
using namespace aliyun::datahub;
std::atomic_int gTotalNum(0);
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
std::string gSubId;
bool gAutoAck;
int32_t gFetchNum;
int64_t gSessionTimeoutMs;
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -s --subId <string> set subscription id [Mandatory]\n"
" -a --autoAck <bool> set auto ack offset\n"
" -f --fetchNum <int> set fetch limit num\n"
" -S --sessionTimeoutMs <int> set session timeout (ms)\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "subId", 1, 0, 's' },
{ "autoAck", 1, 0, 'a'},
{ "fetchNum", 1, 0, 'f'},
{ "sessionTimeoutMs", 1, 0, 'S'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:s:a:f:S:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 's':
gSubId = optarg;
break;
case 'a':
gAutoAck = (bool)std::atoi(optarg);
break;
case 'f':
gFetchNum = std::atoi(optarg);
break;
case 'S':
gSessionTimeoutMs = (int64_t)std::atoi(optarg);
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
}
inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = recordPtr->GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field& field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld ", recordPtr->GetBigint(j));
break;
case INTEGER:
printf("%d ", recordPtr->GetInteger(j));
break;
case SMALLINT:
printf("%hd ", recordPtr->GetSmallint(j));
break;
case TINYINT:
printf("%hhd ", recordPtr->GetTinyint(j));
break;
case DOUBLE:
printf("%.5lf ", recordPtr->GetDouble(j));
break;
case FLOAT:
printf("%.5f ", recordPtr->GetFloat(j));
break;
case STRING:
printf("%s ", recordPtr->GetString(j).c_str());
break;
case BOOLEAN:
printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
break;
case TIMESTAMP:
printf("%ld ", recordPtr->GetTimestamp(j));
break;
case DECIMAL:
printf("%s ", recordPtr->GetDecimal(j).c_str());
break;
default:
break;
}
}
printf("\n");
}
}
void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
DatahubConsumer consumer(project, topic, subId, conf);
const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();
uint64_t readRecordNum = 0;
try{
while (true)
{
auto recordPtr = consumer.Read(60000);
if (recordPtr == nullptr)
{
break;
}
// TODO: 处理数据
ProcessRecords(topicMeta, recordPtr);
if (!gAutoAck)
{
recordPtr->GetMessageKey()->Ack(); // 如果auto_ack设置为false,则处理完数据执行Ack;
}
readRecordNum++;
}
}
catch (const std::exception& e)
{
std::cerr << "Read fail: " << e.what() << std::endl;
}
std::cout << "Read " << readRecordNum << " records total" << std::endl;
}
int main(int argc, char* argv[])
{
parse_args(argc, argv);
ConsumerConfiguration consumerConf(gAccount, gEndpoint);
consumerConf.SetLogFilePath("./DatahubCollaborativeConsumer.log");
consumerConf.SetAutoAckOffset(gAutoAck);
if (gFetchNum > 0)
{
consumerConf.SetFetchLimitNum(gFetchNum);
}
if (gSessionTimeoutMs > 0)
{
consumerConf.SetSessionTimeout(gSessionTimeoutMs);
}
CollaborativeConsume(gProjectName, gTopicName, gSubId, consumerConf);
return 0;
}
协同消费为sdk 2.25版本新增功能,之前版本不支持
写数据
void pub(DatahubClient& client)
{
std::vector<RecordEntry> records;
GenerateRecords(records);
try
{
PutRecordResult prr = client.PutRecord(projectName, topicName, records);
if (prr.GetFailedRecordCount() > 0)
{
pubRetry(client, prr.GetFailedRecords(), 3);
}
}
catch (const DatahubException& e)
{
std::cerr << "Put records fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
void pubRetry(DatahubClient& client, const std::vector<RecordEntry>& records, int retryTimes)
{
if (retryTimes <= 0)
{
std::cout << "Put record retry fail. records size: " << records.size() << std::endl;
return ;
}
try
{
PutRecordResult prr = client.PutRecord(projectName, topicName, records);
if (prr.GetFailedRecordCount() > 0)
{
pubRetry(client, prr.GetFailedRecords(), retryTimes-1);
}
else
{
std::cout << "Put records retry success." << std::endl;
}
}
catch (const DatahubException& e)
{
std::cerr << "Put records fail when put retry. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
inline void GenerateRecords(RecordEntryVec& records)
{
for (int i = 0; i < 10; i++)
{
if (schema.GetFieldCount() == 0)
{
// BLOB
RecordEntry record = RecordEntry(BLOB);
record.SetData("test_blob_data" + std::to_string(i));
records.push_back(record);
}
else
{
// TUPLE
// RecordEntry record = RecordEntry(5);
// record.SetString(0, "field_1_" + std::to_string(i));
// record.SetBigint(1, 123456789l);
// record.SetDouble(2, 123.456d);
// record.SetDouble(3, 654.32100d);
// record.SetString(4, "field_2_" + std::to_string(i));
// records.push_back(record);
}
}
}
}
异步写入示例:
using namespace aliyun;
using namespace aliyun::datahub;
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
int64_t gMaxBufferRecords = 0l;
int64_t gMaxBufferSize = 0l;
int64_t gMaxBufferTimeMs = 0l;
int64_t gMaxRecordPackQueueLimit = 0l;
StringVec gShardIds;
inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
size_t start_pos = 0, end_pos = src.size();
size_t pos = src.find(pre, start_pos);
while (pos != std::string::npos)
{
std::string addStr = src.substr(start_pos, pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
start_pos = pos + 1;
pos = src.find(pre, start_pos);
}
std::string addStr = src.substr(start_pos, end_pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
}
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -E --epochNum <int> set epoch num for write [Mandatory]\n"
" -N --recordNum <int> set record num for each epoch [Mandatory]\n"
" -R --maxBufferRecords <int> set max buffer record count\n"
" -S --maxBufferSize <int> set max buffer size\n"
" -M --maxBufferTimeMs <int> set max buffer time (ms)\n"
" -Q --maxRecordPackLimit <int> set max record pack queue limit\n"
" -H --shardIds <string> set shards (split by ',')\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "epochNum", 1, 0, 'E'},
{ "recordNum", 1, 0, 'N'},
{ "maxBufferRecords", 1, 0, 'R'},
{ "mMaxBufferSize", 1, 0, 'S'},
{ "maxBufferTimeMs", 1, 0, 'M'},
{ "maxRecordPackQueueLimit", 1, 0, 'Q'},
{ "shardIds", 1, 0, 'H'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:R:S:M:Q:H:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 'E':
gEpochNum = std::atoi(optarg);
break;
case 'N':
gRecordNum = std::atoi(optarg);
break;
case 'R':
gMaxBufferRecords = (int64_t)std::atoi(optarg);
break;
case 'S':
gMaxBufferSize = (int64_t)std::atoi(optarg);
break;
case 'M':
gMaxBufferTimeMs = (int64_t)std::atoi(optarg);
break;
case 'Q':
gMaxRecordPackQueueLimit = (int64_t)std::atoi(optarg);
break;
case 'H':
StringSplit(gShardIds, optarg, ",");
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
if (gEpochNum <= 0 || gRecordNum <= 0)
{
std::cerr << "Invalid parameter!" << std::endl;
exit(1);
}
}
inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
for (int i = 0; i < schema.GetFieldCount(); i++)
{
const Field& field = schema.GetField(i);
const FieldType& type = field.GetFieldType();
switch (type)
{
case BIGINT:
record.SetBigint(i, 1234l);
break;
case DOUBLE:
record.SetDouble(i, 1.234);
break;
case BOOLEAN:
record.SetBoolean(i, true);
break;
case TIMESTAMP:
record.SetTimestamp(i, 1234l);
break;
case STRING:
record.SetString(i, "1234");
break;
case DECIMAL:
record.SetDecimal(i, "1234");
break;
case INTEGER:
record.SetInteger(i, (int32_t)1234);
break;
case FLOAT:
record.SetFloat(i, 1.234);
break;
case TINYINT:
record.SetTinyint(i, (int8_t)1234);
break;
case SMALLINT:
record.SetSmallint(i, (int16_t)1234);
break;
default:
break;
}
}
}
inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::vector<WriteResultFuturePtr> resultFutureVecs;
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// TODO: 生成数据
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
auto result = producer.WriteAsync(records);
resultFutureVecs.push_back(result);
}
}
catch(const std::exception& e)
{
std::cerr << "WriteAsync fail: " << e.what() << std::endl;
}
producer.Flush();
for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
{
(*it)->wait();
try
{
WriteResultPtr result = (*it)->get();
writeRecordNum[result->GetShardId()] += gRecordNum;
}
catch (const std::exception& e)
{
std::cerr << "Write records fail: " << e.what() << std::endl;
}
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
int main(int argc, char *argv[])
{
parse_args(argc, argv);
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
producerConf.SetLogFilePath("./DatahubAsyncProducer.log");
if (gMaxBufferRecords > 0)
{
producerConf.SetMaxAsyncBufferRecords(gMaxBufferRecords);
}
if (gMaxBufferSize > 0)
{
producerConf.SetMaxAsyncBufferSize(gMaxBufferSize);
}
if (gMaxBufferTimeMs > 0)
{
producerConf.SetMaxAsyncBufferTimeMs(gMaxBufferTimeMs);
}
if (gMaxRecordPackQueueLimit > 0)
{
producerConf.SetMaxRecordPackQueueLimit(gMaxRecordPackQueueLimit);
}
AsyncProduce(gProjectName, gTopicName, producerConf, gShardIds);
return 0;
}
同步写入示例
using namespace aliyun;
using namespace aliyun::datahub;
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
StringVec gShardIds;
inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
size_t start_pos = 0, end_pos = src.size();
size_t pos = src.find(pre, start_pos);
while (pos != std::string::npos)
{
std::string addStr = src.substr(start_pos, pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
start_pos = pos + 1;
pos = src.find(pre, start_pos);
}
std::string addStr = src.substr(start_pos, end_pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
}
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -E --epochNum <int> set epoch num for write [Mandatory]\n"
" -N --recordNum <int> set record num for each epoch [Mandatory]\n"
" -H --shardIds <string> set shards (split by ',')\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "epochNum", 1, 0, 'E'},
{ "recordNum", 1, 0, 'N'},
{ "shardIds", 1, 0, 'H'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:H:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 'E':
gEpochNum = std::atoi(optarg);
break;
case 'N':
gRecordNum = std::atoi(optarg);
break;
case 'H':
StringSplit(gShardIds, optarg, ",");
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
if (gEpochNum <= 0 || gRecordNum <= 0)
{
std::cerr << "Invalid parameter!" << std::endl;
exit(1);
}
}
inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
for (int i = 0; i < schema.GetFieldCount(); i++)
{
const Field& field = schema.GetField(i);
const FieldType& type = field.GetFieldType();
switch (type)
{
case BIGINT:
record.SetBigint(i, 1234l);
break;
case DOUBLE:
record.SetDouble(i, 1.234);
break;
case BOOLEAN:
record.SetBoolean(i, true);
break;
case TIMESTAMP:
record.SetTimestamp(i, 1234l);
break;
case STRING:
record.SetString(i, "1234");
break;
case DECIMAL:
record.SetDecimal(i, "1234");
break;
case INTEGER:
record.SetInteger(i, (int32_t)1234);
break;
case FLOAT:
record.SetFloat(i, 1.234);
break;
case TINYINT:
record.SetTinyint(i, (int8_t)1234);
break;
case SMALLINT:
record.SetSmallint(i, (int16_t)1234);
break;
default:
break;
}
}
}
inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// TODO: 生成数据
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
std::string shardId = producer.Write(records);
writeRecordNum[shardId] += records.size();
}
}
catch(const std::exception& e)
{
std::cerr << "Write fail: " << e.what() << std::endl;
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
int main(int argc, char *argv[])
{
parse_args(argc, argv);
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
producerConf.SetLogFilePath("./DatahubGeneralProducer.log");
GeneralProduce(gProjectName, gTopicName, producerConf, gShardIds);
return 0;
}
Meter操作
void GetMeter()
{
try
{
GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
异步写入、同步写入 从sdk 2.25版本起支持
Subscribtion操作
订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。
创建 Subscription
void CreateSubscription()
{
std::string projectName = "";
std::string topicName = "";
try
{
const std::string subscriptionComment = "test_subscription";
const CreateSubscriptionResult& createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
}
catch(const DatahubException& e)
{
std::cerr << "Create sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
删除 Subscription
void DeleteSubscription()
{
std::string projectName = "";
std::string topicName = "";
std::string subId = "";
try
{
client.DeleteSubscription(projectName, topicName, subId);
}
catch(const DatahubException& e)
{
std::cerr << "Delete sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
更新 Subscription
void UpdateSubscription()
{
try
{
const std::string updateSubscriptionComment = "test_subscription_1";
const std::string projectName = "";
const std::string topicName = "";
client.UpdateSubscription(projectName, topicName, subId, updateSubscriptionComment);
}
catch(const DatahubException& e)
{
std::cerr << "Upate sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
获取Subscription列表
void ListSubscription()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
try
{
const ListSubscriptionResult& subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId));
std::cout << subscriptionResult.GetTotalCount() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "list sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
查询 Subscription
void GetSubscription()
{
try
{
const std::vector<SubscriptionEntry>& subscriptions = subscriptionResult.GetSubscriptions();
std::cout << subscriptions.size() << std::endl;
for (size_t i = 0; i < subscriptions.size(); ++i)
{
SubscriptionEntry entry = subscriptions[i];
std::cout << entry.GetSubId() << std::endl;
}
}
catch(const DatahubException& e)
{
std::cerr << "Get sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
更新 Subscription 状态
void UpdateSubscriptionState()
{
try
{
client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Update sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
offset 操作
一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。
初始化offset
void InitSubscriptionOffsetSession()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
std::vector<std::string> shardIds;
try
{
const OpenSubscriptionOffsetSessionResult& offsetSessionResult =
client.InitSubscriptionOffsetSession(projectName, topicName, subId, shardIds);
std::cout << offsetSessionResult.GetOffsets().size() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Init offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
获取 offset
//获取点位
void GetSubscriptionOffset()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
std::vector<std::string> shardIds;
try
{
const GetSubscriptionOffsetResult& getSubscriptionOffsetResult =
client.GetSubscriptionOffset(projectName, topicName, subId, shardIds;
std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl; std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
重置 offset
void ResetSubscriptionOffset()
{
try
{
int64_t resetTimestamp = 0l;
int64_t resetSequence = 0l;
uint32_t resetBatchIndex = 0u;
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
std::map<std::string, SubscriptionOffset> resetSubscriptionOffsets;
for (auto iter = offsets.begin(); iter != offsets.end(); ++iter)
{
SubscriptionOffset offset(resetTimestamp, resetSequence, resetBatchIndex);
resetSubscriptionOffsets.insert(
std::pair<std::string, SubscriptionOffset>(iter->first, offset));
}
client.ResetSubscriptionOffset(projectName, topicName, subId, resetSubscriptionOffsets); }
catch(const DatahubException& e)
{
std::cerr << "Reset offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Connector 操作
DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。
创建 Connector
ODPS示例
void CreateConnector()
{
try{
sdk::SinkOdpsConfig config;
config.SetEndpoint("ODPS_ENDPOINT");
config.SetProject("ODPS_PROJECT");
config.SetTable("ODPS_TABLE");
config.SetAccessId("ODPS_ACCESSID");
config.SetAccessKey("ODPS_ACCESSKEY");
config.SetPartitionMode(sdk::PartitionMode::SYSTEM_TIME);
config.SetTimeRange(15);
const std::string projectName = "";
const std::string topicName = "";
std::vector<std::pair<std::string, std::string> > partitionConfig;
partitionConfig.push_back(std::pair<std::string, std::string>("ds", "%Y%m%d"));
partitionConfig.push_back(std::pair<std::string, std::string>("hh", "%H"));
partitionConfig.push_back(std::pair<std::string, std::string>("mm", "%M"));
config.SetPartitionConfig(partitionConfig);
std::vector<std::string> columnFields;
columnFields.push_back(fieldName1);
const CreateConnectorResult& connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Create odpsConnector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
删除 Connector
void DeleteConnector()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string connectorId = "";
try {
client.DeleteConnector(projectName, topicName, connectorId);
}
catch(const DatahubException& e)
{
std::cerr << "Delete Connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
查询 Connector
void GetConnector()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string connectorId = "";
try
{
GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
const sdk::SinkOdpsConfig* odpsConfig = dynamic_cast<const sdk::SinkOdpsConfig*>(getConnectorResult.GetConfig());
std::cout << odpsConfig->GetPartitionConfig().size() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
列出 Connector列表
void ListConnector() {
try {
const std::string projectName = "";
const std::string topicName = "";
const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
} catch(const DatahubException& e)
{
std::cerr << "list connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}