数据管道与物化视图加速Kafka流量日志分析

更新时间:
复制为 MD 格式

本文将介绍如何通过AnalyticDB for MySQL数据管道与物化视图加速Kafka实时流量日志的分析。

背景说明

在线游戏与互联网行业中,应用服务器产生海量半结构化日志。业务部门需要针对这些数据进行报表分析。传统的解决方案通过同步工具将Kafka/SLS 数据导入数据湖或数仓,直接执行 SQL 查询。

这种传统架构存在以下弊端:

  • 查询性能抖动:日志流量随业务高峰波动。数据量的激增会导致下游 SQL 扫描量剧变,引发报表查询超时。

  • 存储成本高昂:全量日志实时入仓带来巨大的存储开销。冷热数据生命周期管理复杂。

  • 计算资源浪费:日志通常为追加写(Append-only)。基于全量原始数据反复执行聚合计算,消耗大量 CPU,难以实现秒级响应。

方案概览

本文介绍基于云原生数据仓库 AnalyticDB MySQL 版的端到端解决方案:

  • 实时投递:应用日志投递至阿里云 Kafka 或 SLS。

  • 内置管道:利用AnalyticDB for MySQL内置数据管道,将 Kafka 数据实时清洗并写入数仓表。

  • 增量聚合:利用增量物化视图,自动摄取增量数据并完成预计算。

  • 极速查询:BI 应用直接查询物化视图,无需扫描全量历史数据,实现亚秒级响应。

image

核心优势

  • 全链路实时:无需外部调度系统,通过数据库内核机制实现近实时的自动聚合。

  • 运维简化:从 Kafka 消费到物化视图刷新,均由AnalyticDB for MySQL统一管理,降低跨系统运维复杂度。

  • 高性能低成本:物化视图大幅减少查询扫描量,显著提升并发能力并降低计算成本。

前提条件

模拟日志数据

日志格式示例:

{
  "id": "abc12345",
  "ts": "2025-01-01T12:00:00Z",
  "src": "192.168.1.1",
  "path": "/api/login",
  "code": 200,
  "lat": 0.15
}

数据生成脚本:

请根据实际情况配置好 Kafka 连接信息(KAFKA_CONFIG),运行以下脚本持续生成测试数据。

  • SSL根证书下载下载SSL根证书

  • 如果实例未开启ACL,您可以在Kafka控制台实例详情页面的配置信息区域获取默认的用户名密码

  • 如果实例已开启ACL,请确保要使用的SASL用户已被授予向云消息队列 Kafka 版实例收发消息的权限。具体操作请参见使用ACL功能进行访问控制

更多详情请参见Python SDK收发消息
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import json
import random
import string
import sys
import time
from datetime import datetime, timedelta
from confluent_kafka import Producer

# ======================
# Kafka 配置(可使用SSL公网接入点本地运行测试)
# ======================

KAFKA_CONFIG = {
    'bootstrap.servers': 'SSL接入点地址:9093',
    'client.id': 'log-gen-simple',
    'message.timeout.ms': 5000,
    # 使用SASL_SSL协议接入。
    'security.protocol': 'SASL_SSL',
    # 鉴权方式
    'sasl.mechanism': 'PLAIN',
    # 鉴权用户
    'sasl.username': 'USERNAME',
    'sasl.password': 'PASSWORD',
    # 设置SSL根证书的路径(绝对路径)
    'ssl.ca.location': '../com/aliyun/mix-4096-ca-cert',
}

TOPIC_NAME = 'access-logs-simple-new'
SEND_RATE_PER_SEC = 2


# ======================
# 工具函数(简化 + 脱敏)
# ======================

def random_ip():
    return ".".join(str(random.randint(1, 254)) for _ in range(4))


def random_id():
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=16))


def random_path():
    paths = ["/api/data", "/user/profile", "/order", "/static/app.js", "/health"]
    params = ["?p=1", "?id=100", "?q=abc", "?v=2", ""]
    return random.choice(paths) + random.choice(params)


def generate_simple_log():
    now = datetime.utcnow()
    event_time = now - timedelta(seconds=random.randint(1, 30))

    return {
        "id": random_id(),  # 请求ID
        "ts": event_time.strftime("%Y-%m-%dT%H:%M:%SZ"),  # ISO时间
        "src": random_ip(),  # 来源IP
        "dst": "svc-" + random.choice(["api", "web", "auth", "pay"]),  # 目标服务
        "method": random.choice(["GET", "POST", "PUT"]),  # 方法
        "path": random_path(),  # 路径(含参数)
        "code": random.choices(
            [200, 400, 404, 500],
            weights=[90, 3, 4, 3],
            k=1
        )[0],  # 状态码
        "size": random.randint(100, 10000),  # 响应大小
        "lat": round(random.uniform(0.01, 1.5), 3),  # 耗时(秒)
        "proto": random.choice(["http", "https"])  # 协议
    }


