首页 开源大数据平台E-MapReduce 最佳实践 数据迁移和同步 使用E-MapReduce进行MySQL Binlog日志准实时传输

使用E-MapReduce进行MySQL Binlog日志准实时传输

本文介绍如何利用阿里云SLS插件功能和E-MapReduce集群进行MySQL Binlog的准实时传输。

前提条件

  • 已在E-MapReduce上创建Hadoop集群,详情请参见创建集群

  • 已创建MySQL类型的数据库(例如RDS或DRDS)。MySQL必须开启Binlog,且Binlog必须为ROW模式。

    本文以RDS为例介绍,详情请参见创建RDS MySQL实例

    说明

    RDS默认已开启Binlog功能。

操作步骤

  1. 连接MySQL实例并添加用户权限。

    1. 使用命令方式连接MySQL实例,详情请参见通过客户端、命令行连接RDS

    2. 执行以下命令添加用户权限。

      CREATE USER canal IDENTIFIED BY 'canal';
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      FLUSH PRIVILEGES;
  2. 为SLS服务添加对应的配置文件,详情请参见采集MySQL Binlog

    说明

    本文创建的Project名称为canaltest,Logstore名称为canal。

    在SLS控制台查看日志数据是否上传成功,如果未上传成功请根据SLS的采集日志排查。

  3. 编译JAR包并上传至OSS。

    1. 在本地打开Git复制示例代码。

      git clone https://github.com/aliyun/aliyun-emapreduce-demo.git
    2. 修改示例代码。

      示例代码中已经有LoghubSample类,该类主要用于从SLS采集数据并打印。以下示例为修改后的代码。

      package com.aliyun.emr.example
      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
      import org.apache.spark.streaming.{Milliseconds, StreamingContext}
      object LoghubSample {
      def main(args: Array[String]): Unit = {
      if (args.length < 7) {
       System.err.println(
         """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
           |            
           |           
         """.stripMargin)
       System.exit(1)
      }
      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)
      val conf = new SparkConf().setAppName("Mysql Sync")
      //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
       ssc,
       loghubProject,
       logStore,
       loghubGroupName,
       endpoint,
       1,
       accessKeyId,
       accessKeySecret,
       StorageLevel.MEMORY_AND_DISK)
      loghubStream.foreachRDD(rdd =>
         rdd.saveAsTextFile("/mysqlbinlog")
      )
      ssc.start()
      ssc.awaitTermination()
      }
      }

      示例代码主要修改loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") )loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog")),以便于在E-MapReduce中运行时,保存Spark Streaming中流出来的数据至EMR的HDFS。

    3. 您可以在本地完成代码调试后,通过如下命令打包。

      mvn clean install
    4. 上传JAR包至OSS。

      在OSS上创建存储空间和上传文件,详情请参见控制台创建存储空间控制台上传文件

      说明

      本示例在OSS上创建的Bucket为EMR-test,上传examples-1.1-shaded.jarEMR-test/jar目录。

  4. 创建Spark作业。

    1. 进入作业编辑页面。

      1. 登录EMR on ECS控制台

      2. 在顶部菜单栏处,根据实际情况选择地域和资源组

      3. 单击左侧导航栏的旧版数据开发

      4. 项目列表区域,单击待编辑项目所在行的作业编辑

    2. 作业编辑区域,在需要操作的文件夹上,右键选择新建作业

    3. 输入作业名称作业描述,在作业类型下拉列表中选择Spark作业类型。

    4. 单击确定

    5. 作业内容中,填写提交该作业需要提供的命令行参数。

      --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 2 --class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming <SLS_endpoint> <SLS_access_id> <SLS_secret_key> 1

      参数

      说明

      SLS_endpoint

      SLS的EndPoint。

      SLS_access_id

      您阿里云账号的AccessKey ID。

      SLS_secret_key

      您阿里云账号的AccessKey Secret。

      说明

      本示例代码中最后的1表示代码示例中的batchInterval,即Spark作业batch的大小。

    6. 单击保存

  5. 运行作业。

    1. 在作业编辑页面,单击上方的运行

    2. 在弹出的运行作业对话框中,从执行集群列表中,选择已创建的Hadoop集群。

    3. 单击确定

  6. 查看mysqlbinlog文件。

    1. 通过SSH方式连接集群,详情请参见登录集群

    2. 执行如下命令查看mysqlbinlog目录下的文件。

      hadoop fs -ls /mysqlbinlog

      您还可以通过执行命令hadoop fs -cat /mysqlbinlog/part-00000查看文件内容。

阿里云首页 开源大数据平台 E-MapReduce 相关技术圈