deliveryman-api/app/core/wechat.py
2025-01-17 16:51:50 +08:00

135 lines
4.7 KiB
Python

import aiohttp
from app.core.config import settings
import requests
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import json
import base64
import time
import random
import string
def generate_random_string(length=32):
"""生成指定长度的随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
class WeChatClient:
"""微信客户端"""
def __init__(self):
self.appid = settings.WECHAT_APPID
self.secret = settings.WECHAT_SECRET
self.mchid = settings.WECHAT_MCH_ID
self.private_key = self._load_private_key()
self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO
self.access_token = None
def _load_private_key(self):
"""加载商户私钥"""
with open(settings.WECHAT_PRIVATE_KEY_PATH, 'rb') as f:
return serialization.load_pem_private_key(
f.read(),
password=None
)
def sign(self, data: bytes) -> str:
"""签名数据"""
signature = self.private_key.sign(
data,
padding.PKCS1v15(),
hashes.SHA256()
)
return base64.b64encode(signature).decode()
def post(self, path: str, **kwargs) -> requests.Response:
"""发送 POST 请求到微信支付接口"""
url = f"https://api.mch.weixin.qq.com{path}"
timestamp = str(int(time.time()))
nonce = generate_random_string()
# 准备签名数据
body = kwargs.get('json', '')
body_str = json.dumps(body) if body else ''
sign_str = f"POST\n{path}\n{timestamp}\n{nonce}\n{body_str}\n"
# 计算签名
signature = self.sign(sign_str.encode())
# 设置认证信息
auth = f'WECHATPAY2-SHA256-RSA2048 mchid="{self.mchid}",nonce_str="{nonce}",signature="{signature}",timestamp="{timestamp}",serial_no="{self.cert_serial_no}"'
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': auth
}
return requests.post(url, headers=headers, **kwargs)
async def get_access_token(self):
"""获取接口调用凭证"""
if self.access_token:
return self.access_token
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with session.get(url, params=params) as response:
result = await response.json()
if "access_token" in result:
self.access_token = result["access_token"]
return self.access_token
raise Exception(result.get("errmsg", "获取access_token失败"))
async def get_phone_number(self, code: str):
"""获取用户手机号"""
access_token = await self.get_access_token()
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber"
params = {"access_token": access_token}
data = {"code": code}
async with session.post(url, params=params, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("phone_info")
raise Exception(result.get("errmsg", "获取手机号失败"))
def create_jsapi_payment(self, orderid: str, amount: int, openid: str, description: str) -> dict:
"""创建 JSAPI 支付订单
Args:
orderid: 订单号
amount: 支付金额(分)
openid: 用户openid
description: 商品描述
"""
pay_data = {
"appid": settings.WECHAT_APPID,
"mchid": settings.WECHAT_MCH_ID,
"description": description,
"out_trade_no": orderid,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/pay/notify",
"amount": {
"total": amount,
"currency": "CNY"
},
"payer": {
"openid": openid
}
}
# 调用微信支付API
resp = self.post("/v3/pay/transactions/jsapi", json=pay_data)
if resp.status_code != 200:
raise Exception(f"微信支付下单失败: {resp.json().get('message')}")
return resp.json()