全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
流计算

创建消息队列(MQ)

更新时间:2018-03-14 22:48:49

什么是消息队列(Message Queue)

消息队列(Message Queue,简称MQ)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务。MQ历史超过7年,帮您实现分布式计算场景中所有异步解耦功能,是阿里双11使用的核心产品。MQ由阿里巴巴集团中间件技术部自主研发,是原汁原味的阿里集团中间件技术精华之沉淀,是性价比最高、最可靠的企业级消息中间件产品。

流计算可以将消息队列作为流式数据输入,如下:

示例

  1. create table metaq_stream(
  2. x varchar,
  3. y varchar,
  4. z varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

注意:MQ实际上是一个非结构化存储格式,其中对于数据的Schema不提供强制定义,完全由业务层指定。目前流计算支持类CSV格式文本和二进制格式。

CSV类格式

对于CSV类格式,假定用户的一条MQ消息记录格式如下:

  1. 1,name,male
  2. 2,name,female

注意,一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用’\n’分隔。那么用户在流计算必须定义该MQ的DDL是:

  1. create table metaq_stream(
  2. id varchar,
  3. name varchar,
  4. sex varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

WITH参数

参数 注释说明 备注
topic topic名
endPoint endPoint地址 消息队列ENDPOINT地址 ,注意在使用MQ的时候如果选择的是公共云内网接入(阿里云经典网络/VPC):华东1、华东2、华北1、华北2、华南1、香港的区域endpoint的地址是:onsaddr-internal.aliyun.com:8080
accessId accessId
accessKey accessKey
consumerGroup 订阅消费group名
pullIntervalMs 拉取时间间隔,毫秒
startTime 可选,消息消费启动的时间点
startMessageOffset 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载
tag 订阅的标签 可选
lineDelimiter 解析TT block时行分隔符 可选,默认为 “\n”
fieldDelimiter 字段分隔符 可选,默认为”\u0001” 表示 Crtl+A 和 \001,(暂不支持\001写法)
encoding 编码格式 可选,默认为 “utf-8”
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION,PAD,“SKIP” 字段数目不符合时跳过 ,“EXCEPTION”:字段数目不符合时抛出异常,“PAD”:按顺序填充,不存在的置为null
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

类型映射

MQ字段类型 建议流计算字段类型
STRING VARCHAR
本文导读目录