Python接入指南

更新时间:

操作步骤

步骤一:安装依赖

pip install requests
pip install dataclass_wizard

步骤二:增加Client类

增加 client.py,按需修改 package 名称

import time
import uuid
import hmac
import hashlib
import base64
import json
import io
import requests

class Client:
    def __init__(self, endpoint: str, app_key: str, app_secret: str):
        self.endpoint = endpoint
        self.app_key = app_key
        self.app_secret = app_secret

    def invoke(self, path: str, params: dict = None, method='POST', headers: dict = None, **kwargs):
        # url = f'https://{self.endpoint}{path}'
        # gen_headers = self._generate_header('POST', path, params, headers)
        # return requests.post(url, headers=gen_headers, json=params, **kwargs)

        url = f'https://{self.endpoint}{path}'
        if method == 'GET':
            gen_headers = self._generate_header('GET', path, params, headers)
            return requests.get(url, headers=gen_headers, **kwargs)
        else:
            gen_headers = self._generate_header('POST', path, params, headers)
            return requests.post(url, headers=gen_headers, json=params, **kwargs)

    def _generate_header(self, http_method: str, path: str, body: dict = None, hdrs: dict = None):
        """
        :param http_method:
        :param path:
        :param params:
        :param body:

        # https://help.aliyun.com/zh/api-gateway/traditional-api-gateway/use-digest-authentication-to-call-an-api?spm=a2c4g.11186623.0.0.52d126desp6m4B#topic-1867627
        """

        timestamp = time.time()
        date_str = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime(timestamp)).replace('GMT', 'GMT+00:00')
        timestamp_str = str(int(timestamp * 1000))
        uuid_str = str(uuid.uuid4())
        json_header = 'application/json; charset=utf-8'

        headers = {
            'date': date_str,
            'x-ca-key': self.app_key,
            'x-ca-timestamp': timestamp_str,
            'x-ca-nonce': uuid_str,
            'x-ca-signature-method': 'HmacSHA256',
            'x-ca-signature-headers': 'x-ca-timestamp,x-ca-key,x-ca-nonce,x-ca-signature-method',
            'Content-Type': json_header,
            'Accept': json_header
        }

        o = io.StringIO()
        o.write(http_method)
        o.write("\n")

        o.write(json_header)
        o.write("\n")

        if body:
            # perform md5 and base64
            h = hashlib.md5()
            h.update(json.dumps(body).encode('utf-8'))
            body_md5_str = base64.b64encode(h.digest()).strip().decode('utf-8')
            headers["content-md5"] = body_md5_str
            o.write(body_md5_str)
        o.write("\n")

        o.write(json_header)
        o.write("\n")

        o.write(date_str)
        o.write("\n")

        o.write("x-ca-key:")
        o.write(self.app_key)
        o.write("\n")

        o.write("x-ca-nonce:")
        o.write(uuid_str)
        o.write("\n")

        o.write("x-ca-signature-method:HmacSHA256")
        o.write("\n")

        o.write("x-ca-timestamp:")
        o.write(timestamp_str)
        o.write("\n")

        o.write(path)

        h = hmac.new(bytes(self.app_secret, 'utf-8'), bytes(o.getvalue(), 'utf-8'), hashlib.sha256)
        headers["x-ca-signature"] = base64.b64encode(h.digest()).decode('utf-8')

        if hdrs and len(hdrs) > 0:
            headers.update(hdrs)

        return headers

步骤三:增加Proto类(以ComfyUI生图服务举例)

增加 proto.py

# -*- coding: utf-8 -*-
from dataclasses import dataclass
from typing import Optional, List, Dict
from enum import Enum
from dataclass_wizard import JSONWizard, DumpMeta
from util import batch_download_images


class PredictResultStatusCode(Enum):
    TASK_INPROGRESS = "running"
    TASK_FAILED = "failed"
    TASK_QUEUE = "waiting"
    TASK_FINISH = "succeeded"

    def finished(self):
        return self in (PredictResultStatusCode.TASK_FAILED, PredictResultStatusCode.TASK_FINISH)

class JSONe(JSONWizard):
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        DumpMeta(key_transform='SNAKE').bind_to(cls)


@dataclass
class GatewayResponse(JSONe):
    status: Optional[int] = 0
    err_code: Optional[str] = ""
    err_message: Optional[str] = ""
    sub_err_code: Optional[str] = ""
    sub_err_message: Optional[str] = ""
    api_invoke_id: Optional[str] = ""


