DataWorks数据集成支持使用MetaQ Reader读取消息队列Message Queue(简称MQ)的数据,本文为您介绍DataWorks的MetaQ数据读取能力。
支持的版本
MetaQ Reader通过消息队列服务的Java SDK订阅MetaQ中的实时消息数据,使用的Java SDK版本如下所示。
<dependency>
<groupId>com.taobao.metaq.final</groupId>
<artifactId>metaq-client</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sdk</artifactId>
<version>1.3.1</version>
</dependency>
使用限制
- 当前仅支持使用脚本模式,通过MetaQ Reader插件读取MQ数据。
- MetaQ Reader仅支持使用独享数据集成资源组。
支持的字段类型
支持的字段类型如下。
MetaQ Reader针对MetaQ类型的转换列表,如下所示。
字段类型 | 离线读(MetaQ Reader) |
---|---|
STRING | 支持 |
数据集成数据类型 | 消息队列数据类型 |
---|---|
STRING | STRING |
数据同步任务开发
- 操作流程请参见通过脚本模式配置离线同步任务。
- 脚本模式配置的全量参数和脚本Demo请参见下文的附录:MetaQ 脚本Demo与参数说明。
附录:MetaQ 脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
MetaQ Reader脚本Demo
{
"job": {
"content": [
{
"reader": {
"name": "metaqreader",
"parameter": {
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"consumerId": "Test01",
"topicName": "test",
"subExpression": "*",
"onsChannel": "ALIYUN",
"domainName": "***.aliyun.com",
"contentType": "singlestringcolumn",
"beginOffset": "lastRead",
"nullCurrentOffset": "begin",
"fieldDelimiter": ",",
"column": [
"col0"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
MetaQ Reader脚本参数
参数 | 描述 | 是否必选 |
---|---|---|
accessId | 访问消息队列的访问密钥,用于标识用户。 | 是 |
accessKey | 访问消息队列的访问密钥,用来验证用户的密钥。 | 是 |
consumerId | Consumer是消息的消费者,也称为消息订阅者,负责接收并消费消息。 consumerId是一类Consumer的标识,该类Consumer通常接收并消费一类消息,且消费逻辑一致。 | 是 |
topicName | 消息主题,一级消息类型,通过topic对消息进行分类。 | 是 |
subExpression | 消息子主题。 | 是 |
onsChannel | 用于进行消息队列鉴权。 | 是 |
unitName | 接收消息的目标单元。常用单元如下:
| 否 |
instanceName | Consumer的实例名称。 | 否 |
domainName | 消息队列的接入点。 | 是 |
contentType | 消息的类型,支持singlestringcolumn(消息为STRING类型)、text(消息为文本类型)和json(消息为JSON类型)。 | 是 |
beginOffset | 任务开始读取的Offset,支持begin(从最开始),lastRead(上次读取的Offset)。 | 否 |
nullCurrentOffset | 上次Offset为空时,开始读取的地方。支持begin(从最开始),current(当前offset)。 | 是 |
fieldDelimiter | 分隔符模式下消息字符串的列分隔符,例如逗号等。支持控制字符,例如\u0001。 | 是 |
column | 读取的字段列表。 | 是 |
beginDateTime | 数据消费的开始时间位点,为时间范围(左闭右开)的左边界。 beginDateTime是yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。 | 否 说明 beginDateTime和endDateTime配合使用。 |
endDateTime | 数据消费的结束时间位点,为时间范围(左闭右开)的右边界。 endDateTime是yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。 |