文档

使用Java SDK管理投递任务

更新时间:

数据投递是指将日志服务采集的数据通过控制台或者SDK投递至其他阿里云产品,便于您存储数据或联合其他系统消费数据。本文介绍如何使用日志服务Java SDK管理投递任务。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • 已安装日志服务Java SDK。具体操作,请参见安装Java SDK

  • 已写入日志到Logstore。具体操作,请参见数据采集概述

  • 已从MaxCompute获取对应地域的MaxCompute Endpoint。更多信息,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)

    创建或更新MaxCompute投递任务时,您需要配置MaxCompute Endpoint。

注意事项

本示例以华东1(杭州)的公网Endpoint为例,其公网Endpoint为https://cn-hangzhou.log.aliyuncs.com。如果您通过与Project同地域的其他阿里云产品访问日志服务,请使用内网Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com。关于日志服务支持的地域与Endpoint的对应关系,请参见服务入口

示例代码

查询OSS/MaxCompute投递任务

本示例展示如何创建一个QuerySinkDemo.java文件,用于查询目标Project下的投递任务配置信息。示例如下:

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;
import com.aliyun.openservices.log.response.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class QuerySinkDemo {
    // 设置日志服务的服务接入点。此处以杭州为例,其他地域请根据实际情况填写。
    private static final String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 输入Project名称,请您根据实际情况填写。请从已创建项目中获取Project名称。
    private static final String project = "ali-test-project";
    // 输入投递任务名称。您可以投递任务的基础信息区域查看任务名称。
    private static final String jobName = "ali-test-job";

    // 创建日志服务Client。
    private static final Client client = new Client(endpoint, accessKeyId, accessKeySecret);

    private static String getSinkExportJob() throws LogException {
        GetJobResponse getJobResponse = client.getJob(new GetJobRequest(project, jobName));
        Job job = getJobResponse.getJob();
        return JSONObject.toJSONString(job);
    }

    private static List<Export> listSinkExportJob() throws LogException {
        ListExportResponse listExportResponse = client.listExport(new ListExportRequest(project));
        return listExportResponse.getResults();
    }

    private static List<String> getOdpsSinkExportJob() throws LogException {
        List<String> odpsSinkList = new ArrayList<>();
        List<Export> listExports = listSinkExportJob();
        for (Export job:listExports) {
            DataSink type = job.getConfiguration().getSink();
            Map<String, Object> map = JSONObject.parseObject(JSONArray.toJSONString(type));
            Object sinkType = map.get("type");
            if (sinkType.equals("AliyunODPS")) {
                odpsSinkList.add(JSONArray.toJSONString(job));
            }
        }
        return odpsSinkList;
    }
    private static List<String> getOssSinkExportJob() throws LogException {
        List<String> ossSinkList = new ArrayList<>();
        List<Export> listExports = listSinkExportJob();
        for (Export job:listExports) {
            DataSink type = job.getConfiguration().getSink();
            Map<String, Object> map = JSONObject.parseObject(JSONArray.toJSONString(type));
            Object sinkType = map.get("type");
            if (sinkType.equals("AliyunOSS")) {
                ossSinkList.add(JSONArray.toJSONString(job));
            }
        }
        return ossSinkList;
    }

    public static void main(String[] args) throws LogException {
        // 查看指定投递任务。
        String jobConfig = getSinkExportJob();
        System.out.println("**********查看指定投递任务配置**********");
        System.out.println(jobConfig);
        // 查看所有投递任务。
        listSinkExportJob();
        // 查看MaxCompute所有投递任务。
        List<String> odpsSinkList = getOdpsSinkExportJob();
        System.out.println("**********查看MaxCompute所有投递任务配置**********");
        System.out.println(odpsSinkList);
        // 查看OSS所有投递任务。
        List<String> ossSinkList = getOssSinkExportJob();
        System.out.println("**********查看OSS所有投递任务配置**********");
        System.out.println(ossSinkList);
    }
}

预期结果如下:

