在将Kafka数据导入到表格存储的过程中可能产生错误,如果您不希望导致Sink Task立即失败,您可以配置错误处理策略。
可能产生的错误类型如下:
- Kafka Connect Error
此类错误发生在Sink Task执行数据导入前,例如使用Converter进行反序列化或者使用Kafka Transformations对消息记录进行轻量级修改时产生错误,您可以配置由Kafka提供的错误处理选项。
如果您想要跳过此类错误,请在connector配置文件中配置属性
errors.tolerance=all
。更多信息,请参见Kafka Connect Configs。 - Tablestore Sink Task Error
此类错误发生在Sink Task执行数据导入时,例如解析消息记录或者写入Tablestore时产生错误,您可以配置由Tablestore Sink Connector提供的错误处理选项。
如果您想要跳过此类错误,请在connector配置文件中配置属性errors.tolerance=all
。更多信息,请参见配置说明。同时,您还可以选择错误报告的方式,如果需要将产生错误的消息记录保存到Tablestore中独立的一张数据表中,请进行如下配置:runtime.error.tolerance=all runtime.error.mode=tablestore runtime.error.table.name=error
在distributed模式下,您也可以通过REST API来管理connector和task。如果发生错误导致connector或者task停止,您可以选择手动重启connector或者task。- 检查connector和task状态。
- 查看connector状态。
curl http://localhost:8083/connectors/{name}/status
- 查看task状态。
curl http://localhost:8083/connectors/{name}/tasks/{taskid}/status
其中
http://localhost:8083/connectors
为Kafka REST服务的地址,name必须与配置文件中的name(连接器名称)相同,taskid包含在connector状态信息中。您还可以通过执行以下命令获取taskid。curl http://localhost:8083/connectors/{name}/tasks
- 查看connector状态。
- 手动重启connector或者task。
- 重启connector。
curl -X POST http://localhost:8083/connectors/{name}/restart
- 重启task。
curl -X POST http://localhost:8083/connectors/{name}/tasks/{taskId}/restart
- 重启connector。
- 检查connector和task状态。