更新OSS导入任务

更新时间:2025-01-20 06:35:57

调用UpdateOSSIngestion接口更新OSS导入任务。

前提条件

  • 开通日志服务

  • 已安装Python、Python开发环境(例如PyCharm)和Python的包管理工具pip

    • 日志服务Python SDK新版支持Python3.7及以上版本。

    • 您可以执行python -V命令检查已安装的Python版本。

    • 您可以执行pip3 -V命令检查您已安装的pip版本。

  • 已安装日志服务Python SDK新版。

    • 在命令行工具中,执行如下命令完成安装。

      pip install alibabacloud_sls20201230
    • 安装SDK新版后,执行如下命令进行验证。

      pip show alibabacloud_sls20201230
  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见Linux、macOSWindows系统中配置环境变量

参数说明

def update_ossingestion(
        self,
        project: str,
        oss_ingestion_name: str,
        request: sls_20201230_models.UpdateOSSIngestionRequest,
) -> sls_20201230_models.UpdateOSSIngestionResponse:

请求参数

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

project

String

Project名称。

ali-test-project

oss_ingestion_name

String

OSS导入任务名称。

ingest-oss-123456

request

object

更新OSS导入任务配置。

-

request参数说明:

class UpdateOSSIngestionRequest(TeaModel):
    def __init__(
        self,
        configuration: OSSIngestionConfiguration = None,
        description: str = None,
        display_name: str = None,
        schedule: Schedule = None,
    ):

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

configuration

object

OSS导入配置。

-

description

String

任务描述。

这是一个更新描述。

display_name

String

显示名称。

OSS导入测试。

schedule

object

调度类型,一般默认不需要填写。

-

schedule参数说明

class Schedule(TeaModel):
    def __init__(
        self,
        cron_expression: str = None,
        delay: int = None,
        interval: str = None,
        run_immediately: bool = None,
        time_zone: str = None,
        type: str = None,
    ):

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

type

String

schedule type。支持枚举值:FixedRateCron

FixedRate

cron_expression

String

typeFixRate时,无需填写,当typeCron时,填写Cron表达式

0/5 * * * *

run_immediately

bool

任务是否立即执行。

True

time_zone

String

Cron表达式所在时区,默认为空,表示东八区。

+0800

delay

int

定时类任务延迟执行(单位是秒:s)

4

interval

String

typeFixRate时,interval参数表示执行的间隔,不能大于30天。支持 d(天)、h(小时)、m(分钟)、s(秒)四种单位。例如3h。不支持多种单位组合,例如不支持3h5m。

1m

configuration参数

class OSSIngestionConfiguration(TeaModel):
    def __init__(
        self,
        logstore: str = None,
        source: OSSIngestionConfigurationSource = None,
    ):

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

logstore

String

logstore名称。

test-logstore

source

object

OSS导入配置。

-

source参数说明:

class OSSIngestionConfigurationSource(TeaModel):
    def __init__(
        self,
        bucket: str = None,
        compression_codec: str = None,
        encoding: str = None,
        end_time: int = None,
        endpoint: str = None,
        format: Dict[str, Any] = None,
        interval: str = None,
        pattern: str = None,
        prefix: str = None,
        restore_object_enabled: bool = None,
        role_arn: str = None,
        start_time: int = None,
        time_field: str = None,
        time_format: str = None,
        time_pattern: str = None,
        time_zone: str = None,
        use_meta_index: bool = None,
    ):

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

bucket

String

oss bucket名称。

ossbucket

compression_code

String

压缩类型。

none

encoding

String

编码类型。

UTF-8

endpoint

String

oss endpoint。

oss-cn-hangzhou-internal.aliyuncs.com

format

Dictionary

OSS 数据格式。

  • 单行文本日志:{"type":"Line"}

  • CSV:

    {
      "type": "CSV",
      "fieldDelimiter": ",",   //分隔符
      "quoteChar": "\"",      //引号
      "escapeChar": "\\",    //转义符
      "firstRowAsHeader": true,    //首行是否作为字段名称
      "maxLines": 1,    //日志最大跨行数
      "skipLeadingRows": 0   //跳过行数
    }
    
    

  • 单行 JSON:{"type": "JSON"}

  • 跨行文本日志:

    {
      "type": "Multiline",
      "match": "after", //正字匹配位置
      "pattern": "\\d+", //正则表达式
      "maxLines": 10 //最大行数
    }
    
  • ORC:{"type": "ORC"}

  • Parquet:{"type": "Parquet"}

  • 阿里云 OSS 访问日志:{"type": "ossAccessLog"}

  • 阿里云 CDN 下载日志:{"type": "cdnDownloadedLog"}

{"type": "Line"}

interval

String

检查新文件周期。

30s

pattern

String

文件路径正则过滤。

.*

prefix

String

文件路径前缀过滤。

prefix

restore_object_enabled

bool

导入归档文件。

true

role_arn

String

roleArn。

acs:ram::12345:role/aliyunlogdefaultrole

start_time

int

某个时间点后修改过的文件。

1714274081

time_field

String

提取时间字段。

__time__

time_format

String

时间字段格式。

yyyy-MM-dd HH:mm:ss

time_pattern

String

提取时间正则。

[0-9]{0,2}\/[0-9a-zA-Z]+\/[0-9:,]+

time_zone

String

时间字段分区。

GMT+08:00

use_meta_index

bool

使用OSS元数据索引。

False

返回参数

返回参数说明,请参见UpdateOSSIngestion - 更新OSS导入任务

示例代码

import os

from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sls20201230 import models as sls_20201230_models
from alibabacloud_tea_util.client import Client as UtilClient


def main():
    config = open_api_models.Config(
        # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
    # 日志服务的服务接入点
    config.endpoint = "cn-hangzhou.log.aliyuncs.com"
    project_name = "ali-test-project"
    oss_ingestion_name = "ingest-oss-123456"
    client = Sls20201230Client(config)

    # 创建OSSIngestionConfigurationSource的实例
    source = sls_20201230_models.OSSIngestionConfigurationSource(
        bucket="ali-shanghai001",
        compression_codec="none",
        encoding="UTF-8",
        end_time=None,
        endpoint="oss-cn-hangzhou-internal.aliyuncs.com",
        format={"type": "Line"},
        interval="30s",
        use_meta_index=False,
    )
    # 创建OSSIngestionConfiguration的实例
    oss_config = sls_20201230_models.OSSIngestionConfiguration(
        logstore="test-logtore",
        source=source
    )

    # 创建UpdateOSSIngestionRequest的实例
    update_ossingestions_request = sls_20201230_models.UpdateOSSIngestionRequest(
        configuration=oss_config,
        description="这是一个更新描述",
        display_name="OSS导入测试"
    )

    try:
        update_ossingestions_response = client.update_ossingestion(project_name, oss_ingestion_name,
                                                                   update_ossingestions_request)
        print(update_ossingestions_response)
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    main()

相关文档

  • 本页导读 (1)
  • 前提条件
  • 参数说明
  • 请求参数
  • 返回参数
  • 示例代码
  • 相关文档