import aiohttp from fastapi import Request from app.core.config import settings import requests from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization import json import base64 import time import random import string from cryptography.x509 import load_pem_x509_certificate from cryptography.hazmat.primitives.ciphers.aead import AESGCM import uuid from typing import Dict, Any from sqlalchemy.orm import Session from sqlalchemy import and_ def generate_random_string(length=32): """生成指定长度的随机字符串""" return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) class WeChatClient: """微信客户端""" def __init__(self): self.appid = settings.WECHAT_APPID self.secret = settings.WECHAT_SECRET self.mch_id = settings.WECHAT_MCH_ID self.private_key_path = settings.WECHAT_PRIVATE_KEY_PATH self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO self.api_v3_key = settings.WECHAT_API_V3_KEY self.platform_cert_path = settings.WECHAT_PLATFORM_CERT_PATH # 加载商户私钥 with open(self.private_key_path, "rb") as f: self.private_key = serialization.load_pem_private_key( f.read(), password=None ) # 加载平台证书 with open(self.platform_cert_path, "rb") as f: self.platform_cert = load_pem_x509_certificate(f.read()) self.platform_public_key = self.platform_cert.public_key() async def get_access_token(self): """获取小程序全局接口调用凭据""" url = "https://api.weixin.qq.com/cgi-bin/token" params = { "grant_type": "client_credential", "appid": self.appid, "secret": self.secret } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: text = await response.text() try: result = json.loads(text) except json.JSONDecodeError: raise Exception(f"解析微信返回数据失败: {text}") if "errcode" in result and result["errcode"] != 0: raise Exception(result.get("errmsg", "获取access_token失败")) return result.get("access_token") async def get_phone_number(self, code: str) -> dict: """获取用户手机号""" access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}" async with aiohttp.ClientSession() as session: async with session.post(url, json={"code": code}) as response: text = await response.text() try: result = json.loads(text) except json.JSONDecodeError: raise Exception(f"解析微信返回数据失败: {text}") # 打印调试信息 print(f"微信返回数据: {result}") if result.get("errcode", 0) != 0: raise Exception(result.get("errmsg", "获取手机号失败")) # 正确获取 phone_info phone_info = result.get("phone_info") if not phone_info: raise Exception("未获取到手机号信息") return { "phone_number": phone_info.get("phoneNumber"), "pure_phone_number": phone_info.get("purePhoneNumber"), "country_code": phone_info.get("countryCode"), } async def code2session(self, code: str) -> dict: """通过 code 获取用户 openid""" url = f"https://api.weixin.qq.com/sns/jscode2session" params = { "appid": self.appid, "secret": self.secret, "js_code": code, "grant_type": "authorization_code" } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: text = await response.text() try: result = json.loads(text) except json.JSONDecodeError: raise Exception(f"解析微信返回数据失败: {text}") if "errcode" in result and result["errcode"] != 0: raise Exception(result.get("errmsg", "获取openid失败")) return result def sign_message(self, method: str, url_path: str, body: dict) -> tuple: """生成请求签名 Returns: tuple: (nonce_str, timestamp, signature) """ nonce_str = str(uuid.uuid4()).replace('-', '') timestamp = str(int(time.time())) # 构造签名字符串 sign_str = f"{method}\n{url_path}\n{timestamp}\n{nonce_str}\n" if body: sign_str += f"{json.dumps(body)}\n" else: sign_str += "\n" # 使用私钥签名 signature = self.private_key.sign( sign_str.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256() ) return nonce_str, timestamp, base64.b64encode(signature).decode() def verify_response(self, headers: dict, body: bytes) -> bool: """验证响应签名""" timestamp = headers.get('Wechatpay-Timestamp') nonce = headers.get('Wechatpay-Nonce') signature = headers.get('Wechatpay-Signature') serial = headers.get('Wechatpay-Serial') if not all([timestamp, nonce, signature, serial]): return False # 构造验签字符串 sign_str = f"{timestamp}\n{nonce}\n{body.decode('utf-8')}\n" try: # 验证签名 self.platform_public_key.verify( base64.b64decode(signature), sign_str.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256() ) return True except Exception: return False async def create_jsapi_payment( self, openid: str, out_trade_no: str, total_amount: int, description: str ) -> dict: """创建 JSAPI 支付订单""" url_path = "/v3/pay/transactions/jsapi" api_url = f"https://api.mch.weixin.qq.com{url_path}" # 构建请求数据 body = { "appid": self.appid, "mchid": self.mch_id, "description": description, "out_trade_no": out_trade_no, "notify_url": f"{settings.API_BASE_URL}/api/wechat/payment/notify", "amount": { "total": total_amount, "currency": "CNY" }, "payer": { "openid": openid } } # 生成签名 nonce_str, timestamp, signature = self.sign_message("POST", url_path, body) # 构建认证头 headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': ( f'WECHATPAY2-SHA256-RSA2048 ' f'mchid="{self.mch_id}",' f'nonce_str="{nonce_str}",' f'timestamp="{timestamp}",' f'serial_no="{self.cert_serial_no}",' f'signature="{signature}"' ) } try: async with aiohttp.ClientSession() as session: async with session.post(api_url, json=body, headers=headers) as response: result = await response.json() if response.status != 200: raise Exception(f"请求失败: {result.get('message')}") # 验证响应签名 if not self.verify_response(response.headers, await response.read()): raise Exception("响应签名验证失败") # 生成小程序调起支付的参数 prepay_id = result.get("prepay_id") if not prepay_id: raise Exception("未获取到prepay_id") timestamp = str(int(time.time())) nonce_str = str(uuid.uuid4()).replace('-', '') package = f"prepay_id={prepay_id}" # 签名支付参数 sign_str = f"{self.appid}\n{timestamp}\n{nonce_str}\n{package}\n" signature = base64.b64encode( self.private_key.sign( sign_str.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256() ) ).decode() return { "prepay_id": prepay_id, "payment_params": { "appId": self.appid, "timeStamp": timestamp, "nonceStr": nonce_str, "package": package, "signType": "RSA", "paySign": signature } } except Exception as e: raise Exception(f"创建支付订单失败: {str(e)}") def decrypt_callback_data(self, ciphertext: str, nonce: str, associated_data: str = None) -> dict: """ 解密回调数据 Args: ciphertext: 密文 nonce: 随机串 associated_data: 附加数据,可选 Returns: dict: 解密后的数据 """ try: # API v3密钥转换为32字节 key_bytes = self.api_v3_key.encode('utf-8') if len(key_bytes) != 32: print(f"API v3密钥长度不正确: {len(key_bytes)} bytes") raise ValueError("API v3密钥必须是32字节") # Base64解码密文和随机串 ciphertext_bytes = base64.b64decode(ciphertext) nonce_bytes = nonce.encode('utf-8') # nonce 不需要 base64 解码 # 处理附加数据 associated_data_bytes = associated_data.encode('utf-8') if associated_data else b'' aesgcm = AESGCM(key_bytes) # print("解密参数:") # print(f"密钥长度: {len(key_bytes)} bytes") # print(f"Nonce长度: {len(nonce_bytes)} bytes") # print(f"密文长度: {len(ciphertext_bytes)} bytes") # print(f"附加数据: {associated_data_bytes}") # print(f"使用的密钥: {self.api_v3_key}") # 仅用于调试,生产环境需要移除 # print(f"原始nonce: {nonce}") # print(f"原始associated_data: {associated_data}") # 解密数据 decrypted_data = aesgcm.decrypt( nonce_bytes, ciphertext_bytes, associated_data_bytes ) print(f"解密后的原始数据: {decrypted_data}") # 解析JSON数据 return json.loads(decrypted_data) except Exception as e: error_msg = f"解密回调数据失败: {str(e)}" print(error_msg) import traceback print("详细错误信息:", traceback.format_exc()) raise Exception(error_msg) async def verify_payment_notify(self, request: Request) -> dict: """验证支付回调通知""" try: # 获取请求头 headers = { 'Wechatpay-Signature': request.headers.get('Wechatpay-Signature'), 'Wechatpay-Timestamp': request.headers.get('Wechatpay-Timestamp'), 'Wechatpay-Nonce': request.headers.get('Wechatpay-Nonce'), 'Wechatpay-Serial': request.headers.get('Wechatpay-Serial') } print("微信支付回调请求头:", headers) # 读取请求体 body = await request.body() print("微信支付回调请求体:", body.decode('utf-8')) # 验证签名 if not self.verify_response(headers, body): print("签名验证失败") return None # 解密回调数据 data = await request.json() print("解析的JSON数据:", data) resource = data.get("resource", {}) ciphertext = resource.get("ciphertext") nonce = resource.get("nonce") associated_data = resource.get("associated_data") if not all([ciphertext, nonce]): print("缺少必要的解密参数") return None # 解密数据 decrypted_data = self.decrypt_callback_data( ciphertext, nonce, associated_data ) print("解密后的数据:", decrypted_data) return decrypted_data except Exception as e: print(f"处理支付回调异常: {str(e)}") import traceback print("详细错误信息:", traceback.format_exc()) raise Exception(f"处理支付回调失败: {str(e)}") async def apply_refund( self, order_id: str, total_amount: int, refund_amount: int = None, reason: str = "用户申请退款" ) -> dict: """ 申请退款 Args: order_id: 订单号(同时作为退款单号) total_amount: 订单总金额(分) refund_amount: 退款金额(分),默认为None时等于total_amount(全额退款) reason: 退款原因 Returns: dict: 退款结果 """ try: # 如果未指定退款金额,默认全额退款 if refund_amount is None: refund_amount = total_amount print(f"开始申请退款: order_id={order_id}, total_amount={total_amount}, refund_amount={refund_amount}") # 生成随机字符串 nonce_str = generate_random_string() timestamp = str(int(time.time())) # 构建请求URL url_path = "/v3/refund/domestic/refunds" api_url = f"https://api.mch.weixin.qq.com{url_path}" # 构建请求体 body = { "out_trade_no": order_id, # 商户订单号,二选一 "out_refund_no": f"refund_{order_id}_{int(time.time())}", # 商户退款单号,加入时间戳避免重复 "reason": reason, "notify_url": f"{settings.API_BASE_URL}/api/wechat/refund/notify", "amount": { "refund": refund_amount, # 退款金额 "total": total_amount, # 原订单金额 "currency": "CNY" } } # 生成签名 nonce_str, timestamp, signature = self.sign_message("POST", url_path, body) # 构建认证头 headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': ( f'WECHATPAY2-SHA256-RSA2048 ' f'mchid="{self.mch_id}",' f'nonce_str="{nonce_str}",' f'timestamp="{timestamp}",' f'serial_no="{self.cert_serial_no}",' f'signature="{signature}"' ) } # 打印签名相关信息 print(f"签名字符串: {nonce_str}, {timestamp}, {signature}") print(f"Authorization: {headers['Authorization']}") async with aiohttp.ClientSession() as session: async with session.post(api_url, json=body, headers=headers) as response: print(f"退款响应状态码: {response.status}") result = await response.json() print(f"退款响应内容: {result}") if response.status != 200: raise Exception(f"退款申请失败: {result.get('message')}") # 验证响应签名 if not self.verify_response(response.headers, await response.read()): raise Exception("响应签名验证失败") return result except Exception as e: print(f"申请退款异常: {str(e)}") raise Exception(f"申请退款失败: {str(e)}") async def get_wx_code(self, path: str, query: str = None) -> bytes: url =f"https://api.weixin.qq.com/wxa/getwxacode?access_token={await self.get_access_token()}" print(f"path: {path}") print(f"scene: {query}") # 添加调试日志 params = { "path": path + "?" + query, "env_version": "release" if not settings.DEBUG else "develop" } try: async with aiohttp.ClientSession() as session: # 使用json参数而不是data参数 async with session.post(url, json=params) as response: if response.status != 200: result = await response.json() print(f"获取小程序码失败: {result}") # 添加错误日志 raise Exception(f"获取小程序码失败: {result.get('errmsg')}") image_data = await response.read() return image_data except Exception as e: print(f"获取小程序码失败: {str(e)}") raise Exception(f"获取小程序码失败: {str(e)}") async def get_url_link(self, path: str, query: str) -> str: url =f"https://api.weixin.qq.com/wxa/generate_urllink?access_token={await self.get_access_token()}" params = { "path": path, "query": query, "expire_type" :1, "expire_interval" : 30 } try: async with aiohttp.ClientSession() as session: async with session.post(url, json=params) as response: result = await response.json() print(f"获取URL链接结果: {result}") return result.get("url_link") except Exception as e: print(f"获取URL链接失败: {str(e)}") raise Exception(f"获取URL链接失败: {str(e)}")