CREATE PIPE

CREATE PIPE语法用于在Lindorm消息引擎中创建Mirror Link链路,方便快速构建数据入库链路,将开源Kafka中的数据导入至消息引擎的流表中。

引擎与版本

  • CREATE PIPE语法仅适用于消息引擎。

  • 消息引擎3.8.0及以上版本支持CREATE PIPE语法。

重要

要求Lindorm SQL为2.7.0以上版本。如何查看Lindorm SQL的版本,请参见SQL版本说明

语法

create_pipe_statement    ::=  CREATE PIPE pipe_identifier AS 
                              COPY [ INTO stream_identifier ] 
                              FROM source_topic_identifier@'server_literal'
                              [ WITH pipe_options ];
pipe_options             ::=  '(' option_definition (',' option_definition )*  ')'
option_definition        ::=  option_identifer '=' string_literal

使用说明

Mirror Link链路名(pipe_identifier)

链路名不能以半角句号(.)或中划线(-)开头。

流表名(stream_identifier)

关于流表名的设置,您需要注意以下内容:

  • 可包含数字、大写英文字符、小写英文字符、半角句号(.)、中划线(-)和下划线(_)。

  • 表名不能以半角句号(.)或中划线(-)开头。

  • 表名的长度为1~255字符。

    说明

    如果在创建Mirror Link链路时没有显式指定流表名,则会自动在Lindorm消息引擎上创建一个与源主题名同名的流表。如果显式指定了流表名,但该流表不存在,则系统会在Mirror Link链路创建完成后,自动创建一个该名称的流表。

源主题名(source_topic_identifier)

源Kafka集群中要进行数据同步的主题名。

说明

由于Kafka中的主题名可能会包含在Lindorm SQL标识符层面不被接受的字符。因此建议在引用源主题名时通过反引号(`)进行引用。关于Lindorm SQL的标识符和反引号的用法,请参见标识符

源集群Broker地址

您需要以字符串常量的形式指定源Kafka集群的Broker地址,格式为host:port,...

链路属性(pipe_options)

您可以通过WITH关键字在创建Mirror Link链路时指定以下属性:

选项

类型

描述

sink.partitions

INT

同步后的分区数量,默认与源Topic的分区数量相同。

checkpoint.interval

LONG

Checkpoint间隔时间,单位为毫秒(ms)。默认值为60000

parallelism

INT

设置Source和Sink的默认并行度,默认值为1

scan.startup.mode

STRING

指定读取Kafka数据的起始位置。取值如下:

  • earliest-offset:从Kafka最早偏移量(offset)开始读取消息。

  • latest-offset(默认值):从Kafka最新偏移量(offset)开始读取消息。

  • timestamp:从scan.startup.timestamp-millis指定的时间戳开始读取消息。

scan.startup.timestamp-millis

LONG

scan.startup.modetimestamp时,从指定的时间戳开始消费。

sink.delivery-guarantee

STRING

Sink目标表的语义模式。取值如下:

  • none:不保证任何语义,数据可能会丢失或重复。

  • at-least-once(默认值):保证数据不丢失,但可能会重复。

  • exactly-once:使用Kafka事务保证数据不会丢失和重复。

sink.parallelism

INT

Sink目标表的并发度。会覆盖parallelism参数的值。

说明

链路属性的属性名通常都包含半角句号(.),因此在指定这些属性名时需要使用反引号(`)进行引用。关于Lindorm SQL的标识符和反引号的用法,请参见标识符

示例

  • 示例一:创建名为pip1的Mirror Link链路,并指定源表从指定时间戳开始导入数据。

    CREATE PIPE pip1 AS
      COPY FROM sourceTopic@'broker-1:90992'
    WITH (
      `scan.startup.mode` = 'timestamp',
      `scan.startup.timestamp-millis` = '1708910643000'
    );
  • 示例二:创建名为pip2的Mirror Link链路,并指定源表从最早偏移量(earliest-offset)开始以exactly-once语义导入数据,目标流表的分区数设置为10个。

    CREATE PIPE pip2 AS
      COPY FROM sourceTopic@'broker-1:90992'
    WITH (
      `scan.startup.mode` = 'earliest-offset',
      `sink.partitions` = '10',
      `sink.delivery-guarantee` = 'exactly-once'
    );
  • 示例三:创建名为pip3的Mirror Link链路 ,并指定源表从最早偏移量(earliest-offset)开始导入数据,导入的目标流表名为sinkTopic

    CREATE PIPE pip3 AS
      COPY INTO sinkTopic FROM sourceTopic@'broker-1:90992'
    WITH (
      `scan.startup.mode` = 'earliest-offset'
    );