本文为您介绍Elasticsearch Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。
使用限制
DataWorks平台目前仅支持配置阿里云Elasticsearch5.x、6.x、7.x版本数据源,不支持配置自建Elasticsearch数据源。
背景信息
Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x、6.x和7.x版本。独享数据集成资源组的详情请参见新增和使用独享数据集成资源组。
Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
endpoint | Elasticsearch的连接地址,通常格式为http://example.com:9999 。
|
否 | 无 |
accessId | Elasticsearch的username,用于与Elasticsearch建立连接时的鉴权。
说明 AccessID和AccessKey为必填项,如果不填写会产生报错。如果您使用的是自建Elasticsearch,不设置basic验证,则无需账号密码,此处AccessId和AccessKey填写随机值即可。
|
否 | 无 |
accessKey | Elasticsearch的password。 | 否 | 无 |
index | Elasticsearch中的index名。 | 否 | 无 |
indexType | Elasticsearch中index的type名。 | 否 | Elasticsearch |
cleanup | 是否删除所配索引中已有数据,清理数据的方法为删除并重建对应索引,默认值为false,表示保留已有索引中的数据。 | 否 | false |
batchSize | 每次批量数据的条数。 | 否 | 1,000 |
trySize | 失败后重试的次数。 | 否 | 30 |
timeout | 客户端超时时间。 | 否 | 600,000 |
discovery | 启用节点发现将轮询并定期更新客户机中的服务器列表。 | 否 | false |
compression | HTTP请求,开启压缩。 | 否 | true |
multiThread | HTTP请求,是否有多线程。 | 否 | true |
ignoreWriteError | 忽略写入错误,不重试,继续写入。 | 否 | false |
ignoreParseError | 忽略解析数据格式错误,继续写入。 | 否 | true |
alias | Elasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。
配置alias表示在数据导入完成后,为指定的索引创建别名。 |
否 | 无 |
aliasMode | 数据导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个):
后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。 |
否 | append |
splitter | 如果待插入目标端数据列类型是array数组类型,则使用指定分隔符(-,-),将源头数据进行拆分写出。
例如,源头列是数组 |
否 | -,- |
settings | 创建index时的settings,与Elasticsearch官方一致。 | 否 | 无 |
column | column用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及Analyzer、Format和Array等扩展配置。
Elasticsearch所支持的字段类型如下所示。
列类型的说明如下:
如果您在列Filed中配置了array属性,且值为true时,则表示数组列。Elasticsearch Writer会使用splitter配置的分隔符(一个任务仅支持配置一种切分分隔符),将对应源端数据进行拆分,转换为字符串数组形式最终写出至目的端,示例如下。
|
是 | 无 |
dynamic | 如果为true,则使用Elasticsearch的自动mappings,而非使用数据集成的mappings。
Elasticsearch 7.x版本的默认type为_doc。使用Elasticsearch的自动mappings时,请配置_doc和esVersion为7。 您需要转换为脚本模式,添加一个版本参数: |
否 | false |
actionType | 表示Elasticsearch在数据写出时的action类型,目前数据集成支持index和update两种actionType,默认值为index:
|
否 | index |
脚本开发介绍
通过脚本模式开发的详情请参见通过脚本模式配置任务。
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"endpoint": "http://example.com:9999",
"accessId": "xxxx",
"accessKey": "yyyy",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
},
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word"
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}