deliveryman-api/app/core/wechat.py
2025-02-15 18:49:52 +08:00

517 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
from fastapi import Request
from app.core.config import settings
import requests
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import json
import base64
import time
import random
import string
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import uuid
from typing import Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import and_
from app.models.subscribe import SubscribeDB
def generate_random_string(length=32):
"""生成指定长度的随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
class WeChatClient:
"""微信客户端"""
def __init__(self):
self.appid = settings.WECHAT_APPID
self.secret = settings.WECHAT_SECRET
self.mch_id = settings.WECHAT_MCH_ID
self.private_key_path = settings.WECHAT_PRIVATE_KEY_PATH
self.cert_serial_no = settings.WECHAT_CERT_SERIAL_NO
self.api_v3_key = settings.WECHAT_API_V3_KEY
self.platform_cert_path = settings.WECHAT_PLATFORM_CERT_PATH
# 加载商户私钥
with open(self.private_key_path, "rb") as f:
self.private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
# 加载平台证书
with open(self.platform_cert_path, "rb") as f:
self.platform_cert = load_pem_x509_certificate(f.read())
self.platform_public_key = self.platform_cert.public_key()
async def get_access_token(self):
"""获取小程序全局接口调用凭据"""
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取access_token失败"))
return result.get("access_token")
async def get_phone_number(self, code: str) -> dict:
"""获取用户手机号"""
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"code": code}) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
# 打印调试信息
print(f"微信返回数据: {result}")
if result.get("errcode", 0) != 0:
raise Exception(result.get("errmsg", "获取手机号失败"))
# 正确获取 phone_info
phone_info = result.get("phone_info")
if not phone_info:
raise Exception("未获取到手机号信息")
return {
"phone_number": phone_info.get("phoneNumber"),
"pure_phone_number": phone_info.get("purePhoneNumber"),
"country_code": phone_info.get("countryCode"),
}
async def code2session(self, code: str) -> dict:
"""通过 code 获取用户 openid"""
url = f"https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取openid失败"))
return result
def sign_message(self, method: str, url_path: str, body: dict) -> tuple:
"""生成请求签名
Returns:
tuple: (nonce_str, timestamp, signature)
"""
nonce_str = str(uuid.uuid4()).replace('-', '')
timestamp = str(int(time.time()))
# 构造签名字符串
sign_str = f"{method}\n{url_path}\n{timestamp}\n{nonce_str}\n"
if body:
sign_str += f"{json.dumps(body)}\n"
else:
sign_str += "\n"
# 使用私钥签名
signature = self.private_key.sign(
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
return nonce_str, timestamp, base64.b64encode(signature).decode()
def verify_response(self, headers: dict, body: bytes) -> bool:
"""验证响应签名"""
timestamp = headers.get('Wechatpay-Timestamp')
nonce = headers.get('Wechatpay-Nonce')
signature = headers.get('Wechatpay-Signature')
serial = headers.get('Wechatpay-Serial')
if not all([timestamp, nonce, signature, serial]):
return False
# 构造验签字符串
sign_str = f"{timestamp}\n{nonce}\n{body.decode('utf-8')}\n"
try:
# 验证签名
self.platform_public_key.verify(
base64.b64decode(signature),
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
async def create_jsapi_payment(
self,
openid: str,
out_trade_no: str,
total_amount: int,
description: str
) -> dict:
"""创建 JSAPI 支付订单"""
url_path = "/v3/pay/transactions/jsapi"
api_url = f"https://api.mch.weixin.qq.com{url_path}"
# 构建请求数据
body = {
"appid": self.appid,
"mchid": self.mch_id,
"description": description,
"out_trade_no": out_trade_no,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/payment/notify",
"amount": {
"total": total_amount,
"currency": "CNY"
},
"payer": {
"openid": openid
}
}
# 生成签名
nonce_str, timestamp, signature = self.sign_message("POST", url_path, body)
# 构建认证头
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': (
f'WECHATPAY2-SHA256-RSA2048 '
f'mchid="{self.mch_id}",'
f'nonce_str="{nonce_str}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}",'
f'signature="{signature}"'
)
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=body, headers=headers) as response:
result = await response.json()
if response.status != 200:
raise Exception(f"请求失败: {result.get('message')}")
# 验证响应签名
if not self.verify_response(response.headers, await response.read()):
raise Exception("响应签名验证失败")
# 生成小程序调起支付的参数
prepay_id = result.get("prepay_id")
if not prepay_id:
raise Exception("未获取到prepay_id")
timestamp = str(int(time.time()))
nonce_str = str(uuid.uuid4()).replace('-', '')
package = f"prepay_id={prepay_id}"
# 签名支付参数
sign_str = f"{self.appid}\n{timestamp}\n{nonce_str}\n{package}\n"
signature = base64.b64encode(
self.private_key.sign(
sign_str.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
).decode()
return {
"prepay_id": prepay_id,
"payment_params": {
"appId": self.appid,
"timeStamp": timestamp,
"nonceStr": nonce_str,
"package": package,
"signType": "RSA",
"paySign": signature
}
}
except Exception as e:
raise Exception(f"创建支付订单失败: {str(e)}")
def decrypt_callback_data(self, ciphertext: str, nonce: str, associated_data: str = None) -> dict:
"""
解密回调数据
Args:
ciphertext: 密文
nonce: 随机串
associated_data: 附加数据,可选
Returns:
dict: 解密后的数据
"""
try:
# API v3密钥转换为32字节
key_bytes = self.api_v3_key.encode('utf-8')
if len(key_bytes) != 32:
print(f"API v3密钥长度不正确: {len(key_bytes)} bytes")
raise ValueError("API v3密钥必须是32字节")
# Base64解码密文和随机串
ciphertext_bytes = base64.b64decode(ciphertext)
nonce_bytes = nonce.encode('utf-8') # nonce 不需要 base64 解码
# 处理附加数据
associated_data_bytes = associated_data.encode('utf-8') if associated_data else b''
aesgcm = AESGCM(key_bytes)
# print("解密参数:")
# print(f"密钥长度: {len(key_bytes)} bytes")
# print(f"Nonce长度: {len(nonce_bytes)} bytes")
# print(f"密文长度: {len(ciphertext_bytes)} bytes")
# print(f"附加数据: {associated_data_bytes}")
# print(f"使用的密钥: {self.api_v3_key}") # 仅用于调试,生产环境需要移除
# print(f"原始nonce: {nonce}")
# print(f"原始associated_data: {associated_data}")
# 解密数据
decrypted_data = aesgcm.decrypt(
nonce_bytes,
ciphertext_bytes,
associated_data_bytes
)
print(f"解密后的原始数据: {decrypted_data}")
# 解析JSON数据
return json.loads(decrypted_data)
except Exception as e:
error_msg = f"解密回调数据失败: {str(e)}"
print(error_msg)
import traceback
print("详细错误信息:", traceback.format_exc())
raise Exception(error_msg)
async def verify_payment_notify(self, request: Request) -> dict:
"""验证支付回调通知"""
try:
# 获取请求头
headers = {
'Wechatpay-Signature': request.headers.get('Wechatpay-Signature'),
'Wechatpay-Timestamp': request.headers.get('Wechatpay-Timestamp'),
'Wechatpay-Nonce': request.headers.get('Wechatpay-Nonce'),
'Wechatpay-Serial': request.headers.get('Wechatpay-Serial')
}
print("微信支付回调请求头:", headers)
# 读取请求体
body = await request.body()
print("微信支付回调请求体:", body.decode('utf-8'))
# 验证签名
if not self.verify_response(headers, body):
print("签名验证失败")
return None
# 解密回调数据
data = await request.json()
print("解析的JSON数据:", data)
resource = data.get("resource", {})
ciphertext = resource.get("ciphertext")
nonce = resource.get("nonce")
associated_data = resource.get("associated_data")
if not all([ciphertext, nonce]):
print("缺少必要的解密参数")
return None
# 解密数据
decrypted_data = self.decrypt_callback_data(
ciphertext,
nonce,
associated_data
)
print("解密后的数据:", decrypted_data)
return decrypted_data
except Exception as e:
print(f"处理支付回调异常: {str(e)}")
import traceback
print("详细错误信息:", traceback.format_exc())
raise Exception(f"处理支付回调失败: {str(e)}")
async def apply_refund(
self,
order_id: str,
total_amount: int,
reason: str = "用户申请退款"
) -> dict:
"""
申请退款
Args:
order_id: 订单号(同时作为退款单号)
total_amount: 退款金额(分)
reason: 退款原因
Returns:
dict: 退款结果
"""
try:
print(f"开始申请退款: order_id={order_id}, amount={total_amount}")
# 生成随机字符串
nonce_str = generate_random_string()
timestamp = str(int(time.time()))
# 构建请求URL
url_path = "/v3/refund/domestic/refunds"
api_url = f"https://api.mch.weixin.qq.com{url_path}"
# 构建请求体
body = {
"out_trade_no": order_id, # 商户订单号,二选一
"out_refund_no": f"refund_{order_id}", # 商户退款单号
"reason": reason,
"notify_url": f"{settings.API_BASE_URL}/api/wechat/refund/notify",
"amount": {
"refund": total_amount, # 退款金额
"total": total_amount, # 原订单金额
"currency": "CNY"
}
}
# 生成签名
nonce_str, timestamp, signature = self.sign_message("POST", url_path, body)
# 构建认证头
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': (
f'WECHATPAY2-SHA256-RSA2048 '
f'mchid="{self.mch_id}",'
f'nonce_str="{nonce_str}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}",'
f'signature="{signature}"'
)
}
# 打印签名相关信息
print(f"签名字符串: {nonce_str}, {timestamp}, {signature}")
print(f"Authorization: {headers['Authorization']}")
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=body, headers=headers) as response:
print(f"退款响应状态码: {response.status}")
result = await response.json()
print(f"退款响应内容: {result}")
if response.status != 200:
raise Exception(f"退款申请失败: {result.get('message')}")
# 验证响应签名
if not self.verify_response(response.headers, await response.read()):
raise Exception("响应签名验证失败")
return result
except Exception as e:
print(f"申请退款异常: {str(e)}")
raise Exception(f"申请退款失败: {str(e)}")
async def send_subscribe_message(
self,
openid: str,
template_id: str,
data: Dict[str, Any],
db: Session = None,
user_id: int = None,
page: str = None
) -> bool:
"""
发送订阅消息
:param openid: 用户openid
:param template_id: 模板ID
:param data: 模板数据
:param db: 数据库会话(可选)
:param user_id: 用户ID可选用于检查订阅状态
:param page: 点击模板卡片后的跳转页面(可选)
:return: 发送是否成功
"""
try:
# 如果提供了数据库会话和用户ID检查订阅状态
if db and user_id:
subscribe = db.query(SubscribeDB).filter(
and_(
SubscribeDB.user_id == user_id,
SubscribeDB.template_id == template_id
)
).first()
# 如果用户没有订阅或拒绝了订阅,则不发送
if not subscribe or subscribe.action == "reject":
return False
# 构建消息数据
message_data = {
"touser": openid,
"template_id": template_id,
"data": {
key: {
"value": value
} for key, value in data.items()
}
}
# 如果提供了跳转页面
if page:
message_data["page"] = page
# 发送订阅消息
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/message/subscribe/send?access_token={access_token}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json=message_data) as response:
result = await response.json()
if result.get("errcode") == 0:
return True
print(f"发送订阅消息失败: {result}")
return False
except Exception as e:
print(f"发送订阅消息异常: {str(e)}")
return False