本文为您介绍使用Hologres过程中关于Blink和Flink的常见问题。
基本概念
Hologres性能
写入性能
列存表: InsertOrIgnore > InsertOrReplace > InsertOrUpdate
行存表: InsertOrReplcae = InsertOrUpdate > InsertOrIgnore
参数
说明
InsertOrIgnore
结果表有主键,实时写入时如果主键重复,丢弃后到的数据。
InsertOrReplace
结果表有主键,实时写入时如果主键重复,按照主键更新,如果写入的一行数据不包含所有列,缺失的列的数据补Null。
InsertOrUpdate
结果表有主键,实时写入时如果主键重复,按照主键更新,如果写入的一行数据不包含所有列,缺失的列不更新。
点查性能
行存 = 行列混存 > 列存。
Blink、Flink(VVP)、开源Flink支持情况
产品形态
数据存储类型
描述
源表
结果表
维表
Binlog
Hologres Catalog
Flink全托管
支持行存储及列存储。
支持行存储及列存储。
建议使用行存储。
支持
支持
无
Blink独享
支持行存储及列存储。
支持行存储及列存储。
建议使用行存储。
Hologres V0.8版本只支持行存储,V0.9及以上版本支持行存储及列存储。建议使用行存储。
不支持
已开始逐步下线,推荐使用阿里云Flink全托管。
开源Flink1.10
支持行存储及列存储。
支持行存储及列存储。
无
不支持
不支持
无
开源Flink1.11及以上
支持行存储及列存储。
支持行存储及列存储。
建议使用行存储。
不支持
不支持
从开源Flink1.11版本开始,Hologres代码已开源。详细内容请参见GitHub。
Blink、Flink 映射Hologres的SQL示例如下。
create table holo_source( 'hg_binlog_lsn' BIGINT HEADER, 'hg_binlog_event_type' BIGINT HEADER, 'hg_binlog_timestamp_us' BIGINT HEADER, A int, B int, C timestamp ) with ( type = 'hologres', 'endpoint' = 'xxx.hologres.aliyuncs.com:80', --Hologres实例的Endpoint。 'userName' = '', --当前阿里云账号的AccessKey ID。 'password' = '', --当前阿里云账号的AccessKey Secret。 'dbName' = 'binlog', --Hologres实例的数据库名称。 'tableName' ='test' --Hologres实例的表名称。 'binlog' = 'true', );
Blink、VVP、Flink SQL,都是在Flink侧声明一张表,然后根据参数映射至Hologres的一张具体的物理表,所以不支持映射至外部表。
实时写入慢问题排查流程
确认写入相关配置
需要确认以下配置信息。
目标表的存储格式,包括行存表、列存表和行列共存表。
Insert模式,包括InsertOrIgnore、InsertOrUpdate和InsertOrReplace。
目标表的Table Group及Shard Count。
查看监控指标的实时写入延迟
如果平均写入延迟偏高,在百毫秒甚至秒级别,通常便是后端达到了写入瓶颈,这时候可能会存在如下问题。
使用了列存表的InsertOrUpdate,即局部更新,且流量较高,这种情况下会导致实例的CPU负载和写入延迟偏高。
解决方法:建议更换表的类型,使用行存表,如果您的实例是V1.1及以上版本可以选择行列混存表。
云监控查看实例的CPU负载,如果CPU水位接近100%,但没有列存表的局部更新,那么通常情况下是由于高QPS的查询,或者本身写入量较高导致的。
解决方法:扩容Hologres实例。
确认是否有不断的
Insert into select from
命令,触发了该表的BulkLoad写入,当前BulkLoad写入会阻塞实时写入。解决方法:将BulkLoad写入转换成实时写入,或者错峰执行。
确认是否有数据倾斜
使用如下SQL命令查看是否有数据倾斜。
SELECT hg_shard_id, count(1) FROM t1 GROUP BY hg_shard_id ORDER BY hg_shard_id;
解决方法:修改Distribution Key,使数据分布更加均衡。
确认后端是否有压力
如果以上步骤排查完没有问题,写入性能突然下降,一般情况是后端集群压力比较大,存在瓶颈。请联系技术支持人员确认情况,详情请参见如何获取更多的在线支持?。
查看Blink/Flink侧的反压情况
上述步骤排查完后,发现Hologres侧没有明显的异常,通常情况下是客户端慢了,也就是Blink/Flink侧本身就慢了,这时候确认是否是Sink节点反压了。如果作业只有一个节点,就无法看出是否反压了,这时候可以将Sink节点单独拆开再观察。具体请联系Flink技术支持。
写入数据有问题排查流程
这种情况通常是由于数据乱序引起的,比如相同主键的数据分布在不同的Flink Task上,写入的时候无法保证顺序。需要确认Flink SQL的逻辑,最后写出到Hologres的时候,是否按照Hologres表的主键进行Shuffle了。
维表查询问题排查流程
维表Join和双流Join
对于读Hologres的场景,需要首先确认用户是否使用对了维表Join,是否错将双流Join当成维表Join来使用了。以下是Hologres作为维表的使用示例,如果少了
proctime AS PROCTIME()
和hologres_dim FOR SYSTEM_TIME AS
两处关键字,则会变成双流Join。CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, proctime AS PROCTIME() ) with ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE hologres_dim ( a INT, b VARCHAR, c VARCHAR ) with ( 'connector' = 'hologres', ... ); CREATE TEMPORARY TABLE blackhole_sink ( a INT, b STRING ) with ( 'connector' = 'blackhole' ); insert into blackhole_sink select T.a,H.b FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
维表查询
确认维表存储格式
确认维表的存储格式是行存表、列存表还是行列共存。
维表查询延迟高
维表的使用,最常见的问题就是Flink/Blink侧用户反馈Join节点有反压,导致整个作业的吞吐上不去。
确认Flink维表Join的模式
当前Hologres Flink Connector的维表Join功能支持同步和异步模式两种,异步模式性能要优于同步模式,具体需要看Flink SQL进行区分,以下是一个开启异步维表查询功能的SQL示例。
CREATE TABLE hologres_dim( id INT, len INT, content VARCHAR ) with ( 'connector'='hologres', 'dbname'='<yourDbname>', --Hologres的数据库名称。 'tablename'='<yourTablename>', --Hologres用于接收数据的表名称。 'username'='<yourUsername>', --当前阿里云账号的AccessKey ID。 'password'='<yourPassword>', --当前阿里云账号的AccessKey Secret。 'endpoint'='<yourEndpoint>' --当前Hologres实例VPC网络的Endpoint。 'async' = 'true'--异步模式 );
确认后端查询延迟
查看监控指标的实时写入延迟:
确认是否是列存表在做维表,列存表的维表在高QPS场景下开销很高。
如果是行存表,且延迟高,通常情况下是实例整体负载较高导致的,需要进行扩容。
确认Join的Key是否是Hologres表的主键
自VVR 4.x (Flink 1.13) 版本开始,Hologres Connector基于Holo Client实现了Hologres表的非主键查询,这种情况通常性能会比较差、实例负载也比较高,尤其是建表没有特别优化过的情况。这时候需要引导优化表结构,最常见的就是将Join的key设置成Distribution Key,这样就能实现Shard Pruning。
查看Blink侧的反压情况
如果上述步骤排查完成,发现Hologres侧没有明显的异常,通常情况下是客户端慢了,也就是Blink侧本身就慢了,这时候可以确认是否是Sink节点反压了。如果作业只有一个节点,就无法看出是否反压了,这时候可以将Sink节点单独拆开再观察。同样可以排查是否是Join节点导致的反压。具体请联系Flink技术支持排查。
连接数使用说明
Hologres Connector默认采用JDBC相关模式。
现已支持JDBC_FIXED模式,该模式不占用连接数,并且在消费Binlog时也不受Walsender数量上限的限制,详情请参见实时数仓Hologres。
从Flink引擎VVR-8.0.5-Flink-1.17版本开始,默认开启了连接复用
'connectionPoolName' = 'default'
,对大多数作业而言,这并没有影响。如果单个作业表数量较多,可能在升级之后出现性能有所下降。这种情况下,建议为热点表单独配置connectionPoolName
参数以优化性能。JDBC模式会占用一定数量的连接数,不同类型的表默认连接数使用情况如下表。
表类型
默认连接数(Flink作业的每个并发)
Binlog源表
0
批量源表
1
维表
3(可以通过
connectionSize
参数调整)结果表
3(可以通过
connectionSize
参数调整)连接数计算方法
默认情况
默认情况下,作业使用的最大连接数可以通过如下公式计算:
最大连接数 = ( 批量源表数 * 1 + 维表数 * connectionSize + 结果表数 * connectionSize )* 作业并发
。例如某作业有一张全增量源表、两张维表和三张结果表,都使用默认的
connectionSize
参数值,作业并发设置为5
,则最终使用的连接数为:(1 * 1 + 2 * 3 + 3 * 3) * 5 = 80
。连接复用
实时计算1.13-vvr-4.1.12及以上版本支持连接复用。一个作业的同一个并发内,相同
connectionPoolName
的维表和结果表会使用同一个连接池。默认情况示例中,如果两张维表和三张结果表都配置了相同的connectionPoolName
,并适当调大connectionSize
为5
,则最终使用的连接数为(1 * 1 + 5) * 5 = 30
。说明连接复用模式适用大多数场景,但部分场景比如维表数量较多、没有启用异步也没有开启缓存时,会非常频繁的进行同步的点查,此时多表连接复用可能导致查询变慢,这种情况可以只为结果表配置连接复用。
其他使用连接的场景
作业启动过程中,需要建立连接用于表元数据的验证等工作,可能会暂时使用3至6个连接,作业正常运行后会释放。
Flink全托管支持Hologres Catalog、CTAS以及CDAS等功能,使用这些功能也会占用连接数。默认情况下,一个使用Catalog的作业,会多占用三个连接,用于建表等DDL操作。
连接数使用诊断
当作业的表数量较多、作业并发较高时,会占用大量的连接数,甚至出现将Hologres总连接数占满的情况,通过以下方式对当前连接数的使用进行了解和诊断。
使用如下命令在HoloWeb中通过
pg_stat_activity
表查看当前的活跃Query,详情请参见查询pg_stat_activity视图信息。其中application_name
字段中值为ververica-connector-hologres
的Query代表来自实时计算Flink的读写连接。SELECT application_name, COUNT (1) AS COUNT FROM pg_stat_activity WHERE backend_type = 'client backend' AND application_name != 'hologres' GROUP BY application_name;
有时作业并发设置的过高,在Hologres管理控制台实例列表对应实例的监控信息页表现如下:刚启动时连接数很高,运行一段时间之后连接数下降。原因是很多连接处于空闲状态而被关闭,此现象表明作业实际上不需要如此大的并发或连接数,应该合理规划任务连接数、降低并发度或
connectionSize
参数值,或者使用连接复用模式。适当调整Hologres节点的并发度。默认情况下Flink作业的所有算子并发相同,一些场景下那些包含复杂计算逻辑的算子需要配置较高的并发,但这些并发对Hologres结果表来说可能是冗余的,还可能占用大量的连接数,此时可以参考作业资源配置,选择专家模式,为写入算子单独设置合适且较小的并发,从而降低总连接数的使用。
常见报错
报错:ERPC TIMEOUT
或者ERPC CONNECTION CLOSED
报错现象:出现
com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT
报错。可能原因:写入时压力过大写入失败或者集群比较繁忙,可以观察Hologres实例的CPU负载是否打满。
CONNECTION CLOSED
可能是负载过大导致后端节点挂掉了,出现OOM(Out Of Memory)或者Coredump。解决方法:请先重试写入,如果不能恢复请找Hologres技术支持人员排查原因。
报错:BackPresure Exceed Reject Limit
可能原因:通常是Hologres后端写入压力过大,导致Memtable来不及刷盘导致写入失败。
解决方法:如偶发失败可忽略该问题,或者Sink加上参数rpcRetries = '100' 来调大写入重试次数。如果一直报该错误,请联系Hologres技术支持人员确认后端实例状态。
报错:The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out
可能原因:通常是用户做了Alter Table导致Blink写入所带表的Schema版本号低于Server端版本号导致的,并且超过了客户端的重试次数。
解决方法:如偶发报错可忽略该问题。如果一直报该错误,请联系Hologres技术支持人员。
报错:Failed to query table meta for table
可能原因:一种可能是用户读写了一张Hologres的外部表,Hologres Connector不支持读写外部表。如果不是,可能是Hologres实例 Meta出现了问题。
解决方法:请联系Hologres技术支持人员。
报错:Cloud authentication failed for access id
可能原因:该报错通常是用户配置的AccessKey信息不对,或者用户没有添加账号至Hologres实例。
解决方法:
请检查当前账户的AccessKey ID和AccessKey Secret填写是否正确,一般是AccessKey Secret错误或者有空格。
检查不出原因可以用当前AccessKey连接HoloWeb(使用账号密码方式登录),在测试联通性时看报错是什么,还是一样的报错说明AccessKey有问题,如果报错为
FATAL:role“ALIYUN$xxxx“does not exist
说明账号没有实例的权限,需要管理员给该账号授予权限。
Hologres维表Join不到数据
可能原因:Hologres维表使用了分区表,Hologres维表暂不支持分区表。
解决方法:请将分区表转为普通表。
报错:Modify record by primary key is not on this table
可能原因:实时写入的时候选择了更新模式,但是Hologres的结果表没有主键。
解决方法:请设置主键。
报错:shard columns count is no match
可能原因:写入Hologres的时候,没有写入完整的Distribution Key的列(默认是主键)。
解决方法:请写入完整的Distribution Key列。
报错:Full row is required, but the column xxx is missing
可能原因:Hologres老版本的报错信息,通常是用户没有写某列数据,而那一列是不能为空的。
解决方法:请为不能为空的列赋值。
VVP用户读写Hologres导致JDBC连接数暴涨
可能原因:VVP Hologres Connector读写Hologres(除了Binlog),采用JDBC模式,最大占用
读写Hologres表数量*并发度 * connectionSize(VVP表的参数,默认为3)
个连接。解决方法:合理规划任务连接数,降低并发度或者connectionSize。如无法调低并发度或connectionSize,可以为表设置参数useRpcMode = 'true' 切回至Rpc模式。
Blink/VVP用户读写Hologres报错显示无法连接Hologres
可能原因:Blink/VVP集群默认访问公网很慢或者无法访问。
解决方法:需要保证和Hologres实例在相同Region,且使用VPC的Endpoint。
报错:Hologres rpc mode dimension table does not support one to many join
可能原因:Blink和VVP的RPC模式维表必须是行存表,且Join的字段必须是主键,报错的原因往往是以上两个条件不满足
解决方法:建议使用JDBC模式,且维表使用行存表或者行列共存表。
报错:DatahubClientException
报错现象:出现
Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}]
报错。可能原因:大量消费Binlog作业由于某种原因同时重启导致线程池被占满。
解决方法:分批进行消费Binlog作业。
报错:Error occurs when reading data from datahub
报错现象:出现
Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.]
报错。可能原因:Binlog每条数据太大,乘上攒批之后,每个RPC请求的大小超过最大限制。
解决方法:在每行数据字段较多且有很长的字符串等字段时,可以减小攒批配置。
报错:Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp
可能原因:在Flink中字段使用了TIMESTAMP(6)类型,当前不支持映射至Hologres。
解决方法:修改字段类型为TIMESTAMP。
报错:Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx
可能原因:在Hologres中设置了IP白名单,但是白名单中未包含Flink访问Hologres的IP地址,所以Flink访问Hologres时被阻止。
解决方法:在Hologres的IP白名单中增加Flink的IP,详情请参见IP白名单。
报错:Caused by: java.lang.RuntimeException: shaded.hologres.com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:400, requestId:xx, errorCode:TableVersionExpired, errorMessage:The specified table has been modified, please refresh cursor and try again
可能原因:用户对源表进行了DDL操作,Table Version发生变化,导致消费失败。
解决办法:升级Flink版本到4.0.16及以上,会对此情况进行重试。
Binlog作业启动时抛出Shard ID不存在的异常
可能原因:所消费表的Shard数发生了变化,可能是用户对表进行了重命名等操作,作业从checkpoint恢复时使用的旧表的Shard信息。
解决办法:重建表等操作之后,checkpoint中保存的Binlog消费点位信息已经没有意义,请无状态重新启动作业。
报错:ERROR,22021,"invalid byte sequence for encoding ""UTF8"": 0x00"
可能原因:维表点查时,使用的主键(字符串类型)中包含非UTF-8编码的字符,导致SQL执行失败。
解决办法:在上游对脏数据进行处理。
报错:hologres.org.postgresql.util.PSQLException: ERROR: syntax error
可能原因:JDBC模式消费Binlog表时需要指定Slot,发生此报错可能是创建的Slot名称中有不支持的字符(只支持小写字母、数字和下划线)。
解决办法:重新创建Slot,或者使用VVR-6.0.7版本自动创建Slot功能。
报错:create table hologres.hg_replication_progress failed
可能原因:JDBC消费Binlog时可能需要
hg_replication_progress
表(当前数据库中不存在此表)时,需要创建此表,但实例可以创建的Shard数已经达到上限,导致创建失败报错。解决办法:清理无用的数据库。
异常:作业运行时卡住,通过thread dump
等可以看到卡在JDBC Driver加载处,通常是Class.forName
等位置
可能原因:JDK 8在加载JDBC驱动程序会进行一些静态初始化操作,而多线程同时加载时可能会发生竞争条件。
解决办法:可以进行重试,或者使用6.0.7版本的Connector,对此类情况做了处理。
异常:使用JDBC模式消费Binlog时,抛出no table is defined in publication或者The table xxx has no slot named xxx异常
可能原因:删除表并重建同名表时,和表绑定的Publication没有被删除。
解决办法:当发生此异常时,可以在Hologres中执行
select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
语句查询未被一起清理的Publication,并执行drop publication xx;
语句删除残留的publication,之后重新启动作业即可。
报错:作业上线时抛出“permission denied for database”的异常
可能原因:Hologres V1.3和V2.0版本的JDBC模式消费Binlog,需要进行权限配置。
解决办法:建议升级Hologres到V2.1版本,使用VVR-8.0.5版本及以上的connector,仅需要表的只读权限就可以消费Binlog。如果不方便升级,请参考使用限制的赋权操作。
报错:table writer init failed: Fail to fetch table meta from sm
可能原因:对表进行truncate或者rename操作之后进行写入。
解决办法:偶发可以忽略,作业failover之后会自行恢复。Hologres V2.1.1到V2.1.14版本FE节点增加了replay缓存时间,导致同一个DDL后再DML,DDL replay会变慢。类似异常出现概率可能提高,建议升级到V2.1最新版本。
异常:本地使用connector依赖开发Datastream作业过程中,出现类似java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter的异常
可能原因:阿里云实时计算Flink版的商业版连接器JAR包中不提供部分运行类。
解决办法:参考本地运行和调试包含连接器的作业文档调整依赖,可以正常调试开发。
异常:JDBC模式消费Binlog,出现Binlog Convert Failed异常,或者部分shard的数据读取停止在某个时刻。
可能原因:Hologres实例的Gateway收到后端超时的异常信息时,将异常返回给客户端的过程中会存在问题,导致读取数据卡住或数据解析失败报错。
解决办法:一般只有在作业反压时会出现,如果作业存在数据读取卡住的问题,可以选择重启作业并从最近的checkpoint恢复。要彻底解决该问题,需要将Hologres版本升级到2.2.21及以上版本。