本文为您介绍Elasticsearch Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。

支持的Elasticsearch版本

DataWorks平台目前仅支持配置阿里云Elasticsearch5.x、6.x、7.x版本数据源,不支持配置自建Elasticsearch数据源。
重要 如果您使用的是6.x及以上版本,仅支持使用独享数据集成资源组

支持的字段类型

类型离线读(Elasticsearch Reader)离线写(Elasticsearch Writer)实时写
binary支持 支持 支持
boolean支持 支持 支持
keyword支持 支持 支持
constant_keyword 不支持不支持不支持
wildcard不支持不支持不支持
long支持 支持 支持
integer支持 支持 支持
short支持 支持 支持
byte支持 支持 支持
double支持 支持 支持
float支持 支持 支持
half_float不支持不支持不支持
scaled_float不支持不支持不支持
unsigned_long不支持不支持不支持
date支持 支持 支持
date_nanos不支持不支持不支持
alias不支持不支持不支持
object支持 支持 支持
flattened不支持不支持不支持
nested支持 支持 支持
join不支持不支持不支持
integer_range支持 支持 支持
float_range支持 支持 支持
long_range支持 支持 支持
double_range支持 支持 支持
date_range支持 支持 支持
ip_range不支持支持 支持
ip支持 支持 支持
version支持 支持 支持
murmur3不支持不支持不支持
aggregate_metric_double不支持不支持不支持
histogram不支持不支持不支持
text支持 支持 支持
annotated-text不支持不支持不支持
completion支持 不支持不支持
search_as_you_type不支持不支持不支持
token_count支持 不支持不支持
dense_vector不支持不支持不支持
rank_feature不支持不支持不支持
rank_features不支持不支持不支持
geo_point支持 支持 支持
geo_shape支持 支持 支持
point不支持不支持不支持
shape不支持不支持不支持
percolator不支持不支持不支持
string支持 支持 支持

背景信息

Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x、6.x和7.x版本。独享数据集成资源组的详情请参见新增和使用独享数据集成资源组

Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。Elasticsearch是一个基于Lucene的搜索和数据分析工具,它提供分布式服务。Elasticsearch核心概念同数据库核心概念的对应关系如下所示。
Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch        -> Index              -> Types       -> Documents       -> Fields

Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。

参数说明

参数描述是否必选默认值
datasource选择需要同步的Elasticsearch数据源,若还未在DataWorks创建该数据源,请先创建,详情请参见配置Elasticsearch数据源
indexElasticsearch中的index名。
indexTypeElasticsearch中index的type名。Elasticsearch
cleanup定义当前任务在索引index已存在的情况是否要删除数据。
  • 是(true):导入数据前删除原来的索引并重建同名索引,此操作会删除该索引下的数据。
  • 否(false):导入数据前保留索引中已存在的数据。
false
batchSize定义同步任务一次性插入ElasticSearch的Document条数。1,000
trySize定义往ElasticSearch写入数据失败后的重试次数。30
timeout客户端超时时间。600,000
discovery任务是否启动节点发现功能。
  • true:与集群中随机一个节点进行连接。启用节点发现将轮询并定期更新客户机中的服务器列表。
  • false:与Elasticsearch集群进行连接。
false
compressionHTTP请求,开启压缩。true
multiThreadHTTP请求,是否有多线程。true
ignoreWriteError忽略写入错误,不重试,继续写入。false
ignoreParseError忽略解析数据格式错误,继续写入。true
aliasElasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。

配置alias表示在数据导入完成后,为指定的索引创建别名。

aliasMode数据导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个):
  • aliasModeappend时,表示追加当前索引至别名alias映射中(一个别名对应多个索引)。
  • aliasModeexclusive时,表示首先删除别名alias,再添加当前索引至别名alias映射中(一个别名对应一个索引)。

后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。

append
settings创建index时的settings,与Elasticsearch官方一致。
columncolumn用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及AnalyzerFormatArray等扩展配置。
Elasticsearch所支持的字段类型如下所示。
- id  //type id对应Elasticsearch中的_id,可以理解为唯一主键。写入时,相同id的数据会被覆盖,且不会被索引。
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested
列类型的说明如下:
  • 列类型为text类型时,可以配置analyzer(分词器)、normsindex_options等参数,示例如下。
    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • 列类型为Date类型时,您可配置如下两种方式解析源端数据,配置方式请保持一致。
    • 方式一:根据reader端读取字段的内容直接写入es data字段:
      • 配置origin:true必填,让读取字段的内容直接写入es data
      • 配置"format",表示在通过es writer创建mapping时,该字段需要设置format属性。示例如下:
          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • 方式二:(时区转换)如果需要数据集成帮助您进行时区转换,可添加Timezone参数。示例如下:
      配置的"format"表示数据集成在做时区转换时,解析的时间格式如下:
        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • 列类型为地理形状geo_shape时,可以配置tree(geohash或quadtree)、precision(精度)属性,示例如下。
    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

如果需要在column中配置除了type以外的属性值,您可以使用other_params参数,该参数配置在column中,在update mappings时,用于描述column中除了type以外的Elasticsearch属性信息。

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

如果您希望源端数据写入为Elasticsearch时按照数组类型写入,您可按照json格式或指定分隔符的方式来解析源端数据。配置详情请参见附录:ElasticSearch写入的格式期望是数组类型

