本文为您展示DataHub的 C++ SDK的读写数据操作。
读数据
获取cursor
读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有以下四种:
OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。
LATEST : 表示获取的cursor指向当前最新的record。
SEQUENCE : 表示获取的cursor指向该序列的record。
SYSTEM_TIME : 表示获取的cursor指向大于等于时间戳的第一条record。
代码示例
void GetCursor()
{
try
{
const GetCursorResult& r1 = client.GetCursor(projectName, topicName, "0", CURSOR_TYPE_OLDEST);
std::string cursor = r1.GetCursor();
}
catch(const DatahubException& e)
{
std::cerr << "Get cursor fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
读取数据
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
shardId | string | |
cursor | string |
示例代码
void sub(DatahubClient& client)
{
std::string projectName = "";
std::string topicName = "";
std::string shardId = "0";
std::string cursor;
RecordSchema schema;
OpenSubscriptionOffsetSessionResult osor = client.InitSubscriptionOffsetSession(projectName, topicName, subId, {shardId});
SubscriptionOffset subscription = osor.GetOffsets().at(shardId);
if (subscription.GetSequence() < 0)
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
}
else
{
int64_t nextSequence = subscription.GetSequence() + 1;
try
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_SEQUENCE, nextSequence).GetCursor();
}
catch (const DatahubException& e)
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
}
}
int64_t readTotal = 0;
int fetchNum = 10;
while (true)
{
try
{
GetRecordResult grr = client.GetRecord(projectName, topicName, shardId, cursor, fetchNum, subId);
if (grr.GetRecordCount() <= 0)
{
std::cout << "Read null, wait for 1s." << std::endl;
sleep(1);
continue;
}
for (auto recordResult : grr.GetRecords())
{
ProcessRecords(recordResult);
if (++readTotal % 1000 == 0)
{
subscription.SetSequence(recordResult.GetSequence());
subscription.SetTimestamp(recordResult.GetSystemTime());
std::map<std::string, SubscriptionOffset> offsets;
offsets[shardId] = subscription;
try
{
client.UpdateSubscriptionOffset(projectName, topicName, subId, offsets);
}
catch (const DatahubException& e)
{
std::cerr << "Update subscription offset fail. requestId: "<< e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
throw ;
}
}
}
cursor = grr.GetNextCursor();
}
catch (const DatahubException& e)
{
std::cerr << "Get record fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
inline void ProcessRecords(const RecordResult& record)
{
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = record.GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field &field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld\n", record.GetBigint(j));
break;
case DOUBLE:
printf("%.15lf\n", record.GetDouble(j));
break;
case STRING:
printf("%s\n", record.GetString(j).c_str());
break;
default:
break;
}
}
}
}
}
协同消费数据示例
参数说明
参数名 | 参数类型 | 参数说明 |
gAccount | Account | |
gEndpoint | string | |
gProjectName | string | |
gTopicName | string | |
gSubId | string | |
gAutoAck | bool | |
gFetchNum | int32_t | |
gSessionTimeoutMs | int64_t |
代码示例
using namespace aliyun;
using namespace aliyun::datahub;
std::atomic_int gTotalNum(0);
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
std::string gSubId;
bool gAutoAck;
int32_t gFetchNum;
int64_t gSessionTimeoutMs;
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -s --subId <string> set subscription id [Mandatory]\n"
" -a --autoAck <bool> set auto ack offset\n"
" -f --fetchNum <int> set fetch limit num\n"
" -S --sessionTimeoutMs <int> set session timeout (ms)\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "subId", 1, 0, 's' },
{ "autoAck", 1, 0, 'a'},
{ "fetchNum", 1, 0, 'f'},
{ "sessionTimeoutMs", 1, 0, 'S'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:s:a:f:S:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 's':
gSubId = optarg;
break;
case 'a':
gAutoAck = (bool)std::atoi(optarg);
break;
case 'f':
gFetchNum = std::atoi(optarg);
break;
case 'S':
gSessionTimeoutMs = (int64_t)std::atoi(optarg);
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
}
inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = recordPtr->GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field& field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld ", recordPtr->GetBigint(j));
break;
case INTEGER:
printf("%d ", recordPtr->GetInteger(j));
break;
case SMALLINT:
printf("%hd ", recordPtr->GetSmallint(j));
break;
case TINYINT:
printf("%hhd ", recordPtr->GetTinyint(j));
break;
case DOUBLE:
printf("%.5lf ", recordPtr->GetDouble(j));
break;
case FLOAT:
printf("%.5f ", recordPtr->GetFloat(j));
break;
case STRING:
printf("%s ", recordPtr->GetString(j).c_str());
break;
case BOOLEAN:
printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
break;
case TIMESTAMP:
printf("%ld ", recordPtr->GetTimestamp(j));
break;
case DECIMAL:
printf("%s ", recordPtr->GetDecimal(j).c_str());
break;
default:
break;
}
}
printf("\n");
}
}
void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
DatahubConsumer consumer(project, topic, subId, conf);
const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();
uint64_t readRecordNum = 0;
try{
while (true)
{
auto recordPtr = consumer.Read(60000);
if (recordPtr == nullptr)
{
break;
}
// TODO: 处理数据
ProcessRecords(topicMeta, recordPtr);
if (!gAutoAck)
{
recordPtr->GetMessageKey()->Ack(); // 如果auto_ack设置为false,则处理完数据执行Ack;
}
readRecordNum++;
}
}
catch (const std::exception& e)
{
std::cerr << "Read fail: " << e.what() << std::endl;
}
std::cout << "Read " << readRecordNum << " records total" << std::endl;
}
int main(int argc, char* argv[])
{
parse_args(argc, argv);
ConsumerConfiguration consumerConf(gAccount, gEndpoint);
consumerConf.SetLogFilePath("./DatahubCollaborativeConsumer.log");
consumerConf.SetAutoAckOffset(gAutoAck);
if (gFetchNum > 0)
{
consumerConf.SetFetchLimitNum(gFetchNum);
}
if (gSessionTimeoutMs > 0)
{
consumerConf.SetSessionTimeout(gSessionTimeoutMs);
}
CollaborativeConsume(gProjectName, gTopicName, gSubId, consumerConf);
return 0;
}
写数据
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
records |
代码示例
void pub(DatahubClient& client)
{
std::vector<RecordEntry> records;
GenerateRecords(records);
try
{
PutRecordResult prr = client.PutRecord(projectName, topicName, records);
if (prr.GetFailedRecordCount() > 0)
{
pubRetry(client, prr.GetFailedRecords(), 3);
}
}
catch (const DatahubException& e)
{
std::cerr << "Put records fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
void pubRetry(DatahubClient& client, const std::vector<RecordEntry>& records, int retryTimes)
{
if (retryTimes <= 0)
{
std::cout << "Put record retry fail. records size: " << records.size() << std::endl;
return ;
}
try
{
PutRecordResult prr = client.PutRecord(projectName, topicName, records);
if (prr.GetFailedRecordCount() > 0)
{
pubRetry(client, prr.GetFailedRecords(), retryTimes-1);
}
else
{
std::cout << "Put records retry success." << std::endl;
}
}
catch (const DatahubException& e)
{
std::cerr << "Put records fail when put retry. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
inline void GenerateRecords(RecordEntryVec& records)
{
for (int i = 0; i < 10; i++)
{
if (schema.GetFieldCount() == 0)
{
// BLOB
RecordEntry record = RecordEntry(BLOB);
record.SetData("test_blob_data" + std::to_string(i));
records.push_back(record);
}
else
{
// TUPLE
// RecordEntry record = RecordEntry(5);
// record.SetString(0, "field_1_" + std::to_string(i));
// record.SetBigint(1, 123456789l);
// record.SetDouble(2, 123.456d);
// record.SetDouble(3, 654.32100d);
// record.SetString(4, "field_2_" + std::to_string(i));
// records.push_back(record);
}
}
}
}
异步写入示例
参数说明
参数名 | 参数类型 | 参数说明 |
gAccount | gAccount | |
gEndpoint | string | |
gProjectName | string | |
gTopicName | string | |
gEpochNum | int | |
gRecordNum | int | |
gMaxBufferRecords | int64_t | |
gMaxBufferSize | int64_t | |
gMaxBufferTimeMs | int64_t | |
gMaxRecordPackQueueLimit | int64_t | |
gShardIds | StringVec |
代码示例
using namespace aliyun;
using namespace aliyun::datahub;
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
int64_t gMaxBufferRecords = 0l;
int64_t gMaxBufferSize = 0l;
int64_t gMaxBufferTimeMs = 0l;
int64_t gMaxRecordPackQueueLimit = 0l;
StringVec gShardIds;
inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
size_t start_pos = 0, end_pos = src.size();
size_t pos = src.find(pre, start_pos);
while (pos != std::string::npos)
{
std::string addStr = src.substr(start_pos, pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
start_pos = pos + 1;
pos = src.find(pre, start_pos);
}
std::string addStr = src.substr(start_pos, end_pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
}
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -E --epochNum <int> set epoch num for write [Mandatory]\n"
" -N --recordNum <int> set record num for each epoch [Mandatory]\n"
" -R --maxBufferRecords <int> set max buffer record count\n"
" -S --maxBufferSize <int> set max buffer size\n"
" -M --maxBufferTimeMs <int> set max buffer time (ms)\n"
" -Q --maxRecordPackLimit <int> set max record pack queue limit\n"
" -H --shardIds <string> set shards (split by ',')\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "epochNum", 1, 0, 'E'},
{ "recordNum", 1, 0, 'N'},
{ "maxBufferRecords", 1, 0, 'R'},
{ "mMaxBufferSize", 1, 0, 'S'},
{ "maxBufferTimeMs", 1, 0, 'M'},
{ "maxRecordPackQueueLimit", 1, 0, 'Q'},
{ "shardIds", 1, 0, 'H'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:R:S:M:Q:H:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 'E':
gEpochNum = std::atoi(optarg);
break;
case 'N':
gRecordNum = std::atoi(optarg);
break;
case 'R':
gMaxBufferRecords = (int64_t)std::atoi(optarg);
break;
case 'S':
gMaxBufferSize = (int64_t)std::atoi(optarg);
break;
case 'M':
gMaxBufferTimeMs = (int64_t)std::atoi(optarg);
break;
case 'Q':
gMaxRecordPackQueueLimit = (int64_t)std::atoi(optarg);
break;
case 'H':
StringSplit(gShardIds, optarg, ",");
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
if (gEpochNum <= 0 || gRecordNum <= 0)
{
std::cerr << "Invalid parameter!" << std::endl;
exit(1);
}
}
inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
for (int i = 0; i < schema.GetFieldCount(); i++)
{
const Field& field = schema.GetField(i);
const FieldType& type = field.GetFieldType();
switch (type)
{
case BIGINT:
record.SetBigint(i, 1234l);
break;
case DOUBLE:
record.SetDouble(i, 1.234);
break;
case BOOLEAN:
record.SetBoolean(i, true);
break;
case TIMESTAMP:
record.SetTimestamp(i, 1234l);
break;
case STRING:
record.SetString(i, "1234");
break;
case DECIMAL:
record.SetDecimal(i, "1234");
break;
case INTEGER:
record.SetInteger(i, (int32_t)1234);
break;
case FLOAT:
record.SetFloat(i, 1.234);
break;
case TINYINT:
record.SetTinyint(i, (int8_t)1234);
break;
case SMALLINT:
record.SetSmallint(i, (int16_t)1234);
break;
default:
break;
}
}
}
inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::vector<WriteResultFuturePtr> resultFutureVecs;
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// TODO: 生成数据
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
auto result = producer.WriteAsync(records);
resultFutureVecs.push_back(result);
}
}
catch(const std::exception& e)
{
std::cerr << "WriteAsync fail: " << e.what() << std::endl;
}
producer.Flush();
for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
{
(*it)->wait();
try
{
WriteResultPtr result = (*it)->get();
writeRecordNum[result->GetShardId()] += gRecordNum;
}
catch (const std::exception& e)
{
std::cerr << "Write records fail: " << e.what() << std::endl;
}
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
int main(int argc, char *argv[])
{
parse_args(argc, argv);
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
producerConf.SetLogFilePath("./DatahubAsyncProducer.log");
if (gMaxBufferRecords > 0)
{
producerConf.SetMaxAsyncBufferRecords(gMaxBufferRecords);
}
if (gMaxBufferSize > 0)
{
producerConf.SetMaxAsyncBufferSize(gMaxBufferSize);
}
if (gMaxBufferTimeMs > 0)
{
producerConf.SetMaxAsyncBufferTimeMs(gMaxBufferTimeMs);
}
if (gMaxRecordPackQueueLimit > 0)
{
producerConf.SetMaxRecordPackQueueLimit(gMaxRecordPackQueueLimit);
}
AsyncProduce(gProjectName, gTopicName, producerConf, gShardIds);
return 0;
}
同步写入示例
参数示例
参数名 | 参数类型 | 参数说明 |
gAccount | Account | |
gEndpoint | string | |
gProjectName | string | |
gTopicName | string | |
gEpochNum | int | |
gRecordNum | int | |
gShardIds | StringVec |
代码示例
using namespace aliyun;
using namespace aliyun::datahub;
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
StringVec gShardIds;
inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
size_t start_pos = 0, end_pos = src.size();
size_t pos = src.find(pre, start_pos);
while (pos != std::string::npos)
{
std::string addStr = src.substr(start_pos, pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
start_pos = pos + 1;
pos = src.find(pre, start_pos);
}
std::string addStr = src.substr(start_pos, end_pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
}
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -E --epochNum <int> set epoch num for write [Mandatory]\n"
" -N --recordNum <int> set record num for each epoch [Mandatory]\n"
" -H --shardIds <string> set shards (split by ',')\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "epochNum", 1, 0, 'E'},
{ "recordNum", 1, 0, 'N'},
{ "shardIds", 1, 0, 'H'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:H:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 'E':
gEpochNum = std::atoi(optarg);
break;
case 'N':
gRecordNum = std::atoi(optarg);
break;
case 'H':
StringSplit(gShardIds, optarg, ",");
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
if (gEpochNum <= 0 || gRecordNum <= 0)
{
std::cerr << "Invalid parameter!" << std::endl;
exit(1);
}
}
inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
for (int i = 0; i < schema.GetFieldCount(); i++)
{
const Field& field = schema.GetField(i);
const FieldType& type = field.GetFieldType();
switch (type)
{
case BIGINT:
record.SetBigint(i, 1234l);
break;
case DOUBLE:
record.SetDouble(i, 1.234);
break;
case BOOLEAN:
record.SetBoolean(i, true);
break;
case TIMESTAMP:
record.SetTimestamp(i, 1234l);
break;
case STRING:
record.SetString(i, "1234");
break;
case DECIMAL:
record.SetDecimal(i, "1234");
break;
case INTEGER:
record.SetInteger(i, (int32_t)1234);
break;
case FLOAT:
record.SetFloat(i, 1.234);
break;
case TINYINT:
record.SetTinyint(i, (int8_t)1234);
break;
case SMALLINT:
record.SetSmallint(i, (int16_t)1234);
break;
default:
break;
}
}
}
inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// TODO: 生成数据
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
std::string shardId = producer.Write(records);
writeRecordNum[shardId] += records.size();
}
}
catch(const std::exception& e)
{
std::cerr << "Write fail: " << e.what() << std::endl;
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
int main(int argc, char *argv[])
{
parse_args(argc, argv);
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
producerConf.SetLogFilePath("./DatahubGeneralProducer.log");
GeneralProduce(gProjectName, gTopicName, producerConf, gShardIds);
return 0;
}