ClickHouse connector

更新时间:
复制 MD 格式

This topic describes using the ClickHouse connector to write data to ClickHouse.

Background information

ClickHouse is a column-oriented database management system for Online Analytical Processing (OLAP). For more information, see What Is ClickHouse?.

The following table describes the capabilities of the ClickHouse connector.

Category

Details

Supported type

Result table only

Running mode

Batch and streaming modes

Data format

Not applicable

Connector-specific metrics

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

Note

For more information about these metrics, see Monitoring metrics.

API type

SQL

Support for updating or deleting data in a result table

The connector supports updates and deletions if you specify a primary key in the Flink result table's DDL and set the ignoreDelete parameter to false. However, this option significantly degrades performance.

Features

  • Write data directly to the local tables of a ClickHouse distributed table.

  • Achieve exactly-once semantics when writing to ClickHouse on Alibaba Cloud E-MapReduce (EMR).

Prerequisites

  • Create a ClickHouse table. For more information, see Create a New table.

  • Configure a whitelist.

    • If you use ApsaraDB for ClickHouse, see Configure a whitelist.

    • If you use ClickHouse on Alibaba Cloud E-MapReduce (EMR), see Manage security groups.

    • If you use a self-managed ClickHouse cluster on an ECS instance, see Security group overview.

    • For all other setups, configure the whitelist on the machine where ClickHouse is deployed to allow access from the Realtime Compute for Apache Flink deployment.

    Note

    To view the vSwitch network segment for Realtime Compute for Apache Flink, see How do I configure a whitelist?.

Limitations

  • The sink.parallelism parameter is not supported.

  • By default, the ClickHouse sink provides at-least-once semantics.

  • The ClickHouse connector is supported only in Ververica Runtime (VVR) 3.0.2 and later.

  • The ignoreDelete option is supported only in VVR 3.0.3, VVR 4.0.7, and later versions.

  • The ClickHouse Nested data type is supported only in VVR 4.0.10 and later.

  • Direct writes to the local tables of a distributed table are supported only in VVR 4.0.11 and later.

  • The exactly-once semantic for writing to ClickHouse on EMR is available only in VVR 4.0.11 and later. However, due to product capability changes, this semantic is unavailable for ClickHouse on EMR v3.45.1 and versions later than v5.11.1.

  • The balance write mode, which evenly distributes data across local table nodes, is available only in VVR 8.0.7 and later.

  • Writing to a ClickHouse local table is supported only for ApsaraDB for ClickHouse Community-compatible Edition.

  • When you register a ClickHouse catalog, the default database name cannot contain a hyphen (-). Otherwise, the JDBC URL validation fails.

Syntax

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000',
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH parameters

Parameter

Description

Type

Required

Default

Remarks

connector

The type of the result table.

String

Yes

N/A

Set the value to clickhouse.

url

The JDBC URL of your ClickHouse cluster.

String

Yes

N/A

The URL format is jdbc:clickhouse://<yourNetworkAddress>:<port>/<yourDatabaseName>. When you write directly to a local table, you can obtain the node IP address by executing select * from system.clusters in ClickHouse. If you do not specify a database name, the default database is used.

Note

If you want to write data to a ClickHouse distributed table, url must be the JDBC URL of a node in the cluster that hosts the distributed table.

userName

The username for accessing ClickHouse.

String

Yes

N/A

N/A

password

The password for accessing ClickHouse.

String

Yes

N/A

N/A

tableName

The name of the ClickHouse table.

String

Yes

N/A

N/A

maxRetryTimes

The maximum number of retries after a failed attempt to insert data into the result table.

Int

No

3

N/A

batchSize

The number of records to write in a single batch.

Int

No

100

If the number of data entries in the cache reaches the value of the batchSize parameter or the wait time exceeds flushIntervalMs, the system automatically writes the data from the cache to the ClickHouse table.

flushIntervalMs

The time interval, in milliseconds, to flush the buffer.

Long

No

1000

The unit is milliseconds.

ignoreDelete

Specifies whether to ignore delete messages.

Boolean

No

true

Valid values:

  • true (default): Ignores delete messages.

  • false: Does not ignore delete messages.

    If this option is set to false and a primary key is defined in the DDL, the connector uses an ALTER statement to delete data in ClickHouse.

