增量日志

元数据增量日志记录了DLF元数据的访问详情(如请求类型、具体操作、请求时间等信息),并以流式方式实时写入 system 库中的 request_logs 系统表。您可以通过Flink等计算引擎直接查询 request_logs 表,也可将该表的数据同步至Kafka,供下游系统灵活消费。本文为您介绍如何使用 request_logs 表记录和消费增量日志。

request_logs表详情

表结构

request_logs 表用于记录DLF元数据的访问日志,其结构定义如下。

CREATE TABLE `request_logs` (
  `version` STRING COMMENT 'Version of catalog event',
  `event` STRING COMMENT 'Enum value',
  `detail` text COMMENT 'Detail data in JSON format',
  `identifier` STRING COMMENT 'identifier for this event',
  `requested_by` STRING COMMENT 'Requested User',
  `requested_at` BIGINT COMMENT 'Requested time, in millis',
  `dt` STRING COMMENT 'Day time, for example, 20250505'
) PARTITIONED BY (dt) WITH (
   'file.format' = 'avro',
   'partition.expiration-time' = '30 d',
   'partition.timestamp-formatter' = 'yyyyMMdd'
);

Event枚举

event 字段表示事件类型,其取值为枚举值,不同事件类型对应的 identifier 和 detail 结构如下:

Event

Identifier结构

Detail结构

GetDatabase

database

null

ListDatabases

null

{"maxResults":"","pageToken":"","databaseNamePattern":""}

(如果有param传参)

CreateDatabase

database

CreateDatabaseRequest

说明

下文中的xxxRequest定义,都来自org.apache.paimon.rest.requests包。

DropDatabase

database

null

AlterDatabase

database

AlterDatabaseRequest

GetTable

database.object

null

ListTables

database

{"maxResults":"","pageToken":"","tableNamePattern":""}

(如果有param传参)

ListTableDetails

database

{"maxResults":"","pageToken":"","tableNamePattern":""}

(如果有param传参)

CreateTable

database.object

CreateTableRequest

AlterTable

database.object

AlterTableRequest

RenameTable

database.object

RenameTableRequest

DropTable

database.object

null

CommitTable

database.object

CommitTableRequest

LoadTableSnapshot

database.object

null

ListTableSnapshots

database.object

{"maxResults":"","pageToken":""}

(如果有param传参)

RollbackTable

database.object

RollbackTableRequest

ListBranches

database.object

{"maxResults":"","pageToken":""}

(如果有param传参)

CreateBranch

database.object

CreateBranchRequest

DropBranch

database.object

null

ForwardBranch

database.object

ForwardBranchRequest

ListPartitions

database.object

{"maxResults":"","pageToken":"","partitionNamePattern":""}

(如果有param传参)

MarkDonePartitions

database.object

MarkDonePartitionsRequest

GetView

database.object

null

ListViews

database

{"maxResults":"","pageToken":"","viewNamePattern":""}

(如果有param传参)

ListViewDetails

database

{"maxResults":"","pageToken":"","viewNamePattern":""}

(如果有param传参)

CreateView

database.object

CreateViewRequest

AlterView

database.object

AlterViewRequest

RenameView

database.object

RenameTableRequest

DropView

database.object

null

GrantPermission

null

Table SELECT授权为例:

{

"principal": "",

"access": "SELECT",

"catalog": "",

"database": "",

"table": "",

"resourceType": "TABLE"

}

RevokePermission

null

同上

BatchGrantPermissions

null

Table SELECT授权为例,permissions数组可以包含多条授权:

{

"permissions": [

{

"principal": "",

"access": "SELECT",

"catalog": "",

"database": "",

"table": "",

"resourceType": "TABLE"

}

]

}

BatchRevokePermissions

null

同上

接入步骤

批量查询

  1. 登录实时计算控制台

  2. 创建Paimon DLF Catalog

  3. 数据查询文本编辑区域,输入查询SQL并执行。

    select * from `<创建的Flink Catalog名称>`.`system`.`request_logs` WHERE dt='20250601';

流式消费

同步增量日志到Kafka

您可以通过计算引擎(如Flink)直接查询sytem.request_logs系统表,也可以同步到Kafka后开发自定义程序,灵活地满足业务需求。

  1. 登录阿里云消息队列Kafka版控制台,创建Kafka实例并部署。

  2. 登录实时计算控制台创建Paimon DLF Catalog

  3. ETL页面,创建Flink流作业,并部署启动作业。

    CREATE TEMPORARY TABLE `kafka_sink` (
      version STRING,
      event STRING,
      detail STRING,
      identifier STRING,
      requested_by STRING,
      requested_at BIGINT,
      dt STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'xxx',
      'properties.bootstrap.servers' = 'xxx',
      'properties.enable.idempotence'='false',
      'format' = 'json'
    );
    
    INSERT INTO `kafka_sink`
    SELECT *
    FROM `flink_test`.`system`.`request_logs`/*+ OPTIONS('scan.mode' = 'latest') */;

    参数说明如下:

    参数名称

    描述

    connector

    指定使用Kafka作为数据输出的目标连接器。

    topic

    Kafka实例的Topic名称,表示数据将被写入的目标Topic。您可在云消息队列 Kafka 版控制台Topic管理页面获取。

    properties.bootstrap.servers

    Kafka集群的接入点信息。您可在云消息队列 Kafka 版控制台的实例详情页面的接入点信息区域获取。

    properties.enable.idempotence

    阿里云Kafka实例的云存储类型Topic默认不支持幂等功能,因此需要关闭幂等写入功能。详情请参见存储引擎对比

    format

    数据序列化格式,指定为JSON格式,与Kafka中的数据交换格式一致。

  4. Kafka消费:您可以自定义程序消费Kafka,满足业务需求。

