通过阿里云Logstash将MaxCompute数据同步至Elasticsearch

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

当您需要将MaxCompute离线表中的数据同步到阿里云Elasticsearch中时,可以通过阿里云Logstash的logstash-input-maxcompute插件和管道配置功能实现。本文介绍对应的配置方法。

前提条件

您已完成以下操作:

  • 开通阿里云MaxCompute产品,并完成创建项目、创建表和导入数据的任务。

    具体操作步骤请参见MaxCompute官方文档的准备工作快速入门章节。

  • 创建阿里云Logstash实例,并安装logstash-input-maxcompute插件。

    具体操作步骤请参见步骤二:创建阿里云Logstash实例安装或卸载插件

  • 创建目标阿里云Elasticsearch实例,并开启实例的自动创建索引功能。

    具体操作步骤请参见创建阿里云Elasticsearch实例快速访问与配置。本文以6.7.0版本为例。

  • 请确保网络能够互通。即MaxCompute、阿里云Logstash、阿里云Elasticsearch处于同一专有网络VPC(Virtual Private Cloud)下。

    说明

    您也可以使用公网环境的服务,前提是需要通过配置NAT网关实现与公网的连通,详情请参见配置NAT公网数据传输

配置Logstash管道

  1. 进入阿里云Elasticsearch控制台的Logstash页面
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理

  4. 单击创建管道

  5. 创建管道任务页面,输入管道ID,并进行Config配置。

    本文使用的Config配置如下。

    input {
        maxcompute {
            access_id => "LTAIezFX********"
            access_key => "SCm8xcF3bwdRbGY7AdU1sH********"
            endpoint => "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"
            project_name => "mXXX"
            table_name => "sale_detail"
            partition => "sale_date='201911', region='hangzhou'"
            thread_num => 1
            dirty_data_file => "/ssd/1/<Logstash实例ID>/logstash/data/XXXXX.txt"
        }
    }
    
    output {
      elasticsearch {
        hosts => ["http://es-cn-4591f5ja9000j****.elasticsearch.aliyuncs.com:9200"]
        index => "odps_index"
        user => "elastic"
        password => "Adm******"
      }
    }
    表 1. input参数说明

    参数

    类型

    是否必选

    说明

    access_id

    string

    您阿里云账号的AccessKey ID。

    access_key

    string

    您阿里云账号的Access Key Secret。

    endpoint

    string

    MaxCompute各地域Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)

    project_name

    string

    MaxCompute的项目名称。

    table_name

    string

    MaxCompute的表名称。

    partition

    string

    分区字段。分区表按照字段来定义,例如:sale_date='201911'region='hangzhou'

    thread_num

    number

    线程数,默认为1。

    dirty_data_file

    string

    指定文件路径,用于记录处理失败的日志。

    说明

    文件路径请指定为/ssd/1/<Logstash实例ID>/logstash/data/

    重要
    • 以上配置仅作为测试使用,在实际业务中,请按照业务需求进行合理配置。Input插件支持的其他配置选项请参见Logstash Jdbc input plugin

    • logstash-input-maxcompute插件会全量同步MaxCompute中的数据到阿里云Elasticsearch中。

    Config配置详情请参见Logstash配置文件说明

  6. 单击下一步,配置管道参数。

    管道参数配置

    参数

    说明

    管道工作线程

    并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。

    管道批大小

    单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。

    管道批延迟

    创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。

    队列类型

    用于事件缓冲的内部排队模型。可选值:

    • MEMORY:默认值。基于内存的传统队列。

    • PERSISTED:基于磁盘的ACKed队列(持久队列)。

    队列最大字节数

    请确保该值小于您的磁盘总容量。默认值:1024 MB。

    队列检查点写入数

    启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。

    警告

    配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。

  7. 单击保存或者保存并部署

    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。

    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

验证结果

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台。

    具体步骤请参见登录Kibana控制台

  2. 在左侧导航栏,单击Dev Tools(开发工具)。

  3. Console中,执行以下命令,查看同步成功的索引数据。

    GET /odps_index/_search

    运行成功后,结果如下。 运行结果

    说明

    如果运行失败,可在日志查询页面查看相关日志进行排查修复,详情请参见查询日志

  4. 切换到Monitoring(监控)页面,单击Indices(索引)。

  5. Indices页面,查看同步成功的索引,以及写入文档的数量。

    查看写入文档的数量

常见问题

Logstash数据写入问题排查方案