**********查看指定投递任务配置**********
{
    "configuration": {
    ......
        "fromTime": 1,
        "instanceType": "Standard",
        "logstore": "ali-test-logstore",
    ......
    "state": "Enabled",
    "status": "RUNNING",
    "type": "Export"
}
**********查看MaxCompute所有投递任务配置**********
[{
    "configuration": {
    ......
    "status": "RUNNING",
    "type": "Export"
}, {
    "configuration": {
    ......
    "state": "Enabled",
    "status": "STOPPED",
    "type": "Export"
}]
**********查看OSS所有投递任务配置**********
[{
    "configuration": {
    ......
        "fromTime": 0,
        "instanceType": "Standard",
        "logstore": "ali-test-logstore",
    ......
    "state": "Enabled",
    "status": "RUNNING",
    "type": "Export"
}]

创建OSS投递任务

本示例展示如何创建一个CreateOssSinkDemo.java文件,用于创建OSS投递任务。示例如下:

package demo;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;

public class CreateOssSinkDemo {
    // 设置日志服务的服务接入点。此处以杭州为例,其他地域请根据实际情况填写。
    private static final String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    // Project名称,请您根据实际情况填写。请从已创建项目中获取Project名称。
    private static final String project = "ali-test-project";
    // Logstore名称,请您根据实际情况填写。请从目标项目中获取Logstore名称。
    private static final String logStore = "ali-test-logstore";
    // RAM角色ARN,请您根据实际情况填写。请从RAM控制台目标角色中获取。
    private static final String roleArn = "acs:ram::111111";
    // OSS Bucket名称,请您根据实际情况填写。请从已创建OSS Bucket中获取Bucket名称。
    private static final String bucket = "yourBucketName";
    // 任务名称jobName。
    private static final String jobName = "ali-test-job-name";
    // 投递任务名称。
    private static final String displayName = "ali-test-job-displayname";
    // 任务描述。
    private static final String description = "This is a OSS Shipper task.";
    // 定义OSS Bucket中的文件投递目录。
    private static final String preffix = "test";
    // 定义OSS Bucket中的文件后缀。
    private static final String suffix = "11111";
    // 定义OSS Bucket中分区格式,即按照投递时间动态生成OSS Bucket的目录。
    private static final String pathFormat = "%Y/%m/%d/%H/%M";
    // 每个Shard的投递大小。通过该值控制OSS Object大小(以未压缩计算),单位:MB。
    private static final int bufferSize = 255;
    // 每个Shard的投递周期,取值范围为300~900,单位:秒。
    private static final int bufferInterval = 300;
    // 数据被投递到OSS后,支持存储为不同的文件格式。取值包括csv、json、parquet和orc。
    private static final String contentType = "json";
    // 投递开始时间,值为1表示从最初的历史数据开始,也可以指定某时间开始。
    private static final int fromtime = 1;
    // 投递结束时间,值为0表示只要有数据就会一直投递,也可以指定某时间结束。
    private static final int totime = 0;

    private static void createOssExportJob(Client client) throws LogException {
        Export export = new Export();
        export.setName(jobName);
        export.setDisplayName(displayName);
        export.setDescription(description);

        ExportConfiguration exportConfiguration = new ExportConfiguration();
        AliyunOSSSink ossSink = new AliyunOSSSink();
        ossSink.setRoleArn(roleArn);
        ossSink.setBucket(bucket);
        ossSink.setPrefix(preffix);
        ossSink.setSuffix(suffix);
        ossSink.setPathFormat(pathFormat);
        ossSink.setBufferSize(bufferSize);
        ossSink.setBufferInterval(bufferInterval);
        ossSink.setContentType(contentType);

        ExportContentJsonDetail jsonDetail = new ExportContentJsonDetail();
        jsonDetail.setEnableTag(true);

        ossSink.setContentDetail(jsonDetail);
        exportConfiguration.setLogstore(logStore);
        exportConfiguration.setRoleArn(roleArn);
        exportConfiguration.setSink(ossSink);
        exportConfiguration.setFromTime(fromtime);
        exportConfiguration.setToTime(totime);
        exportConfiguration.setVersion("v2.0");
        export.setConfiguration(exportConfiguration);
        client.createExport(new CreateExportRequest(project, export));
    }

    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        createOssExportJob(client);
    }
}

更新OSS投递任务

本示例展示如何创建一个UpdateOssSinkDemo.java文件,用于更新OSS投递任务并重启任务使其生效。示例如下:

package demo;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;
import java.util.ArrayList;

public class UpdateOssSinkDemo {
    // 设置日志服务的服务接入点。此处以杭州为例,其他地域请根据实际情况填写。
    private static final String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    // Project名称,请您根据实际情况填写。请从已创建项目中获取Project名称。
    private static final String project = "ali-test-project";
    // Logstore名称,请您根据实际情况填写。请从目标项目中获取Logstore名称。
    private static final String logStore = "ali-test-logstore";
    // RAM角色ARN,请您根据实际情况填写。请从RAM控制台目标角色中获取。
    private static final String roleArn = "acs:ram::111111";
    // OSS Bucket名称,请您根据实际情况填写。请从已创建OSS Bucket中获取Bucket名称。
    private static final String bucket = "yourBucketName";
    // 需要修改的目标jobName。
    private static final String jobName = "ali-test-job-name";
    // 任务显示名称。
    private static final String displayName = "ali-test-job-displayname";
    // 任务描述信息。
    private static final String description = "This is a OSS Shipper task.";
    // 定义OSS Bucket中的文件投递目录。
    private static final String preffix = "test";
    // 定义OSS Bucket中的文件后缀。
    private static final String suffix = "11111";
    // 定义OSS Bucket中分区格式,即按照投递时间动态生成OSS Bucket的目录。
    private static final String pathFormat = "%Y/%m/%d/%H/%M";
    // 每个Shard的投递大小。通过该值控制OSS Object大小(以未压缩计算),单位:MB。
    private static final int bufferSize = 255;
    // 每个Shard的投递周期,取值范围为300~900,单位:秒。
    private static final int bufferInterval = 300;
    // 数据被投递到OSS后,支持存储为不同的文件格式。取值包括csv、json、parquet和orc。
    private static final String contentType = "csv";
    // 投递开始时间,值为1表示从最初的历史数据开始,也可以指定某时间开始。
    private static final int fromtime = 1;
    // 投递结束时间,值为0表示只要有数据就会一直投递,也可以指定某时间结束。
    private static final int totime = 0;
    private static final ArrayList<String> columns = new ArrayList<>();

    private static void updateWithRestartOssSinkJob(Client client) throws LogException {
        Export export = new Export();
        export.setName(jobName);
        export.setDisplayName(displayName);
        export.setDescription(description);

        ExportConfiguration exportConfiguration = new ExportConfiguration();

        AliyunOSSSink ossSink = new AliyunOSSSink();
        ossSink.setRoleArn(roleArn);
        ossSink.setBucket(bucket);
        ossSink.setPrefix(preffix);
        ossSink.setSuffix(suffix);
        ossSink.setPathFormat(pathFormat);
        ossSink.setBufferSize(bufferSize);
        ossSink.setBufferInterval(bufferInterval);
        ossSink.setContentType(contentType);
        ExportContentCsvDetail csvDetail = new ExportContentCsvDetail();
        csvDetail.setNullIdentifier("");
        csvDetail.setStorageColumns(columns);
        ossSink.setContentDetail(csvDetail);

        exportConfiguration.setLogstore(logStore);
        exportConfiguration.setRoleArn(roleArn);
        exportConfiguration.setSink(ossSink);
        exportConfiguration.setFromTime(fromtime);
        exportConfiguration.setToTime(totime);
        exportConfiguration.setVersion("v2.0");
        export.setConfiguration(exportConfiguration);

        client.restartExport(new RestartExportRequest(project,export));
    }

    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        // 自定义csv column
        columns.add("bucket");
        columns.add("__topic__");
        updateWithRestartOssSinkJob(client);
    }
}

创建MaxCompute投递任务

本示例展示如何创建一个CreateOdpsSinkDemo.java文件,用于创建MaxCompute投递任务。示例如下:

package demo;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.CreateExportRequest;

public class CreateOdpsSinkDemo {
    // 设置日志服务的服务接入点。此处以杭州为例,其他地域请根据实际情况填写。
    private static final String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // RAM角色ARN,请您根据实际情况填写。请从RAM控制台目标角色中获取。
    private static final String roleArn = "acs:ram::111111";
    // Project名称,请您根据实际情况填写。请从已创建项目中获取Project名称。
    private static final String projectName = "ali-test-project";
    // Logstore名称,请您根据实际情况填写。请从已创建项目中获取Logstore名称。
    private static final String logstore = "ali-test-logstore";
    // 投递任务显示名称。
    private static final String displayName = "ali-test-displayname";
    // 投递任务名称。
    private static final String jobName = "ali-test-yname";
    // 投递任务描述信息。
    private static final String description = "This is a MaxCompute Shipper task.";
    // 投递开始时间,值为1表示从最初的历史数据开始,也可以指定某时间开始。
    private static final int fromtime = 1;
    // 投递结束时间,值为0表示只要有数据就会一直投递,也可以指定某时间结束。
    private static final int totime = 0;
    // 投递类型。AliyunODPS表示投递到MaxCompute。
    private static final String type = "AliyunODPS";
    // MaxCompute的项目名称,请您根据实际情况填写。您可以从MaxCompute控制台中获取目标Project名称。
    private static final String odpsProject = "yourodpsProjectName";
    // MaxCompute的表名称,请您根据实际情况填写。您可以从MaxCompute控制台中获取已创建好的表名称。
    private static final String odpsTable = "yourodpsTabletName";
    // 此处以杭州为例,其他地域请根据实际情况填写,也可以通过日志服务控制台的MaxCompute投递配置页面或云原生大数据计算服务MaxCompute官网帮助中心查看MaxCompute Endpoint。
    private static final String odpsEndpoint = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api";
    private static final String odpsTunnelEndpoint = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
    // 以时间为分区时格式。
    private static final String partitionTimeFormat = "%Y_%m_%d_%H_%M";
    private static final String timeZone = "+0800";
    // 是否过滤无效字段。
    private static final Boolean filterInvalid = true;
    // RAM角色ARN,请您根据实际情况填写。请从RAM控制台目标角色中获取。
    private static final String odpsRolearn = "acs:ram::111111";
    // 日志服务的日志字段名。
    private static final String[] fields = {"11111","22222","33333"};
    // 分区列名。
    private static final String[] partitionColumn = {"__partition_time__"};
    // 投递模式,当前支持stream和interval两种模式。
    //stream表示实时投递,即时读取Logstore中的数据,并投递到MaxCompute。interval表示批投递,即读取Logstore中早于当前时间5分钟~10分钟之间的数据,并投递到MaxCompute中。
    private static final String mode = "stream";

    private static void creatOdpsSinkJob(Client client) throws LogException {
        Export export = new Export();
        export.setDisplayName(displayName);
        export.setDescription(description);
        export.setName(jobName);
        ExportConfiguration configuration = new ExportConfiguration();
        configuration.setLogstore(logstore);
        configuration.setVersion("v2.0");
        configuration.setAccessKeyId(accessKeyId);
        configuration.setAccessKeySecret(accessKeySecret);
        configuration.setRoleArn(roleArn);
        configuration.setFromTime(fromtime);
        configuration.setToTime(totime);
        ExportGeneralSink sink = new ExportGeneralSink();
        sink.put("type", type);
        sink.put("odpsProject", odpsProject);
        sink.put("odpsTable", odpsTable);
        sink.put("odpsEndpoint", odpsEndpoint);
        sink.put("odpsTunnelEndpoint", odpsTunnelEndpoint);
        sink.put("partitionTimeFormat", partitionTimeFormat);
        sink.put("timeZone", timeZone);
        sink.put("filterInvalid", filterInvalid);
        sink.put("odpsRolearn", odpsRolearn);
        sink.put("fields", fields);
        sink.put("partitionColumn", partitionColumn);
        sink.put("mode", mode);
        configuration.setSink(sink);
        export.setConfiguration(configuration);
        JobSchedule jobSchedule = new JobSchedule();
        jobSchedule.setType(JobScheduleType.RESIDENT);
        export.setSchedule(jobSchedule);
        CreateExportRequest createExportRequest = new CreateExportRequest(projectName, export);
        client.createExport(createExportRequest);
    }

    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        creatOdpsSinkJob(client);
    }
}

更新MaxCompute投递任务

本示例展示如何创建一个UpdateOdpsSinkDemo.java文件,用于更新MaxCompute投递任务并重启任务使其生效。示例如下:

package demo;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.RestartExportRequest;

public class UpdateOdpsSinkDemo {
    // 设置日志服务的服务接入点。此处以杭州为例,其他地域请根据实际情况填写。
    private static final String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // RAM角色ARN,请您根据实际情况填写。请从RAM控制台目标角色中获取。
    private static final String roleArn = "acs:ram::111111";
    // Project名称,请您根据实际情况填写。请从已创建项目中获取Project名称。
    private static final String projectName = "ali-test-project";
    // Logstore名称,请您根据实际情况填写。请从已创建项目中获取Logstore名称。
    private static final String logstore = "ali-test-logstore";
    // 投递任务显示名称。
    private static final String displayName = "ali-test-displayname";
    // 投递任务名称。
    private static final String jobName = "ali-test-job-name";
    // 投递任务描述信息。
    private static final String description = "This is a MaxCompute Shipper task.";
    // 投递开始时间,值为1表示从最初的历史数据开始,也可以指定某时间开始。
    private static final int fromtime = 1;
    // 投递结束时间,值为0表示只要有数据就会一直投递,也可以指定某时间结束。
    private static final int totime = 0;
    // 投递类型。AliyunODPS表示投递到MaxCompute。
    private static final String type = "AliyunODPS";
    // MaxCompute的项目名称,请您根据实际情况填写。您可以从MaxCompute控制台中获取目标Project名称。
    private static final String odpsProject = "yourodpsProjectName";
    // MaxCompute的表名称,请您根据实际情况填写。您可以从MaxCompute控制台中获取已创建好的表名称。
    private static final String odpsTable = "yourodpsTableName";
    // 此处以杭州为例,其他地域请根据实际情况填写,也可以通过日志服务控制台的MaxCompute投递配置页面查看MaxCompute Endpoint。
    private static final String odpsEndpoint = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api";
    private static final String odpsTunnelEndpoint = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
    // 以时间为分区时格式。
    private static final String partitionTimeFormat = "%Y_%m_%d_%H_%M";
    private static final String timeZone = "+0800";
    // 是否过滤无效字段。
    private static final Boolean filterInvalid = true;
    // RAM角色ARN,请您根据实际情况填写。请从RAM控制台目标角色中获取。
    private static final String odpsRolearn = "acs:ram::11111";
    // 日志服务的日志字段名。
    private static final String[] fields = {"11111","22222","33333"};
    // 分区列名。
    private static final String[] partitionColumn = {"__partition_time__"};
    // 投递模式,当前支持stream和interval两种模式。
    //stream表示实时投递,即时读取Logstore中的数据,并投递到MaxCompute。interval表示批投递,即读取Logstore中早于当前时间5分钟~10分钟之间的数据,并投递到MaxCompute中。
    private static final String mode = "stream";

    private static void updateWithRestartOdpsSinkJob(Client client) throws LogException {
        Export export = new Export();
        export.setDisplayName(displayName);
        export.setDescription(description);
        export.setName(jobName);
        ExportConfiguration configuration = new ExportConfiguration();
        configuration.setLogstore(logstore);
        configuration.setVersion("v2.0");
        configuration.setAccessKeyId(accessKeyId);
        configuration.setAccessKeySecret(accessKeySecret);
        configuration.setRoleArn(roleArn);
        configuration.setFromTime(fromtime);
        configuration.setToTime(totime);
        ExportGeneralSink sink = new ExportGeneralSink();
        sink.put("type", type);
        sink.put("odpsProject", odpsProject);
        sink.put("odpsTable", odpsTable);
        sink.put("odpsEndpoint", odpsEndpoint);
        sink.put("odpsTunnelEndpoint", odpsTunnelEndpoint);
        sink.put("partitionTimeFormat", partitionTimeFormat);
        sink.put("timeZone", timeZone);
        sink.put("filterInvalid", filterInvalid);
        sink.put("odpsRolearn", odpsRolearn);
        sink.put("fields", fields);
        sink.put("partitionColumn", partitionColumn);
        sink.put("mode", mode);
        configuration.setSink(sink);
        export.setConfiguration(configuration);
        JobSchedule jobSchedule = new JobSchedule();
        jobSchedule.setType(JobScheduleType.RESIDENT);
        export.setSchedule(jobSchedule);
        RestartExportRequest restartExportRequest = new RestartExportRequest(projectName, export);
        client.restartExport(restartExportRequest);
    }

    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        updateWithRestartOdpsSinkJob(client);
    }
}

删除OSS/MaxCompute投递任务

本示例展示如何创建一个DeleteSinkDemo.java文件,用于删除OSS或者MaxCompute投递任务。示例如下:

package demo;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;
import com.aliyun.openservices.log.response.*;

public class DeleteSinkDemo {
    // 设置日志服务的服务接入点。此处以杭州为例,其他地域请根据实际情况填写。
    private static final String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    // Project名称,请您根据实际情况填写。请从已创建项目中获取Project名称。
    private static final String project = "ali-test-project";
    // 指定要删除投递任务名称。
    private static final String jobName = "ali-test-job-name";
    private static void deleteSinkExportJob(Client client) throws LogException {
        DeleteExportResponse deleteExportResponse = client.deleteExport(new DeleteExportRequest(project, jobName));
        System.out.println(JSONObject.toJSONString(deleteExportResponse));
    }

    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accessKeyId, accessKeySecret);
        deleteSinkExportJob(client);
    }
}

相关文档

  • 本页导读 (1)
文档反馈