dynamic定义当在文档中发现未存在的字段时,同步任务是否通过Elasticsearch动态映射机制为字段添加映射。
  • true:保留Elasticsearch的自动mappings映射。
  • false:默认值,不填写默认为false,根据同步任务配置的column生成并更新Elasticsearch的mappings映射。

Elasticsearch 7.x版本的默认type_doc。使用Elasticsearch的自动mappings时,请配置_docesVersion为7。

您需要转换为脚本模式,添加一个版本参数:"esVersion": "7"

false
actionType表示Elasticsearch在数据写出时的action类型,目前数据集成支持indexupdate两种actionType,默认值为index
  • index:底层使用了Elasticsearch SDK的Index.Builder构造批量请求。Elasticsearch index插入时,需要首先判断插入的文档数据中是否指定ID:
    • 如果没有指定ID,Elasticsearch会默认生成一个唯一ID。该情况下会直接添加文档至Elasticsearch中。
    • 如果已指定ID,会进行更新(替换整个文档),且不支持针对特定Field进行修改。
      说明 此处的更新并非Elasticsearch中的更新(替换部分指定列替换)。
  • update:根据用户指定的ID进行文档更新,如果ID值在索引中不存在则插入文档,存在则更新指定的column字段内容(其他文档字段内容不变)。每次update完成都会获取整个文档信息,从而实现针对特定字段进行修改。这里update不支持条件筛选,仅根据指定ID值进行更新操作。由于每次更新都需要获取一遍原始文档,因此对性能同步能会有较大影响。
    说明 设置action类型为update时,您需要设置主键primaryKeyInfo
index
primaryKeyInfo定义当前写入ElasticSearch的主键取值方式。
  • 业务主键(pk):_id 的值指定为某一个字段。
    "parameter": {
        "primaryKeyInfo": {
            "type": "pk",
            "column": ["id"]}
    }
  • 联合主键(specific):_id 的值指定为某几个字段的值拼接,分隔符为您设置的主键分隔符fieldDelimiter
    说明 其中字段名为eswriter的待写入字段,向导模式拉取主键列配置时只包含Elasticsearch索引中已存在的字段。
    "parameter": {
        "primaryKeyInfo": {
            "type": "specific",
            "fieldDelimiter": ",",
            "column": ["col1","col2"]}
    }
  • 无主键(nopk):_id在写入ElasticSearch时系统自动生成。
    "primaryKeyInfo": {
        "type": "nopk"
    }
specific
esPartitionColumn定义写入ElasticSearch时是否开启分区,用于修改ElasticSearch中的routing的参数。
  • 开启分区:把指定列的value通过分隔符空串连接指定为routing的值,在写入时,插入或更新指定shard中的doc,开启分区的情况下您需要指定分区列。
    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type": "STRING"
                }
            ],
        }
  • 不开启分区:不填写该参数,默认使用_id作为routing起到将文档均匀分布到多个分片上防止数据倾斜的作用。
false
enableWriteNull该参数用于是否支持将来源端的空值字段同步至Elasticsearch。取值如下:
  • true:支持。同步后,Elasticsearch中对应字段的value为空。
  • false:不支持。来源端的空值字段无法同步至Elasticsearch,即在Elasticsearch中不显示该字段。
false

脚本开发介绍

通过脚本模式开发的详情请参见通过脚本模式配置离线同步任务

脚本配置示例如下,具体参数请参见上文的参数说明。
{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource": "xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo": {
                     "type": "pk",    
                     "fieldDelimiter": ",",
                     "column": []
                    },
                "batchSize": 1000,
                "dynamic": false,
                "esPartitionColumn": [
                      {
                         "name": "col1",  
                         "comment": "xx", 
                         "type": "STRING" 
                         }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
说明 VPC环境的Elasticsearch运行在默认资源组会存在网络不通的情况。您需要使用独享数据集成资源或自定义资源,才能连通VPC进行数据同步。添加两种资源的详情请参见独享数据集成资源组新增自定义资源组

向导模式开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,通用配置详情请参见通过向导模式配置离线同步任务,本文为您介绍向导模式配置数据同步至Elasticsearch。

  1. 选择数据源。配置同步任务的数据来源和数据去向。es writer
    参数描述
    数据源即上述参数说明中的datasource
    索引即上述参数说明中的index
    是否删除原索引即上述参数说明中的cleanup
    写入类型即上述参数说明中的ActionType,支持两种写入类型:插入(index)、更新(update)。
    使用ElasticSearch的自动mappings即上述参数说明中的dynamic
    主键取值方式即上述参数说明中的primaryKeyInfo
    批量插入条数即上述参数说明中的batchSize
    开启分区即上述参数说明中的esPartitionColumn
    启用节点发现即上述参数说明中的discovery
    Settings即上述参数说明中的settings
  2. 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应的关系。字段映射
  3. 通道控制通道控制
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

附录:ElasticSearch写入的格式期望是数组类型

支持以下两种方式将源端数据按照数组类型写入ElasticSearch。

  • 按json格式解析源端数据

    例如:源端数据为"[1,2,3,4,5]",配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • 按分隔符解析源端数据

    例如:源端数据为"1,2,3,4,5", 配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。

    说明 一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter无法针对每列单独配置使用。
    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","//注意:splitter配置与column配置同级。
    }