本文介绍如何利用阿里云的 SLS 插件功能和 E-MapReduce 集群进行 MySQL binlog 的准实时传输。

基本架构

RDS -> SLS -> Spark Streaming -> Spark HDFS

上述链路主要包含3个过程:
  1. 如何把 RDS 的 binlog 收集到 SLS。
  2. 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
  3. 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。

环境准备

  1. 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
  2. 开通 SLS 服务。

操作步骤

  1. 检查 MySQL 数据库环境。
    1. 查看是否开启 log-bin 功能。
      mysql> show variables like "log_bin";
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | log_bin       | ON    |
      +---------------+-------+
      1 row in set (0.02 sec)
    2. 查看 binlog 类型。
      mysql> show variables like "binlog_format";
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | binlog_format | ROW   |
      +---------------+-------+
      1 row in set (0.03 sec)
  2. 添加用户权限。(也可以直接通过RDS控制台添加)
    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
  3. 为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。
    1. 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
    2. 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
      {
      "metrics": {
         "##1.0##canaltest$plugin-local": {
             "aliuid": "****",
             "enable": true,
             "category": "canal",
             "defaultEndpoint": "*******",
             "project_name": "canaltest",
             "region": "cn-hangzhou",
             "version": 2
             "log_type": "plugin",
             "plugin": {
                 "inputs": [
                     {
                         "type": "service_canal",
                         "detail": {
                             "Host": "*****",
                             "Password": "****",
                             "ServerID": ****,
                             "User" : "***",
                             "DataBases": [
                                 "yourdb"
                             ],
                             "IgnoreTables": [
                                 "\\S+_inner"
                             ],
                              "TextToString" : true
                         }
                     }
                 ],
                 "flushers": [
                     {
                         "type": "flusher_sls",
                         "detail": {}
                     }
                 ]
             }
         }
      }
      }

      其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。

    3. 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
      查看日志数据

      如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。

  4. 准备代码,将代码编译成 jar 包,然后上传到 OSS。
    1. 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:
      package com.aliyun.emr.example
      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
      import org.apache.spark.streaming.{Milliseconds, StreamingContext}
      object LoghubSample {
      def main(args: Array[String]): Unit = {
      if (args.length < 7) {
       System.err.println(
         """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
           |            
           |           
         """.stripMargin)
       System.exit(1)
      }
      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)
      val conf = new SparkConf().setAppName("Mysql Sync")
      //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
       ssc,
       loghubProject,
       logStore,
       loghubGroupName,
       endpoint,
       1,
       accessKeyId,
       accessKeySecret,
       StorageLevel.MEMORY_AND_DISK)
      loghubStream.foreachRDD(rdd =>
         rdd.saveAsTextFile("/mysqlbinlog")
      )
      ssc.start()
      ssc.awaitTermination()
      }
      }
      其中的主要改动是:loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") )。这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。
      说明
      • 由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。
      • 由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。
        trait RunLocally {
        val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
        conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
        conf.set("spark.hadoop.mapreduce.job.run-local", "true")
        conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
        conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
        conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
        conf.set("spark.hadoop.job.runlocal", "true")
        conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
        conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
        val sc = new SparkContext(conf)
        def getAppName: String
        }
      • 在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。
    2. 代码编译。
      在本地调试完成后,我们可以通过如下命令进行打包编译:
      mvn clean install
    3. 上传 jar 包。
      请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:

  5. 搭建 EMR 集群,创建任务并运行执行计划。
    1. 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
    2. 创建一个类型为 Spark 的作业。
      请根据您具体的配置将 SLS_endpoint $SLS_access_id $SLS_secret_key 替换成真实值。请注意参数的顺序,否则可能会报错。
      --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 2 --class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1
    3. 创建执行计划,将作业和 EMR 集群绑定后,开始运行。
    4. 查询 Master 节点的IP,如图所示:

      通过 SSH 登录后,执行以下命令:
      hadoop fs -ls /
      可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:
      hadoop fs -ls /mysqlbinlog


      还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。
  6. 错误排查。
    如果没有看到正常的结果,可以通过 EMR 的运行记录,来进行问题排查,如图所示: