本文为您介绍MetaQ Reader支持的数据类型、字段映射和数据源等参数及配置示例。

消息队列(Message Queue,简称MQ)是阿里巴巴集团自主研发的专业消息中间件。消息队列基于高可用分布式集群技术,为您提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计和监控报警等消息云服务。消息队列为分布式应用系统提供异步解耦的功能,同时具备海量消息堆积、高吞吐等互联网应用所需要的特性,是阿里巴巴集团双11使用的核心产品。

MetaQ Reader使用消息队列的Java SDK消费消息队列中的实时数据,将数据转换为数据集成传输协议传递给Writer。

实现原理

MetaQ Reader通过消息队列服务的Java SDK订阅MetaQ中的实时消息数据,使用的Java SDK版本如下所示。
<dependency>
            <groupId>com.taobao.metaq.final</groupId>
            <artifactId>metaq-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-sdk</artifactId>
            <version>1.3.1</version>
        </dependency>

类型转换列表

MetaQ Reader针对MetaQ类型的转换列表,如下所示。
数据集成数据类型 消息队列数据类型
STRING STRING

参数说明

参数 描述 是否必选
accessId 访问消息队列的访问密钥,用于标识用户。
accessKey 访问消息队列的访问密钥,用来验证用户的密钥。
consumerId Consumer是消息的消费者,也称为消息订阅者,负责接收并消费消息。

consumerId是一类Consumer的标识,该类Consumer通常接收并消费一类消息,且消费逻辑一致。

topicName 消息主题,一级消息类型,通过topic对消息进行分类。
subExpression 消息子主题。
onsChannel 用于进行消息队列鉴权。
domainNam 消息队列的接入点。
contentType 消息的类型,支持singlestringcolumn(消息为STRING类型)、text(消息为文本类型)和json(消息为JSON类型)。
beginOffset 任务开始读取的Offset,支持begin(从一开始)和lastRead(上次读取的offset)
nullCurrentOffset 上次Offset为空时,开始读取的位置,支持begin(从一开始)和current(当前Offset)。
fieldDelimiter 分隔符模式下消息字符串的列分隔符,例如逗号等。支持控制字符,例如\u0001
column 读取的字段列表。

功能说明

配置一个从消息队列读取数据的示例,详情请参见上述参数说明。使用脚本开发的详情请参见通过脚本模式配置任务
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "metaqreader",
                    "parameter": {
                        "accessId": "xxxxxxxxxxx",
                        "accessKey": "xxxxxxxxxxxxxxxx",
                        "consumerId": "Test01",
                        "topicName": "test",
                        "subExpression": "*",
                        "onsChannel": "ALIYUN",
                        "domainName": "xxx.aliyun.com",
                        "contentType": "singlestringcolumn",
                        "beginOffset": "lastRead",
                        "nullCurrentOffset": "begin",
                        "fieldDelimiter": ",",
                        "column": [
                            "col0"
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}