C++ High-Level SDK

C++ SDK 2.25版本支持Producer、Consumer,以下为读写示例

Producer示例

参数介绍

Producer 的参数都通过 ProducerConfiguration 来设置

 ProducerConfiguration producerConf(gAccount, gEndpoint);
 producerConf.SetEnableProtobuf(true);
 //producerConf.SetMaxAsyncBufferRecords()

具体参数及类型说明如下图所示:

参数名称

类型

是否必须

默认值

描述

mMaxAsyncBufferRecords

int64_t

异步发送时,最大攒批的数据条数,一般通过 size 控制,所以这里默认值为 INT_MAX

mMaxAsyncBufferSize

int64_t

4 * 1024 * 1024

异步发送时,最大攒批 size

mMaxAsyncBufferTimeMs

int64_t

10000

异步发送时,最长缓存时间

mMaxRecordPackQueueLimit

int64_t

16

异步发送时,攒批完成正在发送的请求数,超过会阻塞发送接口,主要是防止 OOM

异步写入(推荐)

异步写入的好处是不需要用户攒批,并且攒批的方式可以通过参数进行设置,可以参考上面参数介绍进行调优。

void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
    DatahubProducer producer(project, topic, conf, shardIds);
    const TopicMetaPtr& topicMeta = producer.GetTopicMeta();

    std::vector<WriteResultFuturePtr> resultFutureVecs;
    std::map<std::string, uint64_t> writeRecordNum;
    for (const std::string& shardId : shardIds)
    {
        writeRecordNum[shardId] = 0;
    }

    try
    {
        // TODO: 生成数据
        RecordEntryVec records;
        GenerateRecords(topicMeta, records);
        for (int i = 0; i < gEpochNum; i++)
        {
            auto result = producer.WriteAsync(records);
            resultFutureVecs.push_back(result);
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "WriteAsync fail: " << e.what() << std::endl;
    }

    producer.Flush();

    for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
    {
        (*it)->wait();
        try
        {
            WriteResultPtr result = (*it)->get();
            writeRecordNum[result->GetShardId()] += gRecordNum;
        }
        catch (const std::exception& e)
        {
            std::cerr << "Write records fail: " << e.what() << std::endl;
        }
    }

    for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
    {
        std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
    }
}
inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    for (int i = 0; i < gRecordNum; i++)
    {
        if (topicMeta->GetRecordType() == "TUPLE")
        {
            // TUPLE
            records.emplace_back(schema.GetFieldCount());
            GenerateTupleRecord(schema, records.back());
        }
        else
        {
            // BLOB
            records.emplace_back(BLOB);
            records.back().SetData("test_blob_data" + std::to_string(i));
        }
    }
}

同步写入

如果想自己控制攒批的方式,那可以采用同步写入的方式。

void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
    DatahubProducer producer(project, topic, conf, shardIds);
    const TopicMetaPtr& topicMeta = producer.GetTopicMeta();

    std::map<std::string, uint64_t> writeRecordNum;
    for (const std::string& shardId : shardIds)
    {
        writeRecordNum[shardId] = 0;
    }

    try
    {
        // TODO: 生成数据
        RecordEntryVec records;
        GenerateRecords(topicMeta, records);
        for (int i = 0; i < gEpochNum; i++)
        {
            std::string shardId = producer.Write(records);
            writeRecordNum[shardId] += records.size();
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "Write fail: " << e.what() << std::endl;
    }

    for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
    {
        std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
    }
}
inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    for (int i = 0; i < gRecordNum; i++)
    {
        if (topicMeta->GetRecordType() == "TUPLE")
        {
            // TUPLE
            records.emplace_back(schema.GetFieldCount());
            GenerateTupleRecord(schema, records.back());
        }
        else
        {
            // BLOB
            records.emplace_back(BLOB);
            records.back().SetData("test_blob_data" + std::to_string(i));
        }
    }
}

Consumer

Cousumer介绍请参考:

参数介绍

参数名称

类型

是否必须

默认值

描述

mAutoAckOffset

bool

是否自动 ack 数据:true 表示数据 read 到以后自动 ack;false 表示数据 read 到以后,需要手动调用一下 recordPtr->GetMessageKey()->Ack()否则点位不会往前推动

mFetchLimitNum

int32_t

单次请求读取的最大数据条数

mSessionTimeout

int64_t

consumer 会话最大时间,consumer 需要和服务端一直发送心跳来保证活跃,超过这个时间没有发送心跳,会被服务端视为退出 consumer group,这个 consumer 的 shard 会被分配给其他 consumer

mRecordFetchQueueLimit

uint32_t

本地缓存数据条数,不足会向服务端发起请求,设置过大有可能会导致 OOM。

示例代码

void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
    DatahubConsumer consumer(project, topic, subId, conf);
    const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();

    uint64_t readRecordNum = 0;
    try{
        while (true)
        {
            auto recordPtr = consumer.Read(60000);
            if (recordPtr == nullptr)
            {
                break;
            }
            // TODO: 处理数据
            ProcessRecords(topicMeta, recordPtr);
            if (!gAutoAck)
            {
                recordPtr->GetMessageKey()->Ack();      // 如果auto_ack设置为false,则处理完数据执行Ack;
            }
            readRecordNum++;
        }
    }
    catch (const std::exception& e)
    {
        std::cerr << "Read fail: " << e.what() << std::endl;
    }
    std::cout << "Read " << readRecordNum << " records total" << std::endl;
}
inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    if (schema.GetFieldCount() == 0)        // BLOB
    {
        int len = 0;
        const char* data = recordPtr->GetData(len);
        printf("%s\n", data);
    }
    else                                    // TUPLE
    {
        for (int j = 0; j < schema.GetFieldCount(); ++j)
        {
            const Field& field = schema.GetField(j);
            const FieldType fieldType = field.GetFieldType();
            switch (fieldType)
            {
                case BIGINT:
                    printf("%ld ", recordPtr->GetBigint(j));
                    break;
                case INTEGER:
                    printf("%d ", recordPtr->GetInteger(j));
                    break;
                case SMALLINT:
                    printf("%hd ", recordPtr->GetSmallint(j));
                    break;
                case TINYINT:
                    printf("%hhd ", recordPtr->GetTinyint(j));
                    break;
                case DOUBLE:
                    printf("%.5lf ", recordPtr->GetDouble(j));
                    break;
                case FLOAT:
                    printf("%.5f ", recordPtr->GetFloat(j));
                    break;
                case STRING:
                    printf("%s ", recordPtr->GetString(j).c_str());
                    break;
                case BOOLEAN:
                    printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
                    break;
                case TIMESTAMP:
                    printf("%ld ", recordPtr->GetTimestamp(j));
                    break;
                case DECIMAL:
                    printf("%s ", recordPtr->GetDecimal(j).c_str());
                    break;
                default:
                    break;
            }
        }
        printf("\n");
    }
}