本文介绍如何在VPC环境下使用Python SDK接入消息队列Kafka版的默认接入点并收发消息。

前提条件

安装Python依赖库

执行以下命令安装Python依赖库。
pip install kafka-python

准备配置

创建Kafka配置文件setting.py
kafka_setting = {
    'bootstrap_servers': ["XXX", "XXX", "XXX"],
    'topic_name': 'XXX',
    'consumer_id': 'XXX'
}
参数 描述
bootstrap_servers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。
topic_name Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。
consumer_id Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。

发送消息

  1. 创建发送消息程序aliyun_kafka_producer.py
    #!/usr/bin/env python
    # encoding: utf-8
    
    import socket
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    print conf
    
    
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            api_version = (0,10),
                            retries=5)
    
    partitions = producer.partitions_for(conf['topic_name'])
    print 'Topic下分区: %s' % partitions
    
    try:
        future = producer.send(conf['topic_name'], 'hello aliyun-kafka!')
        future.get()
        print 'send message succeed.'
    except KafkaError, e:
        print 'send message failed.'
        print e
  2. 执行以下命令发送消息。
    python aliyun_kafka_producer.py

订阅消息

  1. 创建订阅消息程序aliyun_kafka_consumer.py
    #!/usr/bin/env python
    # encoding: utf-8
    
    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import setting
    
    conf = setting.kafka_setting
    
    consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            api_version = (0,10,2), 
                            session_timeout_ms=25000,
                            max_poll_records=100,
                            fetch_max_bytes=1 * 1024 * 1024)
    
    print 'consumer start to consuming...'
    consumer.subscribe((conf['topic_name'], ))
    for message in consumer:
        print message.topic, message.offset, message.key, message.value, message.value, message.partition
  2. 执行以下命令消费消息。
    python aliyun_kafka_consumer.py