全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
流计算

创建消息队列源表 (MQ)

更新时间:2017-12-29 15:19:52

消息队列(Message Queue,简称MQ)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务。MQ历史超过7年,帮您实现分布式计算场景中所有异步解耦功能,是阿里双11使用的核心产品。MQ由阿里巴巴集团中间件技术部自主研发,是原汁原味的阿里集团中间件技术精华之沉淀,是性价比最高、最可靠的企业级消息中间件产品。

流计算可以将消息队列作为流式数据输入和流式数据输出,如下:

  1. CREATE STREAM TABLE source_test_galaxy (
  2. name STRING,
  3. age STRING,
  4. id STRING
  5. ) WITH (
  6. type='mq',
  7. endpoint='onsaddr.aliyun.com',
  8. topic='topic1',
  9. consumerId='CID_failover1',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. galaxy.semantic.source.input.field.delimiter=','
  13. );

注意:

  • MQ实际上是一个非结构化存储格式,其中对于数据的Schema不提供强制定义,完全由业务层指定。目前流计算支持类CSV格式文本和二进制格式。需要注意的是,二进制格式的消息格式由于当前流计算暂不支持UDF自定义函数,因此公有云用户无法使用二进制的格式。

  • 流计算对接MQ暂时未能支持RAM/STS方式,目前我们正在和MQ团队协商为流计算尽快提供支持。因此流计算暂时不提供界面方式注册MQ,用户必须使用手写SQL方式完成MQ的DDL定义。

CSV类格式

对于CSV类格式,假定用户的一条MQ消息记录格式如下:

  1. 1,name,male
  2. 2,name,female

注意,一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用’\n’分隔。那么用户在流计算必须定义该MQ的DDL是:

  1. CREATE STREAM TABLE source_table (
  2. id STRING,
  3. name STRING,
  4. sex STRING
  5. ) WITH (
  6. type='mq',
  7. endpoint='onsaddr.aliyun.com',
  8. topic='topic1',
  9. consumerId='CID_failover1',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. galaxy.semantic.source.input.field.delimiter=','
  13. );

Binary格式

当前受限于流计算不开放UDF,故Binary类型目前公有云暂不支持用户使用。

对于Binary类型,用户的一个MQ消息为一个流计算源头表中的一条记录,对此流计算DDL声明为:

  1. CREATE STREAM TABLE source_table (
  2. obj BINARY
  3. ) WITH (
  4. type='mq',
  5. endpoint='onsaddr.aliyun.com',
  6. topic='topic1',
  7. consumerId='CID_failover1',
  8. accessId='xxx',
  9. accessKey='xxx',
  10. galaxy.semantic.source.input.field.delimiter=','
  11. );

用户在后续的SQL中,必须使用UDF进行二进制处理,例如:

  1. INSERT INTO result_table
  2. SELECT
  3. UDF(obj)
  4. FROM
  5. source_table;

WITH参数

参数 必填参数 说明
type Y mq
endpoint Y 公共云mq endpoint
topic Y 读取数据的topic
consumerId Y 使用的consumerId
accessId Y accessId
accessKey Y accessKey
tag N 需要读取的tag信息
onsChannel N 默认为”onsChannel”,不需要填
sleepInterval N 失败重试时间 默认5s
domainSubGroup N 默认为:“nsaddr4client-internal”,公共云为:”nsaddr4client-internet”
  • domainSubGroup是MQ的ONSAddr参数,请参看MQ文档。请根据用户实际情况选择具体的参数。

类型映射

对于CSV类格式,建议用户均使用STRING类型对接,下游再通过DML进行类型转换。

MQ字段类型 建议流计算字段类型
String String
本文导读目录