使用E-MapReduce进行MySQL Binlog日志准实时传输
本文介绍如何利用阿里云SLS插件功能和E-MapReduce集群进行MySQL Binlog的准实时传输。
前提条件
已在E-MapReduce上创建Hadoop集群,详情请参见创建集群。
已创建MySQL类型的数据库(例如RDS或DRDS)。MySQL必须开启Binlog,且Binlog必须为ROW模式。
本文以RDS为例介绍,详情请参见创建RDS MySQL实例。
说明RDS默认已开启Binlog功能。
操作步骤
连接MySQL实例并添加用户权限。
使用命令方式连接MySQL实例,详情请参见通过客户端、命令行连接RDS。
执行以下命令添加用户权限。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
为SLS服务添加对应的配置文件,详情请参见采集MySQL Binlog。
说明本文创建的Project名称为canaltest,Logstore名称为canal。
在SLS控制台查看日志数据是否上传成功,如果未上传成功请根据SLS的采集日志排查。
编译JAR包并上传至OSS。
在本地打开Git复制示例代码。
git clone https://github.com/aliyun/aliyun-emapreduce-demo.git
修改示例代码。
示例代码中已经有LoghubSample类,该类主要用于从SLS采集数据并打印。以下示例为修改后的代码。
package com.aliyun.emr.example import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} object LoghubSample { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar | | """.stripMargin) System.exit(1) } val loghubProject = args(0) val logStore = args(1) val loghubGroupName = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val conf = new SparkConf().setAppName("Mysql Sync") // conf.setMaster("local[4]"); val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, loghubProject, logStore, loghubGroupName, endpoint, 1, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog") ) ssc.start() ssc.awaitTermination() } }
示例代码主要修改
loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") )
为loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog"))
,以便于在E-MapReduce中运行时,保存Spark Streaming中流出来的数据至EMR的HDFS。您可以在本地完成代码调试后,通过如下命令打包。
mvn clean install
上传JAR包至OSS。
创建Spark作业。
进入作业编辑页面。
在顶部菜单栏处,根据实际情况选择地域和资源组。
单击左侧导航栏的旧版数据开发。
在项目列表区域,单击待编辑项目所在行的作业编辑。
在作业编辑区域,在需要操作的文件夹上,右键选择新建作业。
输入作业名称、作业描述,在作业类型下拉列表中选择Spark作业类型。
单击确定。
在作业内容中,填写提交该作业需要提供的命令行参数。
--master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 2 --class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming <SLS_endpoint> <SLS_access_id> <SLS_secret_key> 1
参数
说明
SLS_endpoint
SLS的EndPoint。
SLS_access_id
您阿里云账号的AccessKey ID。
SLS_secret_key
您阿里云账号的AccessKey Secret。
说明本示例代码中最后的1表示代码示例中的batchInterval,即Spark作业batch的大小。
单击保存。
运行作业。
在作业编辑页面,单击上方的运行。
在弹出的运行作业对话框中,从执行集群列表中,选择已创建的Hadoop集群。
单击确定。
查看mysqlbinlog文件。
通过SSH方式连接集群,详情请参见登录集群。
执行如下命令查看mysqlbinlog目录下的文件。
hadoop fs -ls /mysqlbinlog
您还可以通过执行命令
hadoop fs -cat /mysqlbinlog/part-00000
查看文件内容。