This topic describes using the ClickHouse connector to write data to ClickHouse.
Background information
ClickHouse is a column-oriented database management system for Online Analytical Processing (OLAP). For more information, see What Is ClickHouse?.
The following table describes the capabilities of the ClickHouse connector.
|
Category |
Details |
|
Supported type |
Result table only |
|
Running mode |
Batch and streaming modes |
|
Data format |
Not applicable |
|
Connector-specific metrics |
Note
For more information about these metrics, see Monitoring metrics. |
|
API type |
SQL |
|
Support for updating or deleting data in a result table |
The connector supports updates and deletions if you specify a primary key in the Flink result table's DDL and set the |
Features
-
Write data directly to the local tables of a ClickHouse distributed table.
-
Achieve exactly-once semantics when writing to ClickHouse on Alibaba Cloud E-MapReduce (EMR).
Prerequisites
-
Create a ClickHouse table. For more information, see Create a New table.
-
Configure a whitelist.
-
If you use ApsaraDB for ClickHouse, see Configure a whitelist.
-
If you use ClickHouse on Alibaba Cloud E-MapReduce (EMR), see Manage security groups.
-
If you use a self-managed ClickHouse cluster on an ECS instance, see Security group overview.
-
For all other setups, configure the whitelist on the machine where ClickHouse is deployed to allow access from the Realtime Compute for Apache Flink deployment.
NoteTo view the vSwitch network segment for Realtime Compute for Apache Flink, see How do I configure a whitelist?.
-
Limitations
-
The
sink.parallelismparameter is not supported. -
By default, the ClickHouse sink provides at-least-once semantics.
-
The ClickHouse connector is supported only in Ververica Runtime (VVR) 3.0.2 and later.
-
The
ignoreDeleteoption is supported only in VVR 3.0.3, VVR 4.0.7, and later versions. -
The ClickHouse Nested data type is supported only in VVR 4.0.10 and later.
-
Direct writes to the local tables of a distributed table are supported only in VVR 4.0.11 and later.
-
The exactly-once semantic for writing to ClickHouse on EMR is available only in VVR 4.0.11 and later. However, due to product capability changes, this semantic is unavailable for ClickHouse on EMR v3.45.1 and versions later than v5.11.1.
-
The
balancewrite mode, which evenly distributes data across local table nodes, is available only in VVR 8.0.7 and later. -
Writing to a ClickHouse local table is supported only for ApsaraDB for ClickHouse Community-compatible Edition.
-
When you register a ClickHouse catalog, the default database name cannot contain a hyphen (
-). Otherwise, the JDBC URL validation fails.
Syntax
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000',
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
WITH parameters
|
Parameter |
Description |
Type |
Required |
Default |
Remarks |
|
connector |
The type of the result table. |
String |
Yes |
N/A |
Set the value to clickhouse. |
|
url |
The JDBC URL of your ClickHouse cluster. |
String |
Yes |
N/A |
The URL format is Note
If you want to write data to a ClickHouse distributed table, |
|
userName |
The username for accessing ClickHouse. |
String |
Yes |
N/A |
N/A |
|
password |
The password for accessing ClickHouse. |
String |
Yes |
N/A |
N/A |
|
tableName |
The name of the ClickHouse table. |
String |
Yes |
N/A |
N/A |
|
maxRetryTimes |
The maximum number of retries after a failed attempt to insert data into the result table. |
Int |
No |
3 |
N/A |
|
batchSize |
The number of records to write in a single batch. |
Int |
No |
100 |
If the number of data entries in the cache reaches the value of the batchSize parameter or the wait time exceeds flushIntervalMs, the system automatically writes the data from the cache to the ClickHouse table. |
|
flushIntervalMs |
The time interval, in milliseconds, to flush the buffer. |
Long |
No |
1000 |
The unit is milliseconds. |
|
ignoreDelete |
Specifies whether to ignore delete messages. |
Boolean |
No |
true |
Valid values:
Note
If you set |
|
shardWrite |
For a ClickHouse distributed table, specifies whether to write data directly to the underlying local tables. |
Boolean |
No |
false |
Valid values:
|
|
inferLocalTable |
When writing to a ClickHouse distributed table, specifies whether to automatically discover the underlying local tables and write directly to them. |
Boolean |
No |
false |
Valid values:
Note
This parameter is ignored when writing to a non-distributed table. |
|
writeMode |
The strategy for writing data to local tables of a distributed table. |
Enum |
No |
default |
Valid values:
Note
If you set |
|
shardingKey |
The key used to partition data across local table nodes. |
String |
No |
N/A |
When writeMode is set to 'partition', the shardingKey parameter is required. It can contain multiple fields separated by commas (,). |
|
exactlyOnce |
Specifies whether to enable the exactly-once semantic. |
Boolean |
No |
false |
Valid values:
Note
|
Data type mapping
|
Flink type |
ClickHouse type |
|
BOOLEAN |
UInt8 / Boolean Note
ClickHouse v21.12 and later supports the Boolean type. For earlier versions, the Flink BOOLEAN type maps to the ClickHouse UInt8 type. |
|
TINYINT |
Int8 |
|
SMALLINT |
Int16 |
|
INTEGER |
Int32 |
|
BIGINT |
Int64 |
|
BIGINT |
UInt32 |
|
FLOAT |
Float32 |
|
DOUBLE |
Float64 |
|
CHAR |
FixedString |
|
VARCHAR |
String |
|
BINARY |
FixedString |
|
VARBINARY |
String |
|
DATE |
Date |
|
TIMESTAMP(0) |
DateTime |
|
TIMESTAMP(x) |
Datetime64(x) |
|
DECIMAL |
DECIMAL |
|
ARRAY |
ARRAY |
|
Nested |
The ClickHouse connector does not support the Flink TIME, MAP, MULTISET, and ROW types.
To use the ClickHouse Nested data type, you must map it to an ARRAY type in Flink. For example:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
Map the types as follows:
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
In VVR versions earlier than 6.0.6, writing DateTime64 data with the official ClickHouse JDBC driver caused a loss of precision by truncating values to the second. As a result, only TIMESTAMP data with second-level precision, such as TIMESTAMP(0), could be written successfully. VVR 6.0.6 and later resolve this issue, allowing for accurate DateTime64 data writes.
Examples
-
Example 1: Write to a single-node table.
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source; -
Example 2: Write to a distributed table.
Assume a distributed table named distributed_table_test is created from three local tables named local_table_test, which are located on nodes 192.XX.XX.1, 192.XX.XX.2, and 192.XX.XX.3.
-
If you want Flink to write data directly to the local tables and partition the data by a key, use the following DDL:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source; -
If you want Flink to automatically discover the local table nodes instead of specifying them manually in the URL, use the following DDL:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- The JDBC URL of a node in the cluster. 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', -- The name of the distributed table. 'shardWrite' = 'true', 'inferLocalTable' = 'true', -- Set inferLocalTable to true. 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
-