调用CreateOSSIngestion接口创建OSS导入任务。
前提条件
已开通日志服务。
已安装Python、Python开发环境(例如PyCharm)和Python的包管理工具pip。
日志服务Python SDK新版支持Python3.7及以上版本。
您可以执行
python -V命令检查已安装的Python版本。您可以执行
pip3 -V命令检查您已安装的pip版本。
已安装日志服务Python SDK新版。
在命令行工具中,执行如下命令完成安装。
pip install alibabacloud_sls20201230安装SDK新版后,执行如下命令进行验证。
pip show alibabacloud_sls20201230
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统中配置环境变量。
参数说明
def create_ossingestion(
        self,
        project: str,
        request: sls_20201230_models.CreateOSSIngestionRequest,
) -> sls_20201230_models.CreateOSSIngestionResponse:请求参数
名称  | 类型  | 是否必填  | 描述  | 示例值  | 
project  | String  | 是  | Project名称。  | ali-test-project  | 
request  | object  | 是  | OSS导入任务配置。  | -  | 
request参数说明:
class CreateOSSIngestionRequest(TeaModel):
    def __init__(
        self,
        configuration: OSSIngestionConfiguration = None,
        description: str = None,
        display_name: str = None,
        name: str = None,
        schedule: Schedule = None,
    ):名称  | 类型  | 是否必填  | 描述  | 示例值  | 
configuration  | object  | 是  | OSS导入配置。  | -  | 
description  | String  | 否  | 任务描述。  | OSS导入任务。  | 
display_name  | String  | 是  | 显示名称。  | OSS导入测试。  | 
name  | String  | 是  | OSS导入任务名称。  | ingest-oss-123456  | 
schedule  | object  | 否  | 调度类型,一般默认不需要填写。  | -  | 
schedule参数说明
class Schedule(TeaModel):
    def __init__(
        self,
        cron_expression: str = None,
        delay: int = None,
        interval: str = None,
        run_immediately: bool = None,
        time_zone: str = None,
        type: str = None,
    ):名称  | 类型  | 是否必填  | 描述  | 示例值  | 
type  | String  | 是  | schedule type。支持枚举值:  | FixedRate  | 
cron_expression  | String  | 否  | 当type为  | 0/5 * * * *  | 
run_immediately  | bool  | 否  | 任务是否立即执行。  | True  | 
time_zone  | String  | 否  | Cron表达式所在时区,默认为空,表示东八区。  | +0800  | 
delay  | int  | 否  | 定时类任务延迟执行(单位是秒:s)  | 4  | 
interval  | String  | 否  | 当type为  | 1m  | 
configuration参数
class OSSIngestionConfiguration(TeaModel):
    def __init__(
        self,
        logstore: str = None,
        source: OSSIngestionConfigurationSource = None,
    ):名称  | 类型  | 是否必填  | 描述  | 示例值  | 
logstore  | String  | 是  | logstore名称。  | test-logstore  | 
source  | object  | 是  | OSS导入配置。  | -  | 
source参数说明:
class OSSIngestionConfigurationSource(TeaModel):
    def __init__(
        self,
        bucket: str = None,
        compression_codec: str = None,
        encoding: str = None,
        end_time: int = None,
        endpoint: str = None,
        format: Dict[str, Any] = None,
        interval: str = None,
        pattern: str = None,
        prefix: str = None,
        restore_object_enabled: bool = None,
        role_arn: str = None,
        start_time: int = None,
        time_field: str = None,
        time_format: str = None,
        time_pattern: str = None,
        time_zone: str = None,
        use_meta_index: bool = None,
    ):名称  | 类型  | 是否必填  | 描述  | 示例值  | 
bucket  | String  | 是  | oss bucket名称。  | ossbucket  | 
compression_code  | String  | 是  | 压缩类型。  | none  | 
encoding  | String  | 是  | 编码类型。  | UTF-8  | 
endpoint  | String  | 是  | oss-cn-hangzhou-internal.aliyuncs.com  | |
format  | Dictionary  | OSS 数据格式。 
 
 
  | {"type": "Line"}  | |
interval  | String  | 是  | 检查新文件周期。  | 30s  | 
pattern  | String  | 否  | 文件路径正则过滤。  | .*  | 
prefix  | String  | 否  | 文件路径前缀过滤。  | prefix  | 
restore_object_enabled  | bool  | 否  | 导入归档文件。  | true  | 
role_arn  | String  | 否  | roleArn,查看RAM角色的ARN。  | acs:ram::12345:role/aliyunlogdefaultrole  | 
start_time  | int  | 否  | 某个时间点后修改过的文件。  | 1714274081  | 
time_field  | String  | 否  | 提取时间字段。  | __time__  | 
time_format  | String  | 否  | 时间字段格式。  | yyyy-MM-dd HH:mm:ss  | 
time_pattern  | String  | 否  | 提取时间正则。  | [0-9]{0,2}\/[0-9a-zA-Z]+\/[0-9:,]+  | 
time_zone  | String  | 否  | 时间字段分区。  | GMT+08:00  | 
use_meta_index  | bool  | 是  | 使用OSS元数据索引。  | False  | 
返回参数
返回参数说明,请参见CreateOSSIngestion - 创建OSS导入任务。
示例代码
import os
from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sls20201230 import models as sls_20201230_models
from alibabacloud_tea_util.client import Client as UtilClient
def main():
    config = open_api_models.Config(
        # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
    # 日志服务的服务接入点
    config.endpoint = "cn-hangzhou.log.aliyuncs.com"
    project_name = "ali-test-project"
    client = Sls20201230Client(config)
    # 创建OSSIngestionConfigurationSource的实例
    source = sls_20201230_models.OSSIngestionConfigurationSource(
        bucket="oss-test",
        compression_codec="none",
        encoding="UTF-8",
        end_time=None,
        endpoint="oss-cn-hangzhou-internal.aliyuncs.com",
        format={"type": "Line"},
        interval="30s",
        use_meta_index=False,
    )
    # 创建OSSIngestionConfiguration的实例
    oss_config = sls_20201230_models.OSSIngestionConfiguration(
        logstore="test-logstore",
        source=source
    )
    # 创建CreateOSSIngestionRequest的实例
    create_ossingestions_request = sls_20201230_models.CreateOSSIngestionRequest(
        configuration=oss_config,
        description="创建OSS导入任务",
        display_name="OSS导入测试",
        name="ingest-oss-123456"
    )
    try:
        create_ossingestions_response = client.create_ossingestion(project_name, create_ossingestions_request)
        print(create_ossingestions_response)
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
    main()