This topic describes how to access data in Lindorm Column Store.
Background information
- Analytics
The Lindorm compute engine can access data in the column store to perform interactive analysis and offline computing on massive datasets. The column store provides rich indexing capabilities and data distribution features. This can effectively accelerate data location and arrangement during computation. You can use SQL statements to create, read, update, and delete massive amounts of primary key data.
- High throughput
The throughput capacity of the column store engine supports horizontal scaling. It provides read and write capabilities for terabytes of data per minute. This is suitable for high-throughput data scenarios, such as fast data import for IoV, access to training datasets for models, and large-scale report analysis.
- Low cost
Lindorm Column Store uses technologies such as high-ratio compression algorithms for column formats, high-density low-cost media, hot and cold data separation, multiple encoding methods for compression, and Cold Archive. This significantly reduces storage costs compared to self-built systems. It meets low-cost storage requirements for archiving and retaining massive amounts of data.
- High availability
Lindorm Column Store uses technologies such as erasure coding to ensure the high availability of distributed datasets. It also ensures that there is no single point of failure for data access.
Notes
The feature for accessing data in Lindorm Column Store is currently in invitational preview. If you want to use this feature, contact Lindorm technical support (DingTalk ID: s0s3eg3).
Prerequisites
- You have read Usage notes.
- Complete the required operations based on your job type:
- For Java Database Connectivity (JDBC) development, see JDBC development practices.
- For JAR job development, see JAR job development practices.
- For Python job development, see Python job development practices.
Features
DDL
Namespaces
Create a namespace (database)
USE lindorm_columnar;
CREATE NAMESPACE mydb;Delete a namespace (database)
USE lindorm_columnar;
DROP NAMESPACE mydb;DROP NAMESPACE operation deletes the specified namespace and all tables within it. Before you run the DROP NAMESPACE operation, ensure that the data in the namespace is backed up to prevent data loss.Tables
Create a table
USE lindorm_columnar;
CREATE TABLE mydb.mytable (
id INT,
name STRING,
score INT,
city STRING)
PARTITIONED BY (city, bucket(128,id))
TBLPROPERTIES(
'lce.primaryKey' = 'id,city');Parameter description
Primary key fields
Column-oriented tables support the following field types: BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, and BINARY. You can specify any data fields when you create a table.
Primary key
When you create a table, you must set the lce.primaryKey parameter in TBLPROPERTIES to specify the primary key fields for the table.
- Separate multiple primary key fields with commas.
- The primary key of a column-oriented table must be unique.
- If data with the same primary key is written multiple times, the new data overwrites the old data.
Data partitioning method
When you create a table, you can use PARTITIONED BY([regular partition expression],{bucket(bucketNum,bucketCol(,...))} to specify the data partitioning method.
- Bucket partition expression
- bucketNum is the number of shards. It directly affects the concurrency of data writes and scans.Note Different bucket partitions have different partition numbers (bucket_index). `bucketNum` determines the number of bucket partitions under a regular partition.
- The bucket partition number is calculated by taking the hash value of the partition fields and then performing a modulo operation with `bucketNum`. For example, for the sample table
mydb.mytable,bucket_index=hash(id)%128. - For each different `bucket_index`, the underlying storage is physically divided. Before you create a table, evaluate the total data volume and set `bucketNum` to a reasonable value. This ensures that the data volume of a single bucket partition is between 50 MB and 512 MB.
- The bucket partition number is calculated by taking the hash value of the partition fields and then performing a modulo operation with `bucketNum`. For example, for the sample table
- `bucketColN` (`bucketCol(,...))`) specifies the bucket partition fields.Important
Bucket partition fields must follow these rules:
- The bucket partition fields must be from the primary key fields.
- To avoid data skew, ensure that the bucket partition fields have sufficient discrete features.
Examples
Specify only bucket partitions when you create a table.
- Example 1:
USE lindorm_columnar; CREATE TABLE mydb.mytable ( id0 INT, id1 STRING, name STRING, score DOUBLE) PARTITIONED BY (bucket(1024,id1)) TBLPROPERTIES( 'lce.primaryKey' = 'id0, id1') - Example 2:
USE lindorm_columnar; CREATE TABLE mydb.mytable ( id INT, timestamp LONG, name STRING, sore DOUBLE) PARTITIONED BY (bucket(512,timestamp)) TBLPROPERTIES( 'lce.primaryKey' = 'id,timestamp')
- bucketNum is the number of shards. It directly affects the concurrency of data writes and scans.
- Regular partition expressionFor each distinct value of the regular partition expression, the underlying storage is physically divided. This ensures data scan pruning capabilities.Important
- The fields in different regular partition expressions must be from the primary key fields.
- Ensure that the values of the regular partition expression are relatively concentrated. Common regular partition fields include date, city, and gender. If the values of your regular partition expression are too discrete, such as timestamps, it will cause excessive pressure on the column store metadata.
Examples
Specify both regular and bucket partitions when you create a table.
- Example 1:
USE lindorm_columnar; CREATE TABLE mydb.mytable ( id INT, year STRING, month STRING, day STRING, name STRING, score DOUBLE) PARTITIONED BY (year, month, day, bucket(1024,id)) TBLPROPERTIES( 'lce.primaryKey' = 'id, year, month, day') - Example 2:
USE lindorm_columnar; CREATE TABLE mydb.mytable ( id INT, date STRING, city STRING, name STRING, score DOUBLE) PARTITIONED BY (date, city, bucket(1024,id)) TBLPROPERTIES( 'lce.primaryKey' = 'id,date,city')
View tables in the current namespace
USE lindorm_columnar;
USE mydb;
SHOW TABLES;View an existing table
USE lindorm_columnar;
SHOW CREATE TABLE mydb.mytable;
DESC mydb.mytable;Delete a specified table
USE lindorm_columnar;
DROP TABLE mydb.mytable;Delete the content of a table but keep its structure
USE lindorm_columnar;
TRUNCATE TABLE mydb.mytable;DML
Tables
Insert data into a table
USE lindorm_columnar;
INSERT INTO mydb.mytable VALUES (0, 'zhang3', 99, 'beijing');USE lindorm_columnar;
INSERT INTO mydb.mytable SELECT id, name, score, city FROM another_table;Query data in a table
USE lindorm_columnar;
SELECT * from mydb.mytable where id=0;USE lindorm_columnar;
SELECT count(1), sum(score) from mydb.mytable where city = 'beijing';Partitions
Delete a partition
USE lindorm_columnar;
ALTER TABLE mydb.mytable DROP PARTITION (city = 'beijing', _bucketIndex = '*')_bucketIndex="*". This deletes all bucket partitions under a specific regular partition.Partition Compaction
After you write data to a column store partition for a period of time, you can execute the COMPACT command. This command compacts the partition data, reduces data redundancy, and improves query performance.
USE lindorm_columnar;
COMPACT mydb.mytable PARTITION (city = 'beijing') FORCE MAJORUSE lindorm_columnar;
COMPACT mydb.mytable FORCE MAJORBest practices
You can use the following methods to accelerate data queries or computations.
Query data by primary key
If a table contains a massive dataset, you can specify filter conditions on the primary key to accelerate queries. The smaller the data range of the primary key you set, the better the acceleration.
Assume that the table schema is as follows:
USE lindorm_columnar;
CREATE TABLE orders
(
o_orderkey int,
o_custkey int,
o_orderstatus string,
o_totalprice double ,
o_orderdate string ,
o_orderpriority string,
o_clerk string,
o_shippriority int,
o_comment string
)
PARTITIONED BY (bucket(1024,o_orderkey,o_custkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderkey,o_custkey'
); USE lindorm_columnar;
SELECT * FROM orders WHERE o_orderkey=18394 AND o_custkey=81772;USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey>100000 AND o_orderkey<200000;USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey>100000 AND o_custkey<99;USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_orderkey>100000;Add a partition filter
In the Lindorm Column Store engine, different partitions are physically isolated from each other. Therefore, you can add partition filter conditions to accelerate data queries.
Assume that the table schema is as follows:
USE lindorm_columnar;
CREATE TABLE orders
(
o_orderkey int,
o_custkey int,
o_orderstatus string,
o_totalprice double ,
o_orderdate string ,
o_orderpriority string,
o_clerk string,
o_shippriority int,
o_comment string
)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey,o_custkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderdate,o_orderkey,o_custkey'
);USE lindorm_columnar;
SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate='2022-01-01' GROUP BY o_orderdate;USE lindorm_columnar;
SELECT o_orderdate, count(*) FROM orders WHERE o_orderdate>='2022-01-01' AND o_orderdate<='2022-01-07' GROUP BY o_orderdate;Query by non-primary key condition
For the partition compaction process, you can configure the lce.compact.major.splitKey parameter when you create a table to specify secondary sort keys. In addition to the primary key, data is sorted again based on several other columns. This accelerates queries that use non-primary key conditions.
Assume that the table schema is as follows:
USE lindorm_columnar;
CREATE TABLE orders
(
o_orderkey int,
o_custkey int,
o_orderstatus string,
o_totalprice double ,
o_orderdate string ,
o_orderpriority string,
o_clerk string,
o_shippriority int,
o_comment string
)
PARTITIONED BY (bucket(1024,o_orderkey,o_custkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderkey,o_custkey',
'lce.compact.major.splitKey' = 'o_shippriority,o_totalprice'
);COMPACT orders FORCE MAJOR;You can use the following SQL statements to query data in the compacted table.
USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority=0;USE lindorm_columnar;
SELECT count(*) FROM orders WHERE o_shippriority=0 AND o_totalprice>999.9;Associate data
The bucket partitioning method in a column-oriented table affects the data distribution. For two massive datasets, you can define the same bucket partitioning method based on the association conditions. This accelerates the contextual computing process.
Assume that the table schemas are as follows:
USE lindorm_columnar;
# Schema of the orders0 table:
CREATE TABLE orders0
(
o_orderkey int,
o_custkey int,
o_orderstatus string,
o_totalprice double ,
o_orderdate string ,
o_orderpriority string,
)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderdate,o_orderkey'
);
# Schema of the orders1 table:
CREATE TABLE orders1
(
o_orderkey int,
o_custkey int,
o_orderdate string ,
o_clerk string,
o_shippriority int,
o_comment string
)
PARTITIONED BY (o_orderdate, bucket(1024,o_orderkey))
TBLPROPERTIES(
'lce.primaryKey' = 'o_orderdate,o_orderkey'
);In the `orders0` and `orders1` tables, the bucket partitioning method for each day (`o_orderdate`) is the same: bucket(1024,o_orderkey). Therefore, associating the data of the two tables for a single day based on the bucket partition field (`o_orderkey`) provides better acceleration.
The following is an example of an association query:
USE lindorm_columnar;
SELECT * FROM orders0 a
JOIN
orders1 b
ON a.o_orderkey=b.o_orderkey AND a.o_orderdate=b.o_orderdate
WHERE o_orderdate='2022-01-01';Accelerate queries
Compacting a specified table or a specified partition in a table can improve data order or compactness. This improves data scan performance.
CREATE TABLE hello_tbl
(id int, city string, name string, score int)
partitioned by (city, hash(4, id))
tblproperties('lce.primaryKey' = 'id,city');COMPACT hello_tbl FORCE MAJOR;COMPACT hello_tbl PARTITION (city='beijing') FORCE MAJOR;After the compaction is complete, if you want to further improve the efficiency of subsequent queries, you can execute the following statements to set the relevant table parameters to accelerate subsequent queries:
ALTER TABLE hello_tbl SET TBLPROPERTIES ('lce.scan.majorCompactFilesOnly' = true);- true: Queries only the compacted data and ignores incrementally written data that has not been compacted.
- false: The default value. Queries all data.
ALTER TABLE hello_tbl SET TBLPROPERTIES ('lce.scan.enableVectorizedReader' = true);- true: Uses vectorization to accelerate data queries. This parameter takes effect only when lce.scan.majorCompactFilesOnly is set to `true`.
- false: The default value. Does not use vectorization to accelerate data queries.