Access Lindorm Column Store

更新时间:
复制 MD 格式

This topic describes how to access data in Lindorm Column Store.

Background information

Lindorm Column Store is a column-oriented distributed storage service designed for massive semi-structured and structured data. It is suitable for large-scale storage scenarios, such as the Internet of Vehicles (IoV), Internet of Things (IoT), orders, and logs. Its core features include the following:
  • Analytics

    The Lindorm compute engine can access data in the column store to perform interactive analysis and offline computing on massive datasets. The column store provides rich indexing capabilities and data distribution features. This can effectively accelerate data location and arrangement during computation. You can use SQL statements to create, read, update, and delete massive amounts of primary key data.

  • High throughput

    The throughput capacity of the column store engine supports horizontal scaling. It provides read and write capabilities for terabytes of data per minute. This is suitable for high-throughput data scenarios, such as fast data import for IoV, access to training datasets for models, and large-scale report analysis.

  • Low cost

    Lindorm Column Store uses technologies such as high-ratio compression algorithms for column formats, high-density low-cost media, hot and cold data separation, multiple encoding methods for compression, and Cold Archive. This significantly reduces storage costs compared to self-built systems. It meets low-cost storage requirements for archiving and retaining massive amounts of data.

  • High availability

    Lindorm Column Store uses technologies such as erasure coding to ensure the high availability of distributed datasets. It also ensures that there is no single point of failure for data access.

Notes

The feature for accessing data in Lindorm Column Store is currently in invitational preview. If you want to use this feature, contact Lindorm technical support (DingTalk ID: s0s3eg3).

Prerequisites

Features

DDL

Namespaces

Create a namespace (database)

USE lindorm_columnar;
CREATE NAMESPACE mydb;

Delete a namespace (database)

USE lindorm_columnar;
DROP NAMESPACE mydb;
Warning DROP NAMESPACE operation deletes the specified namespace and all tables within it. Before you run the DROP NAMESPACE operation, ensure that the data in the namespace is backed up to prevent data loss.

Tables

Create a table

USE lindorm_columnar;
CREATE TABLE mydb.mytable (
  id INT,
  name STRING,
  score INT,
  city STRING)
PARTITIONED BY (city, bucket(128,id))
TBLPROPERTIES(
  'lce.primaryKey' = 'id,city');

Parameter description

Primary key fields

Column-oriented tables support the following field types: BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, and BINARY. You can specify any data fields when you create a table.

Primary key

When you create a table, you must set the lce.primaryKey parameter in TBLPROPERTIES to specify the primary key fields for the table.

The primary key must follow these rules:
  • Separate multiple primary key fields with commas.
  • The primary key of a column-oriented table must be unique.
  • If data with the same primary key is written multiple times, the new data overwrites the old data.

Data partitioning method

When you create a table, you can use PARTITIONED BY([regular partition expression],{bucket(bucketNum,bucketCol(,...))} to specify the data partitioning method.

  • Bucket partition expression
    • bucketNum is the number of shards. It directly affects the concurrency of data writes and scans.
      Note Different bucket partitions have different partition numbers (bucket_index). `bucketNum` determines the number of bucket partitions under a regular partition.
      • The bucket partition number is calculated by taking the hash value of the partition fields and then performing a modulo operation with `bucketNum`. For example, for the sample table mydb.mytable, bucket_index=hash(id)%128.
      • For each different `bucket_index`, the underlying storage is physically divided. Before you create a table, evaluate the total data volume and set `bucketNum` to a reasonable value. This ensures that the data volume of a single bucket partition is between 50 MB and 512 MB.
    • `bucketColN` (`bucketCol(,...))`) specifies the bucket partition fields.
      Important

      Bucket partition fields must follow these rules:

      • The bucket partition fields must be from the primary key fields.
      • To avoid data skew, ensure that the bucket partition fields have sufficient discrete features.

    Examples

    Specify only bucket partitions when you create a table.

    • Example 1:
      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id0 INT, id1 STRING, name STRING, score DOUBLE)
      PARTITIONED BY (bucket(1024,id1))
      TBLPROPERTIES(
        'lce.primaryKey' = 'id0, id1')
    • Example 2:
      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT, timestamp LONG, name STRING, sore DOUBLE)
      PARTITIONED BY (bucket(512,timestamp))
      TBLPROPERTIES(
        'lce.primaryKey' = 'id,timestamp')
  • Regular partition expression
    For each distinct value of the regular partition expression, the underlying storage is physically divided. This ensures data scan pruning capabilities.
    Important
    • The fields in different regular partition expressions must be from the primary key fields.
    • Ensure that the values of the regular partition expression are relatively concentrated. Common regular partition fields include date, city, and gender. If the values of your regular partition expression are too discrete, such as timestamps, it will cause excessive pressure on the column store metadata.

    Examples

    Specify both regular and bucket partitions when you create a table.

    • Example 1:
      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT, year STRING, month STRING, day STRING, name STRING, score DOUBLE)
      PARTITIONED BY (year, month, day, bucket(1024,id))
      TBLPROPERTIES(
        'lce.primaryKey' = 'id, year, month, day')
    • Example 2:
      USE lindorm_columnar;
      CREATE TABLE mydb.mytable (
        id INT, date STRING, city STRING, name STRING, score DOUBLE)
      PARTITIONED BY (date, city, bucket(1024,id))
      TBLPROPERTIES(
        'lce.primaryKey' = 'id,date,city')

