建一个包含自定义算子的任务

本文为您介绍如何在自动驾驶数据管理中编写一个用户自定义算子构建一个简单的自动驾驶云上数据预处理流程。

当平台内置的算子无法解决您的数据处理需求时,您需要自己开发一个算子(如天气识别算子)并打包成像,上传到自定义算子管理模块,点击上线后,该算子将会在任务配置的算子列表中出现,即可参与任务的编排。大致可分为以下两个步骤:

  1. 上传自定义算子。

  2. 创建一个包含自定义算子的任务。

视频教程

Step 1 上传自定义算子

自定义算子的上传分为以下三个步骤:

  • 在您的算子代码中引入平台提供的开发框架

  • 将您的算子打包成镜像并上传到阿里云ACR代码仓库

  • 自定义算子管理模块上传该镜像

以下为您详细介绍这三个步骤。

Step 1.1 引入平台提供的开发框架

1) 安装SDK

  • 项目根目录创建sdk目录,下载SDK文件并放置在该目录下:

    ali_autodrive-0.0.1.tar.gz

  • 安装模块:pip install sdk/ali_autodrive-0.0.1.tar.gz

2) 实现抽象类

  • 项目根目录创建模块(例如example),在example模块中创建数据处理类(如DataProcessor)

  • DataProcessor实现抽象方法DataProcessTaskAbstract,重写拆分方法和处理方法。

import json
import time

import av
import os.path

from abc import ABC
from ali_autodrive.parallel_compute.model.FileContent import FileContent
from ali_autodrive.parallel_compute.utils.tree_util import *
from ali_autodrive.parallel_compute.DataProcessTaskAbstract import DataProcessTaskAbstract

MAX_RECORD_NUM = 200


class DataProcessor(DataProcessTaskAbstract, ABC):
    def __init__(self):
        super(DataProcessor, self).__init__()

    def data_partition(self, context):
        self.get_logger().info("Data processor, data partition start.")
        file_tree_reader = FileTreeReader(self.get_file_tree())
        while file_tree_reader.finish is False:
            file_list = file_tree_reader.get_sub_node_file_list(1)
            for file in file_list:
                if "file_path" in file and file["file_path"].endswith('.mp4'):
                    self.save_data_partition([json.dumps(file)])

        self.get_logger().info("Data processor, data partition end.")

    def data_process(self, context):
        self.get_logger().info("Data processor, data process start.")

        # 视频抽帧
        local_file_list = get_sub_node_local_file_list(self.get_file_tree())
        for file_path in local_file_list:
            self.framing(file_path)

        self.get_logger().info("Data processor, data process end.")

    def framing(self, file_path):
        file_name = self.__get_file_name(file_path)
        container = av.open(file_path)
        stream = container.streams.video[0]
        stream.codec_context.skip_frame = 'NONREF'
        path = self.get_user_workspace() + "/img/" + file_name
        if not os.path.exists(path):
            os.makedirs(path)

        file_list = []
        timestamp_ns = time.time_ns()
        for frame in container.decode(stream):
            file_path = path + "/frame-%04d.jpg" % frame.index
            frame.to_image().save(file_path)

            # 文件打标
            file_tag = {}
            file_tag["timestamp_ns"] = timestamp_ns + int(frame.time * 1000000000)
            content = FileContent()
            content.file_tag = file_tag
            content.file_path = file_path

            if len(file_list) >= MAX_RECORD_NUM:
                self.save_data_partition(file_list)
                file_list = []
            else:
                file_list.append(json.dumps(content.__dict__))
        if len(file_list) > 0:
            self.save_data_partition(file_list)

    @staticmethod
    def __get_file_name(file_path):
        return file_path.split("/")[-1].split(".")[0]

Step 1.2 本地测试

1)创建配置文件

项目根目录创建配置文件(如config.ini)

[init]
#表示工作目录,替换为自己机器工作空间
workspace = /Users/icyore/workspace
#模拟OSS地址,替换为自己的测试文件目录
ossInput = /Users/icyore/oss/input 
#模拟OSS地址,替换为测试输出目录
ossOutput = /Users/icyore/oss/output
#【可空】算子初始化参数,可以在算子中获取,同数据管理平台标准化转换节点的“转换程序参数”,JSON字符串类型。
transformParams ={"vehicleId":"并行计算测试车"}

2)创建启动脚本

项目根目录创建启动脚本(如test_start.py)

from ali_autodrive.parallel_compute.service_startup import *

# ./config.ini   配置文件路径,可以是相对当前目录路径,也可以是绝对路径
# example.DataProcessor  算子所在模块
# DataProcessor  算子实现类
test("./config.ini", "example.DataProcessor", "DataProcessor")

3)运行启动脚本

运行启动脚本

python test_start.py

Step 1.3创建镜像

1)打包模块

项目根目录创建setup.py文件,执行打包命令:python setup.py sdist

注意:打包生成的文件存储在dist目录下

# -*- coding:utf-8 -*-
from setuptools import (setup, find_packages)

setup(
    # 包名
    name="example",
    # 版本
    version="0.0.1",
    # 需要包含的子包列表
    packages=find_packages(),
    # 添加依赖
    install_requires=[
        #'python-lzf==0.2.4',
    ]
)

2)创建Dockerfile

项目根目录创建Dockerfile文件

FROM python:3.8
COPY . /app
WORKDIR /app

RUN pip install sdk/ali_autodrive-0.0.1.tar.gz
ADD   sdk/ali_autodrive-0.0.1.tar.gz  ali_autodrive
RUN pip install dist/example-0.0.1.tar.gz
WORKDIR /app/ali_autodrive/ali_autodrive-0.0.1/ali_autodrive/parallel_compute

EXPOSE 5000
#example.DataProcessor  算子所在模块 
#DataProcessor  算子实现类
CMD ["python","service_startup.py" ,"example.DataProcessor","DataProcessor"]
说明

为了提升镜像打包速度,可以将上一个版本当作基础镜像,基于基础镜像打包,节省模块安装耗时。

3)上传镜像

制作镜像并上传到ACR

docker login --username=jier****@city-brain-pro auto-driver-registry.cn-hangzhou.cr.aliyuncs.com
docker build -t parallel-compute-example:0.0.1 .
docker tag parallel-compute-example:0.0.1 auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
docker push auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1

其中auto-driver-registry.cn-hangzhou.cr.aliyuncs.com为ACR仓库地址,parallel_compute为ACR命名空间,需要替换为自己的地址,并使用自己的账号进行登录。

其中parallel-compute-example:0.0.1为镜像名:版本号,可以自定义名称和版本号。

Step 1.4 上传自定义算子

  • 打开产品左侧目录,在数据定义模块下面,找到自定义算子管理。

  • 点击右上角的添加算子,选择对应类别,按照提示上传算子。

  • 在算子列表中找到您刚刚上传的算子,点击上线

Step 2 创建包含自定义算子的任务

以上步骤完成后,您既可在任务配置页面的系统节点列表中,找到您刚刚上传的算子。此时,该算子和系统内置算子一样,可以单独使用,也可以和内置算子一起混合编排。

具体步骤,可参考 使用内置算子建一个简单的数据处理任务。