Python integration guide

更新时间:
复制 MD 格式

Procedure

Step 1: Install dependencies

pip install requests
pip install dataclass_wizard

Step 2: Add the Client class

Add the client.py file and modify the package name as needed.

import time
import uuid
import hmac
import hashlib
import base64
import json
import io
import requests

class Client:
    def __init__(self, endpoint: str, app_key: str, app_secret: str):
        self.endpoint = endpoint
        self.app_key = app_key
        self.app_secret = app_secret

    def invoke(self, path: str, params: dict = None, method='POST', headers: dict = None, **kwargs):
        # url = f'https://{self.endpoint}{path}'
        # gen_headers = self._generate_header('POST', path, params, headers)
        # return requests.post(url, headers=gen_headers, json=params, **kwargs)

        url = f'https://{self.endpoint}{path}'
        if method == 'GET':
            gen_headers = self._generate_header('GET', path, params, headers)
            return requests.get(url, headers=gen_headers, **kwargs)
        else:
            gen_headers = self._generate_header('POST', path, params, headers)
            return requests.post(url, headers=gen_headers, json=params, **kwargs)

    def _generate_header(self, http_method: str, path: str, body: dict = None, hdrs: dict = None):
        """
        :param http_method:
        :param path:
        :param params:
        :param body:

        # https://help.aliyun.com/zh/api-gateway/traditional-api-gateway/use-digest-authentication-to-call-an-api?spm=a2c4g.11186623.0.0.52d126desp6m4B#topic-1867627
        """

        timestamp = time.time()
        date_str = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime(timestamp)).replace('GMT', 'GMT+00:00')
        timestamp_str = str(int(timestamp * 1000))
        uuid_str = str(uuid.uuid4())
        json_header = 'application/json; charset=utf-8'

        headers = {
            'date': date_str,
            'x-ca-key': self.app_key,
            'x-ca-timestamp': timestamp_str,
            'x-ca-nonce': uuid_str,
            'x-ca-signature-method': 'HmacSHA256',
            'x-ca-signature-headers': 'x-ca-timestamp,x-ca-key,x-ca-nonce,x-ca-signature-method',
            'Content-Type': json_header,
            'Accept': json_header
        }

        o = io.StringIO()
        o.write(http_method)
        o.write("\n")

        o.write(json_header)
        o.write("\n")

        if body:
            # perform md5 and base64
            h = hashlib.md5()
            h.update(json.dumps(body).encode('utf-8'))
            body_md5_str = base64.b64encode(h.digest()).strip().decode('utf-8')
            headers["content-md5"] = body_md5_str
            o.write(body_md5_str)
        o.write("\n")

        o.write(json_header)
        o.write("\n")

        o.write(date_str)
        o.write("\n")

        o.write("x-ca-key:")
        o.write(self.app_key)
        o.write("\n")

        o.write("x-ca-nonce:")
        o.write(uuid_str)
        o.write("\n")

        o.write("x-ca-signature-method:HmacSHA256")
        o.write("\n")

        o.write("x-ca-timestamp:")
        o.write(timestamp_str)
        o.write("\n")

        o.write(path)

        h = hmac.new(bytes(self.app_secret, 'utf-8'), bytes(o.getvalue(), 'utf-8'), hashlib.sha256)
        headers["x-ca-signature"] = base64.b64encode(h.digest()).decode('utf-8')

        if hdrs and len(hdrs) > 0:
            headers.update(hdrs)

        return headers

Step 3: Add the Proto class (ComfyUI image generation service example)

Add the proto.py file.

# -*- coding: utf-8 -*-
from dataclasses import dataclass
from typing import Optional, List, Dict
from enum import Enum
from dataclass_wizard import JSONWizard, DumpMeta
from util import batch_download_images


class PredictResultStatusCode(Enum):
    TASK_INPROGRESS = "running"
    TASK_FAILED = "failed"
    TASK_QUEUE = "waiting"
    TASK_FINISH = "succeeded"

    def finished(self):
        return self in (PredictResultStatusCode.TASK_FAILED, PredictResultStatusCode.TASK_FINISH)

class JSONe(JSONWizard):
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        DumpMeta(key_transform='SNAKE').bind_to(cls)


@dataclass
class GatewayResponse(JSONe):
    status: Optional[int] = 0
    err_code: Optional[str] = ""
    err_message: Optional[str] = ""
    sub_err_code: Optional[str] = ""
    sub_err_message: Optional[str] = ""
    api_invoke_id: Optional[str] = ""


