Print

本文为您介绍如何使用Print连接器。

背景信息

Print是用于调试的连接器,允许接收并打印一定数量的输入记录。如果您想观察Flink作业的中间结果,或者观察最终输出结果,可以添加Print结果表,单击运行,在TaskManager的日志中观察打印出的结果信息。

Print可用于辅助确认输出到其他结果表中的消息是否符合预期。

Print连接器支持的信息如下。

类别

详情

支持类型

结果表,数据摄入目标端

运行模式

批模式和流模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL,数据摄入YAML作业

是否支持更新或删除结果表数据

前提条件

  • 因为Print结果表数据输出为Info日志,所以如果您需要查看Print结果表的结果数据,则需要将日志级别调至Info,否则无法查到结果数据。

  • Taskmanager.out日志展示数据限制为2000条。如果您有排查脏数据或特定数据等需求,建议在Where条件中指定业务场景相关条件后,进行Print操作,以避免因为数据条数限制导致无法排查。

SQL

语法结构

CREATE TABLE print_table (
  a INT,
  b varchar
) WITH (
  'connector'='print',
  'logger'='true'
);

您也可以基于现有的表模式使用LIKE子句来创建,代码示例如下。

CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

String

固定值为print。

logger

控制台是否显示数据结果。

Boolean

false

取值如下:

  • false(默认值):不显示。

  • true:显示。

print-identifier

数据结果标识。

String

在日志中通过数据结果标识检索信息。

sink.parallelism

结果表并行度。

Int

上游并行度

无。

数据摄入

数据摄入YAML作业可以使用values连接器打印数据到out文件或日志中。

语法结构

source:
  type: xxx

sink:
  type: values
  name: Values Sink
  print.enabled: true

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

type

目标端类型。

STRING

固定值为values。

name

目标端名称。

STRING

无。

print.enabled

是否作为print使用。

BOOLEAN

固定值为true。

materialized.in.memory

是否将Binlog事件持久化到内存。

BOOLEAN

false

无。

sink.print.standard-error

是否替换为输出到标准错误流(System.error)

BOOLEAN

false

默认输出到标准输出(System.out)。

sink.print.logger

是否在日志打印。

BOOLEAN

false

无。

sink.print.limit

最多打印的条数。

LONG

2000

无。

error.on.schema.change

Schema变更发生时是否报错。

BOOLEAN

false

无。

使用示例

  • 结果表

    CREATE TEMPORARY TABLE table_source(
      name VARCHAR,
      score BIGINT
    ) WITH (
      ...
    );
    
    CREATE TEMPORARY TABLE print_sink(
      name VARCHAR,
      score BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO print_sink SELECT * from table_source;
  • 数据摄入目标端

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true