本文为您介绍如何使用ClickHouse连接器。

背景信息

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,详情请参见什么是ClickHouse

ClickHouse连接器支持的信息如下.

类别

详情

支持类型

仅支持结果表

运行模式

批模式和流模式

数据格式

暂不适用

特有监控指标

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

当Flink结果表的DDL上指定了Primary Key,且参数 ignoreDelete设置为false时,则支持更新或删除结果表数据,但性能会显著下降。

特色功能

  • 对于ClickHouse的分布式表,支持直接写对应的本地表。

  • 对于EMR的ClickHouse,提供Exactly Once的语义。

前提条件

  • 已创建ClickHouse表,详情请参见创建表

  • 已配置白名单。

    • 如果您使用的是阿里云数据库ClickHouse,配置白名单详情请参见设置白名单

    • 如果您使用的是阿里云E-MapReduce的ClickHouse,配置白名单详情请参见管理安全组

    • 如果您使用的是阿里云ECS上自建的ClickHouse,配置白名单详情请参见安全组概述

    • 如果为其他情况,请您自行配置ClickHouse所在机器的白名单让其可被Flink所在机器访问即可。

    说明

    如何查看Flink虚拟交换机的网段,请参见如何设置白名单?

使用限制

  • 暂不支持配置sink.parallelism参数。

  • ClickHouse结果表保证At-Least-Once语义。

  • 仅Flink计算引擎VVR 3.0.2及以上版本支持ClickHouse连接器。

  • 仅Flink计算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete选项。

  • 仅Flink计算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested类型。

  • 仅Flink计算引擎VVR 4.0.11及以上版本支持直接将数据写入到ClickHouse分布式表对应的本地表。

  • 仅Flink计算引擎VVR 4.0.11及以上版本提供写EMR的ClickHouse的Exactly Once语义。但对EMR-3.45.1和EMR-5.11.1之后版本的ClickHouse,由于EMR ClickHouse产品能力变更,也不再提供Exactly Once语义。

  • 仅Flink计算引擎VVR 8.0.7及以上版本支持使用balance的策略来均匀地将数据写入ClickHouse的本地表。

  • 仅ClickHouse社区兼容版支持写ClickHouse本地表。

语法结构

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

结果表类型。

String

固定值为clickhouse

url

ClickHouse的JDBC连接地址。

String

URL格式为jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName>直接写本地表时,节点IP可以在ClickHouse执行 select * from system.clusters获取。如果不写数据库名称,则使用默认的default数据库。

说明

如果您要将数据写入ClickHouse分布式表,则URL为该分布式表所在节点的JDBC URL。

userName

ClickHouse的用户名。

String

无。

password

ClickHouse的密码。

String

无。

tableName

ClickHouse的表名称。

String

无。

maxRetryTimes

向结果表插入数据失败后的最大尝试次数。

Int

3

无。

batchSize

一次批量写入的数据条数。

Int

100

如果缓存中的数据条数达到了batchSize参数值,或者等待时间超过flushIntervalMs后,系统将会自动将缓存中的数据写入ClickHouse表中。

flushIntervalMs

清空缓存的时间间隔。

Long

1000

单位为毫秒。

ignoreDelete

是否忽略Delete消息。

Boolean

true

参数取值如下:

  • true(默认值):忽略。

  • false:不忽略。

    如果为false,并且在DDL中声明了Primary Key,则会使用ClickHouse的ALTER语句来删除数据。

说明

如果设置ignoreDelete=false,则无法支持以partition的方式写ClickHouse分布表的本地表,所以就不能再设置writeMode为partition。

shardWrite

对于ClickHouse分布式表,是否直接写ClickHouse的本地表。

Boolean

false

