SLS日志服务数据实时同步

存在多种方式可以将日志服务SLS的数据写入至Hologres,本文以Flink、DataWorks数据集成为例,为您介绍如何将SLS数据实时写入至Hologres。

前提条件

背景信息

日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。日志服务一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能,全面提升您在研发、运维、运营、安全等场景的数字化能力。

Hologres致力于高性能、高可靠、低成本、可扩展的实时计算引擎研发,为用户提供海量数据的实时数据仓库解决方案和亚秒级交互式查询服务,广泛应用在实时数据中台建设、精细化分析、自助式分析、营销画像、人群圈选、实时风控等场景。您可以将SLS的数据快速写入Hologres中,进行实时分析实时查询,提升业务对数据的探索能力。

通过Flink将SLS数据写入Hologres

  1. 准备SLS数据。

    本次准备的SLS数据来源于SLS日志平台的模拟数据,模拟游戏登录和消费日志的数据。如果您有业务数据,请直接使用业务数据。

    1. 登录日志服务控制台

    2. 接入数据区域,单击模拟接入

    3. 模拟接入页签,单击游戏操作日志下的模拟

    4. 选择日志空间页面,选择项目Project日志库Logstore后单击下一步

    5. 模拟接入页面,设置范围频率后单击开始导入

    6. 查询模拟产生的字段和数据如下,详情请参见查询和分析日志

      其中content列为JSON类型。模拟数据

  2. 创建Hologres表。

    在Hologres中创建用于接收数据的表。您可以根据业务查询需求,为对应的字段创建索引,以提升查询效率。索引介绍请参见建表概述。本次示例建表DDL如下。

    CREATE TABLE sls_flink_holo (
        content JSONB ,
        operation TEXT,
        uid TEXT,
        topic  TEXT ,
        source TEXT ,
        c__timestamp TIMESTAMPTZ,
        receive_time BIGINT,
        PRIMARY KEY (uid)
      );
  3. 通过Flink写入数据。

    在Flink中将SLS数据写入Hologres可以参考如下文档:

    1. Flink读取SLS数据:日志服务SLS源表

    2. Flink写入Hologres:实时数仓Hologres结果表

    本次SLS数据通过Flink写入Hologres的SQL作业示例如下,其中JSON类型字段直接写入Hologres JSON类型中,因为Flink中没有JSON类型,使用VARCHAR类型代替。

    说明
    • 在Flink中进行SQL作业开发和运行的详细步骤请参见SQL作业开发作业启动

    • SLS有JSON类型的数据,您可以根据业务需求先在Flink中将JSON数据进行解析再写入,也可以将JSON数据直接写入Hologres。

    CREATE TEMPORARY TABLE sls_input (
        content STRING,
        operation STRING,
        uid STRING,
        `__topic__` STRING METADATA VIRTUAL,
        `__source__` STRING METADATA VIRTUAL,
        `__timestamp__` BIGINT METADATA VIRTUAL,
        `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL
      )
    WITH (
        'connector' = 'sls',
        'endpoint' = 'sls私域endpoint',--sls私域endpoint
        'accessid' = 'access id',--账号access id
        'accesskey' = 'access key',--账号access key
        'starttime' = '2024-08-30 00:00:00',--消费日志的开始时间
        'project' = 'project name',--sls的project名
        'logstore' = 'LogStore名称'--LogStore名称。
      );
    
    CREATE TEMPORARY TABLE hologres_sink (
        content VARCHAR,
        operation VARCHAR,
        uid VARCHAR,
        topic  STRING ,
        source STRING ,
        c__timestamp TIMESTAMP ,
        receive_time BIGINT
      )
    WITH (
        'connector' = 'hologres',
        'dbname' = 'holo db nema', --Hologres的数据库名称。
        'tablename' = 'holo tablene', --Hologres用于接收数据的表名称。
        'username' = 'access id', --当前阿里云账号的AccessKey ID。
        'password' = 'access key', --当前阿里云账号的AccessKey Secret。
        'endpoint' = 'holo vpc endpoint' --当前Hologres实例VPC网络的Endpoint。
      );
    
    INSERT INTO hologres_sink
    SELECT
       content,
       operation,
       uid,
       `__topic__` ,
       `__source__` ,
        CAST (
        FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP
      ),
       CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time
    FROM
      sls_input;
  4. 查询数据。

    在Hologres中查询通过Flink写入Hologres中的SLS数据,后续您可以根据业务需求进行数据开发。查询数据

通过DataWorks数据集成将SLS数据写入Hologres

  1. 准备SLS数据。

    本次准备的SLS数据来源于SLS日志平台的模拟数据,模拟游戏登录和消费日志的数据。如果您有业务数据,请直接使用业务数据。

    1. 登录日志服务控制台

    2. 接入数据区域,单击模拟接入

    3. 模拟接入页签,单击游戏操作日志下的模拟

    4. 选择日志空间页面,选择项目Project日志库Logstore后单击下一步

    5. 模拟接入页面,设置范围频率后单击开始导入

    6. 查询模拟产生的字段和数据如下,详情请参见查询和分析日志

      其中content列为JSON类型。模拟数据

  2. 创建Hologres表。

    在Hologres中创建用于接收数据的表。并根据业务查询需求,为对应的字段创建索引,以提升查询效率。索引介绍请参见建表概述。本次示例建表DDL如下。

    说明
    • 本示例中设置uid为主键,保证数据的唯一性,实际设置可以根据业务需求选择。

    • 设置uid为Distribution Key,数据写入时相同的uid可以写入至同一个Shard,提高查询性能。

    BEGIN;
    CREATE  TABLE sls_dw_holo (
        content JSONB ,
        operation TEXT,
        uid TEXT,
        C_Topic  TEXT ,
        C_Source TEXT ,
        timestamp BIGINT,
        PRIMARY KEY (uid)
      );
      CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid');
      CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp');
    COMMIT;
                            
  3. 配置数据源。

    在进行数据同步之前,使用数据集成功能需要为DataWorks工作空间添加数据源。

  4. 实时同步数据。

    在数据集成中创建和运行实时同步任务,详情请参见配置单表增量数据实时同步实时同步任务运维

    本示例创建的实时同步任务,配置输入为Loghub数据源,输出为Hologres数据源,并配置如下字段同步映射关系。字段映射

  5. 查询数据。

    实时同步任务运行开始后,可以在Hologres中查询通过DataWorks数据集成写入Hologres中的SLS数据。查询数据