Python integration guide
更新时间:
复制 MD 格式
Procedure
Step 1: Install dependencies
pip install requests
pip install dataclass_wizardStep 2: Add the Client class
Add the client.py file and modify the package name as needed.
import time
import uuid
import hmac
import hashlib
import base64
import json
import io
import requests
class Client:
def __init__(self, endpoint: str, app_key: str, app_secret: str):
self.endpoint = endpoint
self.app_key = app_key
self.app_secret = app_secret
def invoke(self, path: str, params: dict = None, method='POST', headers: dict = None, **kwargs):
# url = f'https://{self.endpoint}{path}'
# gen_headers = self._generate_header('POST', path, params, headers)
# return requests.post(url, headers=gen_headers, json=params, **kwargs)
url = f'https://{self.endpoint}{path}'
if method == 'GET':
gen_headers = self._generate_header('GET', path, params, headers)
return requests.get(url, headers=gen_headers, **kwargs)
else:
gen_headers = self._generate_header('POST', path, params, headers)
return requests.post(url, headers=gen_headers, json=params, **kwargs)
def _generate_header(self, http_method: str, path: str, body: dict = None, hdrs: dict = None):
"""
:param http_method:
:param path:
:param params:
:param body:
# https://help.aliyun.com/zh/api-gateway/traditional-api-gateway/use-digest-authentication-to-call-an-api?spm=a2c4g.11186623.0.0.52d126desp6m4B#topic-1867627
"""
timestamp = time.time()
date_str = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime(timestamp)).replace('GMT', 'GMT+00:00')
timestamp_str = str(int(timestamp * 1000))
uuid_str = str(uuid.uuid4())
json_header = 'application/json; charset=utf-8'
headers = {
'date': date_str,
'x-ca-key': self.app_key,
'x-ca-timestamp': timestamp_str,
'x-ca-nonce': uuid_str,
'x-ca-signature-method': 'HmacSHA256',
'x-ca-signature-headers': 'x-ca-timestamp,x-ca-key,x-ca-nonce,x-ca-signature-method',
'Content-Type': json_header,
'Accept': json_header
}
o = io.StringIO()
o.write(http_method)
o.write("\n")
o.write(json_header)
o.write("\n")
if body:
# perform md5 and base64
h = hashlib.md5()
h.update(json.dumps(body).encode('utf-8'))
body_md5_str = base64.b64encode(h.digest()).strip().decode('utf-8')
headers["content-md5"] = body_md5_str
o.write(body_md5_str)
o.write("\n")
o.write(json_header)
o.write("\n")
o.write(date_str)
o.write("\n")
o.write("x-ca-key:")
o.write(self.app_key)
o.write("\n")
o.write("x-ca-nonce:")
o.write(uuid_str)
o.write("\n")
o.write("x-ca-signature-method:HmacSHA256")
o.write("\n")
o.write("x-ca-timestamp:")
o.write(timestamp_str)
o.write("\n")
o.write(path)
h = hmac.new(bytes(self.app_secret, 'utf-8'), bytes(o.getvalue(), 'utf-8'), hashlib.sha256)
headers["x-ca-signature"] = base64.b64encode(h.digest()).decode('utf-8')
if hdrs and len(hdrs) > 0:
headers.update(hdrs)
return headersStep 3: Add the Proto class (ComfyUI image generation service example)
Add the proto.py file.
# -*- coding: utf-8 -*-
from dataclasses import dataclass
from typing import Optional, List, Dict
from enum import Enum
from dataclass_wizard import JSONWizard, DumpMeta
from util import batch_download_images
class PredictResultStatusCode(Enum):
TASK_INPROGRESS = "running"
TASK_FAILED = "failed"
TASK_QUEUE = "waiting"
TASK_FINISH = "succeeded"
def finished(self):
return self in (PredictResultStatusCode.TASK_FAILED, PredictResultStatusCode.TASK_FINISH)
class JSONe(JSONWizard):
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
DumpMeta(key_transform='SNAKE').bind_to(cls)
@dataclass
class GatewayResponse(JSONe):
status: Optional[int] = 0
err_code: Optional[str] = ""
err_message: Optional[str] = ""
sub_err_code: Optional[str] = ""
sub_err_message: Optional[str] = ""
api_invoke_id: Optional[str] = ""
@dataclass
class ComfyRequest(JSONe):
workflow_id: str
version_id: Optional[str] = None
inputs: Optional[Dict[str, any]] = None
alias_id: Optional[str] = None
@dataclass
class ComfyResponseData(JSONe):
task_id: str
status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS
@dataclass
class ComfyResponse(GatewayResponse):
data: Optional[ComfyResponseData] = None
@dataclass
class PredictResult(JSONe):
task_id: str
images: Optional[List[str]] = None
info: Optional[Dict[str, str]] = None
parameters: Optional[Dict[str, str]] = None
status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS
imgs_bytes: Optional[List[str]] = None
result: Optional[Dict] = None
@dataclass
class PredictResultResponse(GatewayResponse):
data: Optional[PredictResult] = None
def download_images(self):
if self.data.images is not None and len(self.data.images) > 0:
self.data.imgs_bytes = batch_download_images(self.data.images)
@dataclass
class ProgressData(JSONe):
task_id: str
progress: float
eta_relative: int
message: Optional[str] = ""
status: Optional[PredictResultStatusCode] = PredictResultStatusCode.TASK_INPROGRESS
@dataclass
class ProgressResponse(GatewayResponse):
data: Optional[ProgressData] = NoneStep 4: Add the utility class (ComfyUI image generation service example)
Add the util.py file.
from multiprocessing.pool import ThreadPool
import logging
import requests
from dataclass_wizard.utils.string_conv import to_camel_case
logger = logging.getLogger(__name__)
def batch_download_images(image_links):
def _download(image_link):
attempts = 3
while attempts > 0:
try:
response = requests.get(image_link, timeout=100)
return response.content
except Exception:
logger.warning("Failed to download image, retrying...")
attempts -= 1
return None
pool = ThreadPool()
applied = []
for img_url in image_links:
applied.append(pool.apply_async(_download, (img_url, )))
ret = [r.get() for r in applied]
return [_ for _ in ret if _ is not None]
def convert_to_camel_case(data_dict):
if isinstance(data_dict, dict):
return {to_camel_case(key): convert_to_camel_case(value) for key, value in data_dict.items()}
elif isinstance(data_dict, list):
return [convert_to_camel_case(value) for value in data_dict]
else:
return data_dictStep 5: Specify the AccessKey pair, call path, and parameters
from client import Client
from proto import ComfyRequest, ComfyResponse, PredictResultResponse, ProgressResponse
import time
import json
cli = Client(
endpoint="openai.edu-aliyun.com",
app_key="YOUR_ACCESS_KEY_ID",
app_secret="YOUR_ACCESS_KEY_SECRET"
)
# Original call method
def call(url, body, method='POST', headers=None):
res = cli.invoke(url, body, method, headers)
if res:
data = res.json()
return data
def comfy_prompt(prompt: ComfyRequest, custom_resource_config_id='default') -> ComfyResponse:
print(prompt.to_dict())
headers = {}
if custom_resource_config_id:
headers['X-SP-RESOURCE-CONFIG-ID'] = custom_resource_config_id
r = ComfyResponse.from_dict(call("/scc/comfy_prompt", prompt.to_dict(), headers=headers))
print(r.to_dict())
if r.err_code:
raise Exception(r.err_message)
for _ in range(1200):
params = {}
params["taskId"] = r.data.task_id
# Query progress
raw_res = call("/scc/comfy_get_progress", params, headers=None)
print(raw_res)
if raw_res:
r = ProgressResponse.from_dict(raw_res)
if r.status == 20:
pretty_json_str = json.dumps(raw_res, indent=2, ensure_ascii=False)
print(pretty_json_str)
raise Exception("Failed to call , error: %s" % pretty_json_str)
if r.data.status.finished():
# Query result
raw_res = call("/scc/comfy_get_result", {"taskId": r.data.task_id})
return PredictResultResponse.from_dict(raw_res)
time.sleep(1)
raise Exception("1200s Timeout")
if __name__ == '__main__':
begin = time.time()
alias_id = "YOUR_WORKFLOW_ALIAS"
workflow_id = "YOUR_WORKFLOW_ID"
# Define parameters
params = {
"prompt": "A man is walking on the street."
}
result = comfy_prompt(ComfyRequest(alias_id=alias_id,
workflow_id=workflow_id,
inputs=params))
print("Image generation result: " + str(result))
print("Time elapsed: %.2fs" % (time.time() - begin))
该文章对您有帮助吗?