云数据库 SelectDB 版完全兼容Apache Doris,支持通过Flink Doris Connector将MySQL、Oracle、PostgreSQL、SQL Server、Kafka等数据源中的历史数据导入至SelectDB。同时,在Flink开启CDC任务后,数据源的增量数据也会同步至SelectDB。
功能简介
Flink Doris Connector目前仅支持向SelectDB写数据,如果您需要通过Flink Doris Connector直接连接SelectDB的BE节点,实现高效的数据读取。您需联系SelectDB技术支持团队申请开放相关访问权限。
您也可以通过Flink JDBC Connector读取SelectDB数据。
Flink Doris Connector是一个用于Apache Flink和Apache Doris之间的连接器,它允许用户在Flink中读取Doris数据和写入数据到Doris,从而为实时数据处理和分析提供了支持。由于SelectDB完全兼容Apache Doris,所以Flink Doris Connector也是SelectDB流式导入数据的常用方式。
Flink的流处理能力,基于Source、Transform和Sink三个组件。各个组件功能如下:
Source(数据源):
作用:Source是Flink数据流的入口,用于从外部系统读取数据流。这些外部系统可能包括消息队列(如Apache Kafka)、数据库、文件系统等。
示例:使用Kafka作为数据源读取实时消息,或者从文件中读取数据。
Transform(数据转换):
作用:Transform阶段负责对输入的数据流进行处理和转换。这些转换操作可以是过滤、映射、聚合、窗口操作等。
示例:对输入流进行映射操作,将输入的数据结构转换成另一种形式,或对数据进行聚合,计算每分钟的某个指标等。
Sink(数据汇):
作用:Sink是Flink数据流的出口,用于将处理后的数据输出到外部系统。Sink可以将数据写入数据库、文件、消息队列等。
示例:将处理后的结果写入MySQL数据库,或者将数据发送到另一个Kafka主题。
通过Flink Doris Connector将数据导入至SelectDB,各数据的流向如下图所示。
前提条件
数据源、Fink与SelectDB网络互通:
Flink已引入与Flink Doris Connector。
Flink与Flink Doris Connector版本要求如下:
Flink版本
Flink Doris Connector版本
下载地址
阿里云实时计算Flink版:大于或等于1.17
开源版:大于或等于1.15
1.5.2及以上版本,推荐下载最新版本
Flink如何引入Flink Doris Connector,请参见如何引入Flink Doris Connector。
如何引入Flink Doris Connector
根据您的实际情况,引入Flink Doris Connector。
如果您的是通过阿里云实时计算Flink版导入数据至SelectDB,您可以通过自定义连接器来上传、使用和更新
Flink Doris Connector。如何使用自定义连接器,请参见管理自定义连接器。如果您的Flink集群是自建的开源版集群,您需要下载对应
Flink Doris Connector版本的JAR包,并放置在Flink安装目录的lib目录下。JAR包下载地址,请参见JAR包。如果您需要以Maven的方式引入
Flink Doris Connector,需在项目的依赖配置文件中添加以下代码。更多版本,请参见Maven仓库。<!-- flink-doris-connector --> <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> <version>1.5.2</version> </dependency>
使用示例
示例环境
本示例为使用Flink SQL、Flink CDC和DataStream三种方式将MySQL数据库test的employees表的数据,迁移至SelectDB的数据库test的employees表中。在实际使用中,请根据您的场景修改对应参数。示例环境如下:
Flink 1.16单机环境
Jav
目标库:test
目标表:employees
源数据库:test
源数据表:employees
环境准备
Flink环境准备
Java环境准备。
Flink的运行依赖Java环境,因此需安装Java的开发工具包JDK,并配置
JAVA_HOME环境变量。目标Java版本与Flink版本有关,Flink支持的Java版本详情,请参见Flink支持的Java列表。本示例中,安装的版本为Java 8。具体操作,请参见安装JDK。
下载Flink安装包flink-1.16.3-bin-scala_2.12.tgz。如果此版本已过期,请可以下载其他版本。更多版本,请参见Apache Flink。
wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz解压安装包。
tar -zxvf flink-1.16.3-bin-scala_2.12.tgz进入Flink安装目录的lib目录,为后续操作引入相关Connector。
引入Flink Doris Connector。
wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar引入Flink MySQL Connector。
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
启动Flink集群。
在Flink安装目录的bin目录下,执行以下指令。
./start-cluster.sh
目标SelectDB库表准备
创建云数据库 SelectDB 版实例,详情请参见创建实例。
链接实例,详情请参见连接实例。
创建测试数据库test。
CREATE DATABASE test;创建测试表employees。
USE test; -- 建表 CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
源表MySQL库表准备
创建MySQL实例,详情请参见快捷创建RDS MySQL实例与配置数据库。
创建测试数据库test。
CREATE DATABASE test;创建测试表employees。
USE test; CREATE TABLE employees ( emp_no INT NOT NULL PRIMARY KEY, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE );插入数据。
INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'), (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'), (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'), (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'), (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');
通过Flink SQL方式导入数据
启动Flink SQL Client服务。
在Flink安装目录的bin目录下,执行以下指令。
./sql-client.sh在Flink SQL Client上提交Flink任务,具体步骤如下。
创建MySQL源表。
下述语句中,WITH后面的配置项为
MySQL CDC Source的信息,配置项详情,请参见MySQL | Apache Flink CDC。CREATE TABLE employees_source ( emp_no INT, birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE, PRIMARY KEY (`emp_no`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '****', 'database-name' = 'test', 'table-name' = 'employees' );创建SelectDB结果表。
下述语句中,WITH后面的配置项为SelectDB的信息,配置项详情,请参见Sink配置项。
CREATE TABLE employees_sink ( emp_no INT , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.employees', 'username' = 'admin', 'password' = '****' );同步MySql源表数据至SelectDB结果表。
INSERT INTO employees_sink SELECT * FROM employees_source;
查看数据导入结果。
连接SelectDB,执行以下语句,查看数据导入结果。
SELECT * FROM test.employees;
通过Flink CDC方式
阿里云实时计算Flink版不支持JAR作业方式,后面通过CDC 3.0的YAML作业来支持。
以下介绍如何使用Flink CDC将数据库的数据导入至SelectDB。
在Flink安装目录下,通过flink程序执行Flink CDC,语法如下:
<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]参数说明
参数  | 说明  | 
execution.checkpointing.interval  | Flink checkpoint的时间间隔,影响数据同步的频率,推荐10s。  | 
parallelism.default  | 设置Flink任务的并行度,适当增加并行度可提高数据同步速度。  | 
job-name  | Flink作业名称。  | 
database  | 同步到SelectDB的数据库名。  | 
table-prefix  | SelectDB表前缀名,例如  | 
table-suffix  | SelectDB表的后缀名。  | 
including-tables  | 需要同步的表,可以使用"|"分隔多个表,并支持正则表达式。例如  | 
excluding-tables  | 不需要同步的表,配置方法与including-tables相同。  | 
mysql-conf  | MySQL CDC Source配置。详情请参见MySQL CDC Connector,其中  | 
oracle-conf  | Oracle CDC Source配置。详情请参见Oracle CDC Connector,其中  | 
sink-conf  | Doris Sink的所有配置,详情请参见Sink配置项。  | 
table-conf  | SelectDB表的配置项,即创建SelectDB表时properties中包含的内容。  | 
同步时需要在$FLINK_HOME/lib目录下添加对应的Flink CDC依赖,例如flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar。
Flink 1.15以上的版本支持整库同步,Flink Doris Connector各个版本的下载请参见Flink Doris Connector。
Sink配置项
参数  | 默认值  | 是否必填  | 说明  | 
fenodes  | 无  | 是  | 云数据库 SelectDB 版实例的访问地址和HTTP协议端口。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例:  | 
table.identifier  | 无  | 是  | 数据库与表名。示例:  | 
username  | 无  | 是  | 云数据库 SelectDB 版实例的数据库用户名。  | 
password  | 无  | 是  | 云数据库 SelectDB 版实例对应数据库用户名的密码。  | 
jdbc-url  | 无  | 否  | 云数据库 SelectDB 版实例的JDBC连接信息。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。 示例:  | 
auto-redirect  | true  | 否  | 是否重定向Stream Load请求。开启后Stream Load将通过FE写入,不再显示获取BE信息。  | 
doris.request.retries  | 3  | 否  | 向SelectDB发送请求的重试次数。  | 
doris.request.connect.timeout  | 30s  | 否  | 向SelectDB发送请求的连接超时时间。  | 
doris.request.read.timeout  | 30s  | 否  | 向SelectDB发送请求的读取超时时间。  | 
sink.label-prefix  | ""  | 是  | Stream load导入使用的label前缀。2pc场景下要求全局唯一,用来保证Flink的EOS语义。  | 
sink.properties  | 无  | 否  | Stream Load的导入参数,请填写属性配置。 
 更多参数,请参见:Stream Load。  | 
sink.buffer-size  | 1048576  | 否  | 写数据缓存buffer大小,单位字节。不建议修改,默认配置即可,默认1 MB。  | 
sink.buffer-count  | 3  | 否  | 写数据缓存buffer个数。不建议修改,默认配置即可。  | 
sink.max-retries  | 3  | 否  | 提交(Commit)阶段失败后的最大重试次数,默认3次。  | 
sink.use-cache  | false  | 否  | 异常时,是否使用内存缓存进行恢复,开启后缓存中会保留Checkpoint期间的数据。  | 
sink.enable-delete  | true  | 否  | 是否同步删除事件。只支持Unique模型。  | 
sink.enable-2pc  | true  | 否  | 是否开启两阶段提交(2pc),默认为true,保证EOS语义。  | 
sink.enable.batch-mode  | false  | 否  | 是否使用攒批模式写入SelectDB,开启后写入时机不依赖Checkpoint,通过sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes和sink.buffer-flush.interval参数来控制写入时机。 同时开启后将不保证EOS语义,可借助Unique模型做到幂等。  | 
sink.flush.queue-size  | 2  | 否  | 批处理模式下,缓存的队列大小。  | 
sink.buffer-flush.max-rows  | 50000  | 否  | 批处理模式下,单个批次最多写入的数据行数。  | 
sink.buffer-flush.max-bytes  | 10MB  | 否  | 批处理模式下,单个批次最多写入的字节数。  | 
sink.buffer-flush.interval  | 10s  | 否  | 批处理模式下,异步刷新缓存的间隔。最小1s。  | 
sink.ignore.update-before  | true  | 否  | 是否忽略update-before事件,默认忽略。  | 
同步示例
MySQL同步示例
<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****Oracle同步示例
<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****PostgreSQL同步示例
<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****SQL Server同步示例
<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****通过DataStream方式
在Maven项目中,引入相关依赖。
相关Maven依赖
Java核心代码。
下述代码中,MySql源表配置与SelectDB结果表配置的参数,与上述通过Flink SQL方式导入数据中的配置一一对应,详情请参见MySQL | Apache Flink CDC与Sink配置项。
package org.example; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Main { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(10000); Map<String, Object> customConverterConfigs = new HashMap<>(); customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); // MySql源表配置 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("rm-xxx.mysql.rds.aliyuncs***") .port(3306) .startupOptions(StartupOptions.initial()) .databaseList("db_test") .tableList("db_test.employees") .username("root") .password("test_123") .debeziumProperties(DateToStringConverter.DEFAULT_PROPS) .deserializer(schema) .serverTimeZone("Asia/Shanghai") .build(); // SelectDB 结果表配置 DorisSink.Builder<String> sinkBuilder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080") .setTableIdentifier("db_test.employees") .setUsername("admin") .setPassword("test_123"); DorisOptions dorisOptions = dorisBuilder.build(); // 配置Stream Load相关参数 sink.properties Properties properties = new Properties(); properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setStreamLoadProp(properties); sinkBuilder.setDorisExecutionOptions(executionBuilder.build()) .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string .setDorisOptions(dorisOptions); DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); dataStreamSource.sinkTo(sinkBuilder.build()); env.execute("MySQL to SelectDB"); } }
使用进阶
使用Flink SQL更新部分列数据
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);
CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' -- 开启部分列更新
);
INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;
使用Flink SQL根据指定列删除数据
在数据源为CDC的场景中,Doris Sink会根据RowKind来区分事件的类型,对隐藏列__DORIS_DELETE_SIGN__进行赋值以达到删除的目的。在数据源为Kafka消息的场景中,Doris Sink无法直接使用RowKind来区分操作类型,需要依赖消息中的特定字段来标记操作类型,比如{"op_type":"delete",data:{...}},针对这类数据,希望将op_type=delete的数据删除掉。此时需要根据业务逻辑判断,显式地传入隐藏列的值。下面以Flink SQL方式为例,介绍如何根据Kafka数据中的特定字段删除SelectDB中的数据。
-- 比如数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);
CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false表示不从RowKind获取事件类型
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- 显示指定stream load的导入列
);
INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;常见问题
Q:如何写入Bitmap类型?
A:示例如下所示:
CREATE TABLE bitmap_sink ( dt INT, page STRING, user_id INT ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.bitmap_test', 'username' = 'admin', 'password' = '****', 'sink.label-prefix' = 'selectdb_label', 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)' );Q:如何解决报错:
errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]。A:Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint或Savepoint启动,否则会报如上错误。不要求Exactly-Once时,也可通过关闭两阶段提交(2PC)
sink.enable-2pc=false或更换不同的sink.label-prefix解决。Q:如何解决报错:
errCode = 2, detailMessage = transaction[19650]not found。A:此报错发生在提交(Commit)阶段,Checkpoint里面记录的事务ID,在SelectDB侧已经过期,此时再次提交(Commit)就会出现上述错误。 此时无法从Checkpoint启动,后续可通过修改SelectDB的参数
streaming_label_keep_max_second配置来延长过期时间,默认为12小时。Q:如何解决报错:
errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100。A:此报错是因为同一个库并发导入超过了100,可以通过调整SelectDB的参数
max_running_txn_num_per_db来解决,详情请参见max_running_txn_num_per_db。同时,一个任务频繁修改label重启,也可能会导致这个错误。两阶段提交(2PC)场景下(Duplicate/Aggregate模型),每个任务的label需要唯一,并且从Checkpoint重启时,Flink任务才会主动中止(abort)之前启动的但未完成(即已precommit但未Commit的)事务(txn)。如果频繁修改label重启,会导致大量precommit成功的事务(txn)无法被中止(abort),占用事务。在Unique模型下也可关闭两阶段提交(2PC),通过设计Sink来实现幂等写入。
Q:Flink写入Unique模型时,如何保证一批数据的有序性?
A:可以添加sequence列配置来保证,更多详情,请参见SEQUENCE。
Q:为什么Flink任务没报错,但是无法同步数据?
A:Connector 1.1.0版本以前,是攒批写入的,写入均是由数据驱动,需要判断上游是否有数据写入。1.1.0之后,依赖Checkpoint,必须开启Checkpoint才能写入。
Q:如何解决报错:
tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235。A:此报错通常发生在Connector 1.1.0版本之前,是由于写入频率过快,导致版本过多。可以通过设置
sink.buffer-flush.max-bytes和sink.buffer-flush.interval参数来降低Stream Load的频率。Q:Flink导入时有脏数据,如何跳过?
A:Flink在数据导入时,如果有脏数据,比如字段格式、长度等问题,会导致Stream Load报错,此时Flink会不断地重试。如果需要跳过,可以通过禁用Stream Load的严格模式(
strict_mode=false,max_filter_ratio=1)或者在Sink算子之前对数据做过滤。Q:源表和SelectDB表应如何对应?
A:使用Flink Doris Connector导入数据时,要注意两个方面,一是源表的列和类型要跟Flink SQL中的列和类型对应;二是Flink SQL中的列和类型要跟SelectDB表的列和类型对应。
Q:如何解决报错:
TApplicationException: get_next failed: out of sequence response: expected 4 but got 3。A:此报错是由于Thrift框架存在并发bug导致的,建议您使用尽可能新的Connector以及与之兼容的Flink版本。
Q:如何解决报错:
DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX。A:你可以在TaskManager中搜索日志
abort transaction response,根据HTTP返回码确定是客户端(Client)的问题还是服务器(Server)的问题。