更新时间:2021-01-04 23:39
本文介绍如何在公网环境下使用Node.js SDK接入消息队列Kafka版的SSL接入点并使用PLAIN机制收发消息。
cd /etc/yum.repos.d/
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
yum install librdkafka-devel
export CPPFLAGS=-I/usr/local/opt/openssl/include
export LDFLAGS=-L/usr/local/opt/openssl/lib
npm install i --unsafe-perm node-rdkafka
module.exports = {
'sasl_plain_username': 'XXX',
'sasl_plain_password': 'XXX',
'bootstrap_servers': ["XXX"],
'topic_name': 'XXX',
'consumer_id': 'XXX'
}
参数 | 描述 |
---|---|
sasl_plain_username | 用户名。
|
sasl_plain_password | 密码。
|
bootstrap_servers | SSL接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 |
topic_name | Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 |
consumer_id | Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。 |
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);
var producer = new Kafka.Producer({
/*'debug': 'all', */
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'dr_cb': true,
'dr_msg_cb': true,
'security.protocol' : 'sasl_ssl',
'ssl.ca.location' : './ca-cert.pem',
'sasl.mechanisms' : 'PLAIN',
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password']
});
var connected = false
producer.setPollInterval(100);
producer.connect();
producer.on('ready', function() {
connected = true
console.log("connect ok")
});
function produce() {
try {
producer.produce(
config['topic_name'],
new Buffer('Hello Ali Kafka'),
null,
Date.now()
);
} catch (err) {
console.error('A problem occurred when sending our message');
console.error(err);
}
}
producer.on("disconnected", function() {
connected = false;
producer.connect();
})
producer.on('event.log', function(event) {
console.log("event.log", event);
});
producer.on("error", function(error) {
console.log("error:" + error);
});
producer.on('delivery-report', function(err, report) {
console.log("delivery-report: producer ok");
});
// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
console.error('event.error:' + err);
})
setInterval(produce,1000,"Interval");
node producer.js
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)
var consumer = new Kafka.KafkaConsumer({
/*'debug': 'all',*/
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'security.protocol' : 'sasl_ssl',
'ssl.ca.location' : './ca-cert.pem',
'sasl.mechanisms' : 'PLAIN',
'message.max.bytes': 32000,
'fetch.max.bytes' : 32000,
'fetch.message.max.bytes': 32000,
'max.partition.fetch.bytes': 32000,
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password'],
'group.id' : config['consumer_id']
});
consumer.connect();
consumer.on('ready', function() {
console.log("connect ok");
consumer.subscribe([config['topic_name']]);
consumer.consume();
})
consumer.on('data', function(data) {
console.log(data);
});
consumer.on('event.log', function(event) {
console.log("event.log", event);
});
consumer.on('error', function(error) {
console.log("error:" + error);
});
consumer.on('event', function(event) {
console.log("event:" + event);
});
node consumer.js
在文档使用中是否遇到以下问题
更多建议
匿名提交