通过API消费

日志服务提供多语言SDK,并且都支持日志服务消费接口,本文介绍消费日志的SDK示例。

前提条件

  • 已开通日志服务。更多信息,请参见开通日志服务

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见Linux、macOSWindows系统配置环境变量

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey IDAccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

背景信息

调用PullLogs接口可以获取指定游标(Cursor)位置的日志数据。日志服务支持Java、Python、Go等语言的应用作为消费者或消费组消费日志服务的数据。

日志服务SPL支持在实时消费、扫描查询和Logtail采集三个日志服务场景中使用,更多信息,请参见SPL概述

使用Java SDK消费

开始使用前,请确保已安装日志服务Java SDK。具体操作,请参见安装Java SDK

SDK消费

本示例中,调用PullLogs接口读取日志数据,完成普通消费的演示。

参数说明

参数名称

类型

是否必选

说明

project

string

日志服务Project名称,更多信息,请参见管理Project

logStore

string

日志服务Logstore名称,Logstore是日志服务中日志数据的采集、存储和查询单元。更多信息,请参见管理Logstore

shardId

int

日志库的分区ID。

添加Maven依赖

Java项目的根目录下,打开pom.xml文件,添加以下代码:

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
  <artifactId>aliyun-log</artifactId>
  <version>0.6.99</version>
</dependency>

创建PullLogsDemo.java文件

示例代码如下:

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PullLogsDemo {
    // 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写
    private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // Project 名称
    private static final String project = "your_project";
    // LogStore 名称
    private static final String logStore = "your_logstore";

    public static void main(String[] args) throws Exception {
        // 创建日志服务 Client
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        // 查询 LogStore 的 Shard
        ListShardResponse resp = client.ListShard(project, logStore);
        System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
        Map<Integer, String> cursorMap = new HashMap<Integer, String>();
        for (Shard shard : resp.GetShards()) {
            int shardId = shard.getShardId();
            // 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)
            cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
        }
        try {
            while (true) {
                // 从每个Shard中获取日志
                for (Shard shard : resp.GetShards()) {
                    int shardId = shard.getShardId();
                    PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
                    PullLogsResponse response = client.pullLogs(request);
                    // 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
                    List<LogGroupData> logGroups = response.getLogGroups();
                    System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
                    // 完成处理拉取的日志后,移动游标。
                    cursorMap.put(shardId, response.getNextCursor());
                }
            }
        } catch (LogException e) {
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

SDK基于SPL消费

本示例中,调用PullLogs接口读取日志数据,完成使用Java SDK基于SPL消费日志数据的演示。

参数说明

参数名称

类型

是否必选

说明

project

string

日志服务Project名称,更多信息,请参见管理Project

logStore

string

日志服务Logstore名称,Logstore是日志服务中日志数据的采集、存储和查询单元。更多信息,请参见管理Logstore

shardId

int

日志库的分区ID。

添加Maven依赖

Java项目的根目录下,打开pom.xml文件,添加以下代码:

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
  <artifactId>aliyun-log</artifactId>
  <version>0.6.99</version>
</dependency>

创建PullLogsWithSPLDemo.java文件

示例代码如下:

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PullLogsWithSPLDemo {
    // 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写
    private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
    //  本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // Project 名称
    private static final String project = "your_project";
    // LogStore 名称
    private static final String logStore = "your_logstore";

    public static void main(String[] args) throws Exception {
        // 创建日志服务 Client
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        // 查询 LogStore 的 Shard
        ListShardResponse resp = client.ListShard(project, logStore);
        System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
        Map<Integer, String> cursorMap = new HashMap<Integer, String>();
        for (Shard shard : resp.GetShards()) {
            int shardId = shard.getShardId();
            // 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)
            cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
        }
        try {
            while (true) {
                // 从每个Shard中获取日志
                for (Shard shard : resp.GetShards()) {
                    int shardId = shard.getShardId();
                    PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
                    request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
                    request.setPullMode("scan_on_stream");
                    PullLogsResponse response = client.pullLogs(request);
                    // 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
                    List<LogGroupData> logGroups = response.getLogGroups();
                    System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);

                    // 完成处理拉取的日志后,移动游标。
                    cursorMap.put(shardId, response.getNextCursor());
                }
            }
        } catch (LogException e) {
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

相关文档