本文介绍TCP协议下C++ SDK v3.x.x的版本信息,包括使用限制、版本的基本信息、环境要求、编译说明以及和历史版本相比各功能特性的变更内容。
使用限制
C++ SDK v3.x.x版本仅支持有命名空间的实例,若您使用的实例无命名空间,请勿将客户端版本升级到C++ SDK v3.x.x。
5.x版本实例默认都有命名空间,4.x版本实例可在云消息队列 RocketMQ 版控制台实例详情页面的基础信息区域查看是否有命名空间。
版本信息
发布时间 | 版本号 | 下载链接 |
2021-10-18 | v3.x.x |
环境要求
ONS-Client-CPP是基于Apache RocketMQ 5.0协议原生实现的开源客户端开发软件包。Apache RocketMQ 5.0新的通信协议基于gRPC(HTTP 2.0/Protobuf)实现,因此v3.0.0版本的C++ SDK也需要依赖于grpc/grpc实现,满足以下依赖和工具链的要求。
依赖
依赖 | 版本 |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
filesystem | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
工具链
操作系统 | 工具链版本 |
Linux、macOS | GCC 4.9或以上版本、Clang 3.4或以上版本 |
Windows 7或以上版本 | Visual Studio 2015或以上版本 |
C++ Standard
SDK使用了C++ 11语法,因此需要启用C++ 11或以上标准。
编译说明
开源代码编译说明
参考Bazel安装指南安装Bazel工具。
说明使用Bazel 4.x版本,需要安装Python 3.x.x。
下载开源代码并解压。可通过以下两种方式下载:
通过GitHub克隆源代码:执行
git clone https://github.com/aliyun-mq/ons-client-cpp.git
命令。本地下载:下载链接请参见版本信息。
在项目文件夹内执行以下命令,Bazel将自动下载所有第三方依赖。
bazel -c opt //dist/...
输出示例如下:
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actions
编译完成后,合并好的静态库在bazel-bin/dist/ons-dist.tar.gz文件内。
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
CentOS 7编译说明
CentOS 7.x默认安装GCC 4.8.5,不满足工具链的要求。因此您需要安装devtoolset-4,devtoolset-4提供的工具链版本为GCC 5.3.1。
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...
功能变更
顺序消息
顺序消息的最大重试次数MaxReconsumeTimes参数的默认值从Integer.MAX变更为16次。超过最大重试次数消息还未被消费成功将直接被投递至死信队列。您可以通过自定义MaxReconsumeTimes参数值修改顺序消息的最大重试次数。
广播消费
广播消费模式下,支持使用offsetStore
接口的方式定制消费者启动时的消费位点。若未设置,默认和历史版本一致直接从最新消费位点开始消费。
示例代码如下:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Body-size: " << message.getBody().size()
<< ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factory_property;
//从OffsetStore读取消费位点的功能仅支持广播消费模式。
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// register your own listener here to handle the messages received.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Start this consumer
consumer->start();
// Keep main thread running until process finished.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
Push消费
如果设置的消费线程数不在合法区间[1,1000]内,系统会在创建消费者时抛出异常,而不是在启动消费者时抛出异常。
新增消费速度限流功能。为了避免消息洪峰可能对消费端应用产生冲击,您可通过该功能限制消息的消费速度,保护消费端应用。
说明顺序消息的消息重试不受限流控制。
消费限流示例代码如下:
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Before consuming messages=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Client-side throttling. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // register your own listener here to handle the messages received. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Start this consumer. consumer->start(); // Keep main thread running until process finished. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
消息轨迹
参数 | 说明 |
AccessKey | 您的阿里云账号或RAM用户的AccessKey ID,用于标识用户。当您通过SDK或API调用云消息队列 RocketMQ 版资源时,需要使用AccessKey ID进行身份验证。 |
到达Server | 消息到达消息队列RocketMQ版服务端的时间。 |
预设DeliverAt | 定时消息的预计投递时间。 |
实际AvailableAt | 定时消息定时结束的时间。即消息可被消费者消费的开始时间。 |
Available Time | 消息可被消费者消费的开始时间。 |
提交/回滚时间 | 事务消息提交或回滚的时间。 |
到达消费端 | 消息到达消费者客户端的时间。 |
等待处理耗时 | 消息到达消费者客户端,等待线程池分配线程和分配处理资源的耗时。 |
API接口变更
日志默认路径由~/logs/rocketmqlogs/ons.log变更为~/logs/rocketmqlogs/ons.log。
枚举Action从全局命名空间移动到ons命名空间下。
头文件都存放到/ons路径下。
Message#getStartDeliverTime参数返回值,从int64_t修改为std::chrono::system_clock::timepoint或std::chrono::milliseconds。
删除了函数的throws声明,该声明在C++ 11标准下已不支持。
Producer
类提供noexcept
接口,用于禁用异常场景使用。枚举类型都转变为
namespace enum
,即enum class Type
。
SDK常见问题
在同一个进程内,新旧版本的SDK是否可以共存?会不会有符号冲突?
新版本SDK的预编译静态库,符号在默认的命名空间ons下,会与历史版本的符号冲突。若要实现同进程两个版本兼容,您可以自行从源码编译,只要保证在编译过程中,定义
ONS_NAMESPACE
这个宏为非ons的值即可。Bazel提供多种方式定义宏,可以通过.bazelrc、cc_library规则中的defines属性定义或
cc_library#copts
属性定义。调试代码时,怎么编译一个带符号表的静态库?
我已经使用了Protobuf依赖,跟你们要求的依赖版本不一致,怎么解决依赖冲突?
ONS-Client-CPP对第三方采用源码依赖的形式,您只需要调整ONS-Client-CPP项目的依赖版本与您自身的依赖一致即可。
ONS-Client-CPP依赖了RocketMQ-Client-CPP,您需要fork apache/rocketmq-client-cpp仓库,将依赖描述文件ons-client-cpp/bazel/deps.bzl中的依赖指向地址修改为fork的地址。
本地使用了HTTP代理,并声明了
http_proxy
、grpc_proxy
等环境变量,发现发送消息有很多超时,是什么原因?SDK基于gRPC实现,支持http_proxy、https_proxy和grpc_proxy等方式配置代理。如果不需要proxy,可以配置no_grpc_proxy或no_proxy环境变量。忽略代理站点,请参见gRPC环境变量列表说明。
我们项目使用的是C++ 98、C++ 03标准,能支持么?
不能。由于gRPC和Protobuf是目前项目的核心协议和依赖,本项目也依据这两个依赖的标准,不再支持C++ 98和C++ 03标准。