本文汇总了StarRocks数据导入的常见问题。
通用问题
Stream Load
Routine Load
Broker Load
INSERT INTO
MySQL实时同步至StarRocks
Flink Connector
DataX Writer
Spark Load
报错“close index channel failed“或“too many tablet versions”,该如何处理?
报错原因
该问题主要是数据导入频率太快,数据没能及时合并(Compaction),从而导致版本数超过支持的最大未合并版本数。
解决方案
默认支持的最大未合并版本数为1000。您可以通过以下方法解决上述报错:
增大单次导入的数据量,降低导入频率。
在BE的配置文件be.conf中修改以下配置,通过调整合并策略实现加快合并的目的。
cumulative_compaction_num_threads_per_disk = 4 base_compaction_num_threads_per_disk = 2 cumulative_compaction_check_interval_seconds = 2
报错“Label Already Exists”,该如何处理?
问题描述
StarRocks集群中同一个数据库内已经有一个相同Label的导入作业导入成功或者正在执行。
报错原因
由于Stream Load是采用HTTP协议提交导入作业的请求,通常各个语言的HTTP客户端均会自带请求重试逻辑。StarRocks集群在接收到第一个请求后,已经开始操作Stream Load,但是由于没有及时向客户端返回结果,会出现客户端再次重试发送相同请求的情况。这时候,StarRocks集群由于已经在操作第一个请求,所以第二个请求会返回
Label Already Exists
的状态提示。解决方案
需要检查不同导入方式之间是否有标签冲突、或是有重复提交的导入作业。排查方法如下:
使用标签搜索主FE的日志,查看是否存在同一个标签出现了两次的情况。如果有,则说明客户端重复提交了该请求。
说明StarRocks集群中导入作业的标签不区分导入方式。因此,可能存在不同的导入作业使用了相同标签的问题。
运行
SHOW LOAD WHERE LABEL = "xxx"
语句,查看是否已经存在具有相同标签且处于FINISHED状态的导入作业。其中xxx
为待检查的标签字符串。
建议根据当前请求导入的数据量,计算出大致的导入耗时,并根据导入超时时间来适当地调大客户端的请求超时时间,从而避免客户端多次提交该请求。
报错“ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel”,该如何处理?
运行SHOW LOAD
语句。在语句返回的信息中找到URL,查看错误数据。常见的错误类型如下:
convert csv string to INT failed.
待导入数据文件中某列的字符串在转换为对应类型的数据时出错。例如,将abc转换为数字时失败。
the length of input is too long than schema.
待导入数据文件中某列的长度不正确。例如,定长字符串超过建表时设置的长度,或INT类型的字段超过4个字节。
actual column number is less than schema column number.
待导入数据文件中某一行按照指定的分隔符切分后,列数小于指定的列数。可能原因是分隔符不正确。
actual column number is more than schema column number.
待导入数据文件中某一行按照指定的分隔符切分后,列数大于指定的列数。
the frac part length longer than schema scale.
待导入数据文件中某DECIMAL类型的列的小数部分超过指定的长度。
the int part length longer than schema precision.
待导入数据文件中某DECIMAL类型的列的整数部分超过指定的长度。
there is no corresponding partition for this key.
导入文件某行的分区列的值不在分区范围内。
报错“ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3”,该如何处理?
您可以在建表属性中添加"replication_num" = "1"
信息。
导入数据时发现BE服务日志中出现Too many open files问题,该如何处理?
您可以根据以下步骤处理:
调整系统句柄数。
调小base_compaction_num_threads_per_disk和cumulative_compaction_num_threads_per_disk(默认都是1)的参数值。修改配置项的具体操作,请参见修改配置项。
如果问题还未解决,建议扩容或者降低导入频率。
报错“increase config load_process_max_memory_limit_percent”,该如何处理?
导入数据的时候出现“increase config load_process_max_memory_limit_percent”错误时,建议您在StarRocks实例的实例配置页签,查看并调大load_process_max_memory_limit_bytes和load_process_max_memory_limit_percent的参数值。
数据导入过程中,发生远程过程调用(Remote Procedure Call,简称RPC)超时问题,该如何处理?
检查BE配置文件中write_buffer_size参数的设置。该参数用于控制BE上内存块的大小阈值,默认阈值为100 MB。如果阈值过大,可能会导致远程过程调用超时,此时需要配合BE配置文件中的tablet_writer_rpc_timeout_sec参数来适当地调整write_buffer_size参数的取值。BE配置项的更多信息,请参见参数配置。
报错“Value count does not match column count”,该如何处理?
问题描述
导入作业失败,通过查看错误详情URL发现返回“Value count does not match column count”错误,提示解析源数据得到的列数与目标表的列数不匹配。
报错原因
发生该错误的原因是导入命令或导入语句中指定的列分隔符与源数据中的列分隔符不一致。例如,上面示例中,源数据为CSV格式,包括三列,列分隔符为逗号(,),但是导入命令或导入语句中却指定制表符(\t)作为列分隔符,最终导致源数据的三列数据解析成了一列数据。
解决方案
修改导入命令或导入语句中的列分隔符为逗号(,),然后再次尝试执行导入。
如何选择导入方式?
导入方式的选择可以参见导入概述。
影响导入性能的因素都有哪些?
通常影响导入性能的因素有下面几个:
机器内存
当tablet比较多的时候,对于内存消耗比较大。建议单个tablet大小按照如何分桶?中介绍的进行评估。
磁盘IO能力和网络带宽
正常50 Mbit/s~100 Mbit/s是没有问题的。
导入批次和频率
Stream Load批次大小建议10 MB~100 MB。
Broker Load还好,因为Broker Load针对的场景都是批次大小比较大的情况。
导入频率不要太高,SATA盘1s不超过一个任务。
Stream Load是否支持识别文本文件中首行的列名?或者是否支持指定不读取第一行?
Stream Load不支持识别文本中首行的列名,首行对Stream Load来说只是普通数据。当前也不支持指定不读取首行,如果需要导入的文本文件的首行为列名,可以使用如下四种方式处理:
在导出工具中修改设置,重新导出不带列名的文本文件。
使用
sed -i '1d' filename
命令删除文本文件的首行。在Stream Load执行语句中,使用
-H "where: 列名 != '列名称'"
过滤掉首行。当前系统会先转换,然后再过滤,因此如果首行字符串转其他数据类型失败的话,会返回
null
。所以,该方式要求StarRocks表中的列不能设置为NOT NULL
。在Stream Load执行语句中加入
-H "max_filter_ratio:0.01"
,可以给导入作业设置一个1%或者更小、容错超过1行的容错率,从而将首行的错误忽视掉。您也可以根据实际数据量设置一个更小的容错率,但是要保证容错行数在1行以上。设置容错率后,返回结果的ErrorURL
依旧会提示有错误,但导入作业整体会成功。容错率不宜设置过大,避免漏掉其他数据问题。
当前业务的分区键对应的数据不是标准的DATE和INT类型,使用Stream Load导入数据到StarRocks时,需要转换数据吗?
StarRocks支持在导入过程中进行数据转换。
例如,待导入数据文件TEST为CSV格式,并且包含NO、DATE、VERSION、PRICE四列,但是其中DATE列是不规范的202106.00格式。如果在StarRocks中需使用的分区列为DATE,则需要先在StarRocks中创建一张包含NO、VERSION、PRICE、DATE四列的表。您需要指定DATE列的数据类型为DATE、DATETIME或INT。然后,在Stream Load执行语句中,通过指定如下设置来实现列之间的转换。
-H "columns: NO,DATE_1, VERSION, PRICE, DATE=LEFT(DATE_1,6)"
DATE_1
可以简单地看成先占位进行取数,然后通过LEFT()
函数进行转换,赋值给StarRocks表中的DATE
列。需要注意的是,必须先列出CSV文件中所有列的临时名称,然后再使用函数进行转换。支持列转换的函数为标量函数,包括非聚合函数和窗口函数。
报错“body exceed max size: 10737418240, limit: 10737418240”,该如何处理?
报错原因
源数据文件大小超过10 GB,超出了Stream Load所能支持的文件大小的上限。
解决方案
通过
seq -w 0 n
拆分数据文件。通过
curl -XPOST http://be_host:http_port/api/update_config?streaming_load_max_mb=<file_size>
调整BE配置项中streaming_load_max_mb的取值来扩大文件大小上限。BE配置项的更多信息,请参见参数配置。
如何提高导入性能?
方式一:增加实际任务并行度
该方式可能会消耗更多的CPU资源,并且导致导入版本过多。
将一个导入作业拆分成尽可能多的导入任务并行执行。实际任务并行度由以下公式决定,上限为BE节点的数量或者消费分区的数量。
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
各参数含义如下:
alive_be_number:存活BE数量。
partition_number:消费分区数量。
desired_concurrent_number:Routine Load导入作业时为单个导入作业设置较高的期望任务并行度,默认值为3。
如果还未创建导入作业,则需要在执行
CREATE ROUTINE LOAD
时,设置该参数。如果已经创建导入作业,则需要在执行
ALTER ROUTINE LOAD
时,修改该参数。
max_routine_load_task_concurrent_num:Routine Load导入作业的默认最大任务并行度 ,默认值为5。该参数为FE动态参数,更多说明和设置方式,请参见参数配置。
因此当消费分区和BE节点数量较多,并且大于desired_concurrent_number
和max_routine_load_task_concurrent_num
参数时,如果您需要增加实际任务并行度,则可以提高desired_concurrent_number
和max_routine_load_task_concurrent_num
参数。
例如,消费分区数量为7,存活BE数量为5,max_routine_load_task_concurrent_num为默认值5。此时如果需要增加实际任务并发度至上限,则需要将desired_concurrent_number设置为5(默认值为3),则计算实际任务并行度min(5,7,5,5)
为5。
方式二:增大单个导入任务消费分区的数据量
该方式会造成数据导入的延迟变大。
单个Routine Load导入任务消费消息的上限由参数max_routine_load_batch_size,或者参数routine_load_task_consume_second决定。当导入任务消费数据并达到上述要求后,则完成消费。上述两个参数为FE配置项,更多说明和设置方式,请参见参数配置。
您可以通过查看be/log/be.INFO中的日志,分析单个导入任务消费数据量的上限由上述哪个参数决定,并且通过提高该参数,增大单个导入任务消费的数据量。
I0325 20:27:50.410579 15259 data_consumer_group.cpp:131] consumer group done: 41448fb1a0ca59ad-30e34dabfa7e47a0. consume time(ms)=3261, received rows=179190, received bytes=9855450, eos: 1, left_time: -261, left_bytes: 514432550, blocking get time(us): 3065086, blocking put time(us): 24855
正常情况下,该日志的left_bytes >=0,表示在routine_load_task_consume_second时间内一次读取的数据量还未超过max_routine_load_batch_size,说明调度的一批导入任务都可以消费完Kafka的数据,不存在消费延迟,则此时您可以通过提高routine_load_task_consume_second的参数值,增大单个导入任务消费分区的数据量 。
如果left_bytes < 0,则表示未在routine_load_task_consume_second规定时间,一次读取的数据量已经达到max_routine_load_batch_size,每次Kafka的数据都会把调度的一批导入任务填满,因此极有可能Kafka还有剩余数据没有消费完,存在消费积压,则可以提高max_routine_load_batch_size的参数值。
执行SHOW ROUTINE LOAD命令后,导入作业状态变为PAUSED或CANCELLED,该如何处理?
您可以根据报错提示排查错误并修改:
报错提示:导入作业变成PAUSED状态,并且ReasonOfStateChanged报错
Broker: Offset out of range
。原因分析:导入作业的消费位点在Kafka分区中不存在。
解决方式:您可以通过执行命令
SHOW ROUTINE LOAD
,在Progress参数中查看导入作业的最新消费位点,并且在Kafka分区中查看是否存在该位点的消息。如果不存在,则可能有如下两个原因:创建导入作业时指定的消费位点为将来的时间点。
Kafka分区中该位点的消息还未被导入作业消费,就已经被清理。建议您根据导入作业的导入速度设置合理的Kafka日志清理策略和参数,例如log.retention.hours,log.retention.bytes等。
报错提示:导入作业变成PAUSED状态。
原因分析:可能是导入任务错误行数超过阈值max_error_number。
解决方式:您可以根据
ReasonOfStateChanged
,ErrorLogUrls
报错进行排查。如果是数据源的数据格式问题,则需要检查数据源数据格式,并进行修复。修复后您可以使用
RESUME ROUTINE LOAD
,恢复PAUSED状态的导入作业。如果是数据源的数据格式无法被StarRocks解析,则需要调整错误行数阈值max_error_number。
执行命令
SHOW ROUTINE LOAD
,查看错误行数阈值max_error_number。执行命令
ALTER ROUTINE LOAD
,适当提高错误行数阈值max_error_number。执行命令
RESUME ROUTINE LOAD
,恢复PAUSED状态的导入作业。
报错提示:导入作业变为CANCELLED状态。
原因分析:可能是执行导入任务时遇到异常。例如,表被删除。
解决方式:您可以根据
ReasonOfStateChanged
或ErrorLogUrls
报错进行排查和修复。但是修复后,您无法恢复CANCELLED状态的导入作业。
使用Routine Load消费Kafka写入StarRocks是否能保证一致性语义?
Routine Load能够保证一致性(Exactly-once)语义。
一个导入任务是一个单独的事务,如果该事务在执行过程中出现错误,则事务会中止,FE不会更新该导入任务相关分区的消费进度。FE在下一次调度任务执行队列中的导入任务时,从上次保存的分区消费位点发起消费请求,保证Exactly once语义。
报错“Broker: Offset out of range”,该如何处理?
通过命令SHOW ROUTINE LOAD
查看最新的offset,使用Kafka客户端查看该offset有没有数据。可能原因是:
导入时指定了未来的offset。
还没来得及导入,Kafka已经将该offset的数据清理。需要根据StarRocks的导入速度设置合理的log清理参数log.retention.hours、log.retention.bytes等。
Broker Load是否支持再次执行已经执行成功、且处于FINISHED状态的导入作业?
Broker Load不支持再次执行已经成功执行且处于FINISHED状态的导入作业。而且,为了保证导入作业的不丢不重,每个执行成功的导入作业的标签(Label)均不可复用。可以使用SHOW LOAD
命令查看历史的导入记录,找到想要再次执行的导入作业,复制作业信息,并修改作业标签后,重新创建一个导入作业并执行。
Broker Load导入时出现内容乱码,该如何处理?
通过错误URL查看内容乱码。示例如下所示。
Reason: column count mismatch, expect=6 real=1. src line: [$交通]; zcI~跟团+v]; count mismatch, expect=6 real=2. src line: [租e�rD��食休闲娱乐
出现内容乱码是因为导入作业请求中的FORMAT AS
参数指定错误。修改FORMAT AS
参数取值,改为待导入数据文件对应的文件类型,然后重试导入作业。
Broker Load导入HDFS数据时,数据的导入日期字段出现异常,比正确的日期时间多加了8小时,该如何处理?
报错原因
StarRocks在建表时设置的timezone为中国时区,创建Broker Load导入作业时设置的timezone也是中国时区,而服务器设置的是UTC时区。因此,日期字段在导入时,比正确的日期时间多加了8小时。
解决方案
建表时去掉timezone参数。
Broker Load导入ORC格式的数据时,报错“ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>”,该如何处理?
报错原因
待导入数据文件和Starrocks表两侧的列名不一致,执行
SET
子句的时候系统内部会有一个类型推断,但是在调用cast函数执行数据类型转换的时候失败了。解决方案
确保两侧的列名一致,这样就不需要
SET
子句,也就不会调用cast函数执行数据类型转换,即可导入成功。
为什么Broker Load导入作业没报错,但是却查询不到数据?
Broker Load是一种异步的导入方式,创建导入作业的语句没报错,不代表导入作业成功了。您可以通过SHOW LOAD
语句来查看导入作业的结果状态和errmsg
信息,然后修改导入作业的参数配置后,再重试导入作业。
报错“failed to send batch或TabletWriter add batch with unknown id”,该如何处理?
该错误由数据写入超时而引起。需要修改系统变量query_timeout和BE配置项streaming_load_rpc_max_alive_time_sec的参数值。BE配置项的更多信息,请参见参数配置。
报错“LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found”,该如何处理?
如果导入的是Parquet或ORC格式的数据,则需要检查文件头的列名是否与StarRocks表中的列名一致。
例如,以下示例表示映射Parquet或ORC文件中以tmp_c1和tmp_c2为列名的列到StarRocks表中的name
和id
列。如果没有使用SET子句,则以column_list参数中指定的列作为映射。
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
如果导入的是Apache Hive版本直接生成的ORC文件,并且ORC文件中的表头是 (_col0, _col1, _col2, ...)
,则可能导致"Invalid Column Name"错误。此时需要使用SET子句设置列转换规则。
导入作业长时间没有结束等问题应该如何处理?
在FE上的日志文件fe.log中,根据导入作业的标签来搜索导入作业的ID。然后,在BE上的日志文件be.INFO中,根据导入作业的ID来搜索上下文日志,进而查看具体原因。
如何配置以访问高可用 (HA) 模式下的Apache HDFS集群?
通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。配置以下参数用于访问以HA模式部署的HDFS集群。
参数 | 描述 |
dfs.nameservices | 指定HDFS服务的名称,您可以自定义。 例如,设置dfs.nameservices为my_ha。 |
dfs.ha.namenodes.xxx | 自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxx为dfs.nameservices中自定义的名称。 例如,设置dfs.ha.namenodes.my_ha为my_nn。 |
dfs.namenode.rpc-address.xxx.nn | 指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。 例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为host:port。 |
dfs.client.failover.proxy.provider | 指定Client连接NameNode的Provider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。 |
HA模式可以与简单认证、Kerberos认证两种认证方式组合,进行集群访问。例如,通过简单认证方式访问HA HDFS。
(
"username"="user",
"password"="passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
HDFS集群的配置可以写入hdfs-site.xml文件中,您使用Broker进程读取HDFS集群的信息时,只需要填写集群的文件路径名和认证信息即可。
如何配置Hadoop ViewFS Federation?
需要将ViewFs相关的配置文件core-site.xml和hdfs-site.xml拷贝到broker/conf目录中。
如果有自定义的文件系统,还需要将文件系统相关的.jar文件拷贝到broker/lib目录中。
访问Kerberos认证的集群时,报错“Can't get Kerberos realm”,该如何处理?
首先检查是否所有的Broker所在的机器都配置了/etc/krb5.conf文件。
如果配置了仍然报错,则需要在Broker的启动脚本中
JAVA_OPTS
变量的最后,加上-Djava.security.krb5.conf:/etc/krb5.conf
。
使用INSERT INTO语句导入数据时,SQL每插入一条数据大约耗时50~100ms,能否优化执行效率?
因为INSERT INTO导入方式为批量写入,所以单条写入和批量写入的耗时相同。因此OLAP场景下不建议使用INSERT INTO语句单条写入数据。
使用INSERT INTO SELECT语句导入数据时,系统报错“index channel has intoleralbe failure”,该如何解决?
该报错是因为流式导入RPC超时导致。您可以通过在配置文件中调节RPC超时相关参数解决。
您需要在BE配置文件be.conf中修改以下两个系统配置项,并重启集群使修改生效。
streaming_load_rpc_max_alive_time_sec:流式导入RPC的超时时间,默认为1200,单位为秒。
tablet_writer_rpc_timeout_sec:TabletWriter的超时时长,默认为600,单位为秒。
使用INSERT INTO SELECT语句导入大量数据时会执行失败,报错“execute timeout”,该如何解决?
该报错是因为Query超时导致。您可以通过调节Session变量query_timeout
解决。该参数默认为600,单位为秒。
参数设置示例如下。
set query_timeout =xx;
执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该如何解决?
报错原因
在StarRocks-migrate-tools(简称SMT)配置文件config_prod.conf中设置了多组规则
[table-rule.1]
、[table-rule.2]
等,但是缺失必要的配置信息。解决方案
检查是否给每组规则
[table-rule.1]
、[table-rule.2]
等配置了database,table和Flink Connector信息。
Flink如何自动重启失败的Task?
Flink通过Checkpointing机制和重启策略,自动重启失败的Task。
例如,您需要启用Checkpointing机制,并且使用默认的重启策略,即固定延时重启策略,则可以在配置文件flink-conf.yaml进行如下配置。
# unit: ms
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
各参数含义如下:
execution.checkpointing.interval
: Checkpoint的基本时间间隔,单位为ms。如果需要启用Checkpointing机制,则该参数值须大于0。state.backend
:启动Checkpointing机制后,状态会随着CheckPoint而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在CheckPoint时如何持久化以及持久化在哪里均取决于选择的State Backend。状态更多介绍,请参见State Backends。state.checkpoints.dir
:Checkpoint数据存储目录。
如何手动停止Flink job,并且恢复Flink job至停止前的状态?
您可以在停止Flink job时手动触发Savepoint(Savepoint是依据Checkpointing机制所创建的流作业执行状态的一致镜像),后续可以从指定Savepoint中恢复Flink job。
使用Savepoint停止作业。自动触发Flink job ID的Savepoint,并停止该job。此外,您可以指定一个目标文件系统目录来存储Savepoint 。
如果需要恢复Flink job至停止前的状态,则需要在重新提交Flink job时指定Savepoint。命令示例如下。
bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
各参数含义如下:
jobId
:您可以通过Flink WebUI查看Flink job ID,或者在命令行执行flink list -running
进行查看。targetDirectory
:您也可以在Flink配置文件flink-conf.yml中state.savepoints.dir
配置Savepoint的默认目录。 触发Savepoint时,使用此目录来存储Savepoint,无需指定目录。state.savepoints.dir: [file://或hdfs://]/home/user/savepoints_dir
如果需要恢复Flink job至停止前的状态,则需要在重新提交Flink job时指定Savepoint。
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql
使用事务接口的exactly-once时,导入失败,该如何解决?
问题描述:报错信息如下。
com.starrocks.data.load.stream.exception.StreamLoadFailException: { "TxnId": 3382****, "Label": "502c2770-cd48-423d-b6b7-9d8f9a59****", "Status": "Fail", "Message": "timeout by txn manager",--具体报错信息 "NumberTotalRows": 1637, "NumberLoadedRows": 1637, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 4284214, "LoadTimeMs": 120294, "BeginTxnTimeMs": 0, "StreamLoadPlanTimeMs": 7, "ReadDataTimeMs": 9, "WriteDataTimeMs": 120278, "CommitAndPublishTimeMs": 0 }
原因分析:
sink.properties.timeout
小于Flink checkpoint interval,导致事务超时。解决方式:需要调整该参数值大于Flink checkpoint interval。
flink-connector-jdbc_2.11 Sink到StarRocks时间落后8小时,该如何处理?
问题描述:Flink中localTimestamp函数生成的时间,在Flink中时间正常,Sink到StarRocks后发现时间落后8小时。已确认Flink所在服务器与StarRocks所在服务器时区均为Asia/ShangHai东八区。Flink版本为1.12,驱动为flink-connector-jdbc_2.11。
解决方式:可以在Flink sink表中配置时区参数
'server-time-zone' = 'Asia/Shanghai'
,同时在url
参数里添加&serverTimezone=Asia/Shanghai
。示例如下。CREATE TABLE sk ( sid int, local_dtm TIMESTAMP, curr_dtm TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.**.**:9030/sys_device?characterEncoding=utf-8&serverTimezone=Asia/Shanghai', 'table-name' = 'sink', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'sr', 'password' = 'sr123', 'server-time-zone' = 'Asia/Shanghai' );
为什么没有查询时BE内存处于打满状态,且CPU也是打满状态?
因为BE会定期收集统计信息,不会长期占用CPU,10 GB以内的内存使用完不会释放,BE会自己管理,您可以通过tc_use_memory_min参数限制大小。
tc_use_memory_min,TCmalloc最小保留内存。默认值10737418240,只有超过该值,StarRocks才将空闲内存返还给操作系统。您可以在EMR控制台StarRocks服务配置页签的be.conf中配置。BE配置项的更多信息,请参见参数配置。
为什么BE申请的内存不会释放给操作系统?
因为数据库从操作系统获得的大块的内存分配,在分配的时候会多预留,释放时候会延后,为了重复利用,大块内存的分配的代价比较大。建议测试环境验证时,对内存使用进行监控,在较长的周期内观察内存是否能够完成释放。
为什么无法解析Flink Connector依赖?
原因分析:Flink Connector依赖需要通过阿里云镜像地址来获取。/etc/maven/settings.xml的mirror部分未配置全部通过阿里云镜像获取。
解决方式:修改阿里云公共仓库地址为
https://maven.aliyun.com/repository/public
。
Flink-connector-StarRocks中sink.buffer-flush.interval-ms参数是否生效?
问题描述:
sink.buffer-flush.interval-ms
参数设置为15s,但是checkpoint interval=5mins
,设置的sink.buffer-flush.interval-ms
参数还生效吗?+----------------------+--------------------------------------------------------------+ | Option | Required | Default | Type | Description | +-------------------------------------------------------------------------------------+ | sink.buffer-flush. | NO | 300000 | String | the flushing time interval, | | interval-ms | | | | range: [1000ms, 3600000ms] | +----------------------+--------------------------------------------------------------+
解决方式:以下三个参数哪个先达到阈值,即触发flush,和
checkpoint interval
设置的值没关系,checkpoint interval
对于Exactly once语义才有效,At Least Once语义用的是sink.buffer-flush.interval-ms
。sink.buffer-flush.max-rows sink.buffer-flush.max-bytes sink.buffer-flush.interval-ms
使用DataX导入支持更新数据吗?
当前版本中,主键模型已支持通过DataX更新数据。您需要在JSON配置文件的reader部分添加_op
字段配置该功能。
使用DataX同步数据时,如何处理命名中使用的关键字,防止报错?
您需要使用反引号(``)包围相应的字段。
报错“When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment”,该如何解决?
使用Spark Load时没有在Spark客户端的spark-env.sh中配置HADOOP-CONF-DIR
环境变量。
提交Spark job时用到spark-submit命令,报错“Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory”,该如何解决?
使用Spark Load时spark_home_default_dir
配置项没有指定或者指定了错误的Spark客户端根目录。
报错“File xxx/jars/spark-2x.zip does not exist”,该如何解决?
使用Spark Load时spark-resource-path
配置项没有指向打包好的ZIP文件,检查指向文件路径和文件名词是否一致。
报错“yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn”,该如何解决?
使用Spark Load时yarn-client-path
配置项没有指定YARN的可执行文件。
磁盘使用率超80%会进入安全模式,从而出现数据写入报错吗?
问题描述:当集群中BE节点的磁盘使用率超过80%时,系统会自动进入安全模式,从而阻止新的数据写入操作。此情况常见于进行大量删除或更新操作后,由于trash目录占用的空间过大而引发。
原因分析:
执行了大量的
DELETE
、DROP
或INSERT OVERWRITE
操作,导致产生了垃圾数据。而INSERT INTO
操作不会直接导致trash目录膨胀。BE节点上的某个或某些磁盘使用率超过了预设的最大值(默认为80%)。
解决方式:
在实例配置页面,搜索并修改以下两个参数。
说明如果没有找到以下参数,请联系售后支持以获取帮助,他们可以帮助您开启相应的设置权限。
safe_mode_check_disk_ratio
:调整安全模式检查阈值。将其从默认的0.8调整至0.9,即允许更高的磁盘使用率而不触发安全模式。trash_file_expire_time_sec
:调整垃圾文件清理周期。将其从默认值调整为21600秒(等于6小时)。这样可以更频繁地清理不再需要的数据碎片。
若上述调整后情况仍未改善,考虑继续提高
safe_mode_check_disk_ratio
的值或是扩大存储容量。评估是否可以通过优化查询语句减少不必要的删除和重写动作,从而从根本上解决问题。
为什么通过Spark Connector读取数据时出现被取消报错?
问题描述:在使用Spark Connector读取StarRocks数据时,出现了取消操作的错误。该错误通常是由于查询时间过长,导致过期版本的数据被删除。
原因分析:当查询的执行时间超过了系统设定的过期版本数据保留时间,涉及到的版本数据可能会被自动清理。因此,您在查询过程中可能会因为缺少必要的数据而导致任务被取消。
解决方式:建议在实例配置页面,调整参数
lake_autovacuum_grace_period_minutes
。该参数的默认值为30分钟,可以根据实际查询需求适当增大。例如,您可以将其设置为60分钟或更长,以保证在长时间查询时所需的数据不会被意外删除。
Flink作业写入StarRocks时报错 "Caused by: Could not resolve host for client socket"
请尝试在Flink作业中配置 sink.version=V1
。如果未设置此参数,系统将默认使用V2版本,而V2当前存在一些不稳定性和适配性问题。使用V1版本将能够提供更为稳定的性能。有关更多信息,请参见开源StarRocks相关文档:Apache Flink。
在进行数据导入时,若主键索引占用的内存超过限制,应如何处理?
您可以考虑以下优化方案:
创建分区表:为主键模型的表设置分区,这样在写入数据时,可以避免全表的主键索引同时加载到内存中,从而减小内存的压力。
开启持久化主键索引:这将使主键索引在磁盘上持久化,减少内存消耗。执行以下SQL命令来开启持久化主键索引。
ALTER TABLE xxx SET ("enable_persistent_index"="true");
使用JindoSdk读取OSS外表数据时报错,该如何解决?
问题描述:在使用JindoSdk从OSS读取StarRocks外表数据时遇到报错。
原因分析:这通常是因为JindoSdk的
pread
实现与预期不符,导致读取数据时出现问题。解决方式:针对该问题,可以通过在实例配置页面,调整配置文件
jindosdk.cfg
中的配置来优化内存缓冲区,减少错误的发生几率。fs.oss.memory.buffer.size.watermark
:调大内存缓冲区的水位线。建议将该值调整为0.6
(默认值为0.3
)。fs.oss.memory.buffer.size.max.mb
:增大最大内存缓冲区。建议将该值设为6144
(即6 GB,默认值为6144 MB)。
为什么在导入数据时磁盘空间使用率会异常上升?
问题描述:在使用StarRocks进行数据导入的过程中,发现磁盘空间使用率(尤其是标记为"other"的分区)出现异常增长。这种情况可能会影响系统的正常运行和性能。
可能原因:这种现象通常是由于导入过程中产生的临时文件或废弃文件未被及时清理所导致的,其中最常见的原因是回收站(trash)机制中的文件累积过多。
解决方式:建议调整
trash_file_expire_time_sec
参数来加速对回收站中不再需要文件的清理速度。此参数定义了文件在被彻底删除前将在回收站中保留的时间长度(以秒为单位)。减小该值可以更快地释放磁盘空间。
使用Spark Connector导入任务时事务处理慢,该如何解决?
问题描述:在使用阿里云EMR全托管StarRocks集群配合Spark Connector执行数据导入任务时,如果遇到事务处理速度缓慢的情况,这通常是因为待处理的事务堆积到了系统默认设置的上限。这种情况会导致后续的数据导入操作效率大幅下降。
解决方式:可以通过将
lake_enable_batch_publish_version
的参数值设置为true来优化性能。
导入数据时报错“transmit chunk rpc failed”,该如何解决?
问题描述:导入数据时遇到如下错误信息。
transmit chunk rpc failed [dest_instance_id=5a2489c6-f0d8-11ee-abf6-061aec******] [dest=172.17.**.**:8060] detail: brpc failed, error=Host is down, error_text=[E112]Not connected to 172.17.**.**:8060 yet, server_id=904 [R1][E112]Not connected to 172.17.**.**:8060 yet, server_id=904 [R2][E112]Not connected to 172.17.**.**:8060 yet, server_id=904 [R3][E112]Not connected to 172.17.**.**:8060 yet, server_id=904
可能原因:此错误通常表明RPC通信过程中出现了拥塞现象。
解决方式:您可以根据以下两种方法来解决。
调整未发送数据阈值 当前情况可能处于OVER CROWDED状态,表示RPC源端有大量未发送的数据,超出了默认阈值。默认的
brpc_socket_max_unwritten_bytes
为 1GB,若未发送数据超过此值,则可能发生该错误。建议适当提高该值,以避免OVER CROWDED
错误。增加RPC消息包大小 另一种情况是RPC的包大小超出了
brpc_max_body_size
的限制,默认值为2 GB。如果查询中含有超大字符串或位图类型数据,可能会导致此问题。建议通过增大BE参数brpc_max_body_size
来解决此问题。
执行Stream Load时报错“Cancelled because of runtime state is cancelled”,该如何解决?
问题描述:在不设置
sink.buffer-flush.interval-ms
参数的情况下执行Stream Load时时,报错“Cancelled because of runtime state is cancelled”。可能原因:该问题通常出现在使用StarRocks进行数据加载的过程中。
sink.buffer-flush.interval-ms
参数定义了数据缓冲区刷新的时间间隔,即多久将累积的数据一次性写入目标存储中。如果不显式地设置此参数,系统将采用其默认值。然而,默认值可能相对较大,这会导致数据在内存中积累时间过长而没有被及时处理或刷新到目的地,进而可能导致运行时状态取消(如由于超时或其他资源限制),从而引发上述错误信息。解决方式:建议您根据实际应用场景的需求,在代码中明确指定
sink.buffer-flush.interval-ms
的值来覆盖默认设置。这样可以根据具体情况调整缓冲区刷新频率,以确保更高效、稳定的数据传输过程。