本文为您介绍如何使用Print连接器。
背景信息
Print是用于调试的连接器,允许将接收到的数据输出到标准输出流或标准错误流中。如果您想观察Flink作业的中间结果,或者观察最终输出结果,可以添加Print结果表,在TaskManager的日志中观察打印出的结果信息。
Print可用于辅助确认输出到其他结果表中的消息是否符合预期。
Print连接器支持的信息如下。
类别  | 详情  | 
支持类型  | 结果表,数据摄入目标端  | 
运行模式  | 批模式和流模式  | 
数据格式  | 暂不适用  | 
特有监控指标  | 暂无  | 
API种类  | SQL,数据摄入YAML作业  | 
是否支持更新或删除结果表数据  | 是  | 
前提条件
因为Print结果表数据输出为Info日志,所以如果您需要查看Print结果表的结果数据,则需要将日志级别调至Info,否则无法查到结果数据。
Taskmanager.out日志展示数据限制为2000条。如果您有排查脏数据或特定数据等需求,建议在Where条件中指定业务场景相关条件后,进行Print操作,以避免因为数据条数限制导致无法排查。
SQL
语法结构
CREATE TABLE print_table (
  a INT,
  b varchar
) WITH (
  'connector'='print',
  'logger'='true'
);您也可以基于现有的表模式使用LIKE子句来创建,代码示例如下。
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)WITH参数
参数  | 说明  | 数据类型  | 是否必填  | 默认值  | 备注  | 
connector  | 表类型。  | String  | 是  | 无  | 固定值为print。  | 
logger  | 控制台是否显示数据结果。  | Boolean  | 否  | false  | 取值如下: 
  | 
print-identifier  | 数据结果标识。  | String  | 否  | 无  | 在日志中通过数据结果标识检索信息。  | 
sink.parallelism  | 结果表并行度。  | Int  | 否  | 上游并行度  | 无。  | 
数据摄入
数据摄入YAML作业可以使用values连接器打印数据到out文件或日志中。
语法结构
source:
  type: xxx
sink:
  type: values
  name: Values Sink
  print.enabled: trueWITH参数
参数  | 说明  | 数据类型  | 是否必填  | 默认值  | 备注  | 
type  | 目标端类型。  | STRING  | 是  | 无  | 固定值为values。  | 
name  | 目标端名称。  | STRING  | 否  | 无  | 无。  | 
print.enabled  | 是否作为print使用。  | BOOLEAN  | 是  | 无  | 固定值为true。  | 
materialized.in.memory  | 是否将Binlog事件持久化到内存。  | BOOLEAN  | 否  | false  | 无。  | 
sink.print.standard-error  | 是否替换为输出到标准错误流(System.error)  | BOOLEAN  | 否  | false  | 默认输出到标准输出流(System.out)。  | 
sink.print.logger  | 控制台是否显示数据结果。  | BOOLEAN  | 否  | false  | 无。  | 
sink.print.limit  | 最多打印的条数。  | LONG  | 否  | 2000  | 无。  | 
error.on.schema.change  | Schema变更发生时是否报错。  | BOOLEAN  | 否  | false  | 无。  | 
使用示例
结果表
CREATE TEMPORARY TABLE table_source( name VARCHAR, score BIGINT ) WITH ( ... ); CREATE TEMPORARY TABLE print_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * from table_source;数据摄入目标端
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true