文档

管道管理

更新时间:

管道是任务的队列。当您提交异步任务后,任务会进入管道中进行排队,根据优先级和提交顺序依次被调度执行。本文提供了Java SDK管道相关功能的API调用示例,包含添加管道、更新管道、删除管道、查询管道。

前提条件

使用前请先初始化客户端,详细操作请参见初始化

创建管道

调用AddPipeline接口,完成创建管道功能。

说明
  • 管道类型选择说明,请参见创建管道

  • 如果您在添加管道时遇到 "The resource "Pipeline" quota has been used up"错误,代表您的管道配额已用完,可以通过提交工单申请管道数量的配额。

  • 关于管道消息NotifyConfig配置,请参见NotifyConfig详情

  • 管道支持队列或主题模式的消息配置。详细说明,请参见设置消息通知

/**
 * 创建管道
 * @param client
 * @return
 * @throws Exception
 */
public static AddPipelineResponse addPipeline(DefaultAcsClient client) throws Exception {

    AddPipelineRequest request = new AddPipelineRequest();
    request.setName("test-pipeline");
    //管道类型
    request.setSpeed("Standard");
    //request.setSpeedLevel(1l);
    //request.setNotifyConfig("{\"Topic\":\"mts-topic-1\"}");

    return client.getAcsResponse(request);
}

更新管道配置

调用UpdatePipeline接口,完成更新管道配置功能。

/**
 * 更新管道配置
 * @param client
 * @return
 * @throws Exception
 */
public static UpdatePipelineResponse updatePipeline(DefaultAcsClient client) throws Exception {

    UpdatePipelineRequest request = new UpdatePipelineRequest();
    //管道ID, 可以在MPS控制台 > 全局设置 > 管道及回调查看,或通过addPipeline创建
    request.setPipelineId("e03549d796fad2bcb32a****");
    request.setName("update name");
    //管道状态  开启Active   暂停Paused
    request.setState("Active");
    //request.setNotifyConfig("{\"Topic\":\"mts-topic-1\"}");

    return client.getAcsResponse(request);
}

删除管道

调用DeletePipeline接口,完成删除管道。

/**
 * 删除管道
 * @param client
 * @return
 * @throws Exception
 */
public static DeletePipelineResponse deletePipeline(DefaultAcsClient client) throws Exception {

    DeletePipelineRequest request = new DeletePipelineRequest();
    //管道ID, 可以在MPS控制台 > 全局设置 > 管道及回调查看,或通过addPipeline创建
    request.setPipelineId("e03549d796fadd82bcb32a****");

    return client.getAcsResponse(request);
}

通过ID查询管道

调用QueryPipelineList接口,完成通过ID查询管道。

/**
 * 通过ID 查询管道
 * @param client
 * @return
 * @throws Exception
 */
public static QueryPipelineListResponse queryPipelineList(DefaultAcsClient client) throws Exception {

    QueryPipelineListRequest request = new QueryPipelineListRequest();
    //管道ID, 可以在MPS控制台 > 全局设置 > 管道及回调查看,或通过addPipeline创建
    //支持查询多个管道ID,一次最多查询10个,使用半角逗号(,)分隔
    request.setPipelineIds("e03549d796fad2bcb32a****");

    return client.getAcsResponse(request);
}

通过状态查询管道

调用SearchPipeline接口,完成通过状态查询管道。

/**
 * 通过状态查询管道
 * @param client
 * @return
 * @throws Exception
 */
public static SearchPipelineResponse searchPipeline(DefaultAcsClient client) throws Exception {

    SearchPipelineRequest request = new SearchPipelineRequest();
    //管道状态  全部管道All  开启状态Active   暂停状态Paused
    request.setState("All");

    return client.getAcsResponse(request);
}

完整代码

import com.aliyun.mps.utils.InitClient;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.mts.model.v20140618.*;

