使用Flink处理数据并写入ClickHouse,除了可以使用Flink原生的JDBC connector以外,本节介绍了使用flink-connector-clickhouse方式将Flink中的数据写入ClickHouse。这种方式支持Flink SQL,可以使用Flink Table DDL语句定义ClickHouse表,然后通过INSERT语句把数据写入ClickHouse。

前提条件

阿里云全托管方式

您可以使用阿里云全托管的方式进行作业开发,详细操作步骤请参见作业开发,Clickhouse connector的定义参考:管理自定义Connectors

Table API方式

使用Flink的Table API方式详细操作步骤如下:

  1. mvn archetype:generate命令创建项目,生成过程中会提示输入 group-idartifact-id 等。
    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.12.0
  2. 编辑pom.xml中的<dependencies />小节添加依赖。
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>flink-connector-clickhouse</artifactId>
          <version>1.12.0</version>
        </dependency>
    说明
    • 如果maven依赖无法下载,可以手动下载安装,下载地址:flink-connector
    • 安装命令:
      mvn install:install-file -DgroupId=com.aliyun -DartifactId=flink-connector-clickhouse -Dversion=1.12.0 -Dpackaging=jar -Dfile=flink-connector-clickhouse-1.12.0.jar
  3. 创建数据写入程序文件。

    因为flink-connector-clickhouse支持Flink SQL,您可以使用Flink Table DDL语句定义ClickHouse Sink表。

    CREATE TABLE sink_table (
        name VARCHAR,
        grade BIGINT,
        rate FLOAT,
        more FLOAT,
        PRIMARY KEY (name, grade) NOT ENFORCED /* 如果指定 pk,进入 upsert 模式 */
    ) WITH (
        'connector' = 'clickhouse',
        'url' = 'clickhouse://<host>:<port>',
        'username' = '<username>',
        'password' = '<password>',
        'database-name' = 'default',        /* ClickHouse 数据库名,默认为 default */
        'table-name' = 'd_sink_table',      /* ClickHouse 数据表名 */
        'sink.batch-size' = '1000',         /* batch 大小 */
        'sink.flush-interval' = '1000',     /* flush 时间间隔 */
        'sink.max-retries' = '3',           /* 最大重试次数 */
        'sink.partition-strategy' = 'hash', /* hash | random | balanced */
        'sink.partition-key' = 'name',      /* hash 策略下的分区键 */
        'sink.ignore-delete' = 'true'       /* 忽略 DELETE 并视 UPDATE 为 INSERT */
    )

    通过INSERT语句把数据写入ClickHouse。

    INSERT INTO sink_table SELECT name, grade, rate FROM source

    数据写入程序文件完整示例如下:

    package org.myorg.quickstart
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<your-path-to-test-csv>/source.csv"
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.getConfig.disableClosureCleaner
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", DataTypes.STRING())
          .field("age", DataTypes.BIGINT())
          .field("sex", DataTypes.STRING())
          .field("grade", DataTypes.BIGINT())
          .field("rate", DataTypes.FLOAT())
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val create_sql =
          s"""
          | CREATE TABLE sink_table (
          |    name VARCHAR,
          |    grade BIGINT,
          |    rate FLOAT,
          |    PRIMARY KEY (name) NOT ENFORCED
          |) WITH (
          |    'connector' = 'clickhouse',
          |    'url' = 'clickhouse://<host>:<port>',
          |    'table-name' = 'd_sink_table',
          |    'sink.batch-size' = '1000',
          |    'sink.partition-strategy' = 'hash',
          |    'sink.partition-key' = 'name'
          |)
          |""".stripMargin
    
        tEnv.executeSql(create_sql);
    
        tEnv.executeSql(
          "INSERT INTO sink_table SELECT name, grade, rate FROM source"
        )
      }
    }
  4. 编译运行。
    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

connector参数说明

参数 描述
database-name ClickHouse 数据库名,默认为 default
table-name ClickHouse 数据表名
sink.batch-size batch大小需要根据您的实际业务场景进行决定,或进行测试。在我们的测试中,一般 8000左右的 batch size 可以达到较为理想的性能。
sink.flush-interval flush时间间隔,默认1s。
sink.max-retries 最大重试次数
sink.partition-strategy 分区策略
  • balanced:round-robin 轮转。
  • random:随机分区。
  • hash:hash 分区,通过 sink.partition-key 指定分区键。
sink.partition-key hash策略下的分区键
sink.ignore-delete 默认值为TRUE,表示忽略 DELETE 并视 UPDATE 为 INSERT。ClickHouse 对 UPDATE 和 DELETE 的支持并不完善,例如不支持同步更新等。若显式设置 sink.ignore-delete 为 false,则会尝试使用 ALTER TABLE UPDATE/DELETE 来更新数据,此时性能会有显著下降。

Flink与ClickHouse的数据类型映射

Flink的数据类型 ClickHouse的数据类型
BOOLEAN UInt8
TINYINT Int8
SMALLINT Int16
INTEGER Int32
INTERVAL_YEAR_MONTH Int32
BIGINT Int64
INTERVAL_DAY_TIME Int64
FLOAT Float32
DOUBLE Float64
CHAR String
VARCHAR String
BINARY FixedString
VARBINARY FixedString
DATE Date
TIME_WITHOUT_TIME_ZONE DateTime
TIMESTAMP_WITH_TIME_ZONE DateTime
TIMESTAMP_WITHOUT_TIME_ZONE DateTime
DECIMAL Decimal

FAQ

1.如何开启多线程写入?
通过 Flink 的 SetParallelism进行配置。
2.无法找到flink-connector-clickhouse的依赖?
请参考上面示例,手动下载,然后本地安装依赖。
3.Method not found 及类型不能转换错误
这大部分是Java依赖导致的,排查报错类的依赖是否有多个。建议直接使用阿里云全托管flink,写SQL更便捷。
4.flink-connector-clickhouse 是否支持回撤更新消息?
不支持。
5.本地运行flink-connector-clickhouse 没有数据输出?
首先检查下是否有数据流入,可以增加一个printSink来看下是否有数据流入,其次检查是否是批量发送的数量(sink.batch-size)和定时刷新的(sink.flush-interval)阈值有没有满足。