元数据增量日志记录了DLF元数据的访问详情(如请求类型、具体操作、请求时间等信息),并以流式方式实时写入 system
库中的 request_logs
系统表。您可以通过Flink等计算引擎直接查询 request_logs
表,也可将该表的数据同步至Kafka,供下游系统灵活消费。本文为您介绍如何使用 request_logs
表记录和消费增量日志。
request_logs表详情
表结构
request_logs
表用于记录DLF元数据的访问日志,其结构定义如下。
CREATE TABLE `request_logs` (
`version` STRING COMMENT 'Version of catalog event',
`event` STRING COMMENT 'Enum value',
`detail` text COMMENT 'Detail data in JSON format',
`identifier` STRING COMMENT 'identifier for this event',
`requested_by` STRING COMMENT 'Requested User',
`requested_at` BIGINT COMMENT 'Requested time, in millis',
`dt` STRING COMMENT 'Day time, for example, 20250505'
) PARTITIONED BY (dt) WITH (
'file.format' = 'avro',
'partition.expiration-time' = '30 d',
'partition.timestamp-formatter' = 'yyyyMMdd'
);
Event枚举
event
字段表示事件类型,其取值为枚举值,不同事件类型对应的 identifier
和 detail
结构如下:
Event | Identifier结构 | Detail结构 |
GetDatabase | database | null |
ListDatabases | null | {"maxResults":"","pageToken":"","databaseNamePattern":""} (如果有param传参) |
CreateDatabase | database | CreateDatabaseRequest 说明 下文中的xxxRequest定义,都来自org.apache.paimon.rest.requests包。 |
DropDatabase | database | null |
AlterDatabase | database | AlterDatabaseRequest |
GetTable | database.object | null |
ListTables | database | {"maxResults":"","pageToken":"","tableNamePattern":""} (如果有param传参) |
ListTableDetails | database | {"maxResults":"","pageToken":"","tableNamePattern":""} (如果有param传参) |
CreateTable | database.object | CreateTableRequest |
AlterTable | database.object | AlterTableRequest |
RenameTable | database.object | RenameTableRequest |
DropTable | database.object | null |
CommitTable | database.object | CommitTableRequest |
LoadTableSnapshot | database.object | null |
ListTableSnapshots | database.object | {"maxResults":"","pageToken":""} (如果有param传参) |
RollbackTable | database.object | RollbackTableRequest |
ListBranches | database.object | {"maxResults":"","pageToken":""} (如果有param传参) |
CreateBranch | database.object | CreateBranchRequest |
DropBranch | database.object | null |
ForwardBranch | database.object | ForwardBranchRequest |
ListPartitions | database.object | {"maxResults":"","pageToken":"","partitionNamePattern":""} (如果有param传参) |
MarkDonePartitions | database.object | MarkDonePartitionsRequest |
GetView | database.object | null |
ListViews | database | {"maxResults":"","pageToken":"","viewNamePattern":""} (如果有param传参) |
ListViewDetails | database | {"maxResults":"","pageToken":"","viewNamePattern":""} (如果有param传参) |
CreateView | database.object | CreateViewRequest |
AlterView | database.object | AlterViewRequest |
RenameView | database.object | RenameTableRequest |
DropView | database.object | null |
GrantPermission | null | 以Table SELECT授权为例: { "principal": "", "access": "SELECT", "catalog": "", "database": "", "table": "", "resourceType": "TABLE" } |
RevokePermission | null | 同上 |
BatchGrantPermissions | null | 以Table SELECT授权为例,permissions数组可以包含多条授权: { "permissions": [ { "principal": "", "access": "SELECT", "catalog": "", "database": "", "table": "", "resourceType": "TABLE" } ] } |
BatchRevokePermissions | null | 同上 |
接入步骤
批量查询
流式消费
同步增量日志到Kafka
您可以通过计算引擎(如Flink)直接查询sytem.request_logs
系统表,也可以同步到Kafka后开发自定义程序,灵活地满足业务需求。
登录阿里云消息队列Kafka版控制台,创建Kafka实例并部署。
在ETL页面,创建Flink流作业,并部署启动作业。
CREATE TEMPORARY TABLE `kafka_sink` ( version STRING, event STRING, detail STRING, identifier STRING, requested_by STRING, requested_at BIGINT, dt STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'xxx', 'properties.enable.idempotence'='false', 'format' = 'json' ); INSERT INTO `kafka_sink` SELECT * FROM `flink_test`.`system`.`request_logs`/*+ OPTIONS('scan.mode' = 'latest') */;
参数说明如下:
参数名称
描述
connector
指定使用Kafka作为数据输出的目标连接器。
topic
Kafka实例的Topic名称,表示数据将被写入的目标Topic。您可在云消息队列 Kafka 版控制台的Topic管理页面获取。
properties.bootstrap.servers
Kafka集群的接入点信息。您可在云消息队列 Kafka 版控制台的实例详情页面的接入点信息区域获取。
properties.enable.idempotence
阿里云Kafka实例的云存储类型Topic默认不支持幂等功能,因此需要关闭幂等写入功能。详情请参见存储引擎对比。
format
数据序列化格式,指定为JSON格式,与Kafka中的数据交换格式一致。
Kafka消费:您可以自定义程序消费Kafka,满足业务需求。
解析日志数据
根据上文的request_logs结构定义,定义RequestLog类如下。
import com.fasterxml.jackson.annotation.JsonProperty; public class RequestLog { @JsonProperty("version") private String version; @JsonProperty("event") private String event; @JsonProperty("detail") private String detail; @JsonProperty("identifier") private String identifier; @JsonProperty("requested_by") private String requestedBy; @JsonProperty("requested_at") private long requestedAt; @JsonProperty("dt") private String dt; public void setVersion(String version) { this.version = version; } public void setEvent(String event) { this.event = event; } public void setDetail(String detail) { this.detail = detail; } public void setRequestedBy(String requestedBy) { this.requestedBy = requestedBy; } public void setRequestedAt(long requestedAt) { this.requestedAt = requestedAt; } public void setDt(String dt) { this.dt = dt; } public RequestLog() {} public String getVersion() { return version; } public String getEvent() { return event; } public String getDetail() { return detail; } public String getRequestedBy() { return requestedBy; } public long getRequestedAt() { return requestedAt; } public String getDt() { return dt; } public String getIdentifier() { return identifier; } public void setIdentifier(String identifier) { this.identifier = identifier; } }
动态解析detail字段。
根据event类型动态转换detail为具体请求对象,请参考API使用指南配置Maven依赖。
package com.aliyun.morax.flink; import org.apache.paimon.rest.RESTApi; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import java.util.HashMap; import java.util.Map; public class RequestLogParser { // 事件类型到类的映射 private static final Map<String, Class<?>> EVENT_CLASS_MAP = new HashMap<>(); static { EVENT_CLASS_MAP.put("CreateTable", CreateTableRequest.class); EVENT_CLASS_MAP.put("RenameTable", RenameTableRequest.class); EVENT_CLASS_MAP.put("DropTable", DropTableRequest.class); EVENT_CLASS_MAP.put("CreateDatabase", CreateDatabaseRequest.class); EVENT_CLASS_MAP.put("DropDatabase", DropDatabaseRequest.class); EVENT_CLASS_MAP.put("AlterTable", AlterTableRequest.class); // 可继续扩展 } public Object parseDetail(RequestLog log) { Class<?> clazz = EVENT_CLASS_MAP.get(log.getEvent()); if (clazz == null) { throw new UnsupportedOperationException("Unsupported event: " + log.getEvent()); } return fromJson(log.getDetail(), clazz); } private <T> T fromJson(String json, Class<T> clazz) { try { return RESTApi.fromJson(json, clazz); } catch (Exception e) { throw new RuntimeException("Failed to parse JSON for class: " + clazz.getSimpleName(), e); } } }
Kafka消费程序示例
您可参考使用实例接入点收发消息,自定义Kafka程序进行消费,示例如下。
package com.aliyun.morax.flink;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.paimon.rest.RESTApi;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.shade.com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerWithRequestLogParser {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final RequestLogParser parser = new RequestLogParser();
public static void main(String[] args) {
// 具体代码参考云消息队列Kafka开发参考的官方文档,有详细参数说明,此处为示例
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
// 其他Properties配置
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> topics = new ArrayList<>();
String topicStr = kafkaProperties.getProperty("topic");
for (String topic : topicStr.split(",")) {
topics.add(topic.trim());
}
consumer.subscribe(topics);
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
try {
// 解析RequestLog
RequestLog log = objectMapper.readValue(record.value(), RequestLog.class);
// 解析Detail字段
Object detail = parser.parseDetail(log);
// 处理detail
} catch (Exception e) {
System.err.println("Error processing record: " + record.offset());
e.printStackTrace();
}
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
System.err.println("Consumer error, retrying...");
e.printStackTrace();
}
}
}
}