This commit is contained in:
aaron 2025-03-07 15:37:07 +08:00
parent 1af828d499
commit 2c115f456e
3 changed files with 45 additions and 128 deletions

View File

@ -18,49 +18,18 @@ async def extract_pickup_code(
if not file.content_type.startswith('image/'): if not file.content_type.startswith('image/'):
return error_response(code=400, message="只能上传图片文件") return error_response(code=400, message="只能上传图片文件")
url = await qcloud_manager.upload_file(file) # 读取图片内容
if not url: image_data = await file.read()
return error_response(code=500, message="上传图片失败")
# 调用 AI 客户端提取取件码 # 调用 AI 客户端提取取件码
result = await ai_client.extract_pickup_code(url) result = await ai_client.extract_pickup_code(image_data)
if "error" in result: if "error" in result:
return error_response(code=500, message=result.get("message", "提取取件码失败")) return error_response(code=500, message=result.get("message", "提取取件码失败"))
# 检查是否提取到取件码
if not result.get("stations") or not any(station.get("pickup_codes") for station in result.get("stations", [])):
return error_response(code=400, message="提取取件码信息失败")
# 格式化输出
formatted_text = format_pickup_codes(result)
# 返回原始数据和格式化文本 # 返回原始数据和格式化文本
return success_response(data={ return success_response(data=result)
"raw": result,
"formatted_text": formatted_text
})
except Exception as e: except Exception as e:
logging.exception(f"提取取件码失败: {str(e)}") logging.exception(f"提取取件码失败: {str(e)}")
return error_response(code=500, message=f"提取取件码失败: {str(e)}") return error_response(code=500, message=f"提取取件码失败: {str(e)}")
def format_pickup_codes(result):
"""将取件码结果格式化为指定格式"""
formatted_lines = []
for station in result.get("stations", []):
station_name = station.get("name", "未知驿站")
pickup_codes = station.get("pickup_codes", [])
if pickup_codes:
# 格式化取件码,用 分隔
codes_text = " ".join(pickup_codes)
# 添加驿站和取件码信息
formatted_lines.append(f"驿站:{station_name}")
formatted_lines.append(f"取件码:{codes_text}")
formatted_lines.append("") # 添加空行分隔不同驿站
# 合并所有行
return "\n".join(formatted_lines).strip()

View File

@ -12,7 +12,7 @@ class AIClient:
def __init__(self): def __init__(self):
self.timeout = 15 # 请求超时时间(秒) self.timeout = 15 # 请求超时时间(秒)
async def extract_pickup_code(self, image_url: str) -> Dict[str, Any]: async def extract_pickup_code(self, image_data: bytes) -> Dict[str, Any]:
""" """
从图片中提取取件码 从图片中提取取件码
@ -23,7 +23,7 @@ class AIClient:
Dict: 提取结果包含取件码信息 Dict: 提取结果包含取件码信息
""" """
try: try:
primary_result = await self._extract_with_qwen(image_url) primary_result = await self._extract_with_qwen(image_data)
# 检查结果是否有效 # 检查结果是否有效
if self._is_valid_result(primary_result): if self._is_valid_result(primary_result):
@ -35,12 +35,12 @@ class AIClient:
logging.exception(f"提取取件码异常: {str(e)}") logging.exception(f"提取取件码异常: {str(e)}")
return {"error": "处理失败", "message": str(e)} return {"error": "处理失败", "message": str(e)}
async def _extract_with_qwen(self, image_url: str) -> Dict[str, Any]: async def _extract_with_qwen(self, image_data: bytes) -> Dict[str, Any]:
"""使用千问提取取件码""" """使用千问提取取件码"""
try: try:
# 添加超时控制 # 添加超时控制
return await asyncio.wait_for( return await asyncio.wait_for(
qwen_client.extract_pickup_code(image_url), qwen_client.extract_pickup_code(image_data),
timeout=self.timeout timeout=self.timeout
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
@ -51,23 +51,9 @@ class AIClient:
return {"error": "处理失败", "message": str(e)} return {"error": "处理失败", "message": str(e)}
def _is_valid_result(self, result: Dict[str, Any]) -> bool: def _is_valid_result(self, result: List[Dict[str, Any]]) -> bool:
"""检查结果是否有效"""
# 检查是否有错误
if "error" in result:
return False
# 检查是否有站点信息 return isinstance(result, list)
stations = result.get("stations", [])
if not stations:
return False
# 检查是否有取件码
for station in stations:
if station.get("pickup_codes") and len(station.get("pickup_codes", [])) > 0:
return True
return False
# 创建全局实例 # 创建全局实例
ai_client = AIClient() ai_client = AIClient()

View File

@ -20,7 +20,7 @@ class QwenClient:
self.api_key = settings.QWEN_API_KEY self.api_key = settings.QWEN_API_KEY
self.model = "qwen-vl-max" # 使用千问视觉语言大模型 self.model = "qwen-vl-max" # 使用千问视觉语言大模型
async def extract_pickup_code(self, image_url: str) -> Dict[str, Any]: async def extract_pickup_code(self, imageData: bytes) -> Dict[str, Any]:
""" """
从图片中提取取件码 从图片中提取取件码
@ -31,6 +31,8 @@ class QwenClient:
Dict: 提取结果包含取件码信息 Dict: 提取结果包含取件码信息
""" """
try: try:
# 将图片转换为 base64
image_base64 = base64.b64encode(imageData).decode('utf-8')
# 构建消息 # 构建消息
messages = [ messages = [
@ -47,7 +49,7 @@ class QwenClient:
}, },
{ {
"type": "image", "type": "image",
"image": image_url "image": f"data:image/jpeg;base64,{image_base64}"
} }
] ]
} }
@ -63,88 +65,48 @@ class QwenClient:
max_tokens=1000 max_tokens=1000
) )
print(f"response_json: {response}")
# 记录响应信息(用于调试)
logging.info(f"千问 API 响应状态: {response.status_code}")
# 检查响应状态 # 检查响应状态
if response.status_code != 200: if response.status_code != 200:
logging.error(f"千问 API 请求失败: {response.code} - {response.message}") logging.error(f"千问 API 请求失败: {response.code} - {response.message}")
return {"error": "API请求失败", "details": f"{response.code}: {response.message}"} return {"error": "API请求失败", "details": f"{response.code}: {response.message}"}
# 记录响应
logging.info(f"千问 API 响应状态: {response.status_code}")
logging.info(f"千问 API 响应内容: {response}")
# 提取回复内容
try: try:
# 直接使用响应对象 output = response.output
# 提取消息内容 - 使用字典访问方式 if output:
output = response.get('output', {}) choices = output.get('choices', [])
choices = output.get('choices', [{}]) if choices and len(choices) > 0:
message = choices[0].get('message', {}) if choices else {} message = choices[0].get('message', {})
logging.info(f"消息: {message}")
print(f"消息: {message}")
# 获取文本内容
content = message.get('content', []) content = message.get('content', [])
if isinstance(content, list) and len(content) > 0: if isinstance(content, list) and len(content) > 0:
# 提取文本内容
text_content = "" text_content = ""
result = []
for item in content: for item in content:
if isinstance(item, dict) and 'text' in item: if isinstance(item, dict) and 'text' in item:
text_content = item['text'] text_content = item.get('text', '')
break print(f"提取的文本内容: {text_content}")
# 尝试直接解析
logging.info(f"提取的文本内容: {text_content}")
# 清理文本,移除 Markdown 代码块
text_content = text_content.strip()
# 移除 ```json 和 ``` 标记
if text_content.startswith("```json"):
text_content = text_content[7:]
elif text_content.startswith("```"):
text_content = text_content[3:]
if text_content.endswith("```"):
text_content = text_content[:-3]
text_content = text_content.strip()
logging.info(f"清理后的文本内容: {text_content}")
# 尝试解析 JSON
try:
pickup_data = json.loads(text_content) pickup_data = json.loads(text_content)
print(f"pickup_data: {pickup_data}")
# 确保是列表格式 # 确保是列表格式
if isinstance(pickup_data, list): if isinstance(pickup_data, list):
# 转换为统一格式 result.append({"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]})
return {"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]}
else:
logging.warning(f"解析结果不是列表格式: {pickup_data}")
return {"stations": []}
except json.JSONDecodeError as e:
logging.error(f"JSON解析错误: {str(e)}, 原始字符串: {text_content}")
# 尝试使用正则表达式提取JSON
json_match = re.search(r'(\[{.*}\])', text_content, re.DOTALL)
if json_match:
try:
json_str = json_match.group(1)
pickup_data = json.loads(json_str)
return {"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]}
except Exception as je:
logging.error(f"正则提取的JSON解析错误: {str(je)}, 提取的字符串: {json_match.group(1)}")
return {"stations": []} return result
else:
logging.error(f"无法提取内容列表或内容列表为空: {content}")
return {"stations": []}
except Exception as e: except Exception as e:
logging.exception(f"解析千问 API 响应失败: {str(e)}") logging.exception(f"解析千问 API 响应失败: {str(e)}")
return {"stations": []} return None
except Exception as e: except Exception as e:
logging.exception(f"调用千问 API 异常: {str(e)}") logging.exception(f"调用千问 API 异常: {str(e)}")
return {"error": "处理失败", "message": str(e)} return None
# 创建全局实例 # 创建全局实例
qwen_client = QwenClient() qwen_client = QwenClient()