View tables in the current namespace

USE lindorm_columnar;
USE mydb;
SHOW TABLES;

View an existing table

You can execute the following SQL statement to view the table schema.
USE lindorm_columnar;
SHOW CREATE TABLE mydb.mytable;
DESC mydb.mytable;

Delete a specified table

USE lindorm_columnar;
DROP TABLE mydb.mytable;

Delete the content of a table but keep its structure

USE lindorm_columnar;
TRUNCATE TABLE mydb.mytable;

DML

Tables

Insert data into a table

Example 1:
USE lindorm_columnar;
INSERT INTO mydb.mytable VALUES (0, 'zhang3', 99, 'beijing');
Example 2:
USE lindorm_columnar;
INSERT INTO mydb.mytable SELECT id, name, score, city FROM another_table;

Query data in a table

Example 1:
USE lindorm_columnar;
SELECT * from mydb.mytable where id=0;
Example 2:
USE lindorm_columnar;
SELECT count(1), sum(score) from mydb.mytable where city = 'beijing';

Partitions

Delete a partition

USE lindorm_columnar;
ALTER TABLE mydb.mytable DROP PARTITION (city = 'beijing', _bucketIndex = '*')
Important When you delete a partition, you must specify all regular partition fields and set _bucketIndex="*". This deletes all bucket partitions under a specific regular partition.

Partition Compaction

After you write data to a column store partition for a period of time, you can execute the COMPACT command. This command compacts the partition data, reduces data redundancy, and improves query performance.

Example 1:
USE lindorm_columnar;
COMPACT mydb.mytable PARTITION (city = 'beijing') FORCE MAJOR
Example 2:
USE lindorm_columnar;
COMPACT mydb.mytable FORCE MAJOR

Best practices

You can use the following methods to accelerate data queries or computations.

Query data by primary key

If a table contains a massive dataset, you can specify filter conditions on the primary key to accelerate queries. The smaller the data range of the primary key you set, the better the acceleration.

Assume that the table schema is as follows:

USE lindorm_columnar;
CREATE TABLE orders
(
o_orderkey       int,
o_custkey        int,
o_orderstatus    string,
o_totalprice     double ,
o_orderdate      string ,
o_orderpriority  string,
o_clerk          string,
o_shippriority   int,
o_comment        string
)
PARTITIONED BY (bucket(1024,o_orderkey,o_custkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderkey,o_custkey'
);           
Example 1:
USE lindorm_columnar;
SELECT * FROM orders WHERE o_orderkey=18394 AND o_custkey=81772;
Example 2:
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey>100000 AND o_orderkey<200000;
Example 3:
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey>100000 AND o_custkey<99;
Example 4:
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey>100000;

Add a partition filter

In the Lindorm Column Store engine, different partitions are physically isolated from each other. Therefore, you can add partition filter conditions to accelerate data queries.

Assume that the table schema is as follows:

USE lindorm_columnar;
CREATE TABLE orders
(
o_orderkey       int,
o_custkey        int,
o_orderstatus    string,
o_totalprice     double ,
o_orderdate      string ,
o_orderpriority  string,
o_clerk          string,
o_shippriority   int,
o_comment        string
)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey,o_custkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderdate,o_orderkey,o_custkey'
);
Example 1:
USE lindorm_columnar;
SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate='2022-01-01' GROUP BY o_orderdate;
Example 2:
USE lindorm_columnar;
SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate>='2022-01-01' AND o_orderdate<='2022-01-07' GROUP BY o_orderdate;

