This topic describes the C++ SDK v3.x.x, which uses the TCP protocol. It covers limits, version information, environment requirements, compilation instructions, and feature changes from previous versions.
Limits
The C++ SDK v3.x.x supports only instances that have namespaces. If your instance does not have a namespace, do not upgrade the client to C++ SDK v3.x.x.
By default, all ApsaraMQ for RocketMQ 5.x instances contain namespaces. If you use an ApsaraMQ for RocketMQ 4.x instance, you can check whether the instance contains a namespace in the Basic Information section of the Instance Details page in the ApsaraMQ for RocketMQ console.
Version information
Release date | Version number | Download link |
2021-10-18 | v3.x.x |
Environment requirements
ONS-Client-CPP is an open source software development kit (SDK) natively built for the Apache RocketMQ 5.0 protocol. Because the new communication protocol of Apache RocketMQ 5.0 is based on gRPC (HTTP 2.0/Protobuf), the C++ SDK v3.0.0 also depends on grpc/grpc and must meet the following requirements for its dependencies and toolchains.
Dependencies
Dependency | Version |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
filesystem | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
Toolchains
Operating system | Toolchain version |
Linux, macOS | GCC 4.9 or later, Clang 3.4 or later |
Windows 7 or later | Visual Studio 2015 or later |
C++ Standard
The SDK uses C++ 11 syntax. You must enable C++ 11 or a later standard.
Compilation instructions
Compile from open source code
Install the Bazel tool. For more information, see the Bazel installation guide.
NoteTo use Bazel 4.x, you must install Python 3.x.x.
Download and decompress the open source code. You can download the code in one of the following two ways:
Clone the source code from GitHub. Run the
git clone https://github.com/aliyun-mq/ons-client-cpp.gitcommand.Download the code locally. For the download link, see Version information.
Run the following command in the project folder. Bazel automatically downloads all third-party dependencies.
bazel -c opt //dist/...The following output is an example:
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actionsAfter the compilation is complete, the merged static library is located in the bazel-bin/dist/ons-dist.tar.gz file.
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
Compile on CentOS 7
By default, CentOS 7.x installs GCC 4.8.5, which does not meet the toolchain requirement. You must install devtoolset-4, which provides GCC 5.3.1.
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...Feature changes
Ordered messages
The default value of the MaxReconsumeTimes parameter is changed from Integer.MAX to 16. This parameter specifies the maximum number of retries for ordered messages. If a message is not consumed after the maximum number of retries, it is sent to a dead-letter queue. You can change the maximum number of retries for ordered messages by setting a custom value for the MaxReconsumeTimes parameter.
Broadcasting consumption
In broadcasting consumption mode, you can use the offsetStore interface to customize the starting consumer offset. If you do not set this parameter, the consumer starts from the latest consumer offset, which is consistent with previous versions.
The following code is an example:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Body-size: " << message.getBody().size()
<< ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factory_property;
// The feature of reading consumer offsets from OffsetStore is supported only in broadcasting consumption mode.
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// register your own listener here to handle the messages received.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Start this consumer
consumer->start();
// Keep main thread running until process finished.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}Push consumption
If the number of consumer threads is not in the valid range of [1, 1000], an exception is thrown when the consumer is created, not when the consumer starts.
A consumption throttling feature is added. To prevent a message flood from overwhelming the consumer application, you can use this feature to limit the message consumption rate.
NoteMessage retries for ordered messages are not controlled by throttling.
The following code is an example of consumption throttling:
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Before consuming messages=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Client-side throttling. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // register your own listener here to handle the messages received. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Start this consumer. consumer->start(); // Keep main thread running until process finished. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
Message traces
Parameter | Description |
AccessKey | The AccessKey ID of your Alibaba Cloud account or RAM user. This ID is used for identity verification when you access ApsaraMQ for RocketMQ resources using an SDK or calling an API operation. |
Arrival at server | The time when the message arrived at the ApsaraMQ for RocketMQ server. |
Preset DeliverAt | The expected delivery time of a scheduled message. |
Actual AvailableAt | The time when the timer for a scheduled message ended. This is when the message became available for consumption. |
Available Time | The time when the message became available for consumption. |
Commit/Rollback time | The time when the transactional message was committed or rolled back. |
Arrive at Consumer At | The time when the message arrived at the consumer client. |
Wait Duration before Processing | The time elapsed from when the message arrived at the consumer client to when the thread pool allocated a thread and resources for processing. |
API operation changes
The default log path changes from ~/logs/rocketmqlogs/ons.log to ~/logs/rocketmqlogs/ons.log.
The Action enumeration is moved from the global namespace to the ons namespace.
All header files are stored in the /ons path.
The return value of the Message#getStartDeliverTime method is changed from int64_t to std::chrono::system_clock::timepoint or std::chrono::milliseconds.
The throws declaration for functions is deleted because it is no longer supported in the C++ 11 standard.
The
Producerclass provides anoexceptinterface to disable exceptions.All enumeration types are now
namespace enum, which is equivalent toenum class Type.
SDK FAQ
Can new and old SDK versions coexist in the same process? Will there be symbol conflicts?
The symbols in the precompiled static library of the new SDK are in the default namespace ons, which conflicts with the symbols of previous versions. To make two versions compatible in the same process, you must compile the SDK from the source code. During compilation, make sure to define the
ONS_NAMESPACEmacro with a value other than ons.Bazel provides multiple ways to define macros. You can define a macro using .bazelrc, the defines property in `cc_library` rules, or the
cc_library#coptsproperty.How do I compile a static library with a symbol table for debugging?
Run the following command to compile:
bazel -c dbg //dist/...For more custom compilation options, see the Bazel user guide.
I already use a Protobuf dependency, but its version is different from the required version. How do I resolve this dependency conflict?
ONS-Client-CPP includes third-party libraries as source code dependencies. To resolve conflicts, you can adjust the dependency versions in the ONS-Client-CPP project to match the versions used in your project.
ONS-Client-CPP depends on RocketMQ-Client-CPP. You must fork the apache/rocketmq-client-cpp repository and update the dependency address in the ons-client-cpp/bazel/deps.bzl file to point to your forked repository.
I use a local HTTP proxy and have declared environment variables such as
http_proxyandgrpc_proxy. I find that many messages time out when I send them. Why?The SDK is based on gRPC and supports proxy environment variables such as `http_proxy`, `https_proxy`, and `grpc_proxy`. If the proxy interferes with the SDK's connection, you can set the `no_grpc_proxy` or `no_proxy` environment variable to bypass the proxy for specific endpoints. For more information, see the list of gRPC environment variables.
Our project uses the C++ 98 or C++ 03 standard. Is it supported?
No, it is not. Because the SDK relies on gRPC and Protobuf, it follows their standards and does not support C++ 98 or C++ 03.