217 lines
7.4 KiB
Python
217 lines
7.4 KiB
Python
import aiohttp
|
|
from app.core.config import settings
|
|
import requests
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
import json
|
|
import base64
|
|
import time
|
|
import random
|
|
import string
|
|
from cryptography.x509 import load_pem_x509_certificate
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
def generate_random_string(length=32):
|
|
"""生成指定长度的随机字符串"""
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
|
|
|
|
|
|
class WeChatClient:
|
|
"""微信客户端"""
|
|
|
|
def __init__(self):
|
|
self.appid = settings.WECHAT_APPID
|
|
self.secret = settings.WECHAT_SECRET
|
|
self.mchid = settings.WECHAT_MCH_ID
|
|
self.private_key = self._load_private_key()
|
|
self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO
|
|
self.access_token = None
|
|
self.api_v3_key = settings.WECHAT_API_V3_KEY
|
|
self.platform_cert = self._load_platform_cert()
|
|
|
|
def _load_private_key(self):
|
|
"""加载商户私钥"""
|
|
with open(settings.WECHAT_PRIVATE_KEY_PATH, 'rb') as f:
|
|
return serialization.load_pem_private_key(
|
|
f.read(),
|
|
password=None
|
|
)
|
|
|
|
def _load_platform_cert(self):
|
|
"""加载微信支付平台证书"""
|
|
with open(settings.WECHAT_PLATFORM_CERT_PATH, 'rb') as f:
|
|
cert_data = f.read()
|
|
return load_pem_x509_certificate(cert_data)
|
|
|
|
def sign(self, data: bytes) -> str:
|
|
"""签名数据"""
|
|
signature = self.private_key.sign(
|
|
data,
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256()
|
|
)
|
|
return base64.b64encode(signature).decode()
|
|
|
|
def post(self, path: str, **kwargs) -> requests.Response:
|
|
"""发送 POST 请求到微信支付接口"""
|
|
url = f"https://api.mch.weixin.qq.com{path}"
|
|
timestamp = str(int(time.time()))
|
|
nonce = generate_random_string()
|
|
|
|
# 准备签名数据
|
|
body = kwargs.get('json', '')
|
|
body_str = json.dumps(body) if body else ''
|
|
sign_str = f"POST\n{path}\n{timestamp}\n{nonce}\n{body_str}\n"
|
|
|
|
# 计算签名
|
|
signature = self.sign(sign_str.encode())
|
|
|
|
# 设置认证信息
|
|
auth = f'WECHATPAY2-SHA256-RSA2048 mchid="{self.mchid}",nonce_str="{nonce}",signature="{signature}",timestamp="{timestamp}",serial_no="{self.cert_serial_no}"'
|
|
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json',
|
|
'Authorization': auth
|
|
}
|
|
|
|
return requests.post(url, headers=headers, **kwargs)
|
|
|
|
async def get_access_token(self):
|
|
"""获取接口调用凭证"""
|
|
if self.access_token:
|
|
return self.access_token
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
url = f"https://api.weixin.qq.com/cgi-bin/token"
|
|
params = {
|
|
"grant_type": "client_credential",
|
|
"appid": self.appid,
|
|
"secret": self.secret
|
|
}
|
|
async with session.get(url, params=params) as response:
|
|
result = await response.json()
|
|
if "access_token" in result:
|
|
self.access_token = result["access_token"]
|
|
return self.access_token
|
|
raise Exception(result.get("errmsg", "获取access_token失败"))
|
|
|
|
async def get_phone_number(self, code: str):
|
|
"""获取用户手机号"""
|
|
access_token = await self.get_access_token()
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber"
|
|
params = {"access_token": access_token}
|
|
data = {"code": code}
|
|
|
|
async with session.post(url, params=params, json=data) as response:
|
|
result = await response.json()
|
|
if result.get("errcode") == 0:
|
|
return result.get("phone_info")
|
|
raise Exception(result.get("errmsg", "获取手机号失败"))
|
|
|
|
def create_jsapi_payment(self, orderid: str, amount: int, openid: str, description: str) -> dict:
|
|
"""创建 JSAPI 支付订单
|
|
|
|
Args:
|
|
orderid: 订单号
|
|
amount: 支付金额(分)
|
|
openid: 用户openid
|
|
description: 商品描述
|
|
"""
|
|
pay_data = {
|
|
"appid": settings.WECHAT_APPID,
|
|
"mchid": settings.WECHAT_MCH_ID,
|
|
"description": description,
|
|
"out_trade_no": orderid,
|
|
"notify_url": f"{settings.API_BASE_URL}/api/wechat/pay/notify",
|
|
"amount": {
|
|
"total": amount,
|
|
"currency": "CNY"
|
|
},
|
|
"payer": {
|
|
"openid": openid
|
|
}
|
|
}
|
|
|
|
# 调用微信支付API
|
|
resp = self.post("/v3/pay/transactions/jsapi", json=pay_data)
|
|
|
|
if resp.status_code != 200:
|
|
raise Exception(f"微信支付下单失败: {resp.json().get('message')}")
|
|
|
|
return resp.json()
|
|
|
|
async def code2session(self, code: str) -> dict:
|
|
"""通过 code 获取用户 openid
|
|
|
|
Args:
|
|
code: 登录凭证
|
|
Returns:
|
|
dict: 包含 openid 等信息
|
|
"""
|
|
async with aiohttp.ClientSession() as session:
|
|
url = "https://api.weixin.qq.com/sns/jscode2session"
|
|
params = {
|
|
"appid": self.appid,
|
|
"secret": self.secret,
|
|
"js_code": code,
|
|
"grant_type": "authorization_code"
|
|
}
|
|
|
|
async with session.get(url, params=params) as response:
|
|
result = await response.json()
|
|
if "openid" in result:
|
|
return result
|
|
raise Exception(result.get("errmsg", "获取openid失败"))
|
|
|
|
def verify_signature(self, message: bytes, signature: str, serial_no: str) -> bool:
|
|
"""验证微信支付回调签名
|
|
|
|
Args:
|
|
message: 待验证的消息
|
|
signature: 签名字符串
|
|
serial_no: 证书序列号
|
|
"""
|
|
if serial_no != self.cert_serial_no:
|
|
return False
|
|
|
|
try:
|
|
# 解码签名
|
|
signature_bytes = base64.b64decode(signature)
|
|
|
|
# 使用公钥验证签名
|
|
self.platform_cert.public_key().verify(
|
|
signature_bytes,
|
|
message,
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256()
|
|
)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def decrypt_callback(self, associated_data: str, nonce: str, ciphertext: str) -> str:
|
|
"""解密回调数据
|
|
|
|
Args:
|
|
associated_data: 附加数据
|
|
nonce: 随机串
|
|
ciphertext: 密文
|
|
Returns:
|
|
解密后的明文
|
|
"""
|
|
# 解码密文
|
|
encrypted_data = base64.b64decode(ciphertext)
|
|
|
|
# 使用 AEAD_AES_256_GCM 算法解密
|
|
aesgcm = AESGCM(base64.b64decode(self.api_v3_key))
|
|
data = aesgcm.decrypt(
|
|
nonce.encode(),
|
|
encrypted_data,
|
|
associated_data.encode()
|
|
)
|
|
|
|
return data.decode() |