deliveryman-api/app/core/wechat.py
2025-01-17 17:09:28 +08:00

217 lines
7.4 KiB
Python

import aiohttp
from app.core.config import settings
import requests
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import json
import base64
import time
import random
import string
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
def generate_random_string(length=32):
"""生成指定长度的随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
class WeChatClient:
"""微信客户端"""
def __init__(self):
self.appid = settings.WECHAT_APPID
self.secret = settings.WECHAT_SECRET
self.mchid = settings.WECHAT_MCH_ID
self.private_key = self._load_private_key()
self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO
self.access_token = None
self.api_v3_key = settings.WECHAT_API_V3_KEY
self.platform_cert = self._load_platform_cert()
def _load_private_key(self):
"""加载商户私钥"""
with open(settings.WECHAT_PRIVATE_KEY_PATH, 'rb') as f:
return serialization.load_pem_private_key(
f.read(),
password=None
)
def _load_platform_cert(self):
"""加载微信支付平台证书"""
with open(settings.WECHAT_PLATFORM_CERT_PATH, 'rb') as f:
cert_data = f.read()
return load_pem_x509_certificate(cert_data)
def sign(self, data: bytes) -> str:
"""签名数据"""
signature = self.private_key.sign(
data,
padding.PKCS1v15(),
hashes.SHA256()
)
return base64.b64encode(signature).decode()
def post(self, path: str, **kwargs) -> requests.Response:
"""发送 POST 请求到微信支付接口"""
url = f"https://api.mch.weixin.qq.com{path}"
timestamp = str(int(time.time()))
nonce = generate_random_string()
# 准备签名数据
body = kwargs.get('json', '')
body_str = json.dumps(body) if body else ''
sign_str = f"POST\n{path}\n{timestamp}\n{nonce}\n{body_str}\n"
# 计算签名
signature = self.sign(sign_str.encode())
# 设置认证信息
auth = f'WECHATPAY2-SHA256-RSA2048 mchid="{self.mchid}",nonce_str="{nonce}",signature="{signature}",timestamp="{timestamp}",serial_no="{self.cert_serial_no}"'
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': auth
}
return requests.post(url, headers=headers, **kwargs)
async def get_access_token(self):
"""获取接口调用凭证"""
if self.access_token:
return self.access_token
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with session.get(url, params=params) as response:
result = await response.json()
if "access_token" in result:
self.access_token = result["access_token"]
return self.access_token
raise Exception(result.get("errmsg", "获取access_token失败"))
async def get_phone_number(self, code: str):
"""获取用户手机号"""
access_token = await self.get_access_token()
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber"
params = {"access_token": access_token}
data = {"code": code}
async with session.post(url, params=params, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("phone_info")
raise Exception(result.get("errmsg", "获取手机号失败"))
def create_jsapi_payment(self, orderid: str, amount: int, openid: str, description: str) -> dict:
"""创建 JSAPI 支付订单
Args:
orderid: 订单号
amount: 支付金额(分)
openid: 用户openid
description: 商品描述
"""
pay_data = {
"appid": settings.WECHAT_APPID,
"mchid": settings.WECHAT_MCH_ID,
"description": description,
"out_trade_no": orderid,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/pay/notify",
"amount": {
"total": amount,
"currency": "CNY"
},
"payer": {
"openid": openid
}
}
# 调用微信支付API
resp = self.post("/v3/pay/transactions/jsapi", json=pay_data)
if resp.status_code != 200:
raise Exception(f"微信支付下单失败: {resp.json().get('message')}")
return resp.json()
async def code2session(self, code: str) -> dict:
"""通过 code 获取用户 openid
Args:
code: 登录凭证
Returns:
dict: 包含 openid 等信息
"""
async with aiohttp.ClientSession() as session:
url = "https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
async with session.get(url, params=params) as response:
result = await response.json()
if "openid" in result:
return result
raise Exception(result.get("errmsg", "获取openid失败"))
def verify_signature(self, message: bytes, signature: str, serial_no: str) -> bool:
"""验证微信支付回调签名
Args:
message: 待验证的消息
signature: 签名字符串
serial_no: 证书序列号
"""
if serial_no != self.cert_serial_no:
return False
try:
# 解码签名
signature_bytes = base64.b64decode(signature)
# 使用公钥验证签名
self.platform_cert.public_key().verify(
signature_bytes,
message,
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
def decrypt_callback(self, associated_data: str, nonce: str, ciphertext: str) -> str:
"""解密回调数据
Args:
associated_data: 附加数据
nonce: 随机串
ciphertext: 密文
Returns:
解密后的明文
"""
# 解码密文
encrypted_data = base64.b64decode(ciphertext)
# 使用 AEAD_AES_256_GCM 算法解密
aesgcm = AESGCM(base64.b64decode(self.api_v3_key))
data = aesgcm.decrypt(
nonce.encode(),
encrypted_data,
associated_data.encode()
)
return data.decode()