本篇主要介绍如何通过Cassandra CQL访问Lindorm宽表引擎。
使用cqlsh访问Lindorm
cqlsh是Cassandra社区提供的Shell客户端,提供windows以及类Unix版本,该工具存在于Cassandra安装包内。用户可以在本地或阿里云ECS上下载Cassandra安装包,通过cqlsh工具可以完全兼容的访问Lindorm。
操作前请确认已完成SDK安装且已获取集群的连接地址。
- 下载和安装cqlsh。
下载最新版本的Cassandra安装包然后解压,即可完成安装。例如:下载3.11.4版本(请先在官网上确认版本号),操作如下:
$ wget http://mirror.bit.edu.cn/apache/cassandra/3.11.4/apache-cassandra-3.11.4-bin.tar.gz $ tar -zxf apache-cassandra-3.11.4-bin.tar.gz $ cd apache-cassandra-3.11.4
- 启动cqlsh。
使用Lindorm控制台查看CQL访问方式的地址和账户密码,其中端口默认是9042,然后使用如下命令连接Lindorm:
bin/cqlsh $host $port -u $username -p $password
如果您需要经常连接到特定节点,您可以将节点的地址和端口信息保存到环境变量$CQLSH_HOST
和$CQLSH_PORT
中。更多关于 cqlsh 命令支持的参数可以使用bin/cqlsh -help
。 - 基本cqlsh命令。
Lindorm现在支持多种cqlsh的访问命令:
Documented shell commands:=========================== CAPTURE COPY DESCRIBE LOGIN DESC EXIT HELP PAGING SHOW CQL help topics:================ CREATE_KEYSPACE TEXT ALTER_KEYSPACE TIME CREATE_ROLE DROP_USER TIMESTAMP ALTER_TABLE CREATE_TABLE GRANT ALTER_USER INSERT UPDATE CREATE_USER INSERT_JSON USE ASCII DATE INT UUID BATCH DELETE JSON BEGIN KEYWORDS BLOB DROP_COLUMNFAMILY LIST_PERMISSIONSBOOLEAN LIST_ROLES COUNTER DROP_INDEX LIST_USERS DROP_KEYSPACE PERMISSIONS CREATE_COLUMNFAMILY REVOKE DROP_ROLE SELECT CREATE_INDEX DROP_TABLE SELECT_JSON
- 通过 cqlsh 创建 keyspace。
Lindorm 兼容Cassandra中的keyspace和关系型数据库中的database概念比较类似,一个 keyspace 可以包含一个或多个 tables 或 column families。当您启动 cqlsh 时没有指定 keyspace,那么命令提示符为 cqlsh>,您可以使用 CREATE KEYSPACE 命令来创建 keyspace,具体如下:
cqlsh> CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
对于Lindorm用户来说CREATE KEYSPACE 概念中的strategy 、replication_factor和 Cassandra中的概念类似,但是replication_factor 默认都是2,上述建完KEYSPACE 后可以通过DESCRIBE KEYSPACE 来查看keyspace的信息。
cqlsh> DESCRIBE KEYSPACE test_keyspace; cqlsh> CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
现在您可以使用 USE 命令来切换到这个 keyspace :
cqlsh> USE test_keyspace; cqlsh:test_keyspace>
- 通过 cqlsh 创建表。
cqlsh> use test_keyspace; cqlsh:test_keyspace> CREATE TABLE test_user (first_name text , last_name text, PRIMARY KEY (first_name)) ;
上述命令表示在 test_keyspace 下面创建了一张名为 test_user 的表。其中包含了 first_name 和 last_name 两个字段,类型都是 text,并且 first_name 是这张表的 PRIMARY KEY。当然,您也可以通过下述命令在 test_keyspace 里面建表。
cqlsh> CREATE TABLE test_keyspace.test_user(first_name text , last_name text, PRIMARY KEY (first_name)) ;
查看该表的schema信息:
cqlsh:test_keyspace> DESCRIBE TABLE test_user; CREATE TABLE test_keyspace.test_user ( first_name text PRIMARY KEY, last_name text ) WITH bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99PERCENTILE'; cqlsh:test_keyspace>
DESCRIBE TABLE 命令会将建表语句以格式化的形式显示出来,除了您制定的设置,还包含了许多默认的设置。Lindorm表级别的配置和Cassandra有部分属性是不一样的,对于不一样的属性使用Cassandra CQL 默认设置展示。如下配置不具有Lindorm使用意义:
crc_check_chance、gc_grace_seconds、read_repair_chance、speculative_retry、dclocal_read_repair_chance、crc_check_chance
其他配置长期会和CQL属性完全兼容,现阶段如下属性完全兼容:
compression:支持LZ4/SNAPPY/ZSTD default_time_to_live:支持表级别TTL
- 通过 cqlsh 读写数据。
往表里面插入一些数据:
cqlsh:test_keyspace> INSERT INTO test_user (first_name, last_name) VALUES ('test', 'LINDORM'); cqlsh:test_keyspace> INSERT INTO test_user (first_name, last_name) VALUES ('Zhang', 'San');
上述语句表示往 test_user 表中插入三条数据,其中最后一条数据只指定了 key,last_name 没有值。您可以使用 SELECT COUNT 语句查看数据是否插入成功,但是SELECT COUNT 不建议在海量数据上使用。
cqlsh:test_keyspace> SELECT COUNT(*) FROM test_user; count ------- 2 (1 rows)
通过命令的输出查看已成功插入数据。您还可以使用下述命令查询这条数据:
cqlsh:test_keyspace> SELECT * FROM test_user; first_name | last_name ------------+----------- test | LINDORM Zhang | San (3 rows) cqlsh:test_keyspace> SELECT * FROM test_user WHERE first_name='test'; first_name | last_name ------------+----------- test | LINDORM (1 rows)
- 删除列或行。
使用 DELETE 命令删除一些列。例如,删除 last_name 列:
cqlsh:test_keyspace> DELETE last_name FROM test_user WHERE first_name='test'; cqlsh:test_keyspace> SELECT * FROM test_user WHERE first_name='test'; first_name | last_name ------------+----------- (0 rows)
使用 DELETE 命令删除一整行的数据:
cqlsh:test_keyspace> DELETE FROM test_user WHERE first_name='test'; cqlsh:test_keyspace> SELECT * FROM test_user WHERE first_name='test'; first_name | last_name ------------+----------- (0 rows) cqlsh:test_keyspace>
- 清空或删除表。
如果您需要清空一张表,您可以使用 TRUNCATE 命令或 DROP TABLE 命令,只有super 用户可以具备truncate 和 drop keyspace/table 权限,例如:
cqlsh:test_keyspace> TRUNCATE test_user; cqlsh:test_keyspace> DROP TABLE test_user;
使用Cassandra CQL Java Driver 访问Lindorm
- 创建连接。
String[] contactPoints = new String[]{ "ip"//从控制台获取的连接地址 }; Cluster cluster = Cluster.builder() .addContactPoints(contactPoints) // 填写账户名密码(控制台获取) .withAuthProvider(new PlainTextAuthProvider(username, password)) .build(); cluster.init();//初始化集群连接,会建立控制 Session session = cluster.connect();//初始化连接session,不能每个请求创建一个Session。合理的应该是每个进程预先创建若干个。
- 使用Driver执行操作。
- DDL操作
// 创建keyspace,指定对应strategy, replication factor。 session.execute( "CREATE KEYSPACE IF NOT EXISTS testKeyspace WITH replication " + "= {'class':'SimpleStrategy', 'replication_factor':1};"); // 创建table,给table指定对应的primary key 以及cluster key 和regular key session.execute( "CREATE TABLE IF NOT EXISTS testKeyspace.testTable (" + "id int PRIMARY KEY," + "name text," + "age int," + "address text" + ");"); //清空表 session.execute("TRUNCATE TABLE testKeyspace.testTable;"); //删除表 session.execute("DROP TABLE testKeyspace.testTable ");
- DML操作
// 执行insert 操作 session.execute( "INSERT INTO testKeyspace.testTable (id, name, age, address) " + "VALUES (" + "1," + "'testname'," + "11," + "'hangzhou');"); // 执行select 操作,这里select * 表示获取所有列,也可以指定需要select 的列名获取对应列数据 ResultSet res = session.execute( "SELECT * FROM testKeyspace.testTable ;"); // 如果想要获取每一列对应的数据,可以如下操作 for (Row row : results) { int id = row.getInt("id"); String name = row.getString("name"); int age = row.getInt("age"); String address = row.getString("address"); } // 关闭Session session.close(); // 关闭Cluster cluster.close();
- DDL操作
使用Cassandra CQL 多语言 Driver 访问Lindorm
此处会介绍多语言Driver(Python、C++)通过Cassandra CQL 访问Lindorm的例子,其他访问方式可以参考社区文档。
- Cassandra CQL Python Driver访问Lindorm
安装Datastax Python SDK库
# 指定版本安装(建议安装3.x版本) pip install cassandra-driver==3.19.0 # 安装最新版本 pip install cassandra-driver # https://pypi.org/project/cassandra-driver/#history
编写Python访问代码
#!/usr/bin/env python # -*- coding: UTF-8 -*- import logging import sysfrom cassandra.cluster import Clusterfrom cassandra.auth import PlainTextAuthProvider logging.basicConfig(stream=sys.stdout, level=logging.INFO) cluster = Cluster( # 此处填写数据库连接点地址(公网或者内网的) contact_points=["ip"], # 填写账户名密码(如果忘记可以在 账号管理 处重置 auth_provider=PlainTextAuthProvider("cassandra", "123456")) session = cluster.connect() # 建keyspace session.execute( "CREATE KEYSPACE IF NOT EXISTS testKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};"); # 建table session.execute( "CREATE TABLE IF NOT EXISTS testKeyspace.testTable (id int PRIMARY KEY, name text,age int,address text);"); #执行写 session.execute( "INSERT INTO testKeyspace.testTable (id, name, age, address) VALUES ( 1, 'testname', 11, 'hangzhou');"); #读操作 rows = session.execute( "SELECT * FROM testKeyspace.testTable ;"); # 打印每行信息到控制台 for row in rows: print("# row: {}".format(row)) # 关闭Session session.shutdown() # 关闭 Clustercluster.shutdown()
- Cassandra CQL C++ Driver 访问Lindorm
通过Datastax c++ 链接获取相关平台下的rpm包。
编写访问代码
CassFuture* connect_future = NULL; CassCluster* cluster = cass_cluster_new(); CassSession* session = cass_session_new(); char* hosts = "ip";//控制台获取ip //建立相关连接 cass_cluster_set_contact_points(cluster, hosts); connect_future = cass_session_connect(session, cluster); //DDL操作 if (cass_future_error_code(connect_future) == CASS_OK) { CassFuture* close_future = NULL; const char* query = "SELECT name FROM testKeyspace.testTable "; CassStatement* statement = cass_statement_new(query, 0); CassFuture* result_future = cass_session_execute(session, statement); if (cass_future_error_code(result_future) == CASS_OK) { //获取相关的结果 const CassResult* result = cass_future_get_result(result_future); const CassRow* row = cass_result_first_row(result); if (row) { const CassValue* value = cass_row_get_column_by_name(row, "name"); //打印结果 const char* name; size_t name_length; cass_value_get_string(value, &name, &name_length); printf("release_version: '%.*s'\n", (int)name_length, name); } cass_result_free(result); } else { //异常处理 const char* message; size_t message_length; cass_future_error_message(result_future, &message, &message_length); fprintf(stderr, "Unable to run query: '%.*s'\n", (int)message_length, message); } cass_statement_free(statement); cass_future_free(result_future); //手工释放资源信息 close_future = cass_session_close(session); cass_future_wait(close_future); cass_future_free(close_future); } else { //异常处理 const char* message; size_t message_length; cass_future_error_message(connect_future, &message, &message_length); fprintf(stderr, "Unable to connect: '%.*s'\n", (int)message_length, message); } cass_future_free(connect_future); cass_cluster_free(cluster); cass_session_free(session);