DataWorks数据集成支持使用OpenSearch Writer将数据写至OpenSearch,本文为您介绍DataWorks的OpenSearch数据离线写入能力。
支持的版本
V2版本请参见请求结构。
V3版本使用二方包,依赖pom为:com.aliyun.opensearch aliyun-sdk-opensearch 2.1.3。
如果您需要使用OpenSearchWriter插件,请务必使用JDK 1.6-32及以上版本,您可以使用java -version
查看Java版本号。
支持的阿里云OpenSearch商业版本:行业算法版、LLM智能问答版、高性能检索版、向量检索版、召回引擎版。
使用限制
OpenSearch Writer支持使用Serverless资源组(推荐)和独享数据集成资源组,不支持使用自定义资源组。
OpenSearch的列是无序的,因此OpenSearch Writer写入时,需严格按照指定的列的顺序写入。如果指定的列比OpenSearch的列少,则其余列使用默认值或null。
例如,您需要导入的字段列表有b、c两个字段,但OpenSearch表中的字段有a、b、c三列,在列配置中可以写为"column":["c","b"],表示会把Reader的第一列和第二列导入OpenSearch的c字段和b字段,而OpenSearch表中新插入的a字段会被置为默认值或null。
当前仅支持使用脚本模式将离线数据写入OpenSearch。
支持的字段类型
OpenSearch Writer支持大部分OpenSearch类型,请注意检查您的数据类型。OpenSearch Writer针对OpenSearch类型的转换列表,如下所示。
类型分类 | OpenSearch数据类型 |
整数类 | INT |
浮点类 | DOUBLE和FLOAT |
字符串类 | TEXT、LITERAL和SHORT_TEXT |
日期时间类 | INT |
布尔类 | LITERAL |
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
常见问题
列配置错误的处理
为保证写入数据的可靠性,避免多余列数据丢失造成数据质量故障。对于写入多余的列,OpenSearch Writer将报错。例如OpenSearch表字段为a、b、c,如果OpenSearch Writer写入的字段多于3列,OpenSearch Writer将报错。
表配置注意事项
OpenSearch Writer一次只能写入一个表。
任务重跑和Failover
重跑后会自动根据ID覆盖。因此,插入OpenSearch的列中,必须有一个ID,该ID是OpenSearch中一行记录的唯一标识。唯一标识一样的数据,会被覆盖掉。
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
行业算法版、LLM智能问答版、高性能检索版Writer脚本Demo
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {},
"writer": {
"plugin": "opensearch",
"parameter": {
"accessId": "*********",
"accessKey": "********",
"host": "http://yyyy.aliyuncs.com",
"indexName": "datax_xxx",
"table": "datax_yyy",
"column": [
"appkey",
"id",
"title",
"gmt_create",
"pic_default"
],
"batchSize": 500,
"writeMode": add,
"version":"v2",
"ignoreWriteError": false
}
}
}
}
行业算法版、LLM智能问答版、高性能检索版Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
accessId | 访问密钥中的AccessKey ID。 | 是 | 无 |
accessKey | 访问密钥中的AccessKey Secret,相当于登录密码。 | 是 | 无 |
host | OpenSearch连接的服务地址,您可以在应用详情页面进行查看。通常生产的服务地址为:http://opensearch-cn-internal.aliyuncs.com/ ,测试的服务地址为:http://opensearch-cn-corp.aliyuncs.com/ 。 | 是 | 无 |
indexName | OpenSearch项目的名称。 | 是 | 无 |
table | 写入数据的表名,不能填写多张表,因为DataX不支持同时导入多张表。 | 是 | 无 |
column | 需要导入的字段列表。当导入全部字段时,可以配置为"column":["*"] 。当需要插入部分OpenSearch列时,填写需要插入的列,例如:"column":["id","name"] 。 OpenSearch支持列筛选、列换序,例如:表有a、b和c三个字段,只需同步c,b两个字段,则可以配置为["c","b"] 。导入过程中,字段a自动补空,设置为null。 | 是 | 无 |
batchSize | 单次写入的数据条数。OpenSearch写入为批量写入,通常OpenSearch的优势在于查询,写入的每秒处理事务数(TPS)不高,请根据账号申请的资源进行设置。 通常OpenSearch的单条数据小于1 MB,单次写入小于2 MB。 | 如果是分区表,该选项必填。如果是非分区表,该选项不可填写。 | 300 |
writeMode | OpenSearch Writer通过配置"writeMode":"add/update",保证写入的幂等性: | 是 | 无 |
ignoreWriteError | 忽略写错误。 配置示例:"ignoreWriteError":true 。OpenSearch为批量写入,是否忽略当前批次的写入失败。如果忽略,则继续执行其它的写操作。如果不忽略,则直接结束当前任务,并返回错误。建议使用默认值。 | 否 | false |
version | OpenSearch的版本信息,例如"version":"v3" 。由于V2版本对于push操作的限制较多,建议使用V3版本。 | 否 | v2 |
向量检索版、召回引擎版Writer脚本Demo
{
"stepType": "opensearch",
"parameter": {
"indexName": "",
"column": [
{
"name": "col3double",
"type": "DOUBLE"
},
{
"name": "col2vector",
"type": "MULTI_FLOAT"
}
],
"datasource": "zm_test_vector_01",
"batchSize": "500",
"table": "demotable"
},
"name": "Writer",
"category": "writer"
}
向量检索版、召回引擎版Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
table | 写入数据的表名,不能填写多张表,因为DataX不支持同时导入多张表。 | 是 | 无 |
column | 需要导入的字段列表。当导入全部字段时,可以配置为"column":["*"] 。当需要插入部分OpenSearch列时,填写需要插入的列,例如:"column":["id","name"] 。 OpenSearch支持列筛选、列换序,例如:表有a、b和c三个字段,只需同步c,b两个字段,则可以配置为["c","b"] 。导入过程中,字段a自动补空,设置为null。 | 是 | 无 |
batchSize | 单次写入的数据条数。OpenSearch写入为批量写入,通常OpenSearch的优势在于查询,写入的每秒处理事务数(TPS)不高,请根据账号申请的资源进行设置。 通常OpenSearch的单条数据小于1 MB,单次写入小于2 MB。 | 如果是分区表,该选项必填。如果是非分区表,该选项不可填写。 | 300 |