添加Python依赖库
执行以下命令安装依赖库。
pip install confluent-kafka==1.9.2
重要 建议您安装confluent-kafka 1.9.2及以下版本的依赖库,否则使用公网发送消息会报SSL_HANDSHAKE错误。
 准备配置
下载Demo工程,根据实际接入点修改相应配置,然后将Demo工程上传至Linux服务器。
在解压的Demo工程中,找到kafka-confluent-python-demo文件夹,根据实际的接入点修改配置文件setting.py。
默认接入点
在vpc目录中,修改配置文件setting.py。
kafka_setting = {
    'bootstrap_servers': 'XXX:xxx,XXX:xxx',
    'topic_name': 'XXX',
    'group_name': 'XXX'
}
SSL接入点
在vpc-ssl目录中,修改配置文件setting.py。
kafka_setting = {
    'sasl_plain_username': 'XXX',
    'sasl_plain_password': 'XXX',
    'ca_location': '/XXX/mix-4096-ca-cert',
    'bootstrap_servers': 'XXX:xxx,XXX:xxx',
    'topic_name': 'XXX',
    'group_name': 'XXX'
}
参数  | 描述  | 
sasl_plain_username  | SASL用户名。 
 说明 - 如果实例未开启ACL,您可以在云消息队列 Kafka 版控制台的实例详情页面的配置信息区域获取默认的用户名和密码。
 - 如果实例已开启ACL,请确保要使用的SASL用户已被授予向云消息队列 Kafka 版实例收发消息的权限。具体操作,请参见SASL用户授权。
 
   | 
sasl_plain_password  | SASL用户名密码。  | 
ca_location  | SSL根证书的路径。用本地路径替换示例中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert。  | 
bootstrap_servers  | SSL接入点。您可在云消息队列 Kafka 版控制台的实例详情页面的接入点信息区域获取。  | 
topic_name  | Topic名称。您可在云消息队列 Kafka 版控制台的Topic 管理页面获取。  | 
group_name  | Group名称。您可在云消息队列 Kafka 版控制台的Group 管理页面获取。  | 
将kafka-confluent-python-demo文件夹上传到Linux服务器的/home路径下。
发送消息
根据实际的接入点,按照以下方式发送消息。
默认接入点
执行以下命令,进入到/home/kafka-confluent-python-demo/vpc路径。
cd /home/kafka-confluent-python-demo/vpc
执行以下命令,发送消息。
python kafka_producer.py
消息程序kafka_producer.py示例代码如下:
kafka_producer.py
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
# 初始化一个Producer对象。
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
# 异步发送消息。
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
# 在程序结束时,调用flush。
p.flush()
SSL接入点
执行以下命令,进入到/home/kafka-confluent-python-demo/vpc-ssl路径。
cd /home/kafka-confluent-python-demo/vpc-ssl
执行以下命令,发送消息。
python kafka_producer.py
消息程序kafka_producer.py示例代码如下:
kafka_producer.py
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
   'ssl.endpoint.identification.algorithm': 'none',
   'sasl.mechanisms':'PLAIN',
   'ssl.ca.location':conf['ca_location'],
   'security.protocol':'SASL_SSL',
   'sasl.username':conf['sasl_plain_username'],
   'sasl.password':conf['sasl_plain_password']})
def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
p.flush()
订阅消息
根据实际的接入点,按照以下方式订阅消息。
默认接入点
执行以下命令,进入到/home/kafka-confluent-python-demo/vpc路径。
cd /home/kafka-confluent-python-demo/vpc
执行以下命令,订阅消息。
python kafka_consumer.py
消息程序kafka_consumer.py示例代码如下:
kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest'
})
c.subscribe([conf['topic_name']])
while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print("Consumer error: {}".format(msg.error()))
            continue
    print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
SSL接入点
执行以下命令,进入到/home/kafka-confluent-python-demo/vpc-ssl路径。
cd /home/kafka-confluent-python-demo/vpc-ssl
执行以下命令,订阅消息。
python kafka_consumer.py
消息程序kafka_consumer.py示例代码如下:
kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
    'bootstrap.servers': conf['bootstrap_servers'],
    'ssl.endpoint.identification.algorithm': 'none',
    'sasl.mechanisms':'PLAIN',
    'ssl.ca.location':conf['ca_location'],
    'security.protocol':'SASL_SSL',
    'sasl.username':conf['sasl_plain_username'],
    'sasl.password':conf['sasl_plain_password'],
    'group.id': conf['group_name'],
    'auto.offset.reset': 'latest',
    'fetch.message.max.bytes':'1024*512'
})
c.subscribe([conf['topic_name']])
while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
       if msg.error().code() == KafkaError._PARTITION_EOF:
          continue
       else:
           print("Consumer error: {}".format(msg.error()))
           continue
    print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()