管道管理

管道是任务的队列。当您提交异步任务后,任务会进入管道中进行排队,根据优先级和提交顺序依次被调度执行。本文提供了Java SDK V2.0管道相关功能的API调用示例,包含添加管道、更新管道、删除管道、查询管道。

前提条件

使用前请先初始化客户端,详细操作请参见初始化

创建管道

调用AddPipeline接口,完成创建管道功能。

说明
  • 管道类型选择说明,请参见创建管道

  • 如果您在添加管道时遇到 "The resource "Pipeline" quota has been used up"错误,代表您的管道配额已用完,可以通过提交工单申请管道数量的配额。

  • 关于管道消息NotifyConfig配置,请参见NotifyConfig详情

  • 管道支持队列或主题模式的消息配置。详细说明,请参见设置消息通知

package com.aliyun.sample;

import com.aliyun.tea.*;

public class Sample {

    /**
     * <b>description</b> :
     * <p>使用AK&amp;SK初始化账号Client</p>
     * @return Client
     *
     * @throws Exception
     */
    public static com.aliyun.mts20140618.Client createClient() throws Exception {

        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.endpoint = "mts.cn-qingdao.aliyuncs.com";
        return new com.aliyun.mts20140618.Client(config);
    }

    public static void main(String[] args_) throws Exception {
        java.util.List<String> args = java.util.Arrays.asList(args_);
        com.aliyun.mts20140618.Client client = Sample.createClient();
        com.aliyun.mts20140618.models.AddPipelineRequest addPipelineRequest = new com.aliyun.mts20140618.models.AddPipelineRequest()
                //管道名称
                .setName("test-pipeline")
                //管道类型
                .setSpeed("Standard")
                //管道级别
                .setSpeedLevel(1L)
                //MNS通知配置
                .setNotifyConfig("{\"Topic\":\"mts-topic-1\"}")
                //角色
                .setRole("AliyunMTSDefaultRole");
        com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
        try {
            // 复制代码运行请自行打印 API 的返回值
            client.addPipelineWithOptions(addPipelineRequest, runtime);
        } catch (TeaException error) {
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
}

更新管道配置

调用UpdatePipeline接口,完成更新管道配置功能。

package com.aliyun.sample;

import com.aliyun.tea.*;

public class Sample {

    /**
     * <b>description</b> :
     * <p>使用AK&amp;SK初始化账号Client</p>
     * @return Client
     *
     * @throws Exception
     */
    public static com.aliyun.mts20140618.Client createClient() throws Exception {

        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.endpoint = "mts.cn-qingdao.aliyuncs.com";
        return new com.aliyun.mts20140618.Client(config);
    }

    public static void main(String[] args_) throws Exception {
        java.util.List<String> args = java.util.Arrays.asList(args_);
        com.aliyun.mts20140618.Client client = Sample.createClient();
        com.aliyun.mts20140618.models.UpdatePipelineRequest updatePipelineRequest = new com.aliyun.mts20140618.models.UpdatePipelineRequest()
                //需要更新的管道ID
                .setPipelineId("d1ce4d3efcb549419193f50f1fcd****")
                //修改后的管道名称
                .setName("example-pipeline-****")
                //修改后的管道状态
                .setState("Paused")
                //阿里云消息服务配置
                .setNotifyConfig("{\"Topic\":\"example-topic-****\"}")
                //当前RAM用户关联的角色
                .setRole("AliyunMTSDefaultRole");
        com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
        try {
            // 复制代码运行请自行打印 API 的返回值
            client.updatePipelineWithOptions(updatePipelineRequest, runtime);
        } catch (TeaException error) {
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
}

删除管道

调用DeletePipeline接口,完成删除管道。

package com.aliyun.sample;

import com.aliyun.tea.*;

public class Sample {

    /**
     * <b>description</b> :
     * <p>使用AK&amp;SK初始化账号Client</p>
     * @return Client
     *
     * @throws Exception
     */
    public static com.aliyun.mts20140618.Client createClient() throws Exception {

        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.endpoint = "mts.cn-qingdao.aliyuncs.com";
        return new com.aliyun.mts20140618.Client(config);
    }

    public static void main(String[] args_) throws Exception {
        java.util.List<String> args = java.util.Arrays.asList(args_);
        com.aliyun.mts20140618.Client client = Sample.createClient();
        com.aliyun.mts20140618.models.DeletePipelineRequest deletePipelineRequest = new com.aliyun.mts20140618.models.DeletePipelineRequest()
                //需要删除的管道ID
                .setPipelineId("d1ce4d3efcb549419193f50f1fcd****");
        com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
        try {
            // 复制代码运行请自行打印 API 的返回值
            client.deletePipelineWithOptions(deletePipelineRequest, runtime);
        } catch (TeaException error) {
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
}

通过ID查询管道

调用QueryPipelineList接口,完成通过ID查询管道。

package com.aliyun.sample;

import com.aliyun.tea.*;

public class Sample {

    /**
     * <b>description</b> :
     * <p>使用AK&amp;SK初始化账号Client</p>
     * @return Client
     *
     * @throws Exception
     */
    public static com.aliyun.mts20140618.Client createClient() throws Exception {

        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.endpoint = "mts.cn-qingdao.aliyuncs.com";
        return new com.aliyun.mts20140618.Client(config);
    }

    public static void main(String[] args_) throws Exception {
        java.util.List<String> args = java.util.Arrays.asList(args_);
        com.aliyun.mts20140618.Client client = Sample.createClient();
        com.aliyun.mts20140618.models.QueryPipelineListRequest queryPipelineListRequest = new com.aliyun.mts20140618.models.QueryPipelineListRequest()
                //需要查询的管道ID列表
                .setPipelineIds("d1ce4d3efcb549419193f50f1fcd****,72dfa5e679ab4be9a3ed9974c736****");
        com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
        try {
            // 复制代码运行请自行打印 API 的返回值
            client.queryPipelineListWithOptions(queryPipelineListRequest, runtime);
        } catch (TeaException error) {
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
}

通过状态查询管道

调用SearchPipeline接口,完成通过状态查询管道。

// This file is auto-generated, don't edit it. Thanks.
package com.aliyun.sample;

import com.aliyun.tea.*;

public class Sample {

    /**
     * <b>description</b> :
     * <p>使用AK&amp;SK初始化账号Client</p>
     * @return Client
     *
     * @throws Exception
     */
    public static com.aliyun.mts20140618.Client createClient() throws Exception {

        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.endpoint = "mts.cn-qingdao.aliyuncs.com";
        return new com.aliyun.mts20140618.Client(config);
    }

    public static void main(String[] args_) throws Exception {
        java.util.List<String> args = java.util.Arrays.asList(args_);
        com.aliyun.mts20140618.Client client = Sample.createClient();
        com.aliyun.mts20140618.models.SearchPipelineRequest searchPipelineRequest = new com.aliyun.mts20140618.models.SearchPipelineRequest()
                //需要搜索的管道状态
                .setState("Paused")
                //分页查询时设置的每页行数大小
                .setPageSize(10L)
                //当前页号
                .setPageNumber(1L);
        com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
        try {
            // 复制代码运行请自行打印 API 的返回值
            client.searchPipelineWithOptions(searchPipelineRequest, runtime);
        } catch (TeaException error) {
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }
}

相关文档