本文主要介绍如何使用DLA Spark访问云ClickHouse。

前提条件

  • 您已开通数据湖分析DLA(Data Lake Analytics)服务,详情请参见开通云原生数据湖分析服务并在云原生数据湖分析DLA控制台上创建了Spark虚拟集群创建虚拟集群
  • 您已开通对象存储OSS(Object Storage Service)服务,详情请参见开通OSS服务
  • 您已经开通云数据库ClickHouse, 详情请参见 开通ClickHouse服务
  • 准备Spark计算节点所需要的交换机id安全组id,可以选择已有的交换机和安全组,也可以新建交换机安全组。交换机和安全组需要满足以下条件。
    • 交换机需要与您的ClickHouse在同一VPC下。可使用ClickHouse控制台上的交换机ID。
    • 安全组需要与您的ClickHouse在同一VPC下。您可以前往ECS控制台-网络与安全-安全组按照专有网络(VPC)ID搜索该VPC下的安全组,任意选择一个安全组ID即可。
    • 将第二步选定的交换机网段加入ClickHouse的白名单。

操作步骤

  1. 准备测试数据,命名为ck.csv上传至OSS。
    name,age
    fox,18
    tiger,20
    alice,36
  2. 准备以下读写ClickHouse的代码,下面的代码是读取OSS的CSV文件,写入到新建的ClickHouse表中,然后从ClickHouse表中读取数据,打印到控制台。
    • 参考POM文件
          <dependencies>
              <dependency>
                  <groupId>ru.yandex.clickhouse</groupId>
                  <artifactId>clickhouse-jdbc</artifactId>
                  <version>0.2.4</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.4.5</version>
                  <scope>provided</scope>
              </dependency>
           </dependencies>
    • 参考代码片段。完整代码请参考代码
      package com.aliyun.spark
      
      import java.sql.{Connection, DriverManager}
      import java.util.Properties
      
      import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      object SparkWriteToCK {
        val ckProperties = new Properties()
        val url = "jdbc:clickhouse://<您的clickhouse VPC 地址>:8123/default"
        ckProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
        ckProperties.put("user", "<用户名>")
        ckProperties.put("password", "<密码>")
        ckProperties.put("batchsize","100000")
        ckProperties.put("socket_timeout","300000")
        ckProperties.put("numPartitions","8")
        ckProperties.put("rewriteBatchedStatements","true")
      
        //创建ClickHouse表
        def createCKTable(table: String): Unit ={
          Class.forName(ckProperties.getProperty("driver"))
          var conn : Connection = null
          try {
            conn = DriverManager.getConnection(url, ckProperties.getProperty("user"), ckProperties.getProperty("password"))
            val stmt = conn.createStatement()
            val sql =
              s"""
                 |create table if not exists default.${table}  on cluster default(
                 |    `name` String,
                 |    `age`  Int32)
                 |ENGINE = MergeTree() ORDER BY `name` SETTINGS index_granularity = 8192;
                 |""".stripMargin
            stmt.executeQuery(sql)
          } finally {
            if(conn != null)
              conn.close()
          }
        }
        
        def main(args: Array[String]): Unit = {
          val table = "ck_test"
          //使用jdbc建ClickHouse表
          createCKTable(table)
          val spark = SparkSession.builder().getOrCreate()
          //将csv的数据写入ClickHouse
          val csvDF = spark.read.option("header","true").csv("oss://<path/to>/ck.csv").toDF("name", "age")
          csvDF.printSchema()
          csvDF.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, ckProperties)
          //读ck表数据
          val ckDF = spark.read.jdbc(url, table, ckProperties)
          ckDF.show()
        }
      
      }
  3. 将以上代码打包编译上传至OSS。
  4. 进入DLA控制台,选择Serverless Spark > 作业管理,单击创建作业模板,选择前提条件中创建的虚拟集群。配置作业运行参数后点击执行
    {
        "file": "oss://<path/to/your/jar>",
        "name": "SparkWriteToCK",
        "className": "com.aliyun.spark.SparkWriteToCK",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://<指定您存放SparkUI日志的目录/>",
            "spark.dla.connectors": "oss",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.security.group.id": "<您前提条件中选定的安全组id>",
            "spark.dla.eni.vswitch.id": "<您前提条件中选定的交换机id>"
        }
    }
    说明 更多配置说明,请参考Spark配置指南
  5. 作业运行后,可在作业界面查看作业日志和SparkUI