本文为您介绍使用Hologres过程中关于Blink和Flink的常见问题。
基本概念
- Hologres性能
- 写入性能
- 列存表: InsertOrIgnore > InsertOrReplace > InsertOrUpdate
- 行存表: InsertOrReplcae = InsertOrUpdate > InsertOrIgnore
参数 说明 InsertOrIgnore 结果表有主键,实时写入时如果主键重复,丢弃后到的数据。 InsertOrReplace 结果表有主键,实时写入时如果主键重复,按照主键更新,如果写入的一行数据不包含所有列,缺失的列的数据补Null。 InsertOrUpdate 结果表有主键,实时写入时如果主键重复,按照主键更新,如果写入的一行数据不包含所有列,缺失的列不更新。 - 点查性能
行存 = 行列混存 > 列存。
- 写入性能
- Blink、Flink(VVP)、开源Flink支持情况
产品形态 数据存储类型 描述 源表 结果表 维表 Binlog Hologres Catalog Flink全托管 支持行存储及列存储。 支持行存储及列存储。 建议使用行存储。 支持 支持 无 Blink独享 支持行存储及列存储。 支持行存储及列存储。 建议使用行存储。 Hologres V0.8版本只支持行存储,V0.9及以上版本支持行存储及列存储。建议使用行存储。 不支持 已开始逐步下线,推荐使用阿里云Flink全托管。
开源Flink1.10 支持行存储及列存储。 支持行存储及列存储。 无 不支持 不支持 无 开源Flink1.11及以上 支持行存储及列存储。 支持行存储及列存储。 建议使用行存储。 不支持 不支持 从开源Flink1.11版本开始,Hologres代码已开源。详细内容请参见GitHub。 - Blink、Flink 映射Hologres的SQL示例如下。
Blink、VVP、Flink SQL,都是在Flink侧声明一张表,然后根据参数映射至Hologres的一张具体的物理表,所以不支持映射至外部表。create table holo_source( 'hg_binlog_lsn' BIGINT HEADER, 'hg_binlog_event_type' BIGINT HEADER, 'hg_binlog_timestamp_us' BIGINT HEADER, A int, B int, C timestamp ) with ( type = 'hologres', 'endpoint' = 'xxx.hologres.aliyuncs.com:80', --Hologres实例的Endpoint。 'userName' = '', --当前阿里云账号的AccessKey ID。 'password' = '', --当前阿里云账号的AccessKey Secret。 'dbName' = 'binlog', --Hologres实例的数据库名称。 'tableName' ='test' --Hologres实例的表名称。 'binlog' = 'true', );
实时写入慢问题排查流程
- 确认写入相关配置需要确认以下配置信息。
- 目标表的存储格式,包括行存表、列存表和行列共存表。
- Insert模式,包括InsertOrIgnore、InsertOrUpdate和InsertOrReplace。
- 目标表的Table Group及Shard Count。
- 查看监控指标的实时写入延迟如果平均写入延迟偏高,在百毫秒甚至秒级别,通常便是后端达到了写入瓶颈,这时候可能会存在如下问题。
- 使用了列存表的InsertOrUpdate,即局部更新,且流量较高,这种情况下会导致实例的CPU负载和写入延迟偏高。
解决方法:建议更换表的类型,使用行存表,如果您的实例是V1.1及以上版本可以选择行列混存表。
- 云监控查看实例的CPU负载,如果CPU水位接近100%,但没有列存表的局部更新,那么通常情况下是由于高QPS的查询,或者本身写入量较高导致的。
解决方法:扩容Hologres实例。
- 确认是否有不断的
Insert into select from
命令,触发了该表的BulkLoad写入,当前BulkLoad写入会阻塞实时写入。解决方法:将BulkLoad写入转换成实时写入,或者错峰执行。
- 使用了列存表的InsertOrUpdate,即局部更新,且流量较高,这种情况下会导致实例的CPU负载和写入延迟偏高。
- 确认是否有数据倾斜使用如下SQL命令查看是否有数据倾斜。
解决方法:修改Distribution Key,使数据分布更加均衡。SELECT hg_shard_id, count(1) FROM t1 GROUP BY hg_shard_id ORDER BY hg_shard_id;
- 确认后端是否有压力如果以上步骤排查完没有问题,写入性能突然下降,一般情况是后端集群压力比较大,存在瓶颈。请联系技术支持人员确认情况,详情请参见如何获取更多的在线支持?。
- 查看Blink/Flink侧的反压情况上述步骤排查完后,发现Hologres侧没有明显的异常,通常情况下是客户端慢了,也就是Blink/Flink侧本身就慢了,这时候确认是否是Sink节点反压了。如果作业只有一个节点,就无法看出是否反压了,这时候可以将Sink节点单独拆开再观察。具体请联系Flink技术支持。
写入数据有问题排查流程
这种情况通常是由于数据乱序引起的,比如相同主键的数据分布在不同的Flink Task上,写入的时候无法保证顺序。需要确认Flink SQL的逻辑,最后写出到Hologres的时候,是否按照Hologres表的主键进行Shuffle了。
维表查询问题排查流程
- 维表Join和双流Join对于读Hologres的场景,需要首先确认用户是否使用对了维表Join,是否错将双流Join当成维表Join来使用了。以下是Hologres作为维表的使用示例,如果少了
proctime AS PROCTIME()
和hologres_dim FOR SYSTEM_TIME AS
两处关键字,则会变成双流Join。CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, proctime AS PROCTIME() ) with ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE hologres_dim ( a INT, b VARCHAR, c VARCHAR ) with ( 'connector' = 'hologres', ... ); CREATE TEMPORARY TABLE blackhole_sink ( a INT, b STRING ) with ( 'connector' = 'blackhole' ); insert into blackhole_sink select T.a,H.b FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
- 维表查询
- 确认维表存储格式
确认维表的存储格式是行存表、列存表还是行列共存。
- 维表查询延迟高维表的使用,最常见的问题就是Flink/Blink侧用户反馈Join节点有反压,导致整个作业的吞吐上不去。
- 确认Flink维表Join的模式当前Hologres Flink Connector的维表Join功能支持同步和异步模式两种,异步模式性能要优于同步模式,具体需要看Flink SQL进行区分,以下是一个开启异步维表查询功能的SQL示例。
CREATE TABLE hologres_dim( id INT, len INT, content VARCHAR ) with ( 'connector'='hologres', 'dbname'='<yourDbname>', --Hologres的数据库名称。 'tablename'='<yourTablename>', --Hologres用于接收数据的表名称。 'username'='<yourUsername>', --当前阿里云账号的AccessKey ID。 'password'='<yourPassword>', --当前阿里云账号的AccessKey Secret。 'endpoint'='<yourEndpoint>' --当前Hologres实例VPC网络的Endpoint。 'async' = 'true'--异步模式 );
- 确认后端查询延迟查看监控指标的实时写入延迟:
- 确认是否是列存表在做维表,列存表的维表在高QPS场景下开销很高。
- 如果是行存表,且延迟高,通常情况下是实例整体负载较高导致的,需要进行扩容。
- 确认Flink维表Join的模式
- 确认Join的Key是否是Hologres表的主键
自VVR 4.x (Flink 1.13) 版本开始,Hologres Connector基于Holo Client实现了Hologres表的非主键查询,这种情况通常性能会比较差、实例负载也比较高,尤其是建表没有特别优化过的情况。这时候需要引导优化表结构,最常见的就是将Join的key设置成Distribution Key,这样就能实现Shard Pruning。
- 查看Blink侧的反压情况
如果上述步骤排查完成,发现Hologres侧没有明显的异常,通常情况下是客户端慢了,也就是Blink侧本身就慢了,这时候可以确认是否是Sink节点反压了。如果作业只有一个节点,就无法看出是否反压了,这时候可以将Sink节点单独拆开再观察。同样可以排查是否是Join节点导致的反压。具体请联系Flink技术支持排查。
- 确认维表存储格式
连接数使用说明
表类型 | 默认连接数(Flink作业的每个并发) |
---|---|
Binlog源表 | 0 |
批量源表 | 1 |
维表 | 3(可以通过connectionSize 参数调整) |
结果表 | 3(可以通过connectionSize 参数调整) |
- 连接数计算方法
- 默认情况
默认情况下,作业使用的最大连接数可以通过如下公式计算:
最大连接数 = ( 批量源表数 * 1 + 维表数 * connectionSize + 结果表数 * connectionSize )* 作业并发
。例如某作业有一张全增量源表、两张维表和三张结果表,都使用默认的
connectionSize
参数值,作业并发设置为5
,则最终使用的连接数为:(1 * 1 + 2 * 3 + 3 * 3) * 5 = 80
。 - 连接复用
实时计算1.13-vvr-4.1.12及以上版本支持连接复用。一个作业的同一个并发内,相同
connectionPoolName
的维表和结果表会使用同一个连接池。默认情况示例中,如果两张维表和三张结果表都配置了相同的connectionPoolName
,并适当调大connectionSize
为5
,则最终使用的连接数为(1 * 1 + 5) * 5 = 30
。说明 连接复用模式适用大多数场景,但部分场景比如维表数量较多、没有启用异步也没有开启缓存时,会非常频繁的进行同步的点查,此时多表连接复用可能导致查询变慢,这种情况可以只为结果表配置连接复用。 - 其他使用连接的场景
- 作业启动过程中,需要建立连接用于表元数据的验证等工作,可能会暂时使用3至6个连接,作业正常运行后会释放。
- Flink全托管支持Hologres Catalog、CTAS以及CDAS等功能,使用这些功能也会占用连接数。默认情况下,一个使用Catalog的作业,会多占用三个连接,用于建表等DDL操作。
- 默认情况
- 连接数使用诊断当作业的表数量较多、作业并发较高时,会占用大量的连接数,甚至出现将Hologres总连接数占满的情况,通过以下方式对当前连接数的使用进行了解和诊断。
- 使用如下命令在HoloWeb中通过
pg_stat_activity
表查看当前的活跃Query,详情请参见查询pg_stat_activity视图信息。其中application_name
字段中值为ververica-connector-hologres
的Query代表来自实时计算Flink的读写连接。SELECT application_name, COUNT (1) AS COUNT FROM pg_stat_activity WHERE backend_type = 'client backend' AND application_name != 'hologres' AND usename != 'holo_admin' GROUP BY application_name;
- 一些时候作业并发设置的过大, 在Hologres管理控制台监控告警页表现如下:作业刚启动的时候连接数很高,运行一段时间之后连接数下降。这是因为很多连接处于空闲状态被关闭了,此现象表明作业实际上不需要如此大的并发或连接数,应该合理规划任务连接数、降低并发度或
connectionSize
参数值,或者使用连接复用模式。
- 使用如下命令在HoloWeb中通过
常见报错
ERPC TIMEOUT
或者ERPC CONNECTION CLOSED
。- 报错现象:出现
com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT
报错。 - 可能原因:写入时压力过大写入失败或者集群比较繁忙,可以观察Hologres实例的CPU负载是否打满。
CONNECTION CLOSED
可能是负载过大导致后端节点挂掉了,出现OOM(Out Of Memory)或者Coredump。 - 解决方法:请先重试写入,如果不能恢复请找Hologres技术支持人员排查原因。
- 报错现象:出现
- 报错:
BackPresure Exceed Reject Limit
。- 可能原因:通常是Hologres后端写入压力过大,导致Memtable来不及刷盘导致写入失败。
- 解决方法:如偶发失败可忽略该问题,或者Sink加上参数rpcRetries = '100' 来调大写入重试次数。如果一直报该错误,请联系Hologres技术支持人员确认后端实例状态。
- 报错:
The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out
。- 可能原因:通常是用户做了Alter Table导致Blink写入所带表的Schema版本号低于Server端版本号导致的,并且超过了客户端的重试次数。
- 解决方法:如偶发报错可忽略该问题。如果一直报该错误,请联系Hologres技术支持人员。
- 报错:
Failed to query table meta for table
。- 可能原因:一种可能是用户读写了一张Hologres的外部表,Hologres Connector不支持读写外部表。如果不是,可能是Hologres实例 Meta出现了问题。
- 解决方法:请联系Hologres技术支持人员。
- 报错:
Cloud authentication failed for access id
。- 可能原因:该报错通常是用户配置的AccesKey信息不对,或者用户没有添加账号至Hologres实例。
- 解决方法:
- 请检查当前账户的AccessKey ID和AccessKey Secret填写是否正确,一般是AccessKey Secret错误或者有空格。
- 检查不出原因可以用当前AccesKey连接HoloWeb(使用账号密码方式登录),在测试联通性时看报错是什么,还是一样的报错说明AccesKey有问题,如果报错为
FATAL:role“ALIYUN$xxxx“does not exist
说明账号没有实例的权限,需要管理员给该账号授予权限。
- Hologres维表Join不到数据。
- 可能原因:Hologres维表使用了分区表,Hologres维表暂不支持分区表。
- 解决方法:请将分区表转为普通表。
- 报错:
Modify record by primary key is not on this table
。- 可能原因:实时写入的时候选择了更新模式,但是Hologres的结果表没有主键。
- 解决方法:请设置主键。
- 报错:
shard columns count is no match
。- 可能原因:写入Hologres的时候,没有写入完整的Distribution Key的列(默认是主键)。
- 解决方法:请写入完整的Distribution Key列。
- 报错:
Full row is required, but the column xxx is missing
。- 可能原因:Hologres老版本的报错信息,通常是用户没有写某列数据,而那一列是不能为空的。
- 解决方法:请为不能为空的列赋值。
- VVP用户读写Hologres导致JDBC连接数暴涨。
- 可能原因:VVP Hologres Connector读写Hologres(除了Binlog),采用JDBC模式,最大占用
读写Hologres表数量*并发度 * connectionSize(VVP表的参数,默认为3)
个连接。 - 解决方法:合理规划任务连接数,降低并发度或者connectionSize。如无法调低并发度或connectionSize,可以为表设置参数useRpcMode = 'true' 切回至Rpc模式。
- 可能原因:VVP Hologres Connector读写Hologres(除了Binlog),采用JDBC模式,最大占用
- Blink/VVP用户读写Hologres报错显示无法连接Hologres。
- 可能原因:Blink/VVP集群默认访问公网很慢或者无法访问。
- 解决方法:需要保证和Hologres实例在相同Region,且使用VPC的Endpoint。
- 报错:
Hologres rpc mode dimension table does not support one to many join
。- 可能原因:Blink和VVP的RPC模式维表必须是行存表,且Join的字段必须是主键,报错的原因往往是以上两个条件不满足
- 解决方法:建议使用JDBC模式,且维表使用行存表或者行列共存表。
- DatahubClientException
- 报错现象:出现
Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}]
报错。 - 可能原因:大量消费Binlog作业由于某种原因同时重启导致线程池被占满。
- 解决方法:分批进行消费Binlog作业。
- 报错现象:出现
- Error occurs when reading data from datahub
- 报错现象:出现
Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.]
报错。 - 可能原因:Binlog每条数据太大,乘上攒批之后,每个RPC请求的大小超过最大限制。
- 解决方法:在每行数据字段较多且有很长的字符串等字段时,可以减小攒批配置。
- 报错现象:出现
- 报错:
Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp
。- 可能原因:在Flink中字段使用了TIMESTAMP(6)类型,当前不支持映射至Hologres。
- 解决方法:修改字段类型为TIMESTAMP。
- 报错:
Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx
。- 可能原因:在Hologres中设置了IP白名单,但是白名单中未包含Flink访问Hologres的IP地址,所以Flink访问Hologres时被阻止。
- 解决方法:在Hologres的IP白名单中增加Flink的IP,详情请参见IP白名单。