本文介绍如何使用Python SDK通过接入点接入消息队列Kafka版并收发消息。

环境准备

安装Confluent-kafka

执行以下命令安装Confluent-kafka。
pip install confluent-kafka

准备配置

  1. 可选:下载SSL根证书。如果是SSL接入点,需下载该证书。
  2. 在解压的Demo工程中,找到kafka-confluent-python-demo文件夹,将此文件夹上传在Linux系统。
  3. 登录Linux系统,进入文件目录,修改配置文件kafka.properties。
    kafka_setting = {
        'sasl_plain_username': 'XXX',   #如果是默认接入点实例,请删除该配置。
        'sasl_plain_password': 'XXX',   #如果是默认接入点实例,请删除该配置。
        'bootstrap_servers': ["XXX", "XXX", "XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }
    参数 描述
    sasl_plain_username SASL用户名。
    说明
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台实例详情页面的配置信息区域获取默认的用户名密码
    • 如果实例已开启ACL,请确保要使用的SASL用户已被授予向消息队列Kafka版实例收发消息的权限。具体操作,请参见SASL用户授权
    sasl_plain_password SASL用户名密码。
    bootstrap_servers SSL接入点。您可在消息队列Kafka版控制台实例详情页面的接入点信息区域获取。
    topic_name Topic名称。您可在消息队列Kafka版控制台Topic 管理页面获取。
    consumer_id Group名称。您可在消息队列Kafka版控制台Group 管理页面获取。

发送消息

执行以下命令发送消息。

python aliyun_kafka_producer.py
消息程序aliyun_kafka_producer.py示例代码如下:
  • 默认接入点
    #!/usr/bin/env python
    # encoding: utf-8
    
    import socket
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    print conf
    
    
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            api_version = (0,10),
                            retries=5)
    
    partitions = producer.partitions_for(conf['topic_name'])
    print 'Topic下分区: %s' % partitions
    
    try:
        future = producer.send(conf['topic_name'], 'hello aliyun-kafka!')
        future.get()
        print 'send message succeed.'
    except KafkaError, e:
        print 'send message failed.'
        print e
  • SSL接入点
    #!/usr/bin/env python
    # encoding: utf-8
    
    import ssl
    import socket
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    print conf
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    # context.check_hostname = True
    context.load_verify_locations("ca-cert")
    
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            api_version = (0,10),
                            retries=5,
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    partitions = producer.partitions_for(conf['topic_name'])
    print 'Topic下分区: %s' % partitions
    
    try:
        future = producer.send(conf['topic_name'], 'hello aliyun-kafka!')
        future.get()
        print 'send message succeed.'
    except KafkaError, e:
        print 'send message failed.'
        print e

订阅消息

执行以下命令订阅消息。

python aliyun_kafka_consumer.py
消息程序aliyun_kafka_consumer.py示例代码如下:
  • 默认接入点
    #!/usr/bin/env python
    # encoding: utf-8
    
    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            api_version = (0,10,2), 
                            session_timeout_ms=25000,
                            max_poll_records=100,
                            fetch_max_bytes=1 * 1024 * 1024)
    
    print 'consumer start to consuming...'
    consumer.subscribe((conf['topic_name'], ))
    for message in consumer:
        print message.topic, message.offset, message.key, message.value, message.value, message.partition
  • SSL接入点
    #!/usr/bin/env python
    # encoding: utf-8
    
    import ssl
    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    # context.check_hostname = True
    context.load_verify_locations("ca-cert")
    
    consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            api_version = (0,10),
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    print 'consumer start to consuming...'
    consumer.subscribe((conf['topic_name'], ))
    for message in consumer:
        print message.topic, message.offset, message.key, message.value, message.value, message.partition