解析日志数据

  1. 根据上文的request_logs结构定义,定义RequestLog类如下。

    import com.fasterxml.jackson.annotation.JsonProperty;
    
    public class RequestLog {
    
        @JsonProperty("version")
        private String version;
    
        @JsonProperty("event")
        private String event;
    
        @JsonProperty("detail")
        private String detail;
    
        @JsonProperty("identifier")
        private String identifier;
    
        @JsonProperty("requested_by")
        private String requestedBy;
    
        @JsonProperty("requested_at")
        private long requestedAt;
    
        @JsonProperty("dt")
        private String dt;
    
        public void setVersion(String version) {
            this.version = version;
        }
    
        public void setEvent(String event) {
            this.event = event;
        }
    
        public void setDetail(String detail) {
            this.detail = detail;
        }
    
        public void setRequestedBy(String requestedBy) {
            this.requestedBy = requestedBy;
        }
    
        public void setRequestedAt(long requestedAt) {
            this.requestedAt = requestedAt;
        }
    
        public void setDt(String dt) {
            this.dt = dt;
        }
    
        public RequestLog() {}
    
        public String getVersion() {
            return version;
        }
    
        public String getEvent() {
            return event;
        }
    
        public String getDetail() {
            return detail;
        }
    
        public String getRequestedBy() {
            return requestedBy;
        }
    
        public long getRequestedAt() {
            return requestedAt;
        }
    
        public String getDt() {
            return dt;
        }
    
        public String getIdentifier() {
            return identifier;
        }
    
        public void setIdentifier(String identifier) {
            this.identifier = identifier;
        }    
    }
    
  2. 动态解析detail字段。

    根据event类型动态转换detail为具体请求对象,请参考API使用指南配置Maven依赖。

    package com.aliyun.morax.flink;
    
    import org.apache.paimon.rest.RESTApi;
    import org.apache.paimon.rest.requests.CreateTableRequest;
    import org.apache.paimon.rest.requests.RenameTableRequest;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class RequestLogParser {
    
        // 事件类型到类的映射
        private static final Map<String, Class<?>> EVENT_CLASS_MAP = new HashMap<>();
    
        static {
            EVENT_CLASS_MAP.put("CreateTable", CreateTableRequest.class);
            EVENT_CLASS_MAP.put("RenameTable", RenameTableRequest.class);
            EVENT_CLASS_MAP.put("DropTable", DropTableRequest.class);
            EVENT_CLASS_MAP.put("CreateDatabase", CreateDatabaseRequest.class);
            EVENT_CLASS_MAP.put("DropDatabase", DropDatabaseRequest.class);
            EVENT_CLASS_MAP.put("AlterTable", AlterTableRequest.class);
            // 可继续扩展
        }
    
        public Object parseDetail(RequestLog log) {
            Class<?> clazz = EVENT_CLASS_MAP.get(log.getEvent());
            if (clazz == null) {
                throw new UnsupportedOperationException("Unsupported event: " + log.getEvent());
            }
            return fromJson(log.getDetail(), clazz);
        }
    
        private <T> T fromJson(String json, Class<T> clazz) {
            try {
                return RESTApi.fromJson(json, clazz);
            } catch (Exception e) {
                throw new RuntimeException("Failed to parse JSON for class: " + clazz.getSimpleName(), e);
            }
        }
        
    }
    

Kafka消费程序示例

您可参考使用实例接入点收发消息,自定义Kafka程序进行消费,示例如下。

package com.aliyun.morax.flink;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.paimon.rest.RESTApi;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.shade.com.fasterxml.jackson.databind.ObjectMapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumerWithRequestLogParser {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    private static final RequestLogParser parser = new RequestLogParser();

    public static void main(String[] args) {
        // 具体代码参考云消息队列Kafka开发参考的官方文档,有详细参数说明,此处为示例
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        // 其他Properties配置

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        List<String> topics = new ArrayList<>();
        String topicStr = kafkaProperties.getProperty("topic");
        for (String topic : topicStr.split(",")) {
            topics.add(topic.trim());
        }
        consumer.subscribe(topics);

        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // 解析RequestLog
                        RequestLog log = objectMapper.readValue(record.value(), RequestLog.class);
                        // 解析Detail字段
                        Object detail = parser.parseDetail(log);

                        // 处理detail
                    } catch (Exception e) {
                        System.err.println("Error processing record: " + record.offset());
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ignore) {
                    Thread.currentThread().interrupt();
                }
                System.err.println("Consumer error, retrying...");
                e.printStackTrace();
            }
        }
    }

}