Elasticsearch数据源为您提供读取和写入Elasticsearch双向通道的功能,本文为您介绍DataWorks的Elasticsearch数据同步的能力支持情况。
背景信息
Elasticsearch在公共资源组上支持Elasticsearch 5.x版本,在Serverless资源组(推荐)和独享数据集成资源组上支持Elasticsearch 5.x、6.x、7.x和8.x版本。
Serverless资源组的详情请参见新增和使用Serverless资源组。
独享数据集成资源组的详情请参见新增和使用独享数据集成资源组。
Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。Elasticsearch是一个基于Lucene的搜索和数据分析工具,它提供分布式服务。Elasticsearch核心概念同数据库核心概念的对应关系如下所示。
Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。
支持的版本
DataWorks平台目前仅支持配置阿里云Elasticsearch 5.x、6.x、7.x和8.x版本数据源,不支持配置自建Elasticsearch数据源。
使用限制
离线读写
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
如果您使用的是6.x及以上版本,支持使用Serverless资源组(推荐)和独享数据集成资源组。
不支持同步scaled_float类型的字段。
不支持同步字段中带有关键字
$ref
的索引。
支持的字段类型
类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 |
binary | 支持 | 支持 | 支持 |
boolean | 支持 | 支持 | 支持 |
keyword | 支持 | 支持 | 支持 |
constant_keyword | 不支持 | 不支持 | 不支持 |
wildcard | 不支持 | 不支持 | 不支持 |
long | 支持 | 支持 | 支持 |
integer | 支持 | 支持 | 支持 |
short | 支持 | 支持 | 支持 |
byte | 支持 | 支持 | 支持 |
double | 支持 | 支持 | 支持 |
float | 支持 | 支持 | 支持 |
half_float | 不支持 | 不支持 | 不支持 |
scaled_float | 不支持 | 不支持 | 不支持 |
unsigned_long | 不支持 | 不支持 | 不支持 |
date | 支持 | 支持 | 支持 |
date_nanos | 不支持 | 不支持 | 不支持 |
alias | 不支持 | 不支持 | 不支持 |
object | 支持 | 支持 | 支持 |
flattened | 不支持 | 不支持 | 不支持 |
nested | 支持 | 支持 | 支持 |
join | 不支持 | 不支持 | 不支持 |
integer_range | 支持 | 支持 | 支持 |
float_range | 支持 | 支持 | 支持 |
long_range | 支持 | 支持 | 支持 |
double_range | 支持 | 支持 | 支持 |
date_range | 支持 | 支持 | 支持 |
ip_range | 不支持 | 支持 | 支持 |
ip | 支持 | 支持 | 支持 |
version | 支持 | 支持 | 支持 |
murmur3 | 不支持 | 不支持 | 不支持 |
aggregate_metric_double | 不支持 | 不支持 | 不支持 |
histogram | 不支持 | 不支持 | 不支持 |
text | 支持 | 支持 | 支持 |
annotated-text | 不支持 | 不支持 | 不支持 |
completion | 支持 | 不支持 | 不支持 |
search_as_you_type | 不支持 | 不支持 | 不支持 |
token_count | 支持 | 不支持 | 不支持 |
dense_vector | 不支持 | 不支持 | 不支持 |
rank_feature | 不支持 | 不支持 | 不支持 |
rank_features | 不支持 | 不支持 | 不支持 |
geo_point | 支持 | 支持 | 支持 |
geo_shape | 支持 | 支持 | 支持 |
point | 不支持 | 不支持 | 不支持 |
shape | 不支持 | 不支持 | 不支持 |
percolator | 不支持 | 不支持 | 不支持 |
string | 支持 | 支持 | 支持 |
工作原理
Elasticsearch Reader的工作原理如下:
通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
根据Elasticsearch中的Mapping配置,转换数据类型。
更多详情请参见Elasticsearch官方文档。
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
基本配置
实际运行时,请删除下述代码中的注释。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,//并发数
"throttle":true,//
"mbps":"12",//限流,此处1mbps = 1MB/s。
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"", //服务地址。
"index":"", //索引。
"password":"", //密码。
"scroll":"", //scroll标志。
"search":"", //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"default",
"username":"" //用户名。
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ //写入列
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", //写入索引
"indexType": "", //写入索引类型,es7不填
"actionType": "index", //写入方式
"cleanup": false, //是否重建索引
"datasource": "test", //数据源名称
"primaryKeyInfo": { //主键取值方式
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, //动态映射
"batchSize": 1024 //批量写文档数
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" //版本号。
}
高级功能
支持全量拉取
支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取。
支持提取半结构化到结构化数据
分类
描述
相关文档
产生背景
Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。
—
实现原理
将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。
—
解决方案
JSON有嵌套的情况,通过path路径来解决。
属性
属性.子属性
属性[0].子属性
附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。
属性[*].子属性
数组归并,一个字符串数组内容,归并为一个属性,并进行去重。
属性[]
多属性合一,将多个属性合并为一个属性。
属性1,属性2
多属性选择处理。
属性1|属性2
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录一:脚本Demo与参数说明。
单表实时写同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
整库离线写、单表/整库全增量实时写同步任务配置指导
操作流程请参见数据集成侧同步任务配置。
附录一:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
Reader脚本参数
Writer脚本Demo
Writer脚本参数
附录二:ElasticSearch写入的格式期望是数组类型
支持以下两种方式将源端数据按照数组类型写入ElasticSearch。
按JSON格式解析源端数据
例如:源端数据为
"[1,2,3,4,5]"
,配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。"parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
按分隔符解析源端数据
例如:源端数据为
"1,2,3,4,5"
, 配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。说明一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter无法针对每列单独配置使用。
"parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","//注意:splitter配置与column配置同级。 }
附录三:场景示例
场景一:全量拉取
场景二:嵌套或对象字段属性同步
场景三:数组属性拆分为多行
场景四:数组属性去重归并
场景五:多属性合一同步
场景六:多属性选择同步
相关文档
数据集成支持其他更多数据源接入,更多信息,请参见支持的数据源及同步方案。