Query by non-primary key condition

For the partition compaction process, you can configure the lce.compact.major.splitKey parameter when you create a table to specify secondary sort keys. In addition to the primary key, data is sorted again based on several other columns. This accelerates queries that use non-primary key conditions.

Assume that the table schema is as follows:

USE lindorm_columnar;
CREATE TABLE orders
(
o_orderkey       int,
o_custkey        int,
o_orderstatus    string,
o_totalprice     double ,
o_orderdate      string ,
o_orderpriority  string,
o_clerk          string,
o_shippriority   int,
o_comment        string
)
PARTITIONED BY (bucket(1024,o_orderkey,o_custkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderkey,o_custkey',
'lce.compact.major.splitKey' = 'o_shippriority,o_totalprice'
);
Execute the following statement to compact the partition:
COMPACT orders FORCE MAJOR;

You can use the following SQL statements to query data in the compacted table.

Example 1:
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority=0;
Example 2:
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority=0 AND o_totalprice>999.9;

Associate data

The bucket partitioning method in a column-oriented table affects the data distribution. For two massive datasets, you can define the same bucket partitioning method based on the association conditions. This accelerates the contextual computing process.

Assume that the table schemas are as follows:

USE lindorm_columnar;
# Schema of the orders0 table:
CREATE TABLE orders0
(
o_orderkey       int,
o_custkey        int,
o_orderstatus    string,
o_totalprice     double ,
o_orderdate      string ,
o_orderpriority  string,
)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderdate,o_orderkey'
);

# Schema of the orders1 table:
CREATE TABLE orders1
(
o_orderkey       int,
o_custkey        int,
o_orderdate      string ,
o_clerk          string,
o_shippriority   int,
o_comment        string
)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderdate,o_orderkey'
);

In the `orders0` and `orders1` tables, the bucket partitioning method for each day (`o_orderdate`) is the same: bucket(1024,o_orderkey). Therefore, associating the data of the two tables for a single day based on the bucket partition field (`o_orderkey`) provides better acceleration.

The following is an example of an association query:

USE lindorm_columnar;
SELECT * FROM orders0 a
JOIN
 orders1 b
ON a.o_orderkey=b.o_orderkey AND a.o_orderdate=b.o_orderdate
WHERE o_orderdate='2022-01-01';

Accelerate queries

Compacting a specified table or a specified partition in a table can improve data order or compactness. This improves data scan performance.

Assume that the table schema is as follows:
CREATE TABLE hello_tbl
(id int, city string, name string, score int)
partitioned by (city, hash(4, id))
tblproperties('lce.primaryKey' = 'id,city');
Example 1: Compact the entire `hello_tbl` table.
COMPACT hello_tbl FORCE MAJOR;
Example 2: Compact a specified partition.
COMPACT hello_tbl PARTITION (city='beijing') FORCE MAJOR;

After the compaction is complete, if you want to further improve the efficiency of subsequent queries, you can execute the following statements to set the relevant table parameters to accelerate subsequent queries:

Example 1:
ALTER TABLE hello_tbl SET TBLPROPERTIES ('lce.scan.majorCompactFilesOnly' = true);
Parameter description
lce.scan.majorCompactFilesOnly: Specifies the data query range. The data type is BOOLEAN. Valid values:
  • true: Queries only the compacted data and ignores incrementally written data that has not been compacted.
  • false: The default value. Queries all data.
Example 2:
ALTER TABLE hello_tbl SET TBLPROPERTIES ('lce.scan.enableVectorizedReader' = true);
Parameter description
lce.scan.enableVectorizedReader: Specifies whether to use vectorization to accelerate data queries. The data type is BOOLEAN. Valid values:
  • true: Uses vectorization to accelerate data queries. This parameter takes effect only when lce.scan.majorCompactFilesOnly is set to `true`.
  • false: The default value. Does not use vectorization to accelerate data queries.