本文介绍通过阿里云Logstash的logstash-intput-tunnel插件,迁移MaxCompute离线表数据到阿里云Elasticsearch(简称ES)中。

前提条件

操作步骤

  1. 登录阿里云Logstash控制台
  2. 在实例列表页面单击实例名称/ID,或者单击操作栏下的实例管理
  3. 单击左侧导航栏的管道管理
  4. 管道管理页面,查看管道管理方式是否为配置文件管理(默认)。
    管道管理方式
    • 是,继续执行下一步。
    • 否,单击管道管理方式右侧的修改,将管道管理方式切换为配置文件管理,再执行下一步。
      警告 管道管理方式更改,会导致原先配置的所有管道失效,正在执行的数据任务将受到影响。请先删除原有管理方式下的所有管道任务,再进行切换。
  5. 管道列表中,单击创建管道
  6. 创建管道任务页面,进行Config配置。
    本文使用的Config配置如下:
    input {
        tunnel {
            access_id => "LTAIezFX********"
            access_key => "SCm8xcF3bwdRbGY7AdU1sH********"
            endpoint => "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"
            project_name => "mXXX"
            table_name => "sale_detail"
            partition => "sale_date='201911', region='hangzhou'"
            thread_num => 1
            dirty_data_file => "/ssd/1/share/XXXXX.txt"
        }
    }
    
    output {
      elasticsearch {
        hosts => ["http://es-cn-4591f5ja9000j****.elasticsearch.aliyuncs.com:9200"]
        index => "odps_index"
        user => "elastic"
        password => "Adm******"
      }
    }
    表 1. input参数说明
    参数 类型 是否必选 说明
    access_id string 阿里云账号的AccessKey ID。
    access_key string 阿里云账号的Access Key Secret。
    endpoint string MaxCompute对外服务的访问域名,详情请参见开通MaxCompute服务的Region和服务连接对照表
    project_name string MaxCompute的项目名称。
    table_name string MaxCompute的表名称。
    partition string 分区字段。分区表按照字段来定义,例如:sale_date='201911'region='hangzhou'
    thread_num number 线程数,默认为1。
    dirty_data_file string 指定文件,用于记录处理失败的日志。
    注意
    • 以上配置按照测试数据配置,在实际业务中,您需要按照业务需求进行合理配置。input插件支持的其他配置选项请参见官方Logstash Jdbc input plugin文档。
    • logstash-input-tunnel插件会全量迁移MaxCompute中的所有数据到阿里云ES中。

    Config配置详情请参见Logstash配置文件说明

  7. 单击下一步,配置管道参数。
    管道参数配置
    参数 说明
    管道ID 必选,自定义输入。
    管道工作线程 并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。
    管道批大小 单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用改值。默认值:125。
    管道批延迟 创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。
    队列类型 用于事件缓冲的内部排队模型。可选值:
    • memory:默认值。基于内存的传统队列。
    • persisted:基于磁盘的ACKed队列(持久队列)。
    队列最大字节数 请确保该值小于您的磁盘总容量。默认值:1024MB。
    队列检查点写入数 启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。
    警告 配置完成后,需要进行保存和部署才能生效。保存和部署操作会触发实例变更,请在不影响业务的前提下,继续执行以下步骤。
  8. 单击保存/保存并部署
    • 保存:只将管道信息保存在Logstash里,不会触发配置,但是会触发实例变更。保存后,系统会返回管道列表页面,可单击管道列表操作栏下的立即部署,触发配置。
    • 保存并部署:保存并且部署后,才会真正触发配置,并且也会触发实例变更。
    保存并部署成功后,系统提示创建成功,并返回管道列表页面。等待实例变更完成后,即可完成管道任务的创建。此时管道的状态显示为运行中
  9. 结果验证。
    1. 登录Elasticsearch实例的Kibana控制台
    2. 单击左侧导航栏的Dev Tools
    3. Dev Tools页面的Console中,执行以下命令。
      GET /odps_index/_search
      运行成功后,结果如下。运行结果
      说明 如果运行失败,可在日志查询页面查看相关日志,进行排查修复,详情请参见查询日志
    4. 单击切换到Monitoring页面,单击Indices
    5. Indices页面,可查看同步成功的索引,以及写入文档的数量。
      查看写入文档的数量