使用ES API访问搜索索引 - SQL表

云原生多模数据库 Lindorm搜索引擎(兼容Elasticsearch API版本)支持通过开源ES接口访问搜索索引,便于已采用ES技术栈的业务平滑迁移。本文以宽表引擎的SQL表场景为例,演示通过ES API访问搜索索引的最佳实践。

前提条件

  • 开通搜索索引服务,要求开通Lindorm搜索引擎的ElasticSearch兼容版本。

  • 推荐使用ES 7.10版本的客户端访问;若有其他版本客户端的访问需求,可联系Lindorm技术支持(钉钉号:s0s3eg3)咨询。

数据准备

准备SQL表并写入样例数据。

宽表引擎创建SQL

CREATE DATABASE searchindex_db;
USE searchindex_db;

CREATE TABLE search_table (
  user_id BIGINT,
  name VARCHAR,
  age SMALLINT,
  gender VARCHAR,
  address VARCHAR,
  email VARCHAR,
  city VARCHAR,
  PRIMARY KEY (user_id, name)
);

写入样例数据

UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, '张先生', 18, 'M', '北京市朝阳区', 'a***@example.net', '北京');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, '李先生', 32, 'M', '杭州市余杭区', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, '王先生', 28, 'M', '杭州市滨江区', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, '陈女士', 36, 'F', '深圳市南山区', 'a***@example.net', '深圳');

创建搜索索引

云原生多模数据库 LindormSQL表应通过SQL方式创建搜索索引,本文仅提供简单示例,更详细的创建方式可参考管理搜索索引

新建索引

CREATE INDEX idx USING SEARCH ON search_table (
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (numShards=4);

开启索引source

默认情况下,通过SQL表创建的搜索索引不在搜索引擎存储原始数据,以节约存储空间。如需通过ES API直接获取原始数据,请在创建索引时指定SOURCE_SETTINGS属性。

重要

若需指定SOURCE_SETTINGS,要求宽表引擎版本不低于2.7.4.1。

CREATE INDEX idx USING SEARCH ON search_table (
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (
  numShards=4,
  SOURCE_SETTINGS='
    {
      "enabled": true
    }
  '
);

获取ElasticSearch索引名

通过SQL方式创建搜索索引时,ElasticSearch中对应的索引名称为宽表namespace名.宽表Table名.搜索索引名。以上示例对应的ElasticSearch索引名为searchindex_db.search_table.idx登录UI界面并执行下述查询,可以查到对应的索引信息。

GET searchindex_db.search_table.idx

宽表与ElasticSearch索引字段的映射关系

默认情况下,SQL表和ElasticSearch索引的字段名是一一对应的关系。比如上述示例中,最终的ElasticSearch索引mappings结构如下:

{
  "mappings": {
    "dynamic": "true",
    "dynamic_templates": [],
    "properties": {
      "_searchindex_id": {
        "type": "keyword",
        "index": false
      },
      "address": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "age": {
        "type": "integer"
      },
      "city": {
        "type": "keyword"
      },
      "delete_version_l": {
        "type": "long"
      },
      "email": {
        "type": "keyword"
      },
      "gender": {
        "type": "keyword"
      },
      "update_version_l": {
        "type": "long"
      }
    }
  }
}

SQL表中的age列对应索引中的age字段,gender列对应索引中的gender字段,以此类推。索引中存在部分自动添加的内置字段(如update_version_l),这些字段用于内部的数据同步等用途,不影响正常使用,无需关注。

宽表与ElasticSearch索引字段的默认类型映射关系

宽表和搜索的类型映射关系,默认如下:

宽表类型

ElasticSearch类型

BOOLEAN/HBOOLEAN

bool

BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER

integer

LONG/HLONG/UNSIGNED_LONG/TIMESTAMP

long

FLOAT/HFLOAT/UNSIGNED_FLOAT

float

DOUBLE/HDOUBLE/UNSIGNED_DOUBLE

double

STRING/HSTRING

keyword

CHAR/BINARY/VARBINARY

keyword

JSON

object

宽表与ElasticSearch索引的主键映射关系

推荐使用方式

SQL表而言,索引_id列映射回宽表主键的逻辑相对复杂,若无节约存储空间需求,可直接开启索引source,无需从宽表回查。

若需兼顾节约空间和使用便捷的需求,推荐将宽表主键列加入索引列列表中,指定仅存储主键列的原始数据,可直接从ES查询结果获取宽表的主键列。例如以下SQL将宽表的主键列user_id, name都加入索引列中,source可以指定对user_id, name两个列开启,即可通过ES API获取宽表的主键列。

CREATE INDEX idx USING SEARCH ON search_table (
  user_id,
  name,
  age,
  gender,
  address(type=text, analyzer=ik),
  email,
  city
) WITH (
  numShards=4,
  SOURCE_SETTINGS='
    {
      "includes": ["user_id", "name"]
    }
  '
);

上述创建索引语句,通过ES API查询:

GET /searchindex_db.search_table.idx/_search

返回如下:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 4,
    "successful": 4,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 4,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000006e69d8ee58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 6,
          "name": "李先生"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000014e78e8be58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 20,
          "name": "王先生"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "8000000000000001e5bca0e58588e7949f00",
        "_score": 1,
        "_source": {
          "user_id": 1,
          "name": "张先生"
        }
      },
      {
        "_index": "searchindex_db.search_table.idx",
        "_id": "800000000000001ce99988e5a5b3e5a3ab00",
        "_score": 1,
        "_source": {
          "user_id": 28,
          "name": "陈女士"
        }
      }
    ]
  }
}

