本文介绍如何在E-MapReduce的Hadoop集群运行Spark作业统计单词个数并将结果写入MySql。Spark Streaming 作业写入MySql的方法与此类似。
Spark接入MySQL
示例代码如下。
val input = getSparkContext.textFile(inputPath, numPartitions)
input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
.mapPartitions(e => {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = s"insert into $tbName(word, count) values (?, ?)"
try {
conn = DriverManager.getConnection(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", dbUser, dbPwd)
ps = conn.prepareStatement(sql)
e.foreach(pair => {
ps.setString(1, pair._1)
ps.setLong(2, pair._2)
ps.executeUpdate()
})
ps.close()
conn.close()
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
Iterator.empty
}).count()
附录
完整示例代码请参见Spark写入RDS。
在文档使用中是否遇到以下问题
更多建议
匿名提交