deliveryman-api/app/core/wechat.py
2025-04-05 21:40:08 +08:00

499 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
from fastapi import Request
from app.core.config import settings
import requests
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import json
import base64
import time
import random
import string
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import uuid
from typing import Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import and_
def generate_random_string(length=32):
"""生成指定长度的随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
class WeChatClient:
"""微信客户端"""
def __init__(self):
self.appid = settings.WECHAT_APPID
self.secret = settings.WECHAT_SECRET
self.mch_id = settings.WECHAT_MCH_ID
self.private_key_path = settings.WECHAT_PRIVATE_KEY_PATH
self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO
self.api_v3_key = settings.WECHAT_API_V3_KEY
self.platform_cert_path = settings.WECHAT_PLATFORM_CERT_PATH
# 加载商户私钥
with open(self.private_key_path, "rb") as f:
self.private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
# 加载平台证书
with open(self.platform_cert_path, "rb") as f:
self.platform_cert = load_pem_x509_certificate(f.read())
self.platform_public_key = self.platform_cert.public_key()
async def get_access_token(self):
"""获取小程序全局接口调用凭据"""
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取access_token失败"))
return result.get("access_token")
async def get_phone_number(self, code: str) -> dict:
"""获取用户手机号"""
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"code": code}) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
# 打印调试信息
print(f"微信返回数据: {result}")
if result.get("errcode", 0) != 0:
raise Exception(result.get("errmsg", "获取手机号失败"))
# 正确获取 phone_info
phone_info = result.get("phone_info")
if not phone_info:
raise Exception("未获取到手机号信息")
return {
"phone_number": phone_info.get("phoneNumber"),
"pure_phone_number": phone_info.get("purePhoneNumber"),
"country_code": phone_info.get("countryCode"),
}
async def code2session(self, code: str) -> dict:
"""通过 code 获取用户 openid"""
url = f"https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取openid失败"))
return result
def sign_message(self, method: str, url_path: str, body: dict) -> tuple:
"""生成请求签名
Returns:
tuple: (nonce_str, timestamp, signature)
"""
nonce_str = str(uuid.uuid4()).replace('-', '')
timestamp = str(int(time.time()))
# 构造签名字符串
sign_str = f"{method}\n{url_path}\n{timestamp}\n{nonce_str}\n"
if body:
sign_str += f"{json.dumps(body)}\n"
else:
sign_str += "\n"
# 使用私钥签名
signature = self.private_key.sign(
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
return nonce_str, timestamp, base64.b64encode(signature).decode()
def verify_response(self, headers: dict, body: bytes) -> bool:
"""验证响应签名"""
timestamp = headers.get('Wechatpay-Timestamp')
nonce = headers.get('Wechatpay-Nonce')
signature = headers.get('Wechatpay-Signature')
serial = headers.get('Wechatpay-Serial')
if not all([timestamp, nonce, signature, serial]):
return False
# 构造验签字符串
sign_str = f"{timestamp}\n{nonce}\n{body.decode('utf-8')}\n"
try:
# 验证签名
self.platform_public_key.verify(
base64.b64decode(signature),
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
async def create_jsapi_payment(
self,
openid: str,
out_trade_no: str,
total_amount: int,
description: str
) -> dict:
"""创建 JSAPI 支付订单"""
url_path = "/v3/pay/transactions/jsapi"
api_url = f"https://api.mch.weixin.qq.com{url_path}"
# 构建请求数据
body = {
"appid": self.appid,
"mchid": self.mch_id,
"description": description,
"out_trade_no": out_trade_no,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/payment/notify",
"amount": {
"total": total_amount,
"currency": "CNY"
},
"payer": {
"openid": openid
}
}
# 生成签名
nonce_str, timestamp, signature = self.sign_message("POST", url_path, body)
# 构建认证头
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': (
f'WECHATPAY2-SHA256-RSA2048 '
f'mchid="{self.mch_id}",'
f'nonce_str="{nonce_str}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}",'
f'signature="{signature}"'
)
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=body, headers=headers) as response:
result = await response.json()
if response.status != 200:
raise Exception(f"请求失败: {result.get('message')}")
# 验证响应签名
if not self.verify_response(response.headers, await response.read()):
raise Exception("响应签名验证失败")
# 生成小程序调起支付的参数
prepay_id = result.get("prepay_id")
if not prepay_id:
raise Exception("未获取到prepay_id")
timestamp = str(int(time.time()))
nonce_str = str(uuid.uuid4()).replace('-', '')
package = f"prepay_id={prepay_id}"
# 签名支付参数
sign_str = f"{self.appid}\n{timestamp}\n{nonce_str}\n{package}\n"
signature = base64.b64encode(
self.private_key.sign(
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
).decode()
return {
"prepay_id": prepay_id,
"payment_params": {
"appId": self.appid,
"timeStamp": timestamp,
"nonceStr": nonce_str,
"package": package,
"signType": "RSA",
"paySign": signature
}
}
except Exception as e:
raise Exception(f"创建支付订单失败: {str(e)}")
def decrypt_callback_data(self, ciphertext: str, nonce: str, associated_data: str = None) -> dict:
"""
解密回调数据
Args:
ciphertext: 密文
nonce: 随机串
associated_data: 附加数据,可选
Returns:
dict: 解密后的数据
"""
try:
# API v3密钥转换为32字节
key_bytes = self.api_v3_key.encode('utf-8')
if len(key_bytes) != 32:
print(f"API v3密钥长度不正确: {len(key_bytes)} bytes")
raise ValueError("API v3密钥必须是32字节")
# Base64解码密文和随机串
ciphertext_bytes = base64.b64decode(ciphertext)
nonce_bytes = nonce.encode('utf-8') # nonce 不需要 base64 解码
# 处理附加数据
associated_data_bytes = associated_data.encode('utf-8') if associated_data else b''
aesgcm = AESGCM(key_bytes)
# print("解密参数:")
# print(f"密钥长度: {len(key_bytes)} bytes")
# print(f"Nonce长度: {len(nonce_bytes)} bytes")
# print(f"密文长度: {len(ciphertext_bytes)} bytes")
# print(f"附加数据: {associated_data_bytes}")
# print(f"使用的密钥: {self.api_v3_key}") # 仅用于调试,生产环境需要移除
# print(f"原始nonce: {nonce}")
# print(f"原始associated_data: {associated_data}")
# 解密数据
decrypted_data = aesgcm.decrypt(
nonce_bytes,
ciphertext_bytes,
associated_data_bytes
)
print(f"解密后的原始数据: {decrypted_data}")
# 解析JSON数据
return json.loads(decrypted_data)
except Exception as e:
error_msg = f"解密回调数据失败: {str(e)}"
print(error_msg)
import traceback
print("详细错误信息:", traceback.format_exc())
raise Exception(error_msg)
async def verify_payment_notify(self, request: Request) -> dict:
"""验证支付回调通知"""
try:
# 获取请求头
headers = {
'Wechatpay-Signature': request.headers.get('Wechatpay-Signature'),
'Wechatpay-Timestamp': request.headers.get('Wechatpay-Timestamp'),
'Wechatpay-Nonce': request.headers.get('Wechatpay-Nonce'),
'Wechatpay-Serial': request.headers.get('Wechatpay-Serial')
}
print("微信支付回调请求头:", headers)
# 读取请求体
body = await request.body()
print("微信支付回调请求体:", body.decode('utf-8'))
# 验证签名
if not self.verify_response(headers, body):
print("签名验证失败")
return None
# 解密回调数据
data = await request.json()
print("解析的JSON数据:", data)
resource = data.get("resource", {})
ciphertext = resource.get("ciphertext")
nonce = resource.get("nonce")
associated_data = resource.get("associated_data")
if not all([ciphertext, nonce]):
print("缺少必要的解密参数")
return None
# 解密数据
decrypted_data = self.decrypt_callback_data(
ciphertext,
nonce,
associated_data
)
print("解密后的数据:", decrypted_data)
return decrypted_data
except Exception as e:
print(f"处理支付回调异常: {str(e)}")
import traceback
print("详细错误信息:", traceback.format_exc())
raise Exception(f"处理支付回调失败: {str(e)}")
async def apply_refund(
self,
order_id: str,
total_amount: int,
refund_amount: int = None,
reason: str = "用户申请退款"
) -> dict:
"""
申请退款
Args:
order_id: 订单号(同时作为退款单号)
total_amount: 订单总金额(分)
refund_amount: 退款金额(分)默认为None时等于total_amount全额退款
reason: 退款原因
Returns:
dict: 退款结果
"""
try:
# 如果未指定退款金额,默认全额退款
if refund_amount is None:
refund_amount = total_amount
print(f"开始申请退款: order_id={order_id}, total_amount={total_amount}, refund_amount={refund_amount}")
# 生成随机字符串
nonce_str = generate_random_string()
timestamp = str(int(time.time()))
# 构建请求URL
url_path = "/v3/refund/domestic/refunds"
api_url = f"https://api.mch.weixin.qq.com{url_path}"
# 构建请求体
body = {
"out_trade_no": order_id, # 商户订单号,二选一
"out_refund_no": f"refund_{order_id}_{int(time.time())}", # 商户退款单号,加入时间戳避免重复
"reason": reason,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/refund/notify",
"amount": {
"refund": refund_amount, # 退款金额
"total": total_amount, # 原订单金额
"currency": "CNY"
}
}
# 生成签名
nonce_str, timestamp, signature = self.sign_message("POST", url_path, body)
# 构建认证头
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': (
f'WECHATPAY2-SHA256-RSA2048 '
f'mchid="{self.mch_id}",'
f'nonce_str="{nonce_str}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}",'
f'signature="{signature}"'
)
}
# 打印签名相关信息
print(f"签名字符串: {nonce_str}, {timestamp}, {signature}")
print(f"Authorization: {headers['Authorization']}")
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=body, headers=headers) as response:
print(f"退款响应状态码: {response.status}")
result = await response.json()
print(f"退款响应内容: {result}")
if response.status != 200:
raise Exception(f"退款申请失败: {result.get('message')}")
# 验证响应签名
if not self.verify_response(response.headers, await response.read()):
raise Exception("响应签名验证失败")
return result
except Exception as e:
print(f"申请退款异常: {str(e)}")
raise Exception(f"申请退款失败: {str(e)}")
async def get_wx_code(self, path: str, query: str = None) -> bytes:
url =f"https://api.weixin.qq.com/wxa/getwxacode?access_token={await self.get_access_token()}"
print(f"path: {path}")
print(f"scene: {query}") # 添加调试日志
params = {
"path": path + "?" + query,
"env_version": "release" if not settings.DEBUG else "develop"
}
try:
async with aiohttp.ClientSession() as session:
# 使用json参数而不是data参数
async with session.post(url, json=params) as response:
if response.status != 200:
result = await response.json()
print(f"获取小程序码失败: {result}") # 添加错误日志
raise Exception(f"获取小程序码失败: {result.get('errmsg')}")
image_data = await response.read()
return image_data
except Exception as e:
print(f"获取小程序码失败: {str(e)}")
raise Exception(f"获取小程序码失败: {str(e)}")
async def get_url_link(self, path: str, query: str) -> str:
url =f"https://api.weixin.qq.com/wxa/generate_urllink?access_token={await self.get_access_token()}"
params = {
"path": path,
"query": query,
"expire_type" :1,
"expire_interval" : 30
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
print(f"获取URL链接结果: {result}")
return result.get("url_link")
except Exception as e:
print(f"获取URL链接失败: {str(e)}")
raise Exception(f"获取URL链接失败: {str(e)}")