本文主要介绍如何使用DLA Spark访问云ClickHouse。
前提条件
- 您已开通数据湖分析DLA(Data Lake Analytics)服务,详情请参见开通云原生数据湖分析服务并在云原生数据湖分析DLA控制台上创建了Spark虚拟集群。
- 您已开通对象存储OSS(Object Storage Service)服务,详情请参见开通OSS服务。
- 您已经开通云数据库ClickHouse, 详情请参见 开通ClickHouse服务。
- 准备Spark计算节点所需要的交换机id和安全组id,可以选择已有的交换机和安全组,也可以新建交换机和安全组。交换机和安全组需要满足以下条件。
- 交换机需要与您的ClickHouse在同一VPC下。可使用ClickHouse控制台上的交换机ID。
- 安全组需要与您的ClickHouse在同一VPC下。您可以前往ECS控制台-网络与安全-安全组按照专有网络(VPC)ID搜索该VPC下的安全组,任意选择一个安全组ID即可。
- 将第二步选定的交换机网段加入ClickHouse的白名单。
操作步骤
- 准备测试数据,命名为ck.csv上传至OSS。
name,age
fox,18
tiger,20
alice,36
- 准备以下读写ClickHouse的代码,下面的代码是读取OSS的CSV文件,写入到新建的ClickHouse表中,然后从ClickHouse表中读取数据,打印到控制台。
- 参考POM文件
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
- 参考代码片段。完整代码请参考代码。
package com.aliyun.spark
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkWriteToCK {
val ckProperties = new Properties()
val url = "jdbc:clickhouse://<您的clickhouse VPC 地址>:8123/default"
ckProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
ckProperties.put("user", "<用户名>")
ckProperties.put("password", "<密码>")
ckProperties.put("batchsize","100000")
ckProperties.put("socket_timeout","300000")
ckProperties.put("numPartitions","8")
ckProperties.put("rewriteBatchedStatements","true")
//创建ClickHouse表
def createCKTable(table: String): Unit ={
Class.forName(ckProperties.getProperty("driver"))
var conn : Connection = null
try {
conn = DriverManager.getConnection(url, ckProperties.getProperty("user"), ckProperties.getProperty("password"))
val stmt = conn.createStatement()
val sql =
s"""
|create table if not exists default.${table} on cluster default(
| `name` String,
| `age` Int32)
|ENGINE = MergeTree() ORDER BY `name` SETTINGS index_granularity = 8192;
|""".stripMargin
stmt.executeQuery(sql)
} finally {
if(conn != null)
conn.close()
}
}
def main(args: Array[String]): Unit = {
val table = "ck_test"
//使用jdbc建ClickHouse表
createCKTable(table)
val spark = SparkSession.builder().getOrCreate()
//将csv的数据写入ClickHouse
val csvDF = spark.read.option("header","true").csv("oss://<path/to>/ck.csv").toDF("name", "age")
csvDF.printSchema()
csvDF.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, ckProperties)
//读ck表数据
val ckDF = spark.read.jdbc(url, table, ckProperties)
ckDF.show()
}
}
- 将以上代码打包编译上传至OSS。
- 进入DLA控制台,选择,单击创建作业模板,选择前提条件中创建的虚拟集群。配置作业运行参数后点击执行。
{
"file": "oss://<path/to/your/jar>",
"name": "SparkWriteToCK",
"className": "com.aliyun.spark.SparkWriteToCK",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 5,
"spark.executor.resourceSpec": "medium",
"spark.dla.job.log.oss.uri": "oss://<指定您存放SparkUI日志的目录/>",
"spark.dla.connectors": "oss",
"spark.dla.eni.enable": "true",
"spark.dla.eni.security.group.id": "<您前提条件中选定的安全组id>",
"spark.dla.eni.vswitch.id": "<您前提条件中选定的交换机id>"
}
}
- 作业运行后,可在作业界面查看作业日志和SparkUI。