本文为您介绍如何将阿里云日志服务SLS的数据同步至云数据库ClickHouse。
前提条件
云数据库ClickHouse:
日志服务SLS:
数据同步链路
云数据库 ClickHouse 企业版同步SLS数据,依赖其内置的Kafka表引擎和物化视图(Materialized View)机制,实现实时数据消费和存储。具体数据链路如下:
SLS Projection:需要同步的SLS源数据。
云数据库ClickHouse的Kafka外表:Kafka表引擎的表。日志服务SLS兼容Kafka协议,云数据库ClickHouse从指定Kafka主题拉取源数据。此表特点如下:
Kafka外表默认不能直接查询。
Kafka外表只是用来消费Kafka数据,没有真正存储数据,需要通过物化视图将数据加工并插入到目标表中存储。
物化视图:通过Kafka外表读取源数据并执行插入操作,将数据写到云数据库ClickHouse本地表。
本地表:存储同步的数据。
操作步骤
本示例为将日志服务SLS数据同步至云数据库 ClickHouse 企业版集群default库中的sls_dest_table表中。
步骤一:连接云数据库ClickHouse
步骤二:创建Kafka外表
语法
CREATE TABLE sls_to_kafka(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'Project.Endpoint:Port', # Project为SLS的Project名称;Endponit为访问域名,您可前往SLS控制台,在目标Project的项目概览页面查看;端口号,阿里云内网为10011,公网为10012
kafka_topic_list = 'Logstore-Name', # 为SLS中创建的Logstore名称
kafka_group_name = 'kafka-test', # 自定义
kafka_format = 'JSONEachRow', # 云数据库ClickHouse支持的消息体格式
kafka_sasl_username = 'SLS-Project-Name', # SLS Project 名称
kafka_sasl_password = 'AccessKeyID#AccessKeySecret', # 拼接用户的AK与SK
kafka_security_protocol = 'SASL_SSL', # 必须使用SASL_SSL
kafka_sasl_mechanism = 'PLAIN'; # 必须使用PLAIN更多参数说明,请参见创建Kafka外表。
示例
CREATE TABLE default.sls_src_table
(
id Int32,
name String
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'test-log-***-cn-beijing.cn-beijing-intranet.log.aliyuncs.com:10011',
kafka_topic_list = 'test-log-store',
kafka_group_name = 'kafka-group',
kafka_format = 'JSONEachRow',
kafka_sasl_username = 'test-log-***-cn-beijing',
kafka_sasl_password = 'AccessKeyID#AccessKeySecret',
kafka_security_protocol = 'sasl_ssl',
kafka_sasl_mechanism = 'PLAIN';步骤三:创建目标存储表
建表语法,请参见CREATE TABLE。
示例
CREATE TABLE default.sls_dest_table
(
id Int32,
name String
)
ENGINE = SharedMergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192步骤四:创建物化视图
建表语法,请参见CREATE MATERIALIZED VIEW。
示例
-- sls_log_mv为视图名称
-- sls_dest_table为用于存储Kafka数据的目标表,此处为本地表。
-- sls_src_table为kafka外表。
CREATE MATERIALIZED VIEW default.sls_log_mv TO default.sls_dest_table
(
id Int32,
name String
)
AS SELECT
id,
name
FROM default.sls_src_table该文章对您有帮助吗?