参数取值如下:

  • false(默认值):先写ClickHouse的分布式表,再由分布式表写入对应的本地表。此时tableName应为分布式表的名称。

  • true:跳过分布式表,直接将数据写到该ClickHouse分布式表对应的本地表。

    如果需要提高写ClickHouse分布式表的吞吐量,则建议将该值设置为true。

    • 如果您需要在URL中手动指定要将数据写到哪些节点的本地表中。此时tableName应该为本地表的名字。代码示例如下:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • 如果您不需要手动指定本地表的节点,可以通过同时设置inferLocalTable参数来让Flink自动推测本地表的节点。此时,tableName应该为分布式表的名字,且url为该分布式表所在节点的JDBC URL。代码示例如下:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // 分布式表所在节点的JDBC URL
      'tableName' = 'distribute_table'

inferLocalTable

对于写ClickHouse分布式表,是否尝试推测分布式表的本地表信息,然后直接写入本地表中。

Boolean

false

参数取值如下:

  • false(默认值):如果是写ClickHouse分布式表,并且在参数url中只指定了一个节点,则不会尝试推测分布式表对应的本地表信息,而是依然会直接写入分布式表,再由分布式表写入对应的本地表。

  • true:Flink将尝试推测分布式表的本地表信息,并直接写入对应的本地表。此时需要 shardWrite参数也同步设置为 true,tableName设置为分布式表的名字,并且url设置为该分布式表所在节点的JDBC URL。

说明

对于写ClickHouse非分布式表,可直接忽略该参数。

writeMode

对于ClickHouse分布式表,采用何种策略写ClickHouse的本地表。

Enum

default

参数取值如下:

  • default(默认值):表示总是写入到第一个节点的本地表。

  • partition:表示将数据按key写到同一个节点的本地表。

  • random:表示随机写到某个节点的本地表。

  • balance:表示采用round-robin的方式,均匀地将数据写入到本地表节点中。

说明

如果设置了writeMode=partition,请确保配置项ignoreDelete为true。

shardingKey

按何种key将数据写到同一个节点的本地表。

default

writeMode取值为partition时,shardingKey值必填,可包含多个字段,多个字段以英文逗号(,)分隔。

exactlyOnce

是否开启exactlyOnce语义。

Boolean

false

参数取值如下:

  • true:开启。

  • false(默认值):不开启。

说明
  • 目前仅支持写EMR的ClickHouse的Exactly Once语义。所以只有当您写EMR的ClickHouse时,才能将exactlyOnce设置为true。

  • 不支持以partition策略写ClickHouse的本地表的Exactly Once语义。所以如果exactlyOnce设置为true,则writeMode不能设置为partition。

类型映射

Flink字段类型

ClickHouse字段类型

BOOLEAN

UInt8 / Boolean

说明

ClickHouse v21.12及以上版本支持Boolean类型。如果您使用的ClickHouse是v21.12以下版本,Flink的Boolean类型则对应ClickHouse的UInt8类型。

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

String

BINARY

FixedString

VARBINARY

String

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

说明

ClickHouse暂不支持Flink的TIME、MAP、MULTISET和ROW类型。

对于ClickHouse的Nested类型,需要将其映射成Flink的ARRAY类型,例如:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

需要映射为:

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
说明

ClickHouse的DateTime类型可以精确到秒,Datetime64可以精确到纳秒。对于VVR-6.0.6之前的版本,因为ClickHouse官方提供的JDBC写Datetime64数据类型会出现精度丢失,只能精确到秒的问题,所以通过Flink只能写入秒级别的TIMESTAMP,即TIMESTAMP(0)。VVR-6.0.6及之后的版本修复了这个精度丢失问题,通过Flink可以正常写Datetime64类型的数据。

使用示例

  • 示例1:写ClickHouse单节点表。

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 示例2:写ClickHouse分布式表。

    假设您已经有三个本地表,表名为local_table_test,分别在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3节点上。然后基于这三个本地表,创建了一个分布式表distributed_table_test。

    • 此时,如果您希望Flink可以直接写本地表,并且可以按照某个key将相同key的数据写到同一个节点的本地表中,则DDL代码示例如下。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • 此时,如果您不想手动指定本地表的节点,可以让Flink来自动推测本地表节点,DDL代码示例如下:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分布式表所在节点对应的JDBC URL。
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', --为分布式表的名字。
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', --需设置inferLocalTable为true。
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

常见问题