import logging import json import base64 from typing import Dict, Any, Optional, List import re from app.core.config import settings # 导入 DashScope SDK try: from dashscope import MultiModalConversation from dashscope.api_entities.dashscope_response import DashScopeAPIResponse except ImportError: logging.error("请安装 DashScope SDK: pip install dashscope") raise class QwenClient: """千问 API 客户端 (使用 DashScope SDK)""" def __init__(self): self.api_key = settings.QWEN_API_KEY self.model = "qwen-vl-max" # 使用千问视觉语言大模型 async def extract_pickup_code(self, image_url: str) -> Dict[str, Any]: """ 从图片中提取取件码 Args: image_content: 图片二进制内容 Returns: Dict: 提取结果,包含取件码信息 """ try: # 构建消息 messages = [ { "role": "system", "content": "你是一个专门识别快递取件码的助手。请准确提取图片中的所有取件码信息。" }, { "role": "user", "content": [ { "type": "text", "text": "请识别图中驿站的所有取件码,以[{\"station\":\"驿站名字\",\"pickup_codes\":[\"3232\",\"2323\"]}]的格式返回。只返回JSON格式数据,不要其他解释。" }, { "type": "image", "image": image_url } ] } ] # 使用 SDK 调用 API response = MultiModalConversation.call( model=self.model, messages=messages, api_key=self.api_key, result_format='message', temperature=0.1, max_tokens=1000 ) # 检查响应状态 if response.status_code != 200: logging.error(f"千问 API 请求失败: {response.code} - {response.message}") return {"error": "API请求失败", "details": f"{response.code}: {response.message}"} # 记录响应 logging.info(f"千问 API 响应状态: {response.status_code}") logging.info(f"千问 API 响应内容: {response}") # 提取回复内容 try: # 直接使用响应对象 # 提取消息内容 - 使用字典访问方式 output = response.get('output', {}) choices = output.get('choices', [{}]) message = choices[0].get('message', {}) if choices else {} logging.info(f"消息: {message}") print(f"消息: {message}") # 获取文本内容 content = message.get('content', []) if isinstance(content, list) and len(content) > 0: # 提取文本内容 text_content = "" for item in content: if isinstance(item, dict) and 'text' in item: text_content = item['text'] break logging.info(f"提取的文本内容: {text_content}") # 清理文本,移除 Markdown 代码块 text_content = text_content.strip() # 移除 ```json 和 ``` 标记 if text_content.startswith("```json"): text_content = text_content[7:] elif text_content.startswith("```"): text_content = text_content[3:] if text_content.endswith("```"): text_content = text_content[:-3] text_content = text_content.strip() logging.info(f"清理后的文本内容: {text_content}") # 尝试解析 JSON try: pickup_data = json.loads(text_content) # 确保是列表格式 if isinstance(pickup_data, list): # 转换为统一格式 return {"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]} else: logging.warning(f"解析结果不是列表格式: {pickup_data}") return {"stations": []} except json.JSONDecodeError as e: logging.error(f"JSON解析错误: {str(e)}, 原始字符串: {text_content}") # 尝试使用正则表达式提取JSON json_match = re.search(r'(\[{.*}\])', text_content, re.DOTALL) if json_match: try: json_str = json_match.group(1) pickup_data = json.loads(json_str) return {"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]} except Exception as je: logging.error(f"正则提取的JSON解析错误: {str(je)}, 提取的字符串: {json_match.group(1)}") return {"stations": []} else: logging.error(f"无法提取内容列表或内容列表为空: {content}") return {"stations": []} except Exception as e: logging.exception(f"解析千问 API 响应失败: {str(e)}") return {"stations": []} except Exception as e: logging.exception(f"调用千问 API 异常: {str(e)}") return {"error": "处理失败", "message": str(e)} # 创建全局实例 qwen_client = QwenClient()