本文介绍通过阿里云Logstash,迁移阿里云RDS MySQL数据到阿里云Elasticsearch(简称ES)中。阿里云Logstash支持开源Logstash所支持的所有插件。

前提条件

  • 在阿里云RDS MySQL服务中,准备测试数据,并将阿里云Logstash的IP地址(可在基本信息页面获取)加入RDS MySQL白名单中。

    白名单设置详情请参见设置白名单

  • 创建Logstash实例,并上传与RDS MySQL版本兼容的SQL JDBC驱动。

    上传SQL JDBC驱动方式请参见扩展文件配置

  • 确保网络实现连通,即RDS MySQL、阿里云Logstash、阿里云ES处于同一VPC。您也可以使用公网环境的服务,前提是需要通过配置NAT网关实现与公网的连通,详情请参见NAT公网数据传输配置
  • 创建目标阿里云ES实例
  • 开启目标ES实例的自动创建索引功能。

    详情请参见操作步骤

操作步骤

  1. 登录阿里云Logstash控制台
  2. 在实例列表页面单击实例名称/ID,或者单击操作栏下的实例管理
  3. 单击左侧导航栏的管道管理
  4. 管道管理页面,查看管道管理方式是否为配置文件管理(默认)。
    管道管理方式
    • 是,继续执行下一步。
    • 否,单击管道管理方式右侧的修改,将管道管理方式切换为配置文件管理,再执行下一步。
      警告 管道管理方式更改,会导致原先配置的所有管道失效,正在执行的数据任务将受到影响。请先删除原有管理方式下的所有管道任务,再进行切换。
  5. 管道列表中,单击创建管道
  6. 创建管道任务页面,进行Config配置。
    本文使用的Config配置如下:
    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/ls-cn-4590000003/logstash/current/config/custom/mysql-connector-java-6.0.2.jar"
        jdbc_connection_string => "jdbc:mysql://rm-bp****00000000.mysql.rds.aliyuncs.com:3306/my_library?useUnicode=true&characterEncoding=utf-8&useSSL=false"
        jdbc_user => "xxxxx"
        jdbc_password => "xxxx"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from student where id >= :sql_last_value"
        schedule => "* * * * *"
        tracking_column_type => "numeric"
        use_column_value => true
        tracking_column => "id"
      }
    }
    filter {
    }
    output {
     elasticsearch {
        hosts => "es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
        index => "dd_circle_scene_1030"
        user => "elastic"
        password => "xxxxxxx"
      }
    }
    表 1. input参数说明
    参数 描述
    jdbc_driver_class JDBC Class配置。
    jdbc_driver_library 指定JDBC连接MySQL驱动文件。
    jdbc_connection_string 配置数据库连接的域名、端口及数据库。
    jdbc_user 数据库用户名。
    jdbc_password 数据库密码。
    jdbc_paging_enabled 是否启用分页,默认false。
    jdbc_page_size 分页大小。
    statement 指定SQL语句。
    schedule 指定定时操作,"* * * * *"表示每分定时同步数据。
    use_column_value 是否需要记录某个column的值。
    tracking_column_type 跟踪列的类型,默认是numeric。
    tracking_column 指定跟踪列,该列必须是递增的,一般是MySQL主键。
    注意 以上配置按照测试数据配置,在实际业务中,您需要按照业务需求进行合理配置。input插件支持的其他配置选项请参见官方Logstash Jdbc input plugin文档。

    Config配置详情请参见Logstash配置文件说明

  7. 单击下一步,配置管道参数。
    管道参数配置
    参数 说明
    管道ID 必选,自定义输入。
    管道工作线程 并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。
    管道批大小 单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用改值。默认值:125。
    管道批延迟 创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。
    队列类型 用于事件缓冲的内部排队模型。可选值:
    • memory:默认值。基于内存的传统队列。
    • persisted:基于磁盘的ACKed队列(持久队列)。
    队列最大字节数 请确保该值小于您的磁盘总容量。默认值:1024MB。
    队列检查点写入数 启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。
    警告 配置完成后,需要进行保存和部署才能生效。保存和部署操作会触发实例变更,请在不影响业务的前提下,继续执行以下步骤。
  8. 单击保存/保存并部署
    • 保存:只将管道信息保存在Logstash里,不会触发配置,但是会触发实例变更。保存后,系统会返回管道列表页面,可单击管道列表操作栏下的立即部署,触发配置。
    • 保存并部署:保存并且部署后,才会真正触发配置,并且也会触发实例变更。
    保存并部署成功后,系统提示创建成功,并返回管道列表页面。等待实例变更完成后,即可完成管道任务的创建。此时管道的状态显示为运行中
  9. 结果验证。
    1. 登录Elasticsearch实例的Kibana控制台
    2. 单击左侧导航栏的Dev Tools
    3. Dev Tools页面的Console中,执行以下命令。
      GET /dd_circle_scene_1030/_search
      运行成功后,结果如下。运行结果