本文为您介绍如何使用Flume同步EMR DataFlow集群的数据至EMR DataLake集群的Hive。

前提条件

  • 已创建DataLake集群,并且选择了Flume服务,详情请参见创建集群
  • 已创建DataFlow集群,并且选择了Kafka服务,详情请参见创建集群

操作步骤

  1. 通过SSH方式连接DataLake集群,详情请参见登录集群
  2. 创建Hive表。
    Flume使用事务操作将数据写入Hive,需要在创建Hive表(flume_test)时设置transactional属性。
    create table flume_test (id int, content string)
    clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');

    Hive的基础操作,请参见Hive基础操作

  3. 配置Flume。
    1. 进入Flume的配置页面。
      1. 登录EMR on ECS控制台
      2. 在顶部菜单栏处,根据实际情况选择地域和资源组
      3. 集群管理页面,单击目标集群操作列的集群服务
      4. 集群服务页面,单击FLUME服务区域的配置
    2. 单击flume-conf.properties页签。
      本文示例采用的是全局配置方式,如果您想按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置
    3. flume-conf.properties的参数值中,添加以下内容。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      # Describe the sink
      default-agent.sinks.k1.type = hive
      default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083
      default-agent.sinks.k1.hive.database = default
      default-agent.sinks.k1.hive.table = flume_test
      default-agent.sinks.k1.serializer = DELIMITED
      default-agent.sinks.k1.serializer.delimiter = ","
      default-agent.sinks.k1.serializer.serdeSeparator = ','
      default-agent.sinks.k1.serializer.fieldnames =id,content
      
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      参数 描述
      default-agent.sources.source1.kafka.bootstrap.servers Kafka集群Broker的Host和端口号。
      default-agent.channels.c1.capacity 通道中存储的最大事件数。请根据实际环境修改该参数值。
      default-agent.channels.c1.transactionCapacity 每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。
      default-agent.sinks.k1.hive.metastore Hive metastore的URI,格式为thrift://emr-header-1.cluster-xxx:9083。其中emr-header-1.cluster-xxx您可以通过hostname获取。
    4. 保存配置。
      1. 单击下方的保存
      2. 在弹出的对话框中,输入执行原因,单击确定
  4. 启动服务。
    1. 在FLUME服务页面,选择更多操作 > 重启
    2. 在弹出的对话框中,输入执行原因,单击确定
    3. 确认对话框中,单击确定
  5. 测试数据同步情况。
    1. 通过SSH方式连接DataFlow集群,详情请参见登录集群
    2. 创建名称为flume-test的Topic。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. 生成测试数据。
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      例如输入abc并回车。

    4. 通过SSH方式连接DataLake集群,在客户端配置Hive参数并查询表中的数据。
      set hive.support.concurrency=true;
      set hive.exec.dynamic.partition.mode=nonstrict;
      set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
      配置好后查询flume_test表中的数据。
      select * from flume_test;
      返回信息如下:
      OK
      1    a