diff --git a/app/api/endpoints/ai.py b/app/api/endpoints/ai.py index 872ec2f..3c0415d 100644 --- a/app/api/endpoints/ai.py +++ b/app/api/endpoints/ai.py @@ -18,49 +18,18 @@ async def extract_pickup_code( if not file.content_type.startswith('image/'): return error_response(code=400, message="只能上传图片文件") - url = await qcloud_manager.upload_file(file) - if not url: - return error_response(code=500, message="上传图片失败") + # 读取图片内容 + image_data = await file.read() # 调用 AI 客户端提取取件码 - result = await ai_client.extract_pickup_code(url) + result = await ai_client.extract_pickup_code(image_data) if "error" in result: return error_response(code=500, message=result.get("message", "提取取件码失败")) - # 检查是否提取到取件码 - if not result.get("stations") or not any(station.get("pickup_codes") for station in result.get("stations", [])): - return error_response(code=400, message="提取取件码信息失败") - - # 格式化输出 - formatted_text = format_pickup_codes(result) - # 返回原始数据和格式化文本 - return success_response(data={ - "raw": result, - "formatted_text": formatted_text - }) + return success_response(data=result) except Exception as e: logging.exception(f"提取取件码失败: {str(e)}") - return error_response(code=500, message=f"提取取件码失败: {str(e)}") - -def format_pickup_codes(result): - """将取件码结果格式化为指定格式""" - formatted_lines = [] - - for station in result.get("stations", []): - station_name = station.get("name", "未知驿站") - pickup_codes = station.get("pickup_codes", []) - - if pickup_codes: - # 格式化取件码,用 | 分隔 - codes_text = " | ".join(pickup_codes) - - # 添加驿站和取件码信息 - formatted_lines.append(f"驿站:{station_name}") - formatted_lines.append(f"取件码:{codes_text}") - formatted_lines.append("") # 添加空行分隔不同驿站 - - # 合并所有行 - return "\n".join(formatted_lines).strip() \ No newline at end of file + return error_response(code=500, message=f"提取取件码失败: {str(e)}") \ No newline at end of file diff --git a/app/core/ai_client.py b/app/core/ai_client.py index 5a18c9f..130242b 100644 --- a/app/core/ai_client.py +++ b/app/core/ai_client.py @@ -12,7 +12,7 @@ class AIClient: def __init__(self): self.timeout = 15 # 请求超时时间(秒) - async def extract_pickup_code(self, image_url: str) -> Dict[str, Any]: + async def extract_pickup_code(self, image_data: bytes) -> Dict[str, Any]: """ 从图片中提取取件码 @@ -23,7 +23,7 @@ class AIClient: Dict: 提取结果,包含取件码信息 """ try: - primary_result = await self._extract_with_qwen(image_url) + primary_result = await self._extract_with_qwen(image_data) # 检查结果是否有效 if self._is_valid_result(primary_result): @@ -35,12 +35,12 @@ class AIClient: logging.exception(f"提取取件码异常: {str(e)}") return {"error": "处理失败", "message": str(e)} - async def _extract_with_qwen(self, image_url: str) -> Dict[str, Any]: + async def _extract_with_qwen(self, image_data: bytes) -> Dict[str, Any]: """使用千问提取取件码""" try: # 添加超时控制 return await asyncio.wait_for( - qwen_client.extract_pickup_code(image_url), + qwen_client.extract_pickup_code(image_data), timeout=self.timeout ) except asyncio.TimeoutError: @@ -51,23 +51,9 @@ class AIClient: return {"error": "处理失败", "message": str(e)} - def _is_valid_result(self, result: Dict[str, Any]) -> bool: - """检查结果是否有效""" - # 检查是否有错误 - if "error" in result: - return False + def _is_valid_result(self, result: List[Dict[str, Any]]) -> bool: - # 检查是否有站点信息 - stations = result.get("stations", []) - if not stations: - return False - - # 检查是否有取件码 - for station in stations: - if station.get("pickup_codes") and len(station.get("pickup_codes", [])) > 0: - return True - - return False + return isinstance(result, list) # 创建全局实例 ai_client = AIClient() \ No newline at end of file diff --git a/app/core/qwen_client.py b/app/core/qwen_client.py index e245467..c239bd3 100644 --- a/app/core/qwen_client.py +++ b/app/core/qwen_client.py @@ -20,7 +20,7 @@ class QwenClient: self.api_key = settings.QWEN_API_KEY self.model = "qwen-vl-max" # 使用千问视觉语言大模型 - async def extract_pickup_code(self, image_url: str) -> Dict[str, Any]: + async def extract_pickup_code(self, imageData: bytes) -> Dict[str, Any]: """ 从图片中提取取件码 @@ -31,6 +31,8 @@ class QwenClient: Dict: 提取结果,包含取件码信息 """ try: + # 将图片转换为 base64 + image_base64 = base64.b64encode(imageData).decode('utf-8') # 构建消息 messages = [ @@ -47,7 +49,7 @@ class QwenClient: }, { "type": "image", - "image": image_url + "image": f"data:image/jpeg;base64,{image_base64}" } ] } @@ -62,89 +64,49 @@ class QwenClient: temperature=0.1, max_tokens=1000 ) + + print(f"response_json: {response}") + + # 记录响应信息(用于调试) + logging.info(f"千问 API 响应状态: {response.status_code}") # 检查响应状态 if response.status_code != 200: logging.error(f"千问 API 请求失败: {response.code} - {response.message}") return {"error": "API请求失败", "details": f"{response.code}: {response.message}"} - # 记录响应 - logging.info(f"千问 API 响应状态: {response.status_code}") - logging.info(f"千问 API 响应内容: {response}") - - # 提取回复内容 try: - # 直接使用响应对象 - # 提取消息内容 - 使用字典访问方式 - output = response.get('output', {}) - choices = output.get('choices', [{}]) - message = choices[0].get('message', {}) if choices else {} - - logging.info(f"消息: {message}") - print(f"消息: {message}") - - # 获取文本内容 - content = message.get('content', []) - if isinstance(content, list) and len(content) > 0: - # 提取文本内容 - text_content = "" - for item in content: - if isinstance(item, dict) and 'text' in item: - text_content = item['text'] - break - - logging.info(f"提取的文本内容: {text_content}") - - # 清理文本,移除 Markdown 代码块 - text_content = text_content.strip() - - # 移除 ```json 和 ``` 标记 - if text_content.startswith("```json"): - text_content = text_content[7:] - elif text_content.startswith("```"): - text_content = text_content[3:] - - if text_content.endswith("```"): - text_content = text_content[:-3] - - text_content = text_content.strip() - logging.info(f"清理后的文本内容: {text_content}") - - # 尝试解析 JSON - try: - pickup_data = json.loads(text_content) + output = response.output + if output: + choices = output.get('choices', []) + if choices and len(choices) > 0: + message = choices[0].get('message', {}) + content = message.get('content', []) - # 确保是列表格式 - if isinstance(pickup_data, list): - # 转换为统一格式 - return {"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]} - else: - logging.warning(f"解析结果不是列表格式: {pickup_data}") - return {"stations": []} - except json.JSONDecodeError as e: - logging.error(f"JSON解析错误: {str(e)}, 原始字符串: {text_content}") - - # 尝试使用正则表达式提取JSON - json_match = re.search(r'(\[{.*}\])', text_content, re.DOTALL) - if json_match: - try: - json_str = json_match.group(1) - pickup_data = json.loads(json_str) - return {"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]} - except Exception as je: - logging.error(f"正则提取的JSON解析错误: {str(je)}, 提取的字符串: {json_match.group(1)}") - - return {"stations": []} - else: - logging.error(f"无法提取内容列表或内容列表为空: {content}") - return {"stations": []} + if isinstance(content, list) and len(content) > 0: + text_content = "" + result = [] + for item in content: + if isinstance(item, dict) and 'text' in item: + text_content = item.get('text', '') + print(f"提取的文本内容: {text_content}") + # 尝试直接解析 + pickup_data = json.loads(text_content) + print(f"pickup_data: {pickup_data}") + # 确保是列表格式 + if isinstance(pickup_data, list): + result.append({"stations": [{"name": item.get("station", ""), "pickup_codes": item.get("pickup_codes", [])} for item in pickup_data]}) + + + return result + except Exception as e: logging.exception(f"解析千问 API 响应失败: {str(e)}") - return {"stations": []} + return None except Exception as e: logging.exception(f"调用千问 API 异常: {str(e)}") - return {"error": "处理失败", "message": str(e)} + return None # 创建全局实例 qwen_client = QwenClient() \ No newline at end of file