本文为您介绍如何创建消息队列MQ源表,以及创建过程涉及到的WITH参数和类型映射。

注意
  • 本文仅适用于Blink 1.4.5及以上版本。
  • 如果您需要使用带独立命名空间的MQ,请使用Blink 3.x作业版本。

什么是消息队列MQ

消息队列MQ(Message Queue)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品。消息列队基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)和监控报警等一套完整的消息云服务。

示例

create table mq_stream(
 x varchar,
 y varchar,
 z varchar
) with (
 type='mq',
 topic='<yourTopicName>',
 endpoint='<yourEndpoint>',
 pullIntervalMs='1000',
 accessId='<yourAccessId>',
 accessKey='<yourAccessSecret>',
 startMessageOffset='1000',
 consumerGroup='<yourConsumerGroup>',
 fieldDelimiter='|'
);
说明 MQ实际上是一个非结构化存储格式的消息中间件,对于数据的Schema不提供强制定义,完全由业务层指定。实时计算Flink版支持CSV和二进制格式的MQ消息。

CSV格式

假设您的1条CSV格式消息记录如下。
1,name,male 
2,name,female
说明 1条MQ消息可以包括0到多条数据记录,记录之间使用\n分隔。
在实时计算Flink版作业中,声明MQ数据源表的DDL如下。
create table mq_stream(
 id varchar,
 name varchar,
 gender varchar
) with (
 type='mq',
 topic='<yourTopicName>',
 endpoint='<ourEndpoint>',
 pullIntervalMs='1000',
 accessId='<yourAccessId>',
 accessKey='<yourAccessSecret>',
 startMessageOffset='1000',
 consumerGroup='<yourConsumerGroup>',
 fieldDelimiter='|'
);

二进制格式

二进制格式测试代码如下。
create table source_table (
  mess varbinary
) with (
  type = 'mq',
  endpoint = '<yourEndpoint>',
  pullIntervalMs='500',
  accessId='<yourAccessId>',
  accessKey='<yourAccessSecret>',
  topic = '<yourTopicName>',
  consumerGroup='<yourConsumerGroup>'
);

create table out_table (
  commodity varchar
)with(
  type='print'
);

INSERT INTO out_table
SELECT 
  cast(mess as varchar)
FROM source_table;
说明 cast(mess as varchar) 需要在Blink 2.0及以上版本使用。如果版本低于2.0,请先完成版本升级,详情请参见管理独享集群Blink版本

WITH参数

参数 注释说明 备注
topic topic名称
endPoint endPoint地址 阿里云消息队列提供内网服务MQ(非公网region)和公网服务MQ(公网region)两种类型,请务必根据您购买的MQ的类型选择对应正确的接入地址(endPoint):
  • 内网服务MQ(阿里云经典网络/VPC)接入地址:
    • 华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、中国(香港):onsaddr-internal.aliyun.com:8080
    • 亚太东南1(新加坡):ap-southeastaddr-internal.aliyun.com:8080
    • 中东东部1(迪拜):ons-me-east-1-internal.aliyuncs.com:8080
    • 亚太南部1(孟买):ons-ap-south-1-internal.aliyuncs.com:8080
    • 亚太东南3(吉隆坡):ons-ap-southeast-3-internal.aliyun.com:8080
  • 公网服务MQ接入地址:onsaddr-internet.aliyun.com:80
说明
  • 内网服务无法跨地域访问。例如,您所购买的实时计算Flink版服务的地域为华东1,但是购买的消息队列MQ服务的地域为华东2,则无法访问。
  • 独享集群默认不能访问公网。如果需要访问公网,请配置NAT网关,详情请参见独享集群如何访问公网?
  • 由于阿里云网络安全策略动态变化,实时计算Flink版连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。如果在使用公网服务MQ时出现异常,请您提交工单进行咨询。
accessId AccessKey ID
accessKey AccessKey Secret
consumerGroup 订阅消费Group名称
pullIntervalMs 拉取时间间隔 单位为毫秒。
startTime 消息消费启动的时间点 可选
startMessageOffset 消息开始的偏移量 可选,如果填写,将优先以startMessageoffset的位点开始加载数据。
tag 订阅的标签 可选
lineDelimiter 解析Block时,行分隔符 可选,默认值为\n
fieldDelimiter 字段分隔符 可选,默认值为\u0001。表示在只读模式下以\u0001\u0001在只读模式不可见)作为分隔符,在编辑模式下以^A作为分隔符。
encoding 编码格式 可选,默认值为utf-8
lengthCheck 单行字段条数检查策略
  • NONE(默认值):
    • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
    • 解析出的字段数小于定义字段数时,跳过这行数据。
  • SKIP:解析出的字段数和定义字段数不同时跳过这行数据。
  • EXCEPTION:解析出的字段数和定义字段数不同时提示异常。
  • PAD:按从左到右顺序填充。
    • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
    • 解析出的字段数小于定义字段数时,在行尾用null填充缺少的字段。
columnErrorDebug 是否打开调试开关。 可选,默认值为FALSE。如果设置为TRUE,则打印解析异常的Log。
instanceID 实例ID 根据MQ实例是否有独立命名空间,执行如下操作:
  • 是,必须配置instanceID参数。
  • 否,不能配置instanceID参数。

类型映射

MQ字段类型 实时计算Flink版字段类型
STRING VARCHAR