Note

If you set ignoreDelete to false, you cannot use the partition write mode.

shardWrite

For a ClickHouse distributed table, specifies whether to write data directly to the underlying local tables.

Boolean

No

false

Valid values:

  • false (default): The connector writes data to the distributed table, which then routes the data to the corresponding local tables. The tableName parameter must be the name of the distributed table.

  • true: The connector writes data directly to the local tables, bypassing the distributed table.

    This is recommended to improve write throughput.

    • If you want to manually specify which local tables to write to in the url, the tableName must be the name of the local table. Example:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • If you do not want to manually specify the nodes, you can set the inferLocalTable parameter to true to let Flink automatically discover the local table nodes. In this case, tableName must be the name of the distributed table, and url must be the JDBC URL of a node in the cluster. Example:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // The JDBC URL of a node in the cluster.
      'tableName' = 'distribute_table'

inferLocalTable

When writing to a ClickHouse distributed table, specifies whether to automatically discover the underlying local tables and write directly to them.

Boolean

No

false

Valid values:

  • false (default): If you are writing to a distributed table and specify only one node in the url parameter, the connector does not attempt to discover the local tables. It writes to the distributed table, which then routes the data to the local tables.

  • true: Flink attempts to discover the local tables and writes directly to them. This requires shardWrite to be set to true, tableName to be the name of the distributed table, and url to be the JDBC URL of a node in the cluster.

Note

This parameter is ignored when writing to a non-distributed table.

writeMode

The strategy for writing data to local tables of a distributed table.

Enum

No

default

Valid values:

  • default (default): Always writes to the local table on the first node specified in the URL.

  • partition: Distributes data based on a sharding key, ensuring records with the same key are written to the same local table node.

  • random: Writes data to a randomly selected local table node.

  • balance: Uses a round-robin strategy to evenly distribute data across all local table nodes.

Note

If you set writeMode to partition, you must set ignoreDelete to true.

shardingKey

The key used to partition data across local table nodes.

String

No

N/A

When writeMode is set to 'partition', the shardingKey parameter is required. It can contain multiple fields separated by commas (,).

exactlyOnce

Specifies whether to enable the exactly-once semantic.

Boolean

No

false

Valid values:

  • true: Enables the exactly-once semantic.

  • false (default): Disables the exactly-once semantic.

Note
  • The exactly-once semantic is supported only for writing to ClickHouse on EMR. Set this parameter to true only if your sink is a ClickHouse cluster on EMR.

  • Exactly-once semantics are not supported for writing to ClickHouse local tables with the partition strategy. Therefore, if exactlyOnce is set to true, writeMode cannot be set to partition.

Data type mapping

Flink type

ClickHouse type

BOOLEAN

UInt8 / Boolean

Note

ClickHouse v21.12 and later supports the Boolean type. For earlier versions, the Flink BOOLEAN type maps to the ClickHouse UInt8 type.

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

String

BINARY

FixedString

VARBINARY

String

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

Note

The ClickHouse connector does not support the Flink TIME, MAP, MULTISET, and ROW types.

To use the ClickHouse Nested data type, you must map it to an ARRAY type in Flink. For example:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

Map the types as follows:

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
Note

In VVR versions earlier than 6.0.6, writing DateTime64 data with the official ClickHouse JDBC driver caused a loss of precision by truncating values to the second. As a result, only TIMESTAMP data with second-level precision, such as TIMESTAMP(0), could be written successfully. VVR 6.0.6 and later resolve this issue, allowing for accurate DateTime64 data writes.

Examples

  • Example 1: Write to a single-node table.

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • Example 2: Write to a distributed table.

    Assume a distributed table named distributed_table_test is created from three local tables named local_table_test, which are located on nodes 192.XX.XX.1, 192.XX.XX.2, and 192.XX.XX.3.

    • If you want Flink to write data directly to the local tables and partition the data by a key, use the following DDL:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • If you want Flink to automatically discover the local table nodes instead of specifying them manually in the URL, use the following DDL:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- The JDBC URL of a node in the cluster.
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', -- The name of the distributed table.
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', -- Set inferLocalTable to true.
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

FAQ