CREATE PIPE
语法用于在Lindorm消息引擎中创建Mirror Link链路,方便快速构建数据入库链路,将开源Kafka中的数据导入至消息引擎的流表中。
引擎与版本
CREATE PIPE
语法仅适用于消息引擎。消息引擎3.8.0及以上版本支持
CREATE PIPE
语法。
要求Lindorm SQL为2.7.0以上版本。如何查看Lindorm SQL的版本,请参见SQL版本说明。
语法
create_pipe_statement ::= CREATE PIPE pipe_identifier AS
COPY [ INTO stream_identifier ]
FROM source_topic_identifier@'server_literal'
[ WITH pipe_options ];
pipe_options ::= '(' option_definition (',' option_definition )* ')'
option_definition ::= option_identifer '=' string_literal
使用说明
Mirror Link链路名(pipe_identifier)
链路名不能以半角句号(.)或中划线(-)开头。
流表名(stream_identifier)
关于流表名的设置,您需要注意以下内容:
可包含数字、大写英文字符、小写英文字符、半角句号(.)、中划线(-)和下划线(_)。
表名不能以半角句号(.)或中划线(-)开头。
表名的长度为1~255字符。
说明如果在创建Mirror Link链路时没有显式指定流表名,则会自动在Lindorm消息引擎上创建一个与源主题名同名的流表。如果显式指定了流表名,但该流表不存在,则系统会在Mirror Link链路创建完成后,自动创建一个该名称的流表。
源主题名(source_topic_identifier)
源Kafka集群中要进行数据同步的主题名。
由于Kafka中的主题名可能会包含在Lindorm SQL标识符层面不被接受的字符。因此建议在引用源主题名时通过反引号(`)进行引用。关于Lindorm SQL的标识符和反引号的用法,请参见标识符。
源集群Broker地址
您需要以字符串常量的形式指定源Kafka集群的Broker地址,格式为host:port,...
。
链路属性(pipe_options)
您可以通过WITH
关键字在创建Mirror Link链路时指定以下属性:
选项 | 类型 | 描述 |
sink.partitions | INT | 同步后的分区数量,默认与源Topic的分区数量相同。 |
checkpoint.interval | LONG | Checkpoint间隔时间,单位为毫秒(ms)。默认值为 |
parallelism | INT | 设置Source和Sink的默认并行度,默认值为 |
scan.startup.mode | STRING | 指定读取Kafka数据的起始位置。取值如下:
|
scan.startup.timestamp-millis | LONG | scan.startup.mode为 |
sink.delivery-guarantee | STRING | Sink目标表的语义模式。取值如下:
|
sink.parallelism | INT | Sink目标表的并发度。会覆盖parallelism参数的值。 |
链路属性的属性名通常都包含半角句号(.),因此在指定这些属性名时需要使用反引号(`)进行引用。关于Lindorm SQL的标识符和反引号的用法,请参见标识符。
示例
示例一:创建名为
pip1
的Mirror Link链路,并指定源表从指定时间戳开始导入数据。CREATE PIPE pip1 AS COPY FROM sourceTopic@'broker-1:90992' WITH ( `scan.startup.mode` = 'timestamp', `scan.startup.timestamp-millis` = '1708910643000' );
示例二:创建名为
pip2
的Mirror Link链路,并指定源表从最早偏移量(earliest-offset)开始以exactly-once
语义导入数据,目标流表的分区数设置为10个。CREATE PIPE pip2 AS COPY FROM sourceTopic@'broker-1:90992' WITH ( `scan.startup.mode` = 'earliest-offset', `sink.partitions` = '10', `sink.delivery-guarantee` = 'exactly-once' );
示例三:创建名为
pip3
的Mirror Link链路 ,并指定源表从最早偏移量(earliest-offset)开始导入数据,导入的目标流表名为sinkTopic
。CREATE PIPE pip3 AS COPY INTO sinkTopic FROM sourceTopic@'broker-1:90992' WITH ( `scan.startup.mode` = 'earliest-offset' );