本文介绍如何在E-MapReduce中使用Apache Druid Kafka Indexing Service实时消费Kafka数据。
前提条件
已创建E-MapReduce的Druid集群和Kafka集群,详情请参见创建集群。
背景信息
Kafka Indexing Service是Apache Druid推出的使用Apache Druid的Indexing Service服务实时消费Kafka数据的插件。该插件会在Overlord中启动一个Supervisor,Supervisor启动后会在Middlemanager中启动indexing task,这些task会连接到Kafka集群消费topic数据,并完成索引创建。您只需要准备一个数据消费格式文件,通过REST API手动启动Supervisor。
配置Druid集群与Kafka集群交互
E-MapReduce Druid集群与Kafka集群交互的配置方式与Hadoop集群类似,均需要设置连通性和Hosts。
对于非安全Kafka集群,请按照以下步骤操作:
确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
将Kafka集群的Hosts写入到E-MapReduce Druid集群每一个节点的Hosts列表中。
重要Kafka集群的hostname应采用长名形式,例如emr-header-1.cluster-xxxxxxxx。
对于安全Kafka集群,您需要执行下列操作(前两步与非安全Kafka集群相同):
确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
将Kafka集群的hosts写入到E-MapReduce Druid集群每一个节点的hosts列表中。
重要Kafka集群的hostname应采用长名形式,例如emr-header-1.cluster-xxxxxxxx。
设置两个集群间的Kerberos跨域互信(详情请参见跨域互信),推荐做双向互信。
准备一个客户端安全配置文件,文件内容格式如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/druid-conf/druid.keytab" principal="druid@EMR.1234.COM"; };
文件准备好后,将该配置文件同步到E-MapReduce Druid集群的所有节点上,放置于某一个目录下面(例如/tmp/kafka/kafka_client_jaas.conf)。
在E-MapReduce Druid配置页面的overlord.jvm中新增如下选项。
-Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
在E-MapReduce Druid配置页面的middleManager.runtime中配置
druid.indexer.runner.javaOpts=-Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
和其他JVM启动参数。重启Druid服务。
使用Kafka Indexing Service实时消费Kafka数据
在Kafka集群(或Gateway)上执行以下命令创建一个名称为metrics的topic。
-- 如果开启了Kafka高安全。 export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181 --partitions 1 --replication-factor 1 --topic metrics
实际创建topic时,您需要根据您的环境配置来替换上述命令中的各个参数。其中,--zookeeper参数中路径的获取方式是:登录阿里云 E-MapReduce 控制台> 进入Kafka集群的Kafka服务的配置页面,查看zookeeper.connect配置项的值。如果您的Kafka集群是自建集群,则您需要根据集群的实际配置来替换--zookeeper参数。
定义数据源的数据格式描述文件(名称命名为metrics-kafka.json),并放置在当前目录下(或放置在其他您指定的目录上)。
{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "parser": { "type": "string", "parseSpec": { "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": ["url", "user"] }, "format": "json" } }, "granularitySpec": { "type": "uniform", "segmentGranularity": "hour", "queryGranularity": "none" }, "metricsSpec": [{ "type": "count", "name": "views" }, { "name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs" } ] }, "ioConfig": { "topic": "metrics", "consumerProperties": { "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092(您 Kafka 集群的 bootstrap.servers)", "group.id": "kafka-indexing-service", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "GSSAPI" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" }, "tuningConfig": { "type": "kafka", "maxRowsInMemory": "100000" } }
说明ioConfig.consumerProperties.security.protocol和ioConfig.consumerProperties.sasl.mechanism为安全相关选项(非安全Kafka集群不需要)。
执行如下命令添加Kafka Supervisor。
curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor
其中
--negotiate
、-u
、-b
和-c
是针对安全E-MapReduce Druid集群的选项。在Kafka集群上开启一个Console Producer。
# 如果开启了Kafka高安全: export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/kafka-producer.conf kafka-console-producer.sh --producer.config /tmp/kafka-producer.conf --broker-list emr-header-1:9092,emr-header-2:9092,emr-header-3:9092 --topic metrics
其中,--producer.config /tmp/kafka-producer.conf是针对安全Kafka集群的选项。
在Kafka-console-producer.sh的命令提示符下输入数据。
{"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32} {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11} {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
时间戳可用如下Python命令生成。
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
准备名为metrics-search.json的查询文件。
{ "queryType" : "search", "dataSource" : "metrics-kafka", "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"], "granularity" : "all", "searchDimensions": [ "url", "user" ], "query": { "type": "insensitive_contains", "value": "bob" } }
在E-MapReduce Druid集群的Master节点上执行如下命令。
curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty
其中
--negotiate
、-u
、-b
和-c
是针对安全 E-MapReduce Druid集群的选项。返回结果示例如下。
[ { "timestamp" : "2018-03-06T09:00:00.000Z", "result" : [ { "dimension" : "user", "value" : "bob", "count" : 2 } ] } ]