全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
数据集成
    基于Apache Flume实现日志数据导入MaxCompute Datahub

基于Apache Flume实现日志数据导入MaxCompute Datahub

更新时间:2017-06-07 13:26:11

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


简介

Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub。

环境要求

  • JDK (1.7及以上,推荐1.7)
  • Flume-NG 1.x
  • Apache Maven 3.x

插件部署

下载插件压缩包

  1. $ wget https://github.com/aliyun/aliyun-odps-flume-plugin/releases/download/1.1.0/flume-datahub-sink-1.1.0.tar.gz

解压插件压缩包

  1. $ tar zxvf flume-datahub-sink-1.1.0.tar.gz
  2. $ ls flume-datahub-sink
  3. lib libext

部署Datahub Sink插件

将解压后的插件文件夹flume-datahub-sink移动到Apache Flume安装目录下

  1. $ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d
  2. $ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d/

移动后,核验Datahub Sink插件是否已经在相应目录:

  1. $ ls { YOUR_APACHE_FLUME_DIR }/plugins.d
  2. flume-datahub-sink

配置示例

Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。

需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

  1. # test_basic.log
  2. some,log,line1
  3. some,log,line2
  4. ...

下面将创建Datahub Topic,并把每行日志的第一列和第二列作为一条记录写入Topic中。

创建Datahub Topic

使用Datahub WebConsole创建好Topic,schema为(string c1, string c2),下面假设建好的Topic名为test_topic。

Flume配置文件

在Flume安装目录的conf/文件夹下创建名为datahub_basic.conf的文件,并输入内容如下:

  1. # A single-node Flume configuration for Datahub
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = exec
  8. a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log
  9. # Describe the sink
  10. a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
  11. a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
  12. a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
  13. a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_END_POINT}
  14. a1.sinks.k1.datahub.project = test_project
  15. a1.sinks.k1.datahub.topic = test_topic
  16. a1.sinks.k1.batchSize = 1
  17. a1.sinks.k1.serializer = DELIMITED
  18. a1.sinks.k1.serializer.delimiter = ,
  19. a1.sinks.k1.serializer.fieldnames = c1,c2,
  20. a1.sinks.k1.serializer.charset = UTF-8
  21. a1.sinks.k1.shard.number = 1
  22. a1.sinks.k1.shard.maxTimeOut = 60
  23. # Use a channel which buffers events in memory
  24. a1.channels.c1.type = memory
  25. a1.channels.c1.capacity = 1000
  26. a1.channels.c1.transactionCapacity = 1000
  27. # Bind the source and sink to the channel
  28. a1.sources.r1.channels = c1
  29. a1.sinks.k1.channel = c1

这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。

启动Flume

配置完成后,启动Flume并指定agent的名称和配置文件路径,添加-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。

  1. $ cd {YOUR_FLUME_DIRECTORY}
  2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

  1. ...
  2. Write success. Event count: 2
  3. ...

数据使用

日志数据通过Flume上传到Datahub后,可以使用StreamCompute流计算来进行实时分析,例如对于一些Web网站的日志,可以实时统计各个页面的PV/UV等。另外,导入Datahub的数据也可以配置Connector将数据归档至MaxCompute中,方便后续的离线分析。

本文导读目录