# ======================
# Kafka 回调
# ======================

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}", file=sys.stderr)
    else:
        print(f"Sent to {msg.topic()}[{msg.partition()}]")


# ======================
# 主程序
# ======================

def main():
    producer = Producer(KAFKA_CONFIG)
    print(f"Sending simplified logs to topic: '{TOPIC_NAME}'")
    print(f"Rate: ~{SEND_RATE_PER_SEC}/sec. Ctrl+C to stop.\n")

    try:
        while True:
            log = generate_simple_log()
            producer.produce(
                topic=TOPIC_NAME,
                value=json.dumps(log, ensure_ascii=False),
                on_delivery=delivery_report
            )
            producer.poll(0)
            time.sleep(1.0 / SEND_RATE_PER_SEC)
    except KeyboardInterrupt:
        print("\nStopping...")
    finally:
        producer.flush(timeout=10)
        print("Done.")


if __name__ == "__main__":
    main()

场景一:高性能实时数仓

此场景适用于对查询性能要求极高,且希望利用AnalyticDB for MySQL高性能存储引擎的业务。

创建目标表

为了确保物化视图能通过 Binlog 读取增量数据,需显式开启 Binlog 功能,并关闭‘默认创建V2表’的行为。

-- 关闭旧版 DDL 引擎行为,确保 Binlog 设置生效
SET ADB_CONFIG RC_DDL_ENGINE_REWRITE_XUANWUV2=false;

CREATE DATABASE IF NOT EXISTS db_streaming;

-- 创建日志明细表
CREATE TABLE IF NOT EXISTS db_streaming.access_log_simple (
    id VARCHAR(32) NOT NULL COMMENT '请求ID',
    ts TIMESTAMP NOT NULL COMMENT '请求时间',
    src VARCHAR(64) NOT NULL COMMENT '来源IP',
    dst VARCHAR(64) COMMENT '目标服务',
    method VARCHAR(10) COMMENT '方法',
    path VARCHAR(512) COMMENT '路径',
    code INT COMMENT '状态码',
    size INT COMMENT '大小',
    lat DOUBLE COMMENT '耗时',
    proto VARCHAR(10) COMMENT '协议'
)
-- 开启全索引加速查询
INDEX_ALL = 'Y'
-- 设置冷热分层策略
STORAGE_POLICY = 'MIXED' 
HOT_PARTITION_COUNT = 7
DISTRIBUTE BY HASH(src, path)
PARTITION BY VALUE (DATE_FORMAT(ts, '%Y%m%d'))
COMMENT '访问日志明细表';

-- 开启表级别 Binlog(物化视图增量刷新的前提)
SET ADB_CONFIG BINLOG_ENABLE=true;
ALTER TABLE db_streaming.access_log_simple binlog=true;

配置数据管道

创建数据同步任务,将 Kafka 数据持续写入上述表中。

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 左侧导航栏选择数据接入 > 数据源管理 > 新建数据源

    选择对应同地域的Kafka实例,以及测试脚本中对应的Topic,消息格式为JSON。

  3. 左侧导航栏选择SLS/Kafka数据同步

    • 目标端类型:选择数仓 - ADB仓存储

    • 配置映射:选择Source Topic与目标表db_streaming.access_log_simple,确保字段映射正确。

      字段映射需要源端有数据才会自动映射。

    image

  4. 选择Job资源组,勾选加入白名单。

    image

  5. 启动任务后,验证数据写入。

    # 验证增量写入
    SELECT COUNT(*) FROM db_streaming.access_log_simple;
    
    -- 验证原始查询(替换脚本运行的实际时间)
    SELECT 
        COUNT(1) AS total,
        SUBSTRING_INDEX(path, '?', 1) AS url
    FROM db_streaming.access_log_simple
    WHERE ts > '2026-01-04 18:00:00'
    GROUP BY url
    ORDER BY total DESC
    LIMIT 10;

创建并查询物化视图

针对“按小时统计各 URL 访问量”的分析需求,构建增量物化视图。

视图设计

  • 预聚合:将TIMESTAMP精确时间聚合为小时粒度ts_h

  • 增量刷新:设置REFRESH FAST,数据库内核自动计算增量变化,无需全量重算。

  • 分区策略:物化视图按小时分区,提升查询时的裁剪效率。

DROP MATERIALIZED VIEW IF EXISTS db_streaming.access_log_simple_agg_mv;

