本节主要介绍如何利用ClickHouse JDBC connector,使用不同版本的Flink写入数据到ClickHouse中。
背景信息
Flink在1.11.0版本对其JDBC connector进行了一次较大的重构:
- 重构之前(1.10.1及之前版本),包名为flink-jdbc 。
- 重构之后(1.11.0及之后版本),包名为flink-connector-jdbc 。
二者对Flink中以不同方式写入ClickHouse Sink的支持情况如下:
API名称 |
flink-jdbc |
flink-connector-jdbc |
DataStream |
不支持 |
支持 |
Table API (Legacy) |
支持 |
不支持 |
Table API (DDL) |
不支持 |
不支持 |
flink-connector-jdbc完全移除了对Table API (Legacy) 的支持,只能通过DDL的方式调用Table API。但是,Table DDL方式硬编码了其所支持的
JDBC Driver,不支持ClickHouse。
下面,我们依次以Flink 1.10.1 + flink-jdbc 以及Flink 1.11.0 + flink-connector-jdbc 为例,介绍Flink写入ClickHouse的方法。
Flink 1.10.1 + flink-jdbc
Flink 1.10.1及之前版本需要采用flink-jdbc+Table API的方式写入数据到ClickHouse。本节我们使用Maven及Flink 1.10.1版本进行示例。
- 用mvn archetype:generate命令创建项目,生成过程中根据提示输入group-id和artifact-id等。
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.10.1
- 编辑pom.xml中的
<dependencies />
小节添加依赖。//添加Flink Table API相关的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
//添加Flink JDBC以及Clickhouse JDBC Driver相关的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
- 创建数据写入程序文件。
示例程序使用
CsvTableSource
读入 CSV 文件产生Table Source,使用
JDBCAppendTableSink
将数据写入到ClickHouse Sink中。
说明
- 由于ClickHouse单次插入的延迟比较高,我们需要设置
BatchSize
来批量插入数据,提高性能。
- 在JDBCAppendTableSink的实现中,若最后一批数据的数目不足
BatchSize
,则不会插入剩余数据。
package org.myorg.example
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
TableEnvironment,
TableSchema,
Types,
ValidationException
}
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation
object StreamingJob {
def main(args: Array[String]) {
val SourceCsvPath =
"/<your-path-to-test-csv>/source.csv"
val CkJdbcUrl =
"jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
val CkUsername = "<your-username>"
val CkPassword = "<your-password>"
val BatchSize = 500 // 设置您的batch size
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val csvTableSource = CsvTableSource
.builder()
.path(SourceCsvPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("name", Types.STRING)
.field("age", Types.LONG)
.field("sex", Types.STRING)
.field("grade", Types.LONG)
.field("rate", Types.FLOAT)
.build()
tEnv.registerTableSource("source", csvTableSource)
val resultTable = tEnv.scan("source").select("name, grade, rate")
val insertIntoCkSql =
"""
| INSERT INTO sink_table (
| name, grade, rate
| ) VALUES (
| ?, ?, ?
| )
""".stripMargin
//将数据写入 ClickHouse Sink
val sink = JDBCAppendTableSink
.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl(CkJdbcUrl)
.setUsername(CkUsername)
.setPassword(CkPassword)
.setQuery(insertIntoCkSql)
.setBatchSize(BatchSize)
.setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
.build()
tEnv.registerTableSink(
"sink",
Array("name", "grade", "rate"),
Array(Types.STRING, Types.LONG, Types.FLOAT),
sink
)
tEnv.insertInto(resultTable, "sink")
env.execute("Flink Table API to ClickHouse Example")
}
}
参数说明:
SourceCsvPath
:源CSV文件路径。
CkJdbcUrl
:目标ClickHouse集群地址。
CkUsername
:目标ClickHouse集群用户名。
CkPassword
:目标ClickHouse集群对应密码。
- 编译运行。
$ mvn clean package
$ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
Flink 1.11.0及之后版本需要采用flink-connector-jdbc+DataStream的方式写入数据到ClickHouse。本节我们使用Maven及Flink
1.11.0版本进行示例。
- 用mvn archetype:generate命令创建项目,生成过程中会提示输入group-id和artifact-id等。
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.11.0
- 编辑pom.xml中的
<dependencies />
小节添加依赖。//添加Flink Table API相关的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
//添加Flink JDBC Connector以及Clickhouse JDBC Driver相关的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
- 创建数据写入程序文件。
示例程序使用
CsvTableSource
读入CSV文件产生Table Source,通过
TableEnvironment.toAppendStream
将Table转换为DataStream。使用
JdbcSink
将数据写入到ClickHouse中。
说明
- 由于ClickHouse单次插入的延迟比较高,我们需要设置
BatchSize
来批量插入数据,提高性能。
- 当前版本的flink-connector-jdbc,使用Scala API调用JdbcSink时会出现lambda函数的序列化问题。我们只能采用手动实现interface的方式来传入相关JDBC
Statement build函数(
class CkSinkBuilder
)。class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
ps.setString(1, v._1)
ps.setLong(2, v._2)
ps.setFloat(3, v._3)
}
}
package org.myorg.example
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
TableEnvironment,
TableSchema,
Types,
ValidationException
}
import org.apache.flink.connector.jdbc._
import java.sql.PreparedStatement
//手动实现interface的方式来传入相关JDBC Statement build函数
class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
ps.setString(1, v._1)
ps.setLong(2, v._2)
ps.setFloat(3, v._3)
}
}
object StreamingJob {
def main(args: Array[String]) {
val SourceCsvPath =
"/<your-path-to-test-csv>/source.csv"
val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
val CkUsername = "<your-username>"
val CkPassword = "<your-password>"
val BatchSize = 500 // 设置您的 batch size
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val csvTableSource = CsvTableSource
.builder()
.path(SourceCsvPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("name", Types.STRING)
.field("age", Types.LONG)
.field("sex", Types.STRING)
.field("grade", Types.LONG)
.field("rate", Types.FLOAT)
.build()
tEnv.registerTableSource("source", csvTableSource)
val resultTable = tEnv.scan("source").select("name, grade, rate")
//将Table转换为DataStream
val resultDataStream =
tEnv.toAppendStream[(String, Long, Float)](resultTable)
val insertIntoCkSql =
"""
| INSERT INTO sink_table (
| name, grade, rate
| ) VALUES (
| ?, ?, ?
| )
""".stripMargin
//将数据写入ClickHouse JDBC Sink
resultDataStream.addSink(
JdbcSink.sink[(String, Long, Float)](
insertIntoCkSql,
new CkSinkBuilder,
new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(CkJdbcUrl)
.withUsername(CkUsername)
.withPassword(CkPassword)
.build()
)
)
env.execute("Flink DataStream to ClickHouse Example")
}
}
参数说明:
SourceCsvPath
:源CSV文件路径。
CkJdbcUrl
:目标ClickHouse集群地址。
CkUsername
:目标ClickHouse集群用户名。
CkPassword
:目标ClickHouse集群对应密码。
- 编译运行。
$ mvn clean package
$ ${FLINK_HOME}/bin/flink run target/example-0.1.jar