索引_id和宽表主键映射关系

如果您不使用上述推荐方式,请参考本章节完成_id列与宽表主键的转换操作。

宽表主键与ElasticSearch索引的_id列通过索引同步建立一一映射:每行宽表数据对应唯一索引文档。若创建索引时设置source=false(不存储原始数据),则需根据主键回查宽表获取原始数据。以下代码示例展示如何将ElasticSearch索引的_id列映射为宽表主键列。

说明

以下代码示例仅考虑宽表已支持的基础数据类型

package org.example;

import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;

import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

public class SearchDocIdToLindormColumnValues {

    public static void main(String[] args) throws Exception {
        List<String> primaryKeyTypes = new ArrayList<>();
        List<SortOrder> primaryKeySortOrders = new ArrayList<>();

        // this is "_id" value in ElasticSearch index
        String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f41800000000e6b58be8af95e5ad97e7aca6e4b8b2";
        // get primary key types and sort orders
        getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);

        // primary keys in desc type
        // String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffffe6b58be8af95e5ad97e7aca6e4b8b2";
        // getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);

        // get rowKey encoded type from search index
        RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);

        byte[] rowKey;
        if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
            rowKey = Hex.decodeHex(id.toCharArray());
        } else {
            rowKey = Bytes.toBytes(id);
        }

        // parse rowKey in Lindorm to several primary key values
        List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
        for (Object value : values) {
            System.out.println(value);
        }
    }

    private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        // sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
        // OR you can simply write down your primary types in order here
        primaryKeyTypes.add("BOOLEAN");
        primaryKeyTypes.add("TINYINT");
        primaryKeyTypes.add("SMALLINT");
        primaryKeyTypes.add("INT");
        primaryKeyTypes.add("BIGINT");
        primaryKeyTypes.add("FLOAT");
        primaryKeyTypes.add("DOUBLE");
        primaryKeyTypes.add("DECIMAL(10,2)");
        primaryKeyTypes.add("VARCHAR");
        primaryKeyTypes.add("DATE");
        primaryKeyTypes.add("TIME");
        primaryKeyTypes.add("TIMESTAMP");
        primaryKeyTypes.add("VARBINARY");

        for (int i = 0; i < 13; ++i) {
            primaryKeySortOrders.add(SortOrder.ASC);
        }
    }

    private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        // sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
        // OR you can simply write down your primary types in order here
        primaryKeyTypes.add("BOOLEAN");
        primaryKeyTypes.add("TINYINT");
        primaryKeyTypes.add("SMALLINT");
        primaryKeyTypes.add("INT");
        primaryKeyTypes.add("BIGINT");
        primaryKeyTypes.add("FLOAT");
        primaryKeyTypes.add("DOUBLE");
        primaryKeyTypes.add("DECIMAL(10,2)");
        primaryKeyTypes.add("VARCHAR");
        primaryKeyTypes.add("DATE");
        primaryKeyTypes.add("TIME");
        primaryKeyTypes.add("TIMESTAMP");
        primaryKeyTypes.add("VARBINARY");

        for (int i = 0; i < 12; ++i) {
            primaryKeySortOrders.add(SortOrder.DESC);
        }
        primaryKeySortOrders.add(SortOrder.ASC);
    }

    private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
        List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
        int pos = 0;
        for (int i = 0; i < primaryKeyTypes.size(); i++) {
            String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
            int start = pos;
            if (primaryKeyType.equalsIgnoreCase("varchar")) {
                while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
                    ++pos;
                }
                decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
                ++pos;
            } else if (primaryKeyType.equalsIgnoreCase("boolean")) {
                decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_BOOLEAN;
            } else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
                decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_BYTE;
            } else if (primaryKeyType.equalsIgnoreCase("smallint")) {
                decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_SHORT;
            } else if (primaryKeyType.equalsIgnoreCase("int")) {
                decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_INT;
            } else if (primaryKeyType.equalsIgnoreCase("bigint")) {
                decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.equalsIgnoreCase("float")) {
                decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_FLOAT;
            } else if (primaryKeyType.equalsIgnoreCase("double")) {
                decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_DOUBLE;
            } else if (primaryKeyType.contains("decimal")) {
                while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
                    ++pos;
                }
                decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
                ++pos;
            } else if (primaryKeyType.contains("varbinary")) {
                decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
                pos = rowKey.length;
            } else if (primaryKeyType.contains("date")) {
                decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.contains("time")) {
                decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG;
            } else if (primaryKeyType.contains("timestamp")) {
                decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
                pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
            }
        }

        return decoded;
    }

    private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        if (SortOrder.ASC.equals(sortOrder)) {
            return new String(bytes, offset, length, StandardCharsets.UTF_8);
        } else {
            byte[] reversed = new byte[length];
            for (int i = 0; i < length; i++) {
                reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
            }
            return new String(reversed, StandardCharsets.UTF_8);
        }
    }

    private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
        return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
    }

    private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
    }

    private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
    }

    private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
    }

    private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
    }

    private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
    }

    private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
        return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
    }

    private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        if (SortOrder.DESC.equals(sortOrder)) {
            return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
        }
        return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
    }

    private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
        byte[] result = new byte[length];
        if (SortOrder.DESC.equals(sortOrder)) {
            DataTypeUtils.invert(bytes, offset, result, 0, length);
        } else {
            System.arraycopy(bytes, offset, result, 0, length);
        }
        return result;
    }

    private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
        Long result = (Long) decodeLong(bytes, offset, sortOrder);
        return new Date(result);
    }

    private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
        Long result = (Long) decodeLong(bytes, offset, sortOrder);
        return new Time(result);
    }

    private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
        long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
        int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
        Timestamp ts = new Timestamp(ms);
        ts.setNanos(ts.getNanos() + nanos);
        return ts;
    }

    private static byte getSeparatorByte(SortOrder sortOrder) {
        if (sortOrder == SortOrder.DESC) {
            return (byte) (0xFF);
        } else {
            return (byte) 0;
        }
    }

    public enum RowKeyEncodedType {
        STRING,
        HEX;

        public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
            int n = primaryKeyTypes.size();
            for (int i = 0; i < n; i++) {
                String primaryKeyType = primaryKeyTypes.get(i);
                SortOrder sortOrder = primaryKeySortOrders.get(i);
                if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
                    || SortOrder.DESC.equals(sortOrder)) {
                    return RowKeyEncodedType.HEX;
                }
            }
            return RowKeyEncodedType.STRING;
        }
    }
}