本文为您介绍如何使用Flume同步EMR Kafka集群的数据至EMR DataServing集群的HBase。
前提条件
操作步骤
- 创建HBase表。
- 通过SSH方式连接DataServing集群,详情请参见登录集群。
- 执行以下命令,连接HBase。
hbase shell
- 创建名为flume_test的HBase表,表中包含一个列簇column。
create 'flume_test','column'
- 配置Flume。
- 进入Flume的配置页面。
- 登录EMR on ECS控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 在集群管理页面,单击目标集群操作列的集群服务。
- 在集群服务页面,单击FLUME服务区域的配置。
- 单击flume-conf.properties页签。本文示例采用的是全局配置方式,如果您想按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置。
- 在flume-conf.properties的参数值中,添加以下内容。说明 代码示例中的
default-agent
,请与FLUME服务配置页面的agent_name参数的参数值保持一致。default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hbase default-agent.sinks.k1.table = flume_test default-agent.sinks.k1.columnFamily = column # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
参数 描述 default-agent.sources.source1.kafka.bootstrap.servers Kafka集群Broker的Host和端口号。 default-agent.sinks.k1.table HBase表名。 default-agent.sinks.k1.columnFamily 列簇名。 default-agent.channels.c1.capacity 通道中存储的最大事件数。请根据实际环境修改该参数值。 default-agent.channels.c1.transactionCapacity 每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。
- 进入Flume的配置页面。
- 启动服务。
- 在FLUME服务页面,选择更多操作 > 重启。
- 在弹出的对话框中,输入执行原因,单击确定。
- 在确认对话框中,单击确定。
- 测试数据写入情况。登录DataFlow集群,使用
kafka-console-producer.sh
命令生成数据后,在DataServing集群连接HBase即可查看数据。