全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
流计算

创建消息队列结果表

更新时间:2017-12-29 15:20:00

消息队列(Message Queue,简称MQ)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务。MQ历史超过7年,帮您实现分布式计算场景中所有异步解耦功能,是阿里双11使用的核心产品。MQ由阿里巴巴集团中间件技术部自主研发,是原汁原味的阿里集团中间件技术精华之沉淀,是性价比最高、最可靠的企业级消息中间件产品。

流计算可以将消息队列作为流式数据输入和流式数据输出,如下:

DDL定义

示例代码如下:

  1. create result table overwrite result_max3 (
  2. a STRING,
  3. b STRING,
  4. c STRING,
  5. d STRING
  6. ) with (
  7. type='mq',
  8. endpoint='onsaddr-internet.aliyun.com',
  9. topic='galaxyxxx',
  10. tag='write3',
  11. producerId='PID_galaxyxxx',
  12. accessId='xxx',
  13. accessKey='xxx',
  14. fieldDelimiter=",",
  15. domainSubGroup="nsaddr4client-internet"
  16. );

注意:

  • MQ实际上是一个非结构化存储格式,其中对于数据的Schema不提供强制定义,完全由业务层指定。目前流计算支持类CSV格式文本和二进制格式。需要注意的是,二进制格式的消息格式由于当前流计算暂不支持UDF自定义函数,因此公有云用户无法使用二进制的格式。

  • 流计算对接MQ暂时未能支持RAM/STS方式,目前我们正在和MQ团队协商为流计算尽快提供支持。因此流计算暂时不提供界面方式注册MQ,用户必须使用手写SQL方式完成MQ的DDL定义。

CSV类格式定义

以上述DDL为例:

  1. create result table result_max3 (
  2. a STRING,
  3. b STRING,
  4. c STRING,
  5. d STRING
  6. ) with (
  7. type='mq',
  8. endpoint='onsaddr-internet.aliyun.com',
  9. topic='galaxyxxx',
  10. producerId='PID_galaxyxxx',
  11. accessId='xxx',
  12. accessKey='xxx',
  13. fieldDelimiter=","
  14. );

用户写出到MQ的格式为类CSV格式,字段分隔符为”,” 多条流计算记录将写出到多个MQ的消息,如下:

第一条:

  1. a,b,c,1

第二条:

  1. x,y,z,2

Binary定义

当前受限于流计算不开放UDF,故Binary类型目前公有云暂不支持用户使用。

对于Binary类型,用户的一个MQ消息为一个流计算源头表中的一条记录,对此流计算DDL声明为:

  1. CREATE RESULT TABLE result_table (
  2. obj BINARY
  3. ) WITH (
  4. type='mq',
  5. endpoint='onsaddr-internet.aliyun.com',
  6. topic='galaxyxxx',
  7. producerId='PID_galaxyxxx',
  8. accessId='xxx',
  9. accessKey='xxx',
  10. fieldDelimiter=","
  11. );

用户在写入result_table前,必须拼接为一个业务需要的二进制格式写出到result_table。例如:

  1. INSERT INTO result_table
  2. SELECT
  3. UDF(s.id, s.name)
  4. FROM
  5. source_table s;

其中UDF函数是用户自定义函数,将s表中每条记录的id和name,经过个性化转换后生成一个二进制的对象。流计算直接将二进制写出到MQ。

WITH参数

参数 必填参数 说明
type Y mq
endpoint Y 要写入mq的endpoint
topic Y 要写入的topic
producerId Y 写入的producerId
accessId Y accessId
accessKey Y accessKey
onsChannel N 默认ALIYUN,不需要填
sleepInterval N 失败重试时间,默认5s
tag N 写入topic的某个特定tag下,默认不区分tag
key N 写入message的key
withSchama N boolean类型(true/false),写入的是否是结构化数据,默认为true
fieldDelimiter N 写入数据的列分隔符,区分同一行中不同列,默认为\u0001
domainSubGroup N 默认为”nsaddr4client-internal”,公共云为:”nsaddr4client-internet”

注意:

  • domainSubGroup是MQ的ONSAddr参数,请参看MQ文档。请根据用户实际情况选择具体的参数。

类型映射

对于CSV类格式,建议用户均使用STRING类型对接,下游再通过DML进行类型转换。

MQ字段类型 建议流计算字段类型
String String
本文导读目录