E-MapReduce 从 EMR-3.11.0 版本开始支持 E-MapReduce Druid 作为 E-MapReduce 的一个集群类型。

将 E-MapReduce Druid 作为一种单独的集群类型(而不是在 Hadoop 集群中增加 Druid 组件)主要基于几个方面的考虑:
  • E-MapReduce Druid 可以完全脱离 Hadoop 使用。
  • 大数据量下 E-MapReduce Druid 对内存要求比较高,尤其是 Broker 节点和 Historical 节点。E-MapReduce Druid 本身不受 YARN 管控 ,在多服务运行时容易发生资源争抢。
  • Hadoop 作为基础设施,其规模可以比较大,而 E-MapReduce Druid 集群可以比较小,两者配合起来工作灵活性更高。

创建 Druid 集群

在创建集群时选择 Druid 集群类型即可。您在创建 E-MapReduce Druid 集群时可以勾选 HDFS 和 YARN 服务,E-MapReduce Druid 集群自带的 HDFS 和 YARN 仅供测试使用,原因如本文档开头所述。对于生产环境,我们强烈建议您采用专门的 Hadoop 集群。

配置集群

  • 配置使用 HDFS 作为 E-MapReduce Druid 的 deep storage

    对于独立的 E-MapReduce Druid 集群,您可能需要将您的索引数据存放在另外一个 Hadoop 集群的 HDFS 之上。为此,您需要首先设置一下两个集群的连通性(见下文与 Hadoop 集群交互一节),再在 E-MapReduce Druid 的配置页面配置如下两个选项并重启服务即可(配置项位于配置页面的 common.runtime)。

    • druid.storage.type: hdfs
    • druid.storage.storageDirectory: (请注意这里 HDFS 目录最好写完整目录,比如 hdfs://emr-header-1.cluster-xxxxxxxx:9000/druid/segments)
    说明 如果 Hadoop 集群为 HA 集群,emr-header-1.cluster-xxxxx:9000 需要改成 emr-cluster,或者把端口 9000 改成 8020,下同。
  • 配置使用 OSS 作为 E-MapReduce Druid 的 deep storage

    E-MapReduce Druid 支持以 OSS 作为 deep storage,借助于 E-MapReduce 的免 AccessKey 能力,E-MapReduce Druid 不用做 AccessKey 配置即可访问 OSS。由于 OSS 的访问能力是借助于 HDFS 的 OSS 功能实现的,因此在配置时,druid.storage.type需要仍然配置为 HDFS。

    • druid.storage.type: hdfs
    • druid.storage.storageDirectory: (如 oss://emr-druid-cn-hangzhou/segments

    由于 OSS 访问借助了 HDFS,因此您需要选择以下两种方案之一:

    • 建集群的时候选择安装 HDFS,系统自动配好(安装好 HDFS 您可以不使用它,关闭它,或者仅作为测试用途)。
    • 在 E-MapReduce Druid 的配置目录/etc/ecm/druid-conf/druid/_common/下新建hdfs-site.xml,内容如下,然后将该文件拷贝至所有节点的相同目录下。
      <?xml version="1.0"?>
        <configuration>
          <property>
            <name>fs.oss.impl</name>
            <value>com.aliyun.fs.oss.nat.NativeOssFileSystem</value>
          </property>
          <property>
            <name>fs.oss.buffer.dirs</name>
            <value>file:///mnt/disk1/data,...</value>
          </property>
          <property>
            <name>fs.oss.impl.disable.cache</name>
            <value>true</value>
          </property>
        </configuration>

      其中fs.oss.buffer.dirs可以设置多个路径。

  • 配置使用 RDS 作为 E-MapReduce Druid 的元数据存储

    默认情况下 E-MapReduce Druid 利用 header-1节点上的本地 MySQL 数据库作为元数据存储。您也可以配置使用阿里云 RDS 作为元数据存储。

    下面以 RDS MySQL 版作为例演示配置。在具体配置之前,请先确保:

    • RDS MySQL 实例已经被创建。
    • 为 E-MapReduce Druid 访问 RDS MySQL 创建了单独的账户(不推荐使用 root),假设账户名为 druid,密码为 druidpw。
    • 为 E-MapReduce Druid 元数据创建单独的 MySQL 数据库,假设数据库名为 druiddb。
    • 确保账户 druid 有权限访问 druiddb。

    在 E-MapReduce 管理控制台,进入 E-MapReduce Druid 集群,单击 Druid 组件,选择配置选项卡,找到 common.runtime 配置文件。单击自定义配置,添加如下三个配置项:

    • druid.metadata.storage.connector.connectURI,值为 jdbc:mysql://rm-xxxxx.mysql.rds.aliyuncs.com:3306/druiddb。
    • druid.metadata.storage.connector.user,值为druid。
    • druid.metadata.storage.connector.password,值为druidpw。

    依次点击右上角的保存 > > 部署配置文件到主机 > 重启所有组件,配置即可生效。

    登录 RDS 控制台查看 druiddb 创建表的情况,如果正常,您将会看到一些 druid 自动创建的表。

  • 组件内存配置

    E-MapReduce Druid 组件内存设置主要包括两方面:堆内存(通过 jvm.config 配置)和 direct 内存(通过 jvm.config 和 runtime.properteis 配置)。在创建集群时,E-MapReduce 会自动生成一套配置,不过在某些情况下您仍然可能需要自己调整内存配置。

    要调整组件内存配置,您可以通过 E-MapReduce 控制台进入到集群组件,在页面上进行操作。

    说明 对于 direct 内存,调整时请确保:
    -XX:MaxDirectMemorySize >= druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)

访问 Druid web 页面

E-MapReduce Druid 自带两个 Web 页面:
  • Overlord:http://emr-header-1.cluster-1234:18090,用于查看 task 运行情况。
  • Coordinator:http://emr-header-1.cluster-1234:18081,用于查看 segments 存储情况,并设置 rule 加载和丢弃 segments。
E-MapReduce 提供三种方式访问 E-MapReduce Druid 的 Web 页面:
  • 在集群管理页面,单击访问链接与端口,找到 Druid overlord 或 Druid coordinator 链接,单击链接进入(推荐方式,EMR-3.20.0 及之后版本)。
  • 通过 SSH 隧道方式建立 SSH 隧道,开启代理浏览器访问。
  • 通过公网 IP+端口访问,如 http://123.123.123.123:18090(不推荐,请通过安全组设置合理控制公网访问集群权限)。

批量索引

  • 与 Hadoop 集群交互

    您在创建 E-MapReduce Druid 集群时如果勾选了 HDFS 和 YARN(自带 Hadoop 集群),那么系统将会自动为您配置好与 HDFS 和 YARN 的交互,您无需做额外操作。下面的介绍是配置独立 E-MapReduce Druid 集群与独立 Hadoop 集群之间交互,这里假设 E-MapReduce Druid 集群 cluster id 为 1234,Hadoop 集群 cluster id 为 5678。另外请仔细阅读并严格按照指导进行操作,如果操作不当,集群可能就不会按照预期工作。

    对于与非安全独立 Hadoop 集群交互,请按照如下操作进行:

    1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
    2. 将Hadoop集群/etc/ecm/hadoop-conf的 core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml 放在 E-MapReduce Druid 集群每个节点的 /etc/ecm/duird-conf/druid/_common 目录下(如果创建集群时选了自带Hadoop,在该目录下会有几个软链接指向自带 Hadoop 的配置,请先移除这些软链接)。
    3. 将 Hadoop 集群的 hosts 写入到 E-MapReduce Druid 集群的 hosts 列表中,注意 Hadoop 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx,且最好将 Hadoop 的 hosts 放在本集群 hosts 之后,例如:
      ...
      10.157.201.36   emr-as.cn-hangzhou.aliyuncs.com
      10.157.64.5     eas.cn-hangzhou.emr.aliyuncs.com
      192.168.142.255 emr-worker-1.cluster-1234 emr-worker-1 emr-header-2.cluster-1234 emr-header-2 iZbp1h9g7boqo9x23qbifiZ
      192.168.143.0   emr-worker-2.cluster-1234 emr-worker-2 emr-header-3.cluster-1234 emr-header-3 iZbp1eaa5819tkjx55yr9xZ
      192.168.142.254 emr-header-1.cluster-1234 emr-header-1 iZbp1e3zwuvnmakmsjer2uZ
      --以下为hadoop集群的hosts信息
      192.168.143.6   emr-worker-1.cluster-5678 emr-worker-1 emr-header-2.cluster-5678 emr-header-2 iZbp195rj7zvx8qar4f6b0Z
      192.168.143.7   emr-worker-2.cluster-5678 emr-worker-2 emr-header-3.cluster-5678 emr-header-3 iZbp15vy2rsxoegki4qhdpZ
      192.168.143.5   emr-header-1.cluster-5678 emr-header-1 iZbp10tx4egw3wfnh5oii1Z
    对于安全 Hadoop 集群,请按如下操作进行:
    1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
    2. 将 Hadoop 集群 /etc/ecm/hadoop-conf 的 core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml 放在 E-MapReduce Druid 集群每个节点的 /etc/ecm/duird-conf/druid/_common 目录下(如果创建集群时选了自带 Hadoop,在该目录下会有几个软链接指向自带 Hadoop 的配置,请先移除这些软链接),并修改 core-site.xml 中 hadoop.security.authentication.use.hasfalse(这是一条客户端配置,目的是让用户能够使用 AccessKey 认证,如果用 Kerberos 认证方式,需 disable 该配置)。
    3. 将 Hadoop 集群的 hosts 写入到 E-MapReduce Druid 集群每个节点的 hosts 列表中,注意 Hadoop 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx,且最好将 Hadoop 的 hosts放在本集群hosts之后。
    4. 设置两个集群间的 Kerberos 跨域互信(详情参考跨域互信)。
    5. 在 Hadoop 集群的所有节点下都创建一个本地 druid 账户(useradd -m -g hadoop druid),或者设置 druid.auth.authenticator.kerberos.authToLocal(具体预发规则参考这里)创建 Kerberos 账户到本地账户的映射规则。推荐第一种做法,操作简便不易出错。
      说明 默认在安全 Hadoop 集群中,所有 Hadoop 命令必须运行在一个本地的账户中,该本地账户需要与 principal 的 name 部分同名。YARN 也支持将一个 principal 映射至本地一个账户,即上文第二种做法。
    6. 重启 Druid 服务。
  • 使用 Hadoop 对批量数据创建索引
    E-MapReduce Druid 自带了一个名为 wikiticker 的例子,位于${DRUID_HOME}/quickstart/tutorial下面(${DRUID_HOME}默认为 /usr/lib/druid-current)。wikiticker 文件(wikiticker-2015-09-12-sampled.json.gz)的每一行是一条记录,每条记录是个 json 对象。其格式如下所示:
    ```json
    {
        "time": "2015-09-12T00:46:58.771Z",
        "channel": "#en.wikipedia",
        "cityName": null,
        "comment": "added project",
        "countryIsoCode": null,
        "countryName": null,
        "isAnonymous": false,
        "isMinor": false,
        "isNew": false,
        "isRobot": false,
        "isUnpatrolled": false,
        "metroCode": null,
        "namespace": "Talk",
        "page": "Talk:Oswald Tilghman",
        "regionIsoCode": null,
        "regionName": null,
        "user": "GELongstreet",
        "delta": 36,
        "added": 36,
        "deleted": 0
    }
    ```

    使用 Hadoop 对批量数据创建索引,请按照如下步骤进行操作:

    1. 将该压缩文件解压,并放置于 HDFS 的一个目录下(如 hdfs://emr-header-1.cluster-5678:9000/druid)。 在 Hadoop 集群上执行如下命令。
      ### 如果是在独立Hadoop集群上进行操作,需要做好两个集群互信之后需要拷贝一个 druid.keytab到Hadoop集群再kinit。
       kinit -kt /etc/ecm/druid-conf/druid.keytab druid
       ###
       hdfs dfs -mkdir hdfs://emr-header-1.cluster-5678:9000/druid
       hdfs dfs -put ${DRUID_HOME}/quickstart/tutorial/wikiticker-2015-09-16-sampled.json hdfs://emr-header-1.cluster-5678:9000/druid
      说明
      • 对于安全集群执行 HDFS 命令前先修改 /etc/ecm/hadoop-conf/core-site.xmlhadoop.security.authentication.use.hasfalse
      • 请确保已经在 Hadoop 集群每个节点上创建名为 druid 的 Linux 账户。
    2. 准备一个数据索引任务文件 ${DRUID_HOME}/quickstart/tutorial/wikiticker-index.json,如下所示:
      {
           "type" : "index_hadoop",
           "spec" : {
               "ioConfig" : {
                   "type" : "hadoop",
                   "inputSpec" : {
                       "type" : "static",
                       "paths" : "hdfs://emr-header-1.cluster-5678:9000/druid/wikiticker-2015-09-16-sampled.json"
                   }
               },
               "dataSchema" : {
                   "dataSource" : "wikiticker",
                   "granularitySpec" : {
                       "type" : "uniform",
                       "segmentGranularity" : "day",
                       "queryGranularity" : "none",
                       "intervals" : ["2015-09-12/2015-09-13"]
                   },
                   "parser" : {
                       "type" : "hadoopyString",
                       "parseSpec" : {
                           "format" : "json",
                           "dimensionsSpec" : {
                               "dimensions" : [
                                   "channel",
                                   "cityName",
                                   "comment",
                                   "countryIsoCode",
                                   "countryName",
                                   "isAnonymous",
                                   "isMinor",
                                   "isNew",
                                   "isRobot",
                                   "isUnpatrolled",
                                   "metroCode",
                                   "namespace",
                                   "page",
                                   "regionIsoCode",
                                   "regionName",
                                   "user"
                               ]
                           },
                           "timestampSpec" : {
                               "format" : "auto",
                               "column" : "time"
                           }
                       }
                   },
                   "metricsSpec" : [
                       {
                           "name" : "count",
                           "type" : "count"
                       },
                       {
                           "name" : "added",
                           "type" : "longSum",
                           "fieldName" : "added"
                       },
                       {
                           "name" : "deleted",
                           "type" : "longSum",
                           "fieldName" : "deleted"
                       },
                       {
                           "name" : "delta",
                           "type" : "longSum",
                           "fieldName" : "delta"
                       },
                       {
                           "name" : "user_unique",
                           "type" : "hyperUnique",
                           "fieldName" : "user"
                       }
                   ]
               },
               "tuningConfig" : {
                   "type" : "hadoop",
                   "partitionsSpec" : {
                       "type" : "hashed",
                       "targetPartitionSize" : 5000000
                   },
                   "jobProperties" : {
                       "mapreduce.job.classloader": "true"
                   }
               }
           },
           "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.7.2"]
       }
      说明
      • spec.ioConfig.type 设置为 hadoop
      • spec.ioConfig.inputSpec.paths 为输入文件路径。
      • tuningConfig.typehadoop
      • tuningConfig.jobProperties 设置了 mapreduce job 的 classloader。
      • hadoopDependencyCoordinates 制定了 hadoop client 的版本。
    3. 在 E-MapReduce Druid 集群上运行批量索引命令。
      cd ${DRUID_HOME}
       curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-index.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/task

      其中 - -negotiate-u-b-c 等选项是针对安全E-MapReduce Druid集群的。Overlord的端口默认为18090。

    4. 查看作业运行情况。

      在浏览器访问http://emr-header-1.cluster-1234:18090/console.html查看作业运行情况。

    5. 根据 Druid 语法查询数据。
      Druid 有自己的查询语法。您需要准备一个描述您如何查询的 json 格式的查询文件,如下所示为对 wikiticker 数据的一个 top N 查询(${DRUID_HOME}/quickstart/tutorial/wikiticker-top-pages.json):
      {
           "queryType" : "topN",
           "dataSource" : "wikiticker",
           "intervals" : ["2015-09-12/2015-09-13"],
           "granularity" : "all",
           "dimension" : "page",
           "metric" : "edits",
           "threshold" : 25,
           "aggregations" : [
               {
                   "type" : "longSum",
                   "name" : "edits",
                   "fieldName" : "count"
               }
           ]
       }
      在命令行界面运行下面的命令即可看到查询结果:
      cd ${DRUID_HOME}
       curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-top-pages.json 'http://emr-header-1.cluster-1234:18082/druid/v2/?pretty'

      其中 - -negotiate-u-b-c 等选项是针对安全 E-MapReduce Druid 集群的。如果一切正常,您将能看到具体地查询结果。

实时索引

对于数据从 Kafka 集群实时到 E-MapReduce Druid 集群进行索引,我们推荐使用 Kafka Indexing Service 扩展,提供了高可靠保证,支持 exactly-once 语义。详见 Kafka Indexing Service 中“使用 Druid Kafka Indexing Service 实时消费Kafka数据”一节。

如果您的数据实时打到了阿里云日志服务(SLS),并想用 E-MapReduce Druid 实时索引这部分数据,我们提供了 SLS Indexing Service 扩展。使用 SLS Indexing Service 避免了您额外建立并维护 Kafka 集群的开销。SLS Indexing Service 的作用与 Kafka Indexing Service 相同,也提供高可靠保证和 Exactly-Once语义。在这里,您完全可以把 SLS 当成一个 Kafka 来使用。详见 SLS-Indexing-Service 一节。

对于其他方式,如使用 Flink,Storm,Spark Streaming 等等,我们推荐使用 Tranquility 客户端向 E-MapReduce Druid 集群推送数据。详见 Tranquility 一节。

Kafka Indexing Service 和 SLS Indexing Service 是类似的,都使用拉的方式从数据源拉取数据到 E-MapReduce Druid 集群,并提供高可靠保证和 exactly-once 语义;而 Tranquility 则使用推的方式,把数据推送至 E-MapReduce Druid 进行索引。Tranquility 并不提供 Exactly-once 语义,因此如果有这方面的要求,需要使用者自己解决。

索引失败问题分析思路

当发现索引失败时,一般遵循如下排错思路:
  • 对于批量索引
    1. 如果 curl 直接返回错误,或者不返回,检查一下输入文件格式。或者 curl 加上 -v 参数,观察 REST API 的返回情况。
    2. 在 Overlord 页面观察作业执行情况,如果失败,查看页面上的 logs。
    3. 在很多情况下并没有生成 logs,如果是 Hadoop 作业,打开 YARN 页面查看是否有索引作业生成,并查看作业执行 log。
    4. 如果上述情况都没有定位到错误,需要登录到 E-MapReduce Druid 集群,查看 Overlord 的执行日志(位于/mnt/disk1/log/druid/overlord—emr-header-1.cluster-xxxx.log),如果是 HA 集群,查看您提交作业的那个Overlord。
    5. 如果作业已经被提交到 Middlemanager,但是从 Middlemanager 返回了失败,则需要从 Overlord 中查看作业提交到了那个worker,并登录到相应的 worker,查看 Middlemanager 的日志(位于/mnt/disk1/log/druid/middleManager-emr-header-1.cluster-xxxx.log)。
  • 对于 Kafka Indexing Service 和 SLS Indexing Service
    1. 首先查看 Overlord 的 Web 页面:http://emr-header-1:18090, 查看 Supervisor 的运行状态,检查 payload 是否合法。
    2. 查看失败 task 的 log。
    3. 如果不能从 task log 定位出失败原因,则需要从 Overlord log 入手,排查问题,其排查思路与批量索引一节中最后两步类似。
  • 对于 Tranquility 实时索引 @

    查看 Tranquility log,查看消息是否被接收到了或者是否被丢弃(drop)掉了。

    其余的排查步骤同批量索引的 步骤 2~ 步骤 5。

    错误多数情况为集群配置问题和作业问题。集群配置问题包括:内存参数是否合理、跨集群联通性是否正确、安全集群访问是否通过、principal 是否正确等等,作业问题包括作业描述文件格式正确,输入数据是否能够正常被解析,以及一些其他的作业相关的配置(如 ioConfig 等)。