C++ SDK 2.25版本支持Producer、Consumer,以下为读写示例
Producer示例
参数介绍
Producer 的参数都通过 ProducerConfiguration 来设置
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
//producerConf.SetMaxAsyncBufferRecords()
具体参数及类型说明如下图所示:
参数名称 | 类型 | 是否必须 | 默认值 | 描述 |
mMaxAsyncBufferRecords | int64_t | 否 | 异步发送时,最大攒批的数据条数,一般通过 size 控制,所以这里默认值为 INT_MAX | |
mMaxAsyncBufferSize | int64_t | 否 | 4 * 1024 * 1024 | 异步发送时,最大攒批 size |
mMaxAsyncBufferTimeMs | int64_t | 否 | 10000 | 异步发送时,最长缓存时间 |
mMaxRecordPackQueueLimit | int64_t | 否 | 16 | 异步发送时,攒批完成正在发送的请求数,超过会阻塞发送接口,主要是防止 OOM |
异步写入(推荐)
异步写入的好处是不需要用户攒批,并且攒批的方式可以通过参数进行设置,可以参考上面参数介绍进行调优。
void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::vector<WriteResultFuturePtr> resultFutureVecs;
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// TODO: 生成数据
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
auto result = producer.WriteAsync(records);
resultFutureVecs.push_back(result);
}
}
catch(const std::exception& e)
{
std::cerr << "WriteAsync fail: " << e.what() << std::endl;
}
producer.Flush();
for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
{
(*it)->wait();
try
{
WriteResultPtr result = (*it)->get();
writeRecordNum[result->GetShardId()] += gRecordNum;
}
catch (const std::exception& e)
{
std::cerr << "Write records fail: " << e.what() << std::endl;
}
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
同步写入
如果想自己控制攒批的方式,那可以采用同步写入的方式。
void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// TODO: 生成数据
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
std::string shardId = producer.Write(records);
writeRecordNum[shardId] += records.size();
}
}
catch(const std::exception& e)
{
std::cerr << "Write fail: " << e.what() << std::endl;
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
Consumer
Cousumer介绍请参考:
参数介绍
参数名称 | 类型 | 是否必须 | 默认值 | 描述 |
mAutoAckOffset | bool | 是 | 是否自动 ack 数据:true 表示数据 read 到以后自动 ack;false 表示数据 read 到以后,需要手动调用一下 recordPtr->GetMessageKey()->Ack()否则点位不会往前推动 | |
mFetchLimitNum | int32_t | 否 | 单次请求读取的最大数据条数 | |
mSessionTimeout | int64_t | 否 | consumer 会话最大时间,consumer 需要和服务端一直发送心跳来保证活跃,超过这个时间没有发送心跳,会被服务端视为退出 consumer group,这个 consumer 的 shard 会被分配给其他 consumer | |
mRecordFetchQueueLimit | uint32_t | 否 | 本地缓存数据条数,不足会向服务端发起请求,设置过大有可能会导致 OOM。 |
示例代码
void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
DatahubConsumer consumer(project, topic, subId, conf);
const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();
uint64_t readRecordNum = 0;
try{
while (true)
{
auto recordPtr = consumer.Read(60000);
if (recordPtr == nullptr)
{
break;
}
// TODO: 处理数据
ProcessRecords(topicMeta, recordPtr);
if (!gAutoAck)
{
recordPtr->GetMessageKey()->Ack(); // 如果auto_ack设置为false,则处理完数据执行Ack;
}
readRecordNum++;
}
}
catch (const std::exception& e)
{
std::cerr << "Read fail: " << e.what() << std::endl;
}
std::cout << "Read " << readRecordNum << " records total" << std::endl;
}
inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = recordPtr->GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field& field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld ", recordPtr->GetBigint(j));
break;
case INTEGER:
printf("%d ", recordPtr->GetInteger(j));
break;
case SMALLINT:
printf("%hd ", recordPtr->GetSmallint(j));
break;
case TINYINT:
printf("%hhd ", recordPtr->GetTinyint(j));
break;
case DOUBLE:
printf("%.5lf ", recordPtr->GetDouble(j));
break;
case FLOAT:
printf("%.5f ", recordPtr->GetFloat(j));
break;
case STRING:
printf("%s ", recordPtr->GetString(j).c_str());
break;
case BOOLEAN:
printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
break;
case TIMESTAMP:
printf("%ld ", recordPtr->GetTimestamp(j));
break;
case DECIMAL:
printf("%s ", recordPtr->GetDecimal(j).c_str());
break;
default:
break;
}
}
printf("\n");
}
}