云原生多模数据库 Lindorm搜索引擎(兼容Elasticsearch API版本)支持通过开源ES接口访问搜索索引,便于已采用ES技术栈的业务平滑迁移。本文以宽表引擎的SQL表场景为例,演示通过ES API访问搜索索引的最佳实践。
前提条件
已开通搜索索引服务,要求开通Lindorm搜索引擎的ElasticSearch兼容版本。
推荐使用ES 7.10版本的客户端访问;若有其他版本客户端的访问需求,可联系Lindorm技术支持(钉钉号:s0s3eg3)咨询。
数据准备
准备SQL表并写入样例数据。
宽表引擎创建SQL表
CREATE DATABASE searchindex_db;
USE searchindex_db;
CREATE TABLE search_table (
user_id BIGINT,
name VARCHAR,
age SMALLINT,
gender VARCHAR,
address VARCHAR,
email VARCHAR,
city VARCHAR,
PRIMARY KEY (user_id, name)
);写入样例数据
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (1, '张先生', 18, 'M', '北京市朝阳区', 'a***@example.net', '北京');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (6, '李先生', 32, 'M', '杭州市余杭区', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (20, '王先生', 28, 'M', '杭州市滨江区', 'a***@example.net', '杭州');
UPSERT INTO search_table (user_id,name,age,gender,address,email,city) VALUES (28, '陈女士', 36, 'F', '深圳市南山区', 'a***@example.net', '深圳');创建搜索索引
云原生多模数据库 LindormSQL表应通过SQL方式创建搜索索引,本文仅提供简单示例,更详细的创建方式可参考管理搜索索引。
新建索引
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (numShards=4);开启索引source
默认情况下,通过SQL表创建的搜索索引不在搜索引擎存储原始数据,以节约存储空间。如需通过ES API直接获取原始数据,请在创建索引时指定SOURCE_SETTINGS属性。
若需指定SOURCE_SETTINGS,要求宽表引擎版本不低于2.7.4.1。
CREATE INDEX idx USING SEARCH ON search_table (
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"enabled": true
}
'
);获取ElasticSearch索引名
通过SQL方式创建搜索索引时,ElasticSearch中对应的索引名称为宽表namespace名.宽表Table名.搜索索引名。以上示例对应的ElasticSearch索引名为searchindex_db.search_table.idx。登录UI界面并执行下述查询,可以查到对应的索引信息。
GET searchindex_db.search_table.idx宽表与ElasticSearch索引字段的映射关系
默认情况下,SQL表和ElasticSearch索引的字段名是一一对应的关系。比如上述示例中,最终的ElasticSearch索引mappings结构如下:
{
"mappings": {
"dynamic": "true",
"dynamic_templates": [],
"properties": {
"_searchindex_id": {
"type": "keyword",
"index": false
},
"address": {
"type": "text",
"analyzer": "ik_max_word"
},
"age": {
"type": "integer"
},
"city": {
"type": "keyword"
},
"delete_version_l": {
"type": "long"
},
"email": {
"type": "keyword"
},
"gender": {
"type": "keyword"
},
"update_version_l": {
"type": "long"
}
}
}
}SQL表中的age列对应索引中的age字段,gender列对应索引中的gender字段,以此类推。索引中存在部分自动添加的内置字段(如update_version_l),这些字段用于内部的数据同步等用途,不影响正常使用,无需关注。
宽表与ElasticSearch索引字段的默认类型映射关系
宽表和搜索的类型映射关系,默认如下:
宽表类型 | ElasticSearch类型 |
BOOLEAN/HBOOLEAN | bool |
BYTE/SHORT/HSHORT/INT/HINTEGER/UNSIGNED_BYTE/UNSIGNED_SHORT/UNSIGNED_INTEGER | integer |
LONG/HLONG/UNSIGNED_LONG/TIMESTAMP | long |
FLOAT/HFLOAT/UNSIGNED_FLOAT | float |
DOUBLE/HDOUBLE/UNSIGNED_DOUBLE | double |
STRING/HSTRING | keyword |
CHAR/BINARY/VARBINARY | keyword |
JSON | object |
宽表与ElasticSearch索引的主键映射关系
推荐使用方式
对SQL表而言,索引_id列映射回宽表主键的逻辑相对复杂,若无节约存储空间需求,可直接开启索引source,无需从宽表回查。
若需兼顾节约空间和使用便捷的需求,推荐将宽表主键列加入索引列列表中,指定仅存储主键列的原始数据,可直接从ES查询结果获取宽表的主键列。例如以下SQL将宽表的主键列user_id, name都加入索引列中,source可以指定对user_id, name两个列开启,即可通过ES API获取宽表的主键列。
CREATE INDEX idx USING SEARCH ON search_table (
user_id,
name,
age,
gender,
address(type=text, analyzer=ik),
email,
city
) WITH (
numShards=4,
SOURCE_SETTINGS='
{
"includes": ["user_id", "name"]
}
'
);上述创建索引语句,通过ES API查询:
GET /searchindex_db.search_table.idx/_search返回如下:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 4,
"successful": 4,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000006e69d8ee58588e7949f00",
"_score": 1,
"_source": {
"user_id": 6,
"name": "李先生"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000014e78e8be58588e7949f00",
"_score": 1,
"_source": {
"user_id": 20,
"name": "王先生"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "8000000000000001e5bca0e58588e7949f00",
"_score": 1,
"_source": {
"user_id": 1,
"name": "张先生"
}
},
{
"_index": "searchindex_db.search_table.idx",
"_id": "800000000000001ce99988e5a5b3e5a3ab00",
"_score": 1,
"_source": {
"user_id": 28,
"name": "陈女士"
}
}
]
}
}索引_id和宽表主键映射关系
如果您不使用上述推荐方式,请参考本章节完成_id列与宽表主键的转换操作。
宽表主键与ElasticSearch索引的_id列通过索引同步建立一一映射:每行宽表数据对应唯一索引文档。若创建索引时设置source=false(不存储原始数据),则需根据主键回查宽表获取原始数据。以下代码示例展示如何将ElasticSearch索引的_id列映射为宽表主键列。
以下代码示例仅考虑宽表已支持的基础数据类型。
package org.example;
import com.alibaba.lindorm.client.core.utils.DataTypeUtils;
import com.alibaba.lindorm.client.schema.DataType;
import com.alibaba.lindorm.client.schema.SortOrder;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.util.Bytes;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
public class SearchDocIdToLindormColumnValues {
public static void main(String[] args) throws Exception {
List<String> primaryKeyTypes = new ArrayList<>();
List<SortOrder> primaryKeySortOrders = new ArrayList<>();
// this is "_id" value in ElasticSearch index
String id = "00818001800000018000000000000001bf8ccccebff199999999999bc10b02007465737400800001992c36f418800001992c36f418800001992c36f41800000000e6b58be8af95e5ad97e7aca6e4b8b2";
// get primary key types and sort orders
getPrimaryKeyTypesAndSortOrders(primaryKeyTypes, primaryKeySortOrders);
// primary keys in desc type
// String id = "017e7ffe7ffffffe7ffffffffffffffe40733331400e6666666666643febf4feff8b9a8c8bff7ffffe66d3c90be77ffffe66d3c90be77ffffe66d3c90be7ffffffffe6b58be8af95e5ad97e7aca6e4b8b2";
// getPrimaryKeyTypesAndSortOrdersDesc(primaryKeyTypes, primaryKeySortOrders);
// get rowKey encoded type from search index
RowKeyEncodedType rowKeyEncodedType = RowKeyEncodedType.getFormatterType(primaryKeyTypes, primaryKeySortOrders);
byte[] rowKey;
if (RowKeyEncodedType.HEX.equals(rowKeyEncodedType)) {
rowKey = Hex.decodeHex(id.toCharArray());
} else {
rowKey = Bytes.toBytes(id);
}
// parse rowKey in Lindorm to several primary key values
List<Object> values = parseRowKeyToValues(rowKey, primaryKeyTypes, primaryKeySortOrders);
for (Object value : values) {
System.out.println(value);
}
}
private static void getPrimaryKeyTypesAndSortOrders(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
// OR you can simply write down your primary types in order here
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 13; ++i) {
primaryKeySortOrders.add(SortOrder.ASC);
}
}
private static void getPrimaryKeyTypesAndSortOrdersDesc(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
// sample here, you can get this through mysql connection by 'DESCRIBE TABLE' and get real schema from your table
// OR you can simply write down your primary types in order here
primaryKeyTypes.add("BOOLEAN");
primaryKeyTypes.add("TINYINT");
primaryKeyTypes.add("SMALLINT");
primaryKeyTypes.add("INT");
primaryKeyTypes.add("BIGINT");
primaryKeyTypes.add("FLOAT");
primaryKeyTypes.add("DOUBLE");
primaryKeyTypes.add("DECIMAL(10,2)");
primaryKeyTypes.add("VARCHAR");
primaryKeyTypes.add("DATE");
primaryKeyTypes.add("TIME");
primaryKeyTypes.add("TIMESTAMP");
primaryKeyTypes.add("VARBINARY");
for (int i = 0; i < 12; ++i) {
primaryKeySortOrders.add(SortOrder.DESC);
}
primaryKeySortOrders.add(SortOrder.ASC);
}
private static List<Object> parseRowKeyToValues(byte[] rowKey, List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
List<Object> decoded = new ArrayList<>(primaryKeyTypes.size());
int pos = 0;
for (int i = 0; i < primaryKeyTypes.size(); i++) {
String primaryKeyType = primaryKeyTypes.get(i).toLowerCase();
int start = pos;
if (primaryKeyType.equalsIgnoreCase("varchar")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeString(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.equalsIgnoreCase("boolean")) {
decoded.add(decodeBoolean(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BOOLEAN;
} else if (primaryKeyType.equalsIgnoreCase("tinyint")) {
decoded.add(decodeByte(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_BYTE;
} else if (primaryKeyType.equalsIgnoreCase("smallint")) {
decoded.add(decodeShort(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_SHORT;
} else if (primaryKeyType.equalsIgnoreCase("int")) {
decoded.add(decodeInt(rowKey, pos, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_INT;
} else if (primaryKeyType.equalsIgnoreCase("bigint")) {
decoded.add(decodeLong(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.equalsIgnoreCase("float")) {
decoded.add(decodeFloat(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_FLOAT;
} else if (primaryKeyType.equalsIgnoreCase("double")) {
decoded.add(decodeDouble(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_DOUBLE;
} else if (primaryKeyType.contains("decimal")) {
while (rowKey[pos] != getSeparatorByte(primaryKeySortOrders.get(i))) {
++pos;
}
decoded.add(decodeDecimal(rowKey, start, pos - start, primaryKeySortOrders.get(i)));
++pos;
} else if (primaryKeyType.contains("varbinary")) {
decoded.add(decodeVarbinary(rowKey, start, rowKey.length - start, primaryKeySortOrders.get(i)));
pos = rowKey.length;
} else if (primaryKeyType.contains("date")) {
decoded.add(decodeDate(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("time")) {
decoded.add(decodeTime(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG;
} else if (primaryKeyType.contains("timestamp")) {
decoded.add(decodeTimestamp(rowKey, start, primaryKeySortOrders.get(i)));
pos += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
}
}
return decoded;
}
private static Object decodeString(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.ASC.equals(sortOrder)) {
return new String(bytes, offset, length, StandardCharsets.UTF_8);
} else {
byte[] reversed = new byte[length];
for (int i = 0; i < length; i++) {
reversed[i] = (byte) (bytes[offset + i] ^ 0xFF);
}
return new String(reversed, StandardCharsets.UTF_8);
}
}
private static Object decodeBoolean(byte[] bytes, int offset, SortOrder sortOrder) {
return ((bytes[offset] == 0) == SortOrder.DESC.equals(sortOrder));
}
private static Object decodeByte(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeByte(bytes, offset, sortOrder);
}
private static Object decodeShort(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeShort(bytes, offset, sortOrder);
}
private static Object decodeInt(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeInt(bytes, offset, sortOrder);
}
private static Object decodeLong(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeLong(bytes, offset, sortOrder);
}
private static Object decodeFloat(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeFloat(bytes, offset, sortOrder);
}
private static Object decodeDouble(byte[] bytes, int offset, SortOrder sortOrder) {
return DataTypeUtils.decodeDouble(bytes, offset, sortOrder);
}
private static Object decodeDecimal(byte[] bytes, int offset, int length, SortOrder sortOrder) {
if (SortOrder.DESC.equals(sortOrder)) {
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder, DataType.DECIMAL_V2);
}
return DataTypeUtils.decodeDecimal(bytes, offset, length, sortOrder);
}
private static Object decodeVarbinary(byte[] bytes, int offset, int length, SortOrder sortOrder) {
byte[] result = new byte[length];
if (SortOrder.DESC.equals(sortOrder)) {
DataTypeUtils.invert(bytes, offset, result, 0, length);
} else {
System.arraycopy(bytes, offset, result, 0, length);
}
return result;
}
private static Object decodeDate(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Date(result);
}
private static Object decodeTime(byte[] bytes, int offset, SortOrder sortOrder) {
Long result = (Long) decodeLong(bytes, offset, sortOrder);
return new Time(result);
}
private static Object decodeTimestamp(byte[] bytes, int offset, SortOrder sortOrder) {
long ms = DataTypeUtils.decodeLong(bytes, offset, sortOrder);
int nanos = DataTypeUtils.decodeUnsignedInt(bytes, offset + Bytes.SIZEOF_LONG, sortOrder);
Timestamp ts = new Timestamp(ms);
ts.setNanos(ts.getNanos() + nanos);
return ts;
}
private static byte getSeparatorByte(SortOrder sortOrder) {
if (sortOrder == SortOrder.DESC) {
return (byte) (0xFF);
} else {
return (byte) 0;
}
}
public enum RowKeyEncodedType {
STRING,
HEX;
public static RowKeyEncodedType getFormatterType(List<String> primaryKeyTypes, List<SortOrder> primaryKeySortOrders) {
int n = primaryKeyTypes.size();
for (int i = 0; i < n; i++) {
String primaryKeyType = primaryKeyTypes.get(i);
SortOrder sortOrder = primaryKeySortOrders.get(i);
if (!(primaryKeyType.equalsIgnoreCase("varchar") || primaryKeyType.equalsIgnoreCase("hstring"))
|| SortOrder.DESC.equals(sortOrder)) {
return RowKeyEncodedType.HEX;
}
}
return RowKeyEncodedType.STRING;
}
}
}