存在多种方式可以将日志服务SLS的数据写入至Hologres,本文以Flink、DataWorks数据集成为例,为您介绍如何将SLS数据实时写入至Hologres。
前提条件
开通SLS服务并创建Project和Logstore,详情请参见快速入门。
开通Hologres服务并连接至开发工具,详情请参见实时数仓Hologres使用流程。
如果选择通过Flink将SLS数据写入Hologres,请开通实时计算Flink版服务并创建项目空间,详情请参见开通Flink全托管和创建与管理项目空间。
如果选择通过DataWorks数据集成将SLS数据写入Hologre,请开通DataWorks服务并创建工作空间,详情请参见开通DataWorks服务和创建工作空间。
背景信息
日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。日志服务一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能,全面提升您在研发、运维、运营、安全等场景的数字化能力。
Hologres致力于高性能、高可靠、低成本、可扩展的实时计算引擎研发,为用户提供海量数据的实时数据仓库解决方案和亚秒级交互式查询服务,广泛应用在实时数据中台建设、精细化分析、自助式分析、营销画像、人群圈选、实时风控等场景。您可以将SLS的数据快速写入Hologres中,进行实时分析实时查询,提升业务对数据的探索能力。
通过Flink将SLS数据写入Hologres
准备SLS数据。
本次准备的SLS数据来源于SLS日志平台的模拟数据,模拟游戏登录和消费日志的数据。如果您有业务数据,请直接使用业务数据。
创建Hologres表。
在Hologres中创建用于接收数据的表。您可以根据业务查询需求,为对应的字段创建索引,以提升查询效率。索引介绍请参见建表概述。本次示例建表DDL如下。
CREATE TABLE sls_flink_holo ( content JSONB , operation TEXT, uid TEXT, topic TEXT , source TEXT , c__timestamp TIMESTAMPTZ, receive_time BIGINT, PRIMARY KEY (uid) );
通过Flink写入数据。
在Flink中将SLS数据写入Hologres可以参考如下文档:
Flink读取SLS数据:日志服务SLS源表。
Flink写入Hologres:实时数仓Hologres结果表。
本次SLS数据通过Flink写入Hologres的SQL作业示例如下,其中JSON类型字段直接写入Hologres JSON类型中,因为Flink中没有JSON类型,使用VARCHAR类型代替。
CREATE TEMPORARY TABLE sls_input ( content STRING, operation STRING, uid STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` BIGINT METADATA VIRTUAL, `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL ) WITH ( 'connector' = 'sls', 'endpoint' = 'sls私域endpoint',--sls私域endpoint 'accessid' = 'access id',--账号access id 'accesskey' = 'access key',--账号access key 'starttime' = '2024-08-30 00:00:00',--消费日志的开始时间 'project' = 'project name',--sls的project名 'logstore' = 'LogStore名称'--LogStore名称。 ); CREATE TEMPORARY TABLE hologres_sink ( content VARCHAR, operation VARCHAR, uid VARCHAR, topic STRING , source STRING , c__timestamp TIMESTAMP , receive_time BIGINT ) WITH ( 'connector' = 'hologres', 'dbname' = 'holo db nema', --Hologres的数据库名称。 'tablename' = 'holo tablene', --Hologres用于接收数据的表名称。 'username' = 'access id', --当前阿里云账号的AccessKey ID。 'password' = 'access key', --当前阿里云账号的AccessKey Secret。 'endpoint' = 'holo vpc endpoint' --当前Hologres实例VPC网络的Endpoint。 ); INSERT INTO hologres_sink SELECT content, operation, uid, `__topic__` , `__source__` , CAST ( FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP ), CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time FROM sls_input;
查询数据。
在Hologres中查询通过Flink写入Hologres中的SLS数据,后续您可以根据业务需求进行数据开发。
通过DataWorks数据集成将SLS数据写入Hologres
准备SLS数据。
本次准备的SLS数据来源于SLS日志平台的模拟数据,模拟游戏登录和消费日志的数据。如果您有业务数据,请直接使用业务数据。
创建Hologres表。
在Hologres中创建用于接收数据的表。并根据业务查询需求,为对应的字段创建索引,以提升查询效率。索引介绍请参见建表概述。本次示例建表DDL如下。
说明本示例中设置
uid
为主键,保证数据的唯一性,实际设置可以根据业务需求选择。设置
uid
为Distribution Key,数据写入时相同的uid
可以写入至同一个Shard,提高查询性能。
BEGIN; CREATE TABLE sls_dw_holo ( content JSONB , operation TEXT, uid TEXT, C_Topic TEXT , C_Source TEXT , timestamp BIGINT, PRIMARY KEY (uid) ); CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid'); CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp'); COMMIT;
配置数据源。
在进行数据同步之前,使用数据集成功能需要为DataWorks工作空间添加数据源。
SLS数据源使用Loghub数据源,详情请参见配置LogHub(SLS)数据源。
配置Hologres数据源,详情请参见配置Hologres数据源。
实时同步数据。
在数据集成中创建和运行实时同步任务,详情请参见配置单表增量数据实时同步和实时同步任务运维。
本示例创建的实时同步任务,配置输入为Loghub数据源,输出为Hologres数据源,并配置如下字段同步映射关系。
查询数据。
实时同步任务运行开始后,可以在Hologres中查询通过DataWorks数据集成写入Hologres中的SLS数据。