本文为您介绍如何将Spark中的数据导入至ClickHouse集群。

前提条件

背景信息

关于Spark的更多介绍,请参见概述

代码示例

代码示例如下。
package com.company.packageName

import java.util.Properties
import java.util.concurrent.ThreadLocalRandom

import scala.annotation.tailrec

import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}

case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)

object CKDataImporter extends Logging {

  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  private var local: Boolean = false

  def main(args: Array[String]): Unit = {
    parse(args.toList)
    checkArguments()

    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"

    logInfo(s"Use jdbc: $jdbcUrl")
    logInfo(s"Use table: $tableName")

    val spark = getSparkSession

    // generate test data
    val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")

      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })

    val df = spark.createDataFrame(rdd)

    df.write
      .mode(SaveMode.Append)
      .jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
  }

  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
    logError("  --dbName      设置ClickHouse数据库的名称,默认为default")
    logError("  --tableName   设置ClickHouse库中表的名称")
    logError("  --ckHost      设置ClickHouse地址")
    logError("  --ckPort      设置ClickHouse端口,默认为8123")
    logError("  --user        设置ClickHouse所使用的用户名")
    logError("  --password    设置ClickHouse用户的密码,默认为空")
    logError("  --local       设置此程序使用Spark Local模式运行")
    System.exit(exitCode)
  }

  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case "--local" :: tail =>
      local = true
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }

  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }

  private def getCKJdbcProperties(
      user: String,
      password: String,
      batchSize: String = "1000",
      socketTimeout: String = "300000",
      numPartitions: String = "8",
      rewriteBatchedStatements: String = "true"): Properties = {
    val kvMap = ImmutableMap.builder()
      .put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .put("user", user)
      .put("password", password)
      .put("batchsize", batchSize)
      .put("socket_timeout", socketTimeout)
      .put("numPartitions", numPartitions)
      .put("rewriteBatchedStatements", rewriteBatchedStatements)
      .build()
    val properties = new Properties
    properties.putAll(kvMap)
    properties
  }

  private def getSparkSession: SparkSession = {
    val builder = SparkSession.builder()
    if (local) {
      builder.master("local[*]")
    }
    builder.appName("ClickHouse-Data-Importer")
    builder.getOrCreate()
  }
}

操作流程

  1. 步骤一:创建ClickHouse表
  2. 步骤二:编译并打包
  3. 步骤三:提交作业

步骤一:创建ClickHouse表

  1. 使用SSH方式登录ClickHouse集群,详情请参见登录集群
  2. 执行如下命令,进入ClickHouse客户端。
    clickhouse-client -m
  3. 创建ClickHouse信息。
    1. 执行如下命令,创建数据库clickhouse_database_name
      CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

      阿里云EMR会为ClickHouse集群自动生成一个名为cluster_emr的集群。数据库名您可以自定义。

    2. 执行如下命令,创建表clickhouse_table_name_local
      CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
        id            UInt32,
        key1            String,
        value1        UInt8,
        key2            Int64,
        value2        Float64
      ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
      ORDER BY id;
      说明 表名您可以自定义,但请确保表名是以_local结尾。layershardreplica是阿里云EMR为ClickHouse集群自动生成的宏定义,可以直接使用。
    3. 执行如下命令,创建与表clickhouse_table_name_local字段定义一致的表clickhouse_table_name_all
      说明 表名您可以自定义,但请确保表名是以_all结尾。
      CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
        id                    UInt32,
        key1                  String,
        value1                UInt8,
        key2                  Int64,
        value2                Float64
      ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

步骤二:编译并打包

  1. 下载并解压CKDataImporter示例到本地。
  2. 在CMD命令行中,进入到下载文件中pom.xml所在的目录下,执行如下命令打包文件。
    mvn clean package

    根据您pom.xml文件中artifactId的信息,下载文件中的target目录下会出现CKDataImporter-1.0.0.jar的JAR包。

步骤三:提交作业

  1. 使用SSH方式登录Hadoop集群,详情请参见登录集群
  2. 上传打包好的CKDataImporter-1.0.0.jar至Hadoop集群的根目录下。
    说明 本文示例中CKDataImporter-1.0.0.jar是上传至root根目录下,您也可以自定义上传路径。
  3. 执行如下命令提交作业。
    spark-submit --master yarn \
                 --class com.aliyun.emr.CKDataImporter \
                  CKDataImporter-1.0.0.jar \
                 --dbName clickhouse_database_name \
                 --tableName clickhouse_table_name_all \
                 --ckHost ${clickhouse_host};
    参数 说明
    dbName ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name
    tableName ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all
    ckHost ClickHouse集群的Master节点的内网IP地址或公网IP地址。IP地址获取方式,请参见获取主节点的IP地址

获取主节点的IP地址

  1. 进入详情页面。
    1. 登录阿里云E-MapReduce控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击上方的集群管理页签。
    4. 集群管理页面,单击相应集群所在行的详情
  2. 集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。
    Intranet IP