curl --request GET \
--url https://api.yuannengai.com/v1/tasks/task_01K8SGYNNNVBQTXNR4MM964S7K \
--header 'Authorization: Bearer <token>'
{
"code": 200,
"data": {
"task_id": "task_01K8SGYNNNVBQTXNR4MM964S7K",
"status": "processing",
"progress": 50,
"created_at": "2024-01-01T12:00:00Z"
}
}
查询异步任务的处理状态和结果
curl --request GET \
--url https://api.yuannengai.com/v1/tasks/task_01K8SGYNNNVBQTXNR4MM964S7K \
--header 'Authorization: Bearer <token>'
{
"code": 200,
"data": {
"task_id": "task_01K8SGYNNNVBQTXNR4MM964S7K",
"status": "processing",
"progress": 50,
"created_at": "2024-01-01T12:00:00Z"
}
}
curl --request GET \
--url https://api.yuannengai.com/v1/tasks/task_01K8SGYNNNVBQTXNR4MM964S7K \
--header 'Authorization: Bearer <token>'
{
"code": 200,
"data": {
"task_id": "task_01K8SGYNNNVBQTXNR4MM964S7K",
"status": "processing",
"progress": 50,
"created_at": "2024-01-01T12:00:00Z"
}
}
Authorization: Bearer YOUR_API_KEY
显示 属性
submitted(已提交)、queued(排队中)、processing(处理中)、completed(已完成)、failed(失败)images 数组,视频任务返回 video_url 和 duration。import time
import requests
def wait_for_task(task_id, max_wait=300, interval=3):
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(
f"https://api.yuannengai.com/v1/tasks/{task_id}",
headers={"Authorization": "Bearer <token>"}
)
data = response.json()["data"]
status = data["status"]
if status == "completed":
return data["result"]
elif status == "failed":
raise Exception(data["error"]["message"])
time.sleep(interval)
raise TimeoutError("任务超时")