207 lines
8.6 KiB
Python
207 lines
8.6 KiB
Python
import logging
|
|
import json
|
|
import base64
|
|
from typing import Dict, Any, Optional, List
|
|
import re
|
|
from app.core.config import settings
|
|
|
|
# 导入 DashScope SDK
|
|
try:
|
|
from dashscope import MultiModalConversation
|
|
from dashscope.api_entities.dashscope_response import DashScopeAPIResponse
|
|
except ImportError:
|
|
logging.error("请安装 DashScope SDK: pip install dashscope")
|
|
raise
|
|
|
|
class QwenClient:
|
|
"""千问 API 客户端 (使用 DashScope SDK)"""
|
|
|
|
def __init__(self):
|
|
self.api_key = settings.QWEN_API_KEY
|
|
self.model = "qwen-vl-max" # 使用千问视觉语言大模型
|
|
|
|
async def extract_pickup_code(self, imageData: bytes = None, url: str = None) -> Dict[str, Any]:
|
|
"""
|
|
从图片中提取取件码
|
|
|
|
Args:
|
|
image_content: 图片二进制内容
|
|
|
|
Returns:
|
|
Dict: 提取结果,包含取件码信息
|
|
"""
|
|
try:
|
|
# 构建消息
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": "你现在是一个专门提取驿站名字、快递取件码和运单号等信息的助手"
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "请识别图中信息,提取驿站的名字、取件码、以及运单号(如:顺丰、圆通等快递单号),\
|
|
取件码和运单号格式一般有:数字加-分割,字母+数字,字母+数字+字母等。\
|
|
以JSON 格式:[{\"station\":\"驿站名字\",\"pickup_codes\":[\"\",\"\"]}] 返回"
|
|
},
|
|
{
|
|
"type": "image",
|
|
"image": f"data:image/jpeg;base64,{base64.b64encode(imageData).decode('utf-8')}" if imageData else url,
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# 使用 SDK 调用 API
|
|
response = MultiModalConversation.call(
|
|
model=self.model,
|
|
messages=messages,
|
|
api_key=self.api_key,
|
|
result_format='message',
|
|
temperature=0.1,
|
|
max_tokens=1000
|
|
)
|
|
|
|
print(f"response_json: {response}")
|
|
|
|
# 记录响应信息(用于调试)
|
|
logging.info(f"千问 API 响应状态: {response.status_code}")
|
|
|
|
# 检查响应状态
|
|
if response.status_code != 200:
|
|
logging.error(f"千问 API 请求失败: {response.code} - {response.message}")
|
|
return {"error": "API请求失败", "details": f"{response.code}: {response.message}"}
|
|
|
|
try:
|
|
output = response.output
|
|
print(f"千问output: {output}")
|
|
if output:
|
|
choices = output.get('choices', [])
|
|
if choices and len(choices) > 0:
|
|
message = choices[0].get('message', {})
|
|
content = message.get('content', [])
|
|
|
|
if isinstance(content, list) and len(content) > 0:
|
|
text_content = ""
|
|
result = []
|
|
for item in content:
|
|
if isinstance(item, dict) and 'text' in item:
|
|
text_content = item.get('text', '')
|
|
print(f"提取的文本内容: {text_content}")
|
|
# 只获取 ```json 和 ``` 之间的内容
|
|
text_content = re.search(r'```json(.*)```', text_content, re.DOTALL).group(1)
|
|
|
|
# 剔除 ```json 和 ```
|
|
# text_content = text_content.replace('```json', '').replace('```', '')
|
|
|
|
# 尝试直接解析
|
|
pickup_data = json.loads(text_content)
|
|
print(f"pickup_data: {pickup_data}")
|
|
# 确保是列表格式
|
|
if isinstance(pickup_data, list):
|
|
result.append({"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]})
|
|
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logging.exception(f"解析千问 API 响应失败: {str(e)}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logging.exception(f"调用千问 API 异常: {str(e)}")
|
|
return None
|
|
|
|
async def vl(self, imageData: bytes = None, url: str = None, prompt: str = None, user_content : str = None) -> Dict[str, Any]:
|
|
"""
|
|
从图片中提取取件码
|
|
|
|
Args:
|
|
image_content: 图片二进制内容
|
|
|
|
Returns:
|
|
Dict: 提取结果,包含取件码信息
|
|
"""
|
|
try:
|
|
# 构建消息
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": prompt
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": user_content
|
|
},
|
|
{
|
|
"type": "image",
|
|
"image": f"data:image/jpeg;base64,{base64.b64encode(imageData).decode('utf-8')}" if imageData else url,
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# 使用 SDK 调用 API
|
|
response = MultiModalConversation.call(
|
|
model=self.model,
|
|
messages=messages,
|
|
api_key=self.api_key,
|
|
result_format='message',
|
|
temperature=0.1,
|
|
max_tokens=1000
|
|
)
|
|
|
|
print(f"response_json: {response}")
|
|
|
|
# 记录响应信息(用于调试)
|
|
logging.info(f"千问 API 响应状态: {response.status_code}")
|
|
|
|
# 检查响应状态
|
|
if response.status_code != 200:
|
|
logging.error(f"千问 API 请求失败: {response.code} - {response.message}")
|
|
return {"error": "API请求失败", "details": f"{response.code}: {response.message}"}
|
|
|
|
try:
|
|
output = response.output
|
|
print(f"千问output: {output}")
|
|
if output:
|
|
choices = output.get('choices', [])
|
|
if choices and len(choices) > 0:
|
|
message = choices[0].get('message', {})
|
|
content = message.get('content', [])
|
|
|
|
if isinstance(content, list) and len(content) > 0:
|
|
text_content = ""
|
|
result = []
|
|
for item in content:
|
|
if isinstance(item, dict) and 'text' in item:
|
|
text_content = item.get('text', '')
|
|
print(f"提取的文本内容: {text_content}")
|
|
# 只获取 ```json 和 ``` 之间的内容
|
|
text_content = re.search(r'```json(.*)```', text_content, re.DOTALL).group(1)
|
|
|
|
# 剔除 ```json 和 ```
|
|
# text_content = text_content.replace('```json', '').replace('```', '')
|
|
|
|
# 尝试直接解析
|
|
pickup_data = json.loads(text_content)
|
|
print(f"pickup_data: {pickup_data}")
|
|
result = pickup_data
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logging.exception(f"解析千问 API 响应失败: {str(e)}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logging.exception(f"调用千问 API 异常: {str(e)}")
|
|
return None
|
|
|
|
# 创建全局实例
|
|
qwen_client = QwenClient() |