/**
 * *****   使用须知     ******
 * 本demo为基本的管道相关接口demo示例
 *
 * *****   方法介绍     ******
 * addPipeline   添加管道 https://help.aliyun.com/document_detail/602848.html
 * updatePipeline   更新管道配置 https://help.aliyun.com/document_detail/602845.html
 * deletePipeline   删除管道 https://help.aliyun.com/document_detail/602849.html
 * queryPipelineList   通过ID查询管道 https://help.aliyun.com/document_detail/602846.html
 * searchPipeline    通过状态查询管道 https://help.aliyun.com/document_detail/602847.html
 *
 * 管道通知NotifyConfig 请参考 https://help.aliyun.com/document_detail/29253.htm#section-ayf-324-y2b
 *
 * *****   main方法     ******
 * main方法中仅以addPipeline和updatePipeline作为调用示例, 如需调用其他方法请自行替换response
 */
public class Pipeline {

    public static void main(String[] args) throws ClientException {

        //初始化调用 client
        DefaultAcsClient client = InitClient.initMpsClient();

        AddPipelineResponse response;
        try {
            response = addPipeline(client);
            System.out.println("RequestId is:" + response.getRequestId());
            System.out.println("pipelineId is:" + response.getPipeline().getId());
        } catch (Exception e) {
            e.printStackTrace();
        }

//        UpdatePipelineResponse response;
//        try {
//            response = updatePipeline(client);
//            System.out.println("RequestId is:" + response.getRequestId());
//            System.out.println("Pipeline is:" + JSON.toJSON(response.getPipeline()));
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

    }

    /**
     * 创建管道
     * @param client
     * @return
     * @throws Exception
     */
    public static AddPipelineResponse addPipeline(DefaultAcsClient client) throws Exception {

        AddPipelineRequest request = new AddPipelineRequest();
        request.setName("test-pipeline");
        //管道类型
        request.setSpeed("Standard");
        //request.setSpeedLevel(1l);
        //request.setNotifyConfig("{\"Topic\":\"mts-topic-1\"}");

        return client.getAcsResponse(request);
    }

    /**
     * 更新管道配置
     * @param client
     * @return
     * @throws Exception
     */
    public static UpdatePipelineResponse updatePipeline(DefaultAcsClient client) throws Exception {

        UpdatePipelineRequest request = new UpdatePipelineRequest();
        //管道ID, 可以在MPS控制台 > 全局设置 > 管道及回调查看,或通过addPipeline创建
        request.setPipelineId("e03549796fad9d72bcb32a****");
        request.setName("update name");
        //管道状态  开启Active   暂停Paused
        request.setState("Active");
        //request.setNotifyConfig("{\"Topic\":\"mts-topic-1\"}");

        return client.getAcsResponse(request);
    }

    /**
     * 删除管道
     * @param client
     * @return
     * @throws Exception
     */
    public static DeletePipelineResponse deletePipeline(DefaultAcsClient client) throws Exception {

        DeletePipelineRequest request = new DeletePipelineRequest();
        //管道ID, 可以在MPS控制台 > 全局设置 > 管道及回调查看,或通过addPipeline创建
        request.setPipelineId("e03549796fad9d7d82bcb32a****");

        return client.getAcsResponse(request);
    }

    /**
     * 通过ID 查询管道
     * @param client
     * @return
     * @throws Exception
     */
    public static QueryPipelineListResponse queryPipelineList(DefaultAcsClient client) throws Exception {

        QueryPipelineListRequest request = new QueryPipelineListRequest();
        //管道ID, 可以在MPS控制台 > 全局设置 > 管道及回调查看,或通过addPipeline创建
        //支持查询多个管道Id 一次最多查询10个,使用半角逗号(,)分隔
        request.setPipelineIds("e03549d796fad2bcb32a****");

        return client.getAcsResponse(request);
    }

    /**
     * 通过状态查询管道
     * @param client
     * @return
     * @throws Exception
     */
    public static SearchPipelineResponse searchPipeline(DefaultAcsClient client) throws Exception {

        SearchPipelineRequest request = new SearchPipelineRequest();
        //管道状态  全部管道All  开启状态Active   暂停状态Paused
        request.setState("All");

        return client.getAcsResponse(request);
    }
}
            

相关文档

  • 本页导读 (1)
文档反馈