deliveryman-api/app/core/wechat.py
2025-01-23 21:09:48 +08:00

300 lines
11 KiB
Python

import aiohttp
from fastapi import Request
from app.core.config import settings
import requests
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import json
import base64
import time
import random
import string
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import uuid
def generate_random_string(length=32):
"""生成指定长度的随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
class WeChatClient:
"""微信客户端"""
def __init__(self):
self.appid = settings.WECHAT_APPID
self.secret = settings.WECHAT_SECRET
self.mch_id = settings.WECHAT_MCH_ID
self.private_key_path = settings.WECHAT_PRIVATE_KEY_PATH
self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO
self.api_v3_key = settings.WECHAT_API_V3_KEY
self.platform_cert_path = settings.WECHAT_PLATFORM_CERT_PATH
# 加载商户私钥
with open(self.private_key_path, "rb") as f:
self.private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
# 加载平台证书
with open(self.platform_cert_path, "rb") as f:
self.platform_cert = load_pem_x509_certificate(f.read())
self.platform_public_key = self.platform_cert.public_key()
async def get_access_token(self):
"""获取小程序全局接口调用凭据"""
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取access_token失败"))
return result.get("access_token")
async def get_phone_number(self, code: str) -> dict:
"""获取用户手机号"""
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"code": code}) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
# 打印调试信息
print(f"微信返回数据: {result}")
if result.get("errcode", 0) != 0:
raise Exception(result.get("errmsg", "获取手机号失败"))
# 正确获取 phone_info
phone_info = result.get("phone_info")
if not phone_info:
raise Exception("未获取到手机号信息")
return {
"phone_number": phone_info.get("phoneNumber"),
"pure_phone_number": phone_info.get("purePhoneNumber"),
"country_code": phone_info.get("countryCode"),
}
async def code2session(self, code: str) -> dict:
"""通过 code 获取用户 openid"""
url = f"https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取openid失败"))
return result
def sign_message(self, method: str, url_path: str, body: dict) -> tuple:
"""生成请求签名
Returns:
tuple: (nonce_str, timestamp, signature)
"""
nonce_str = str(uuid.uuid4()).replace('-', '')
timestamp = str(int(time.time()))
# 构造签名字符串
sign_str = f"{method}\n{url_path}\n{timestamp}\n{nonce_str}\n"
if body:
sign_str += f"{json.dumps(body)}\n"
else:
sign_str += "\n"
# 使用私钥签名
signature = self.private_key.sign(
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
return nonce_str, timestamp, base64.b64encode(signature).decode()
def verify_response(self, headers: dict, body: bytes) -> bool:
"""验证响应签名"""
timestamp = headers.get('Wechatpay-Timestamp')
nonce = headers.get('Wechatpay-Nonce')
signature = headers.get('Wechatpay-Signature')
serial = headers.get('Wechatpay-Serial')
if not all([timestamp, nonce, signature, serial]):
return False
# 构造验签字符串
sign_str = f"{timestamp}\n{nonce}\n{body.decode('utf-8')}\n"
try:
# 验证签名
self.platform_public_key.verify(
base64.b64decode(signature),
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
async def create_jsapi_payment(
self,
openid: str,
out_trade_no: str,
total_amount: int,
description: str
) -> dict:
"""创建 JSAPI 支付订单"""
url_path = "/v3/pay/transactions/jsapi"
api_url = f"https://api.mch.weixin.qq.com{url_path}"
# 构建请求数据
body = {
"appid": self.appid,
"mchid": self.mch_id,
"description": description,
"out_trade_no": out_trade_no,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/payment-notify",
"amount": {
"total": total_amount,
"currency": "CNY"
},
"payer": {
"openid": openid
}
}
# 生成签名
nonce_str, timestamp, signature = self.sign_message("POST", url_path, body)
# 构建认证头
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': (
f'WECHATPAY2-SHA256-RSA2048 '
f'mchid="{self.mch_id}",'
f'nonce_str="{nonce_str}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}",'
f'signature="{signature}"'
)
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=body, headers=headers) as response:
result = await response.json()
if response.status != 200:
raise Exception(f"请求失败: {result.get('message')}")
# 验证响应签名
if not self.verify_response(response.headers, await response.read()):
raise Exception("响应签名验证失败")
# 生成小程序调起支付的参数
prepay_id = result.get("prepay_id")
if not prepay_id:
raise Exception("未获取到prepay_id")
timestamp = str(int(time.time()))
nonce_str = str(uuid.uuid4()).replace('-', '')
package = f"prepay_id={prepay_id}"
# 签名支付参数
sign_str = f"{self.appid}\n{timestamp}\n{nonce_str}\n{package}\n"
signature = base64.b64encode(
self.private_key.sign(
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
).decode()
return {
"prepay_id": prepay_id,
"payment_params": {
"appId": self.appid,
"timeStamp": timestamp,
"nonceStr": nonce_str,
"package": package,
"signType": "RSA",
"paySign": signature
}
}
except Exception as e:
raise Exception(f"创建支付订单失败: {str(e)}")
async def verify_payment_notify(self, request: Request) -> dict:
"""验证支付回调通知"""
# 获取请求头
headers = {
'Wechatpay-Signature': request.headers.get('Wechatpay-Signature'),
'Wechatpay-Timestamp': request.headers.get('Wechatpay-Timestamp'),
'Wechatpay-Nonce': request.headers.get('Wechatpay-Nonce'),
'Wechatpay-Serial': request.headers.get('Wechatpay-Serial')
}
# 读取请求体
body = await request.body()
# 验证签名
if not self.verify_response(headers, body):
raise Exception("回调通知签名验证失败")
# 解析数据
try:
data = json.loads(body)
resource = data.get('resource', {})
# 解密数据
nonce = base64.b64decode(resource['nonce'])
associated_data = resource.get('associated_data', '').encode('utf-8')
ciphertext = base64.b64decode(resource['ciphertext'])
aesgcm = AESGCM(self.api_v3_key.encode('utf-8'))
decrypted_data = aesgcm.decrypt(
nonce,
ciphertext,
associated_data
)
return json.loads(decrypted_data)
except Exception as e:
raise Exception(f"解析回调数据失败: {str(e)}")