@dataclass
class ComfyRequest(JSONe):
    workflow_id: str
    version_id: Optional[str] = None
    inputs: Optional[Dict[str, any]] = None
    alias_id: Optional[str] = None



@dataclass
class ComfyResponseData(JSONe):
    task_id: str
    status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS


@dataclass
class ComfyResponse(GatewayResponse):
    data: Optional[ComfyResponseData] = None



@dataclass
class PredictResult(JSONe):
    task_id: str
    images: Optional[List[str]] = None
    info: Optional[Dict[str, str]] = None
    parameters: Optional[Dict[str, str]] = None
    status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS
    imgs_bytes: Optional[List[str]] = None
    result: Optional[Dict] = None

@dataclass
class PredictResultResponse(GatewayResponse):
    data: Optional[PredictResult] = None

    def download_images(self):
        if self.data.images is not None and len(self.data.images) > 0:
            self.data.imgs_bytes = batch_download_images(self.data.images)


@dataclass
class ProgressData(JSONe):
    task_id: str
    progress: float
    eta_relative: int
    message: Optional[str] = ""
    status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS


@dataclass
class ProgressResponse(GatewayResponse):
    data: Optional[ProgressData] = None

Step 4: Add the utility class (ComfyUI image generation service example)

Add the util.py file.

from multiprocessing.pool import ThreadPool
import logging
import requests
from dataclass_wizard.utils.string_conv import to_camel_case


logger = logging.getLogger(__name__)


def batch_download_images(image_links):
    def _download(image_link):
        attempts = 3
        while attempts > 0:
            try:
                response = requests.get(image_link, timeout=100)
                return response.content
            except Exception:
                logger.warning("Failed to download image, retrying...")
            attempts -= 1
        return None

    pool = ThreadPool()
    applied = []
    for img_url in image_links:
        applied.append(pool.apply_async(_download, (img_url, )))
    ret = [r.get() for r in applied]
    return [_ for _ in ret if _ is not None]


def convert_to_camel_case(data_dict):
    if isinstance(data_dict, dict):
        return {to_camel_case(key): convert_to_camel_case(value) for key, value in data_dict.items()}
    elif isinstance(data_dict, list):
        return [convert_to_camel_case(value) for value in data_dict]
    else:
        return data_dict

Step 5: Specify the AccessKey pair, call path, and parameters

from client import Client
from proto import ComfyRequest, ComfyResponse, PredictResultResponse, ProgressResponse
import time
import json

cli = Client(
    endpoint="openai.edu-aliyun.com",
    app_key="YOUR_ACCESS_KEY_ID",
    app_secret="YOUR_ACCESS_KEY_SECRET"
)


# Original call method
def call(url, body, method='POST', headers=None):
    res = cli.invoke(url, body, method, headers)
    if res:
        data = res.json()
        return data


def comfy_prompt(prompt: ComfyRequest, custom_resource_config_id='default') -> ComfyResponse:
    print(prompt.to_dict())

    headers = {}
    if custom_resource_config_id:
        headers['X-SP-RESOURCE-CONFIG-ID'] = custom_resource_config_id

    r = ComfyResponse.from_dict(call("/scc/comfy_prompt", prompt.to_dict(), headers=headers))
    print(r.to_dict())
    if r.err_code:
        raise Exception(r.err_message)

    for _ in range(1200):
        params = {}
        params["taskId"] = r.data.task_id
        # Query progress
        raw_res = call("/scc/comfy_get_progress", params, headers=None)
        print(raw_res)
        if raw_res:
            r = ProgressResponse.from_dict(raw_res)
            if r.status == 20:
                pretty_json_str = json.dumps(raw_res, indent=2, ensure_ascii=False)
                print(pretty_json_str)
                raise Exception("Failed to call , error: %s" % pretty_json_str)
            if r.data.status.finished():
                # Query result
                raw_res = call("/scc/comfy_get_result", {"taskId": r.data.task_id})
                return PredictResultResponse.from_dict(raw_res)
        time.sleep(1)
    raise Exception("1200s Timeout")


if __name__ == '__main__':
    begin = time.time()
    alias_id = "YOUR_WORKFLOW_ALIAS"
    workflow_id = "YOUR_WORKFLOW_ID"
    # Define parameters
    params = {
            "prompt": "A man is walking on the street."
    }
    result = comfy_prompt(ComfyRequest(alias_id=alias_id,
                                                workflow_id=workflow_id,
                                                inputs=params))
    print("Image generation result: " + str(result))
    print("Time elapsed: %.2fs" % (time.time() - begin))