@dataclass
class ComfyRequest(JSONe):
    workflow_id: str
    version_id: Optional[str] = None
    inputs: Optional[Dict[str, any]] = None
    alias_id: Optional[str] = None



@dataclass
class ComfyResponseData(JSONe):
    task_id: str
    status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS


@dataclass
class ComfyResponse(GatewayResponse):
    data: Optional[ComfyResponseData] = None



@dataclass
class PredictResult(JSONe):
    task_id: str
    images: Optional[List[str]] = None
    info: Optional[Dict[str, str]] = None
    parameters: Optional[Dict[str, str]] = None
    status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS
    imgs_bytes: Optional[List[str]] = None
    result: Optional[Dict] = None

@dataclass
class PredictResultResponse(GatewayResponse):
    data: Optional[PredictResult] = None

    def download_images(self):
        if self.data.images is not None and len(self.data.images) > 0:
            self.data.imgs_bytes = batch_download_images(self.data.images)


@dataclass
class ProgressData(JSONe):
    task_id: str
    progress: float
    eta_relative: int
    message: Optional[str] = ""
    status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS


@dataclass
class ProgressResponse(GatewayResponse):
    data: Optional[ProgressData] = None

步骤四:增加工具类(以ComfyUI生图服务举例)

增加 util.py

from multiprocessing.pool import ThreadPool
import logging
import requests
from dataclass_wizard.utils.string_conv import to_camel_case


logger = logging.getLogger(__name__)


def batch_download_images(image_links):
    def _download(image_link):
        attempts = 3
        while attempts > 0:
            try:
                response = requests.get(image_link, timeout=100)
                return response.content
            except Exception:
                logger.warning("Failed to download image, retrying...")
            attempts -= 1
        return None

    pool = ThreadPool()
    applied = []
    for img_url in image_links:
        applied.append(pool.apply_async(_download, (img_url, )))
    ret = [r.get() for r in applied]
    return [_ for _ in ret if _ is not None]


def convert_to_camel_case(data_dict):
    if isinstance(data_dict, dict):
        return {to_camel_case(key): convert_to_camel_case(value) for key, value in data_dict.items()}
    elif isinstance(data_dict, list):
        return [convert_to_camel_case(value) for value in data_dict]
    else:
        return data_dict

步骤五:填写调用AK/SK、调用路径、调用参数

from client import Client
from proto import ComfyRequest, ComfyResponse, PredictResultResponse, ProgressResponse
import time
import json

cli = Client(
    endpoint="openai.edu-aliyun.com",
    app_key="应用AK",
    app_secret="应用SK"
)


# 原始调用方法
def call(url, body, method='POST', headers=None):
    res = cli.invoke(url, body, method, headers)
    if res:
        data = res.json()
        return data


def comfy_prompt(prompt: ComfyRequest, custom_resource_config_id='default') -> ComfyResponse:
    print(prompt.to_dict())

    headers = {}
    if custom_resource_config_id:
        headers['X-SP-RESOURCE-CONFIG-ID'] = custom_resource_config_id

    r = ComfyResponse.from_dict(call("/scc/comfy_prompt", prompt.to_dict(), headers=headers))
    print(r.to_dict())
    if r.err_code:
        raise Exception(r.err_message)

    for _ in range(1200):
        params = {}
        params["taskId"] = r.data.task_id
        # 查询进度
        raw_res = call("/scc/comfy_get_progress", params, headers=None)
        print(raw_res)
        if raw_res:
            r = ProgressResponse.from_dict(raw_res)
            if r.status == 20:
                pretty_json_str = json.dumps(raw_res, indent=2, ensure_ascii=False)
                print(pretty_json_str)
                raise Exception("Failed to call , error: %s" % pretty_json_str)
            if r.data.status.finished():
                # 查询结果
                raw_res = call("/scc/comfy_get_result", {"taskId": r.data.task_id})
                return PredictResultResponse.from_dict(raw_res)
        time.sleep(1)
    raise Exception("1200s Timeout")


if __name__ == '__main__':
    begin = time.time()
    alias_id = "配置的工作流别名"
    workflow_id = "控制台获取的工作流id"
    # 定义的参数
    params = {
            "prompt": "A man is walking on the street."
    }
    result = comfy_prompt(ComfyRequest(alias_id=alias_id,
                                                workflow_id=workflow_id,
                                                inputs=params))
    print("生图结果:" + str(result))
    print("时间消耗: %.2fs" % (time.time() - begin))