MetaQ数据源

DataWorks数据集成支持使用MetaQ Reader读取消息队列Message Queue(简称MQ)的数据,本文为您介绍DataWorks的MetaQ数据读取能力。

支持的版本

MetaQ Reader通过消息队列服务的Java SDK订阅MetaQ中的实时消息数据,使用的Java SDK版本如下所示。

<dependency>
            <groupId>com.taobao.metaq.final</groupId>
            <artifactId>metaq-client</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-sdk</artifactId>
            <version>1.3.1</version>
        </dependency>

使用限制

支持的字段类型

支持的字段类型如下。

字段类型

离线读(MetaQ Reader)

STRING

支持

MetaQ Reader针对MetaQ类型的转换列表,如下所示。

数据集成数据类型

消息队列数据类型

STRING

STRING

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

附录:MetaQ 脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "metaqreader",
                    "parameter": {
                        "accessId": "<yourAccessKeyId>",
                        "accessKey": "<yourAccessKeySecret>",
                        "consumerId": "Test01",
                        "topicName": "test",
                        "subExpression": "*",
                        "onsChannel": "ALIYUN",
                        "domainName": "***.aliyun.com",
                        "contentType": "singlestringcolumn",
                        "beginOffset": "lastRead",
                        "nullCurrentOffset": "begin",
                        "fieldDelimiter": ",",
                        "column": [
                            "col0"
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}

Reader脚本参数

参数

描述

是否必选

accessId

访问消息队列的访问密钥,用于标识用户。

accessKey

访问消息队列的访问密钥,用来验证用户的密钥。

consumerId

Consumer是消息的消费者,也称为消息订阅者,负责接收并消费消息。

consumerId是一类Consumer的标识,该类Consumer通常接收并消费一类消息,且消费逻辑一致。

topicName

消息主题,一级消息类型,通过topic对消息进行分类。

subExpression

消息子主题。

onsChannel

用于进行消息队列鉴权。

unitName

接收消息的目标单元。常用单元如下:

  • sh:中心

  • unsz:深圳单元

  • us:美国

  • en-us:欧洲

  • rg-ru:俄罗斯

  • zbyk:张北优酷

  • unzbyun:张北云

  • unshyun:上海云

  • lazada-sg:新加坡lazada

  • lazada-my:马来西亚lazada

  • lazada-vn:越南lazada

  • lazada-ph:菲律宾lazada

  • lazada-th:泰国lazada

  • lazada-id:印尼lazada

instanceName

Consumer的实例名称。

domainName

消息队列的接入点。

contentType

消息的类型,支持singlestringcolumn(消息为STRING类型)、text(消息为文本类型)和json(消息为JSON类型)。

beginOffset

任务开始读取的Offset,支持begin(从最开始),lastRead(上次读取的Offset)。

nullCurrentOffset

上次Offset为空时,开始读取的地方。支持begin(从最开始),current(当前offset)。

fieldDelimiter

分隔符模式下消息字符串的列分隔符,例如逗号等。支持控制字符,例如\u0001

column

读取的字段列表。

beginDateTime

数据消费的开始时间位点,为时间范围(左闭右开)的左边界。

beginDateTimeyyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。

说明

beginDateTimeendDateTime配合使用。

endDateTime

数据消费的结束时间位点,为时间范围(左闭右开)的右边界。

endDateTimeyyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。