云原生多模数据库Lindorm计算引擎提供Lindorm CDC(Change Data Capture)数据源,您可以通过该数据源使用Spark SQL查询Lindorm实例其他引擎变更的数据。

前提条件

  • Lindorm实例已开通LTS(Lindorm Tunnel Service)服务,具体请参见购买并登录LTS
  • Lindorm宽表已创建数据订阅通道,创建方法请参见通过Pull模式创建数据订阅通道
    说明 创建数据订阅通道时需要注意以下几点:
    • 不要勾选在投递的数据内容中为列名省略family前缀
    • 数据序列化格式中请选择json
    • 一个主题名仅对应一个Lindorm表名
  • 为HBase表设置LINDORM_HBASE_CATALOG属性,设置方法请参见访问宽表数据
    说明 LINDORM_HBASE_CATALOG属性表示SparkSQL Schema与HBase表Schema的对应关系,Lindorm CDC数据源根据该属性的内容提取HBase表Schema。

使用限制

  • 仅支持HBase表(表示通过HBase客户端写入Lindorm宽表引擎的表)。
  • 实时数据订阅功能的数据消费格式仅支持json格式文件。

作业提交方式

您可以通过以下两种方法编写并提交Lindorm CDC数据源的Spark作业。
说明 读写Lindorm CDC数据源的语法说明请参见Lindorm CDC数据源配置介绍与语法

Lindorm CDC数据源配置介绍与语法

Lindorm CDC数据源的库表结构介绍

  • Lindorm计算引擎提供的Lindorm CDC数据源名称为lindorm_cdc。
  • Lindorm CDC数据源不支持管理Namespace,仅支持管理表。表名称为创建数据订阅通道的主题名

Lindorm CDC数据源输出的Schema介绍

Lindorm CDC数据源根据LINDORM_HBASE_CATALOG属性提取HBase表的Schema作为Lindorm CDC数据源的Schema。Lindorm CDC数据源从Kafka中读取数据,每一条操作记录都会被保存。Lindorm CDC数据源的Schema支持以下meta字段:
字段名类型说明字段配置值
_cdc_timestamp_kafkalong该操作记录写入Kafka的时间戳。单位为毫秒。无需配置,Schema中包含默认配置值。
_cdc_operation_typestring该操作记录的变更类型。
  • C:新增数据操作。
  • U:更新数据操作。
  • D:删除数据操作。
无需配置,Schema中包含默认配置值。
_cdc_timestamp_lindormlong该操作记录被Lindorm引擎处理的时间戳。单位为毫秒。spark.sql.catalog.lindorm_cdc.lindormTsEnabled
_cdc_timestamp_ltslong该操作记录被LTS处理的时间戳。单位为毫秒。spark.sql.catalog.lindorm_cdc.ltsTsEnabled

Lindorm CDC数据源的配置项介绍

Lindorm CDC数据源的配置项如下表所示:
配置项是否必填说明示例值
spark.sql.catalog.lindorm_cdc.username
  • 必填:提交JAR作业或者Python作业。
  • 非必填(系统自动添加):提交SQL作业。
连接Lindorm宽表引擎的用户名。默认用户名root。
spark.sql.catalog.lindorm_cdc.password
  • 必填:提交JAR作业或者Python作业。
  • 非必填(系统自动添加):提交SQL作业。
连接Lindorm宽表引擎的密码。默认密码root。
spark.sql.catalog.lindorm_cdc.lindormTsEnabled非必填Lindorm处理该操作记录的时间戳。默认值为false。设置为true时Lindorm CDC数据源的Schema中会增加_cdc_timestamp_lindorm字段。true
spark.sql.catalog.lindorm_cdc.ltsTsEnabled非必填LTS处理该操作记录的时间戳。默认值为false。设置为true时Lindorm CDC数据源的Schema中会增加_cdc_timestamp_lts字段。true

Lindorm CDC数据源的语法介绍

Lindorm CDC数据源支持的语法如下表所示:
语法描述示例
USE table_name使用某个表。USE test
SHOW TABLES查看所有的表。SHOW TABLES
DESCRIBE table_name查看某个表的详细信息。DESC test或者DESCRIBE test
SELECT关于SELECT语法请参见Spark SQL
说明 SELECT语法使用中需要注意以下两点:
  • 必须使用_cdc_timestamp_kafka > $startTimestamp and _cdc_timestamp_kafka < $endTimestamp指定Kafka中的数据读取范围。
  • 对于_cdc_operation_type=D的字段,仅显示rowKey对应的字段,其他字段为空。