本文主要介绍如何使用DLA Spark访问云数据库ClickHouse。
操作步骤
- 准备测试数据,命名为ck.csv上传至OSS。
name,age
fox,18
tiger,20
alice,36
- 准备以下读写ClickHouse的代码,下面的代码是读取OSS的CSV文件,写入到新建的ClickHouse表中,然后从ClickHouse表中读取数据,打印到控制台。
- 参考POM文件。
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
- 参考代码片段。完整代码请参考这里
package com.aliyun.spark
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkWriteToCK {
val ckProperties = new Properties()
val url = "jdbc:clickhouse://<您的clickhouse VPC 地址>:8123/default"
ckProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
ckProperties.put("user", "<用户名>")
ckProperties.put("password", "<密码>")
ckProperties.put("batchsize","100000")
ckProperties.put("socket_timeout","300000")
ckProperties.put("numPartitions","8")
ckProperties.put("rewriteBatchedStatements","true")
//创建ClickHouse表
def createCKTable(table: String): Unit ={
Class.forName(ckProperties.getProperty("driver"))
var conn : Connection = null
try {
conn = DriverManager.getConnection(url, ckProperties.getProperty("user"), ckProperties.getProperty("password"))
val stmt = conn.createStatement()
val sql =
s"""
|create table if not exists default.${table} on cluster default(
| `name` String,
| `age` Int32)
|ENGINE = MergeTree() ORDER BY `name` SETTINGS index_granularity = 8192;
|""".stripMargin
stmt.executeQuery(sql)
} finally {
if(conn != null)
conn.close()
}
}
def main(args: Array[String]): Unit = {
val table = "ck_test"
//使用jdbc建ClickHouse表
createCKTable(table)
val spark = SparkSession.builder().getOrCreate()
//将csv的数据写入ClickHouse
val csvDF = spark.read.option("header","true").csv("oss://<path/to>/ck.csv").toDF("name", "age")
csvDF.printSchema()
csvDF.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, ckProperties)
//读ck表数据
val ckDF = spark.read.jdbc(url, table, ckProperties)
ckDF.show()
}
}
- 将以上代码打包编译上传至OSS。
- 进入DLA控制台,单击,执行选择前提条件中创建的虚拟集群。配置作业运行参数后单击执行。

{
"file": "oss://<path/to/your/jar>",
"name": "SparkWriteToCK",
"className": "com.aliyun.spark.SparkWriteToCK",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 5,
"spark.executor.resourceSpec": "medium",
"spark.dla.job.log.oss.uri": "oss://<指定您存放SparkUI日志的目录/>",
"spark.dla.connectors": "oss",
"spark.dla.eni.enable": "true",
"spark.dla.eni.security.group.id": "<您前提条件中选定的安全组id>",
"spark.dla.eni.vswitch.id": "<您前提条件中选定的交换机id>"
}
}
- 作业运行后,可在下方作业界面查看作业日志和SparkUI。
在文档使用中是否遇到以下问题
更多建议
匿名提交