测试方法

本文介绍MSE XXL-JOB性能测试方法。

测试环境准备

  • 前往MSE控制台购买XXL-JOB实例。

  • 部署测试Demo应用程序,接入任务调度服务,详情可参见10分钟快速体验

  • 参考示例代码,批量创建测试任务。

  • 等待调度服务自动运行,也可在任务管理菜单批量启用/禁用任务控制运行。

  • 观察实例基础信息中的调度监控大盘。

示例代码

  • 创建任务:参考如下代码通过API批量创建压测任务。

  • 根据实际情况修改如下参数。

    说明

    通过调整时间表达式与创建的任务数,控制任务并发总量,以控制测试负载压力。

    • access_key_id:云账号 AccessKeyID。

    • access_key_secret:云账号 AccessKeySecret。

    • cluster_id:集群ID。

    • app_name:应用名称。

    • time_expression:时间表达式。

    • task_count:创建的任务数。

    • task_name_prefix:任务名前缀。

    import os
    import sys
    
    from typing import List
    
    from alibabacloud_schedulerx320240624.client import Client as SchedulerX320240624Client
    from alibabacloud_credentials.client import Client as CredentialClient
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_schedulerx320240624 import models as scheduler_x320240624_models
    from alibabacloud_tea_util import models as util_models
    from alibabacloud_tea_util.client import Client as UtilClient
    
    
    class Sample:
        def __init__(self):
            pass
    
        @staticmethod
        def create_client() -> SchedulerX320240624Client:
            """
            使用凭据初始化账号Client
            @return: Client
            @throws Exception
            """
            # 工程代码建议使用更安全的无AK方式,凭据配置方式请参见:https://help.aliyun.com/document_detail/378659.html。             
            config = open_api_models.Config(
                access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
                access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
            )
            # Endpoint 请参考 https://api.aliyun.com/product/SchedulerX3
            config.endpoint = f'schedulerx3.cn-hangzhou.aliyuncs.com'
            return SchedulerX320240624Client(config)
    
        @staticmethod
        def create_multiple_jobs(
            job_count: int,
            base_name: str = 'test-job'
        ) -> None:
            """
            创建多个任务
            @param job_count: 要创建的任务数量
            @param base_name: 任务名称前缀
            """
            client = Sample.create_client()
            runtime = util_models.RuntimeOptions()
            
            for i in range(job_count):
                job_name = f'{base_name}{i:03d}'  # 格式化任务名称,如 test-job000, test-job001...
                create_job_request = scheduler_x320240624_models.CreateJobRequest(
                    cluster_id='YOUR_CLUSTER_ID',
                    app_name='YOUR_APP_NAME',
                    name=job_name,
                    job_type='script_python',
                    route_strategy=1,
                    job_handler='testJobVoidHandler',
                    time_type=1,
                    time_expression='*/1 * * * * ?'
                )
                
                try:
                    # 复制代码运行请自行打印 API 的返回值
                    client.create_job_with_options(create_job_request, runtime)
                    print(f"成功创建任务: {job_name}")
                except Exception as error:
                    # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                    # 错误 message
                    print(f"创建任务 {job_name} 失败: {error.message}")
                    # 诊断地址
                    if error.data and error.data.get("Recommend"):
                        print(f"诊断地址: {error.data.get('Recommend')}")
        
        @staticmethod
        def main(
            args: List[str],
        ) -> None:
            client = Sample.create_client()
            create_job_request = scheduler_x320240624_models.CreateJobRequest(
                cluster_id='YOUR_CLUSTER_ID',
                app_name='YOUR_APP_NAME',
                name='test-job091',
                job_type='script_python',
                route_strategy=1,
                job_handler='testJobVoidHandler',
                time_type=1,
                time_expression='*/1 * * * * ?'
            )
            runtime = util_models.RuntimeOptions()
            try:
                # 复制代码运行请自行打印 API 的返回值
                client.create_job_with_options(create_job_request, runtime)
            except Exception as error:
                # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                # 错误 message
                print(error.message)
                # 诊断地址
                print(error.data.get("Recommend"))
                UtilClient.assert_as_string(error.message)
    
        @staticmethod
        async def main_async(
            args: List[str],
        ) -> None:
            client = Sample.create_client()
            create_job_request = scheduler_x320240624_models.CreateJobRequest(
                cluster_id='YOUR_CLUSTER_ID',
                app_name='YOUR_APP_NAME',
                name='test-job091',
                job_type='script_python',
                route_strategy=1,
                job_handler='testJobVoidHandler',
                time_type=1,
                time_expression='*/1 * * * * ?'
            )
            runtime = util_models.RuntimeOptions()
            try:
                # 复制代码运行请自行打印 API 的返回值
                await client.create_job_with_options_async(create_job_request, runtime)
            except Exception as error:
                # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                # 错误 message
                print(error.message)
                # 诊断地址
                print(error.data.get("Recommend"))
                UtilClient.assert_as_string(error.message)
    
    
    if __name__ == '__main__':
        # 任务数与任务名前缀
        task_count = 2
        task_name_prefix = 'my-task-'
        Sample.create_multiple_jobs(task_count, task_name_prefix)