从SLS同步数据

本文为您介绍如何将阿里云日志服务SLS的数据同步至云数据库ClickHouse

前提条件

数据同步链路

云数据库 ClickHouse 企业版同步SLS数据,依赖其内置的Kafka表引擎和物化视图(Materialized View)机制,实现实时数据消费和存储。具体数据链路如下:

  • SLS Projection:需要同步的SLS源数据。

  • 云数据库ClickHouseKafka外表:Kafka表引擎的表。日志服务SLS兼容Kafka协议,云数据库ClickHouse从指定Kafka主题拉取源数据。此表特点如下:

    • Kafka外表默认不能直接查询。

    • Kafka外表只是用来消费Kafka数据,没有真正存储数据,需要通过物化视图将数据加工并插入到目标表中存储。

  • 物化视图:通过Kafka外表读取源数据并执行插入操作,将数据写到云数据库ClickHouse本地表。

  • 本地表:存储同步的数据。

操作步骤

本示例为将日志服务SLS数据同步至云数据库 ClickHouse 企业版集群default库中的sls_dest_table表中。

步骤一:连接云数据库ClickHouse

通过DMS连接ClickHouse

步骤二:创建Kafka外表

语法

CREATE TABLE sls_to_kafka(
  name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
  name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
  ...
)
ENGINE = Kafka()
SETTINGS 
kafka_broker_list = 'Project.Endpoint:Port', # Project为SLS的Project名称;Endponit为访问域名,您可前往SLS控制台,在目标Project的项目概览页面查看;端口号,阿里云内网为10011,公网为10012
kafka_topic_list = 'Logstore-Name', # 为SLS中创建的Logstore名称
kafka_group_name = 'kafka-test', # 自定义
kafka_format = 'JSONEachRow', # 云数据库ClickHouse支持的消息体格式
kafka_sasl_username = 'SLS-Project-Name', # SLS Project 名称
kafka_sasl_password = 'AccessKeyID#AccessKeySecret', # 拼接用户的AK与SK
kafka_security_protocol = 'SASL_SSL', # 必须使用SASL_SSL
kafka_sasl_mechanism = 'PLAIN';  # 必须使用PLAIN

更多参数说明,请参见创建Kafka外表

示例

CREATE TABLE default.sls_src_table
(
    id Int32,
    name String
)
ENGINE = Kafka
SETTINGS 
kafka_broker_list = 'test-log-***-cn-beijing.cn-beijing-intranet.log.aliyuncs.com:10011',
kafka_topic_list = 'test-log-store', 
kafka_group_name = 'kafka-group', 
kafka_format = 'JSONEachRow',
kafka_sasl_username = 'test-log-***-cn-beijing',
kafka_sasl_password = 'AccessKeyID#AccessKeySecret',
kafka_security_protocol = 'sasl_ssl',
kafka_sasl_mechanism = 'PLAIN';

步骤三:创建目标存储表

建表语法,请参见CREATE TABLE

示例

CREATE TABLE default.sls_dest_table
(
    id Int32,
    name String
)
ENGINE = SharedMergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192

步骤四:创建物化视图

建表语法,请参见CREATE MATERIALIZED VIEW

示例

-- sls_log_mv为视图名称
-- sls_dest_table为用于存储Kafka数据的目标表,此处为本地表。
-- sls_src_table为kafka外表。
CREATE MATERIALIZED VIEW default.sls_log_mv TO default.sls_dest_table
(
    id Int32,
    name String
)
AS SELECT
    id,
    name
FROM default.sls_src_table