api/app/services/dashscope_service.py
2025-04-14 14:16:45 +08:00

354 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import dashscope
from dashscope import Generation
# 修改导入语句dashscope的API响应可能改变了结构
from typing import List, Dict, Any, Optional
import asyncio
import httpx
import json
import re
from app.core.config import settings
# 导入 DashScope SDK
try:
from dashscope import MultiModalConversation
from dashscope.api_entities.dashscope_response import DashScopeAPIResponse
except ImportError:
logging.error("请安装 DashScope SDK: pip install dashscope")
raise
logger = logging.getLogger(__name__)
class DashScopeService:
"""DashScope服务类提供对DashScope API的调用封装"""
def __init__(self):
self.api_key = settings.DASHSCOPE_API_KEY
# 配置DashScope
dashscope.api_key = self.api_key
# 配置API URL
self.image_synthesis_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2image/image-synthesis"
self.comment_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = "qwen-max",
max_tokens: int = 2048,
temperature: float = 0.7,
stream: bool = False
):
"""
调用DashScope的大模型API进行对话
Args:
messages: 对话历史记录
model: 模型名称
max_tokens: 最大生成token数
temperature: 温度参数,控制随机性
stream: 是否流式输出
Returns:
ApiResponse: DashScope的API响应
"""
try:
# 为了不阻塞FastAPI的异步性能我们使用run_in_executor运行同步API
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: Generation.call(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
result_format='message',
stream=stream,
)
)
if response.status_code != 200:
logger.error(f"DashScope API请求失败状态码{response.status_code}, 错误信息:{response.message}")
raise Exception(f"API调用失败: {response.message}")
return response
except Exception as e:
logger.error(f"DashScope聊天API调用出错: {str(e)}")
raise e
async def generate_image(
self,
prompt: str,
negative_prompt: Optional[str] = None,
model: str = "stable-diffusion-xl",
n: int = 1,
size: str = "1024*1024"
):
"""
调用DashScope的图像生成API
Args:
prompt: 生成图像的文本描述
negative_prompt: 负面提示词
model: 模型名称
n: 生成图像数量
size: 图像尺寸
Returns:
ApiResponse: DashScope的API响应
"""
try:
# 构建请求参数
params = {
"model": model,
"prompt": prompt,
"n": n,
"size": size,
}
if negative_prompt:
params["negative_prompt"] = negative_prompt
# 异步调用图像生成API
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: dashscope.ImageSynthesis.call(**params)
)
if response.status_code != 200:
logger.error(f"DashScope 图像生成API请求失败状态码{response.status_code}, 错误信息:{response.message}")
raise Exception(f"图像生成API调用失败: {response.message}")
return response
except Exception as e:
logger.error(f"DashScope图像生成API调用出错: {str(e)}")
raise e
async def generate_tryon(
self,
person_image_url: str,
top_garment_url: Optional[str] = None,
bottom_garment_url: Optional[str] = None,
resolution: int = -1,
restore_face: bool = True
):
"""
调用阿里百炼平台的试衣服务
Args:
person_image_url: 人物图片URL
top_garment_url: 上衣图片URL
bottom_garment_url: 下衣图片URL
resolution: 分辨率,-1表示自动
restore_face: 是否修复面部
Returns:
Dict: 包含任务ID和请求ID的响应
"""
try:
# 验证参数
if not person_image_url:
raise ValueError("人物图片URL不能为空")
if not top_garment_url and not bottom_garment_url:
raise ValueError("上衣和下衣图片至少需要提供一个")
# 构建请求数据
request_data = {
"model": "aitryon",
"input": {
"person_image_url": person_image_url
},
"parameters": {
"resolution": resolution,
"restore_face": restore_face
}
}
# 添加可选字段
if top_garment_url:
request_data["input"]["top_garment_url"] = top_garment_url
if bottom_garment_url:
request_data["input"]["bottom_garment_url"] = bottom_garment_url
# 构建请求头
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"X-DashScope-Async": "enable"
}
# 发送请求
async with httpx.AsyncClient() as client:
logger.info(f"发送试穿请求: {request_data}")
response = await client.post(
self.image_synthesis_url,
json=request_data,
headers=headers,
timeout=30.0
)
response_data = response.json()
logger.info(f"试穿API响应: {response_data}")
if response.status_code == 200 or response.status_code == 202: # 202表示异步任务已接受
# 提取任务ID适应不同的API响应格式
task_id = None
if 'output' in response_data and 'task_id' in response_data['output']:
task_id = response_data['output']['task_id']
elif 'task_id' in response_data:
task_id = response_data['task_id']
if task_id:
logger.info(f"试穿请求发送成功任务ID: {task_id}")
return {
"task_id": task_id,
"request_id": response_data.get('request_id'),
"status": "processing"
}
else:
# 如果没有任务ID这可能是同步响应
logger.info("收到同步响应没有任务ID")
return {
"status": "completed",
"result": response_data.get('output', {}),
"request_id": response_data.get('request_id')
}
else:
error_msg = f"试穿请求失败: {response.status_code} - {response.text}"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"DashScope试穿API调用出错: {str(e)}")
raise e
async def check_tryon_status(self, task_id: str):
"""
检查试穿任务状态
Args:
task_id: 任务ID
Returns:
Dict: 任务状态信息
"""
try:
# 构建请求头
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 构建请求URL
status_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
# 发送请求
async with httpx.AsyncClient() as client:
response = await client.get(
status_url,
headers=headers,
timeout=30.0
)
if response.status_code == 200:
response_data = response.json()
print(response_data)
status = response_data.get('output', {}).get('task_status', '')
logger.info(f"试穿任务状态查询成功: {status}")
# 检查是否完成并返回结果
if status.lower() == 'succeeded':
image_url = response_data.get('output', {}).get('image_url')
if image_url:
logger.info(f"试穿任务完成结果URL: {image_url}")
return {
"status": status,
"task_id": task_id,
"image_url": image_url,
"result": response_data.get('output', {})
}
return {
"status": status,
"task_id": task_id
}
else:
error_msg = f"试穿任务状态查询失败: {response.status_code} - {response.text}"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"查询试穿任务状态出错: {str(e)}")
raise e
# 传入图片url通过 DashScope 的 qwen-vl-plus 模型,生成穿搭点评
async def generate_dressing_comment(self, image_url: str):
"""
调用DashScope的qwen-vl-plus模型生成穿搭点评
Args:
image_url: 穿搭图片URL
Returns:
Dict: 包含任务ID和请求ID的响应
"""
try:
# 构建请求头
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 构建请求数据
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "你是一个穿搭点评师,请根据图片中的穿搭,给出穿搭点评,点评内容包括穿搭风格、穿搭亮点等。不超过 80字并给出一个打分满分100分。返回格式以json格式返回返回内容包括'comment': '穿搭点评', 'score': '打分'"
},
{
"type": "image",
"image": image_url
}
]
}
]
#使用Dashscope sdk 进行请求
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: dashscope.MultiModalConversation.call(
api_key=self.api_key,
model="qwen-vl-plus",
messages=messages,
result_format='json'
)
)
logger.info(f"穿搭点评API响应: {response.output.choices[0].message.content}")
content = response.output.choices[0].message.content
# 将内容转换为json
if isinstance(content, list) and len(content) > 0:
text_content = ""
for item in content:
if isinstance(item, dict) and 'text' in item:
text_content = item.get('text', '')
print(f"提取的文本内容: {text_content}")
# 只获取 ```json 和 ``` 之间的内容
text_content = re.search(r'```json(.*)```', text_content, re.DOTALL).group(1)
return json.loads(text_content)
else:
return None
else:
return None
except Exception as e:
logger.error(f"DashScope穿搭点评API调用出错: {str(e)}")
raise e