CREATE MATERIALIZED VIEW db_streaming.access_log_simple_agg_mv(
    total int COMMENT '访问次数',
    url string COMMENT 'URL路径', 
    ts_h string COMMENT '小时级时间窗口',
    PRIMARY key (url, ts_h)
)
PARTITION BY VALUE (ts_h)
-- 设置自动刷新间隔为 3 分钟
REFRESH FAST NEXT now() + INTERVAL 3 minute
AS
SELECT
    COUNT(1) AS total,
    SUBSTRING_INDEX(path, '?', 1) AS url,
    DATE_FORMAT(ts, '%Y%m%d%H') AS ts_h
FROM db_streaming.access_log_simple
GROUP BY url, ts_h;

高性能查询

业务应用不再查询明细表,而是直接查询物化视图。

SELECT url, sum(total) as total_pv 
FROM db_streaming.access_log_simple_agg_mv
WHERE ts_h >= '2026010118' -- 快速分区裁剪
GROUP BY url
ORDER BY total_pv DESC;

场景二:低成本数据湖仓

此场景适用于数据量巨大,对存储成本敏感,且希望利用数据湖开放生态的业务。可以使用 Paimon 格式将数据存储在 OSS 上。

准备 Paimon 外部数据库

创建一个映射到 OSS 路径的外部数据库。

说明

此语句仅用于在元数据层定义表的结构和数据存储路径。它不会创建任何物理数据文件或目录,也不会校验数据存储路径是否存在或其中的数据格式是否匹配。

-- 创建基于 OSS 的 Paimon 数据库
CREATE EXTERNAL DATABASE IF NOT EXISTS ods_paimon_layer
WITH DBPROPERTIES(
    adb.paimon.warehouse = 'oss://<bucket_name>/paimon/',
    location = 'oss://<bucket_name>/paimon/ods_paimon_layer.db/'
);

更多配置详情请参见CREATE EXTERNAL DATABASE

配置数据管道

创建数据同步任务,将 Kafka 数据持续写入上述表中。

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 左侧导航栏选择数据接入 > 数据源管理 > 新建数据源

    选择对应同地域的Kafka实例,以及测试脚本中对应的Topic,消息格式为JSON。

  3. 左侧导航栏选择SLS/Kafka数据同步

    • 目标端类型:选择数据湖 - 用户OSS

    • OSS路径:实际存储同步数据的路径,需要具备访问权限,如oss://<bucket_name>/paimon/paimondata/

    • 存储格式:选择PAIMON

      image

    • 配置映射:选择上一步创建的外部库ods_paimon_layer。表名可自定义(ods_logs),系统将自动创建 Paimon 表。

      字段映射需要源端有数据才会自动映射。

      image

  4. (可选)高级配置

    定义CheckPoint时间间隔(ms):aps.job.checkpoint.interval=60000,控制CheckPoint提交频率。

    image

  5. 启动任务后,验证数据写入。

    任务启动及提交数据写入大约需要3分钟,请3分钟后验证。
    SELECT COUNT(*) FROM `ods_paimon_layer`.`ods_logs`;
    
    -- 验证原始查询
    SELECT 
        COUNT(1) AS total,
        SUBSTRING_INDEX(path, '?', 1) AS url
    FROM `ods_paimon_layer`.`ods_logs`
    WHERE ts > '2026-01-04 18:00:00'
    GROUP BY url
    ORDER BY total DESC
    LIMIT 10;

基于 Paimon 构建物化视图

-- 开启 Paimon 增量视图支持
SET ADB_CONFIG O_MV_IVM_V2_ENABLED = true;
-- 关闭快速刷新计划缓存(特定版本优化配置)
SET ADB_CONFIG MV_FAST_REFRESH_PLAN_CACHE_ENABLED = false;

CREATE MATERIALIZED VIEW db_streaming.paimon_agg_mv (
    total int,
    url string, 
    ts_h string,
    PRIMARY key (url, ts_h)
)
PARTITION BY VALUE (ts_h)
REFRESH FAST NEXT now() + INTERVAL 3 minute
AS
SELECT
    COUNT(1) AS total,
    SUBSTRING_INDEX(path, '?', 1) AS url,
    -- 注意 Paimon 时间格式转换
    DATE_FORMAT(STR_TO_DATE(ts, '%Y-%m-%dT%H:%i:%sZ'), '%Y%m%d%H') AS ts_h
FROM ods_paimon_layer.ods_logs
GROUP BY url, ts_h;

查询物化视图

查询物化视图相较于直接查询Paimon,具有更好的性能。

SELECT * FROM db_streaming.paimon_agg_mv;