全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
流计算

创建消息队列(MQ)

更新时间:2017-12-20 14:47:54

什么是消息队列(Message Queue)

消息队列(Message Queue,简称MQ)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务。MQ历史超过7年,帮您实现分布式计算场景中所有异步解耦功能,是阿里双11使用的核心产品。MQ由阿里巴巴集团中间件技术部自主研发,是原汁原味的阿里集团中间件技术精华之沉淀,是性价比最高、最可靠的企业级消息中间件产品。

流计算可以将消息队列作为流式数据输入,如下:

示例

  1. create table metaq_stream(
  2. x varchar,
  3. y varchar,
  4. z varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

注意:MQ实际上是一个非结构化存储格式,其中对于数据的Schema不提供强制定义,完全由业务层指定。目前流计算支持类CSV格式文本和二进制格式。

CSV类格式

对于CSV类格式,假定用户的一条MQ消息记录格式如下:

  1. 1,name,male
  2. 2,name,female

注意,一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用’\n’分隔。那么用户在流计算必须定义该MQ的DDL是:

  1. create table metaq_stream(
  2. id varchar,
  3. name varchar,
  4. sex varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

WITH参数

参数 注释说明 备注
topic topic名
endPoint endPoint地址 消息队列ENDPOINT地址
accessId accessId
accessKey accessKey
consumerGroup 订阅消费group名
pullIntervalMs 拉取时间间隔,毫秒
startTime 可选,消息消费启动的时间点
startMessageOffset 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载
tag 订阅的标签 可选
lineDelimiter 解析TT block时行分隔符 可选,默认为 “\n”
fieldDelimiter 字段分隔符 可选,默认为”\u0001” 表示 Crtl+A 和 \001,(暂不支持\001写法)
encoding 编码格式 可选,默认为 “utf-8”
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION,PAD,“SKIP” 字段数目不符合时跳过 ,“EXCEPTION”:字段数目不符合时抛出异常,“PAD”:按顺序填充,不存在的置为null
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

类型映射

MQ字段类型 建议流计算字段类型
STRING VARCHAR
本文导读目录