This commit is contained in:
aaron 2025-03-29 22:08:02 +08:00
parent 87dda933e2
commit be8a478676
4 changed files with 151 additions and 0 deletions

View File

@ -1529,6 +1529,7 @@ async def get_orders(
"complete_images": order.optimized_complete_images, "complete_images": order.optimized_complete_images,
"completed_time": order.completed_time, "completed_time": order.completed_time,
"final_amount": order.final_amount, "final_amount": order.final_amount,
"is_first_order": order.is_first_order,
"packages": package_list, "packages": package_list,
"address": { "address": {
"name": order.address_customer_name, "name": order.address_customer_name,

View File

@ -134,6 +134,12 @@ class Settings(BaseSettings):
QWEN_API_KEY: str = "sk-caa199589f1c451aaac471fad2986e28" QWEN_API_KEY: str = "sk-caa199589f1c451aaac471fad2986e28"
QWEN_API_URL: str = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" QWEN_API_URL: str = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
# UniSMS 配置
UNISMS_ACCESS_KEY_ID: str = "xxxxxx" # 替换为您的 UniSMS Access Key ID
UNISMS_ACCESS_KEY_SECRET: str = "xxxxxx" # 替换为您的 UniSMS Access Key Secret
UNISMS_SIGNATURE: str = "蜂快到家" # 短信签名
UNISMS_VERIFICATION_TEMPLATE_ID: str = "pub_verif_8dgk" # 验证码短信模板ID
class Config: class Config:
case_sensitive = True case_sensitive = True
env_file = ".env" env_file = ".env"

144
app/core/unisms.py Normal file
View File

@ -0,0 +1,144 @@
import json
import time
import hmac
import hashlib
import base64
import uuid
import logging
import aiohttp
from typing import List, Dict, Any, Optional
from app.core.config import settings
class UniSMSClient:
"""UniSMS短信客户端"""
def __init__(self):
self.access_key_id = settings.UNISMS_ACCESS_KEY_ID
self.access_key_secret = settings.UNISMS_ACCESS_KEY_SECRET
self.base_url = "https://uni.apistd.com"
self.endpoint = "/2022-12-28/sms/send"
self.signature_method = "HMAC-SHA256"
self.signature_version = "1"
def _generate_signature(self, string_to_sign: str) -> str:
"""生成签名"""
key = self.access_key_secret.encode('utf-8')
message = string_to_sign.encode('utf-8')
sign = hmac.new(key, message, digestmod=hashlib.sha256).digest()
return base64.b64encode(sign).decode('utf-8')
def _build_request_headers(self, body: Dict[str, Any]) -> Dict[str, str]:
"""构建请求头"""
timestamp = str(int(time.time()))
nonce = str(uuid.uuid4()).replace('-', '')
content_md5 = hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest()
# 构建待签名字符串
string_to_sign = "\n".join([
self.endpoint,
timestamp,
nonce,
content_md5
])
# 生成签名
signature = self._generate_signature(string_to_sign)
# 构建请求头
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-Uni-Timestamp": timestamp,
"X-Uni-Nonce": nonce,
"X-Uni-Content-MD5": content_md5,
"X-Uni-Signature-Method": self.signature_method,
"X-Uni-Signature-Version": self.signature_version,
"X-Uni-Signature": signature,
"X-Uni-AccessKeyId": self.access_key_id,
}
return headers
async def send_sms(
self,
to: str,
signature: str,
template_id: str,
template_data: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
发送短信
Args:
to: 接收短信的手机号码
signature: 短信签名
template_id: 短信模板ID
template_data: 短信模板参数
Returns:
Dict[str, Any]: 发送结果
"""
try:
# 构建请求体
body = {
"to": to,
"signature": signature,
"templateId": template_id,
}
if template_data:
body["templateData"] = template_data
# 构建请求头
headers = self._build_request_headers(body)
# 发送请求
url = f"{self.base_url}{self.endpoint}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body, headers=headers) as response:
result = await response.json()
logging.info(f"UniSMS响应: {result}")
if result.get("status") != "success":
logging.error(f"发送短信失败: {result}")
return {
"success": False,
"error": result.get("message", "发送短信失败")
}
return {
"success": True,
"message_id": result.get("data", {}).get("messageId", ""),
"fee": result.get("data", {}).get("fee", 0)
}
except Exception as e:
logging.error(f"发送短信异常: {str(e)}")
return {
"success": False,
"error": f"发送短信异常: {str(e)}"
}
async def send_verification_code(self, phone: str, code: str) -> Dict[str, Any]:
"""
发送验证码短信
Args:
phone: 手机号码
code: 验证码
Returns:
Dict[str, Any]: 发送结果
"""
# 使用验证码短信模板
template_id = settings.UNISMS_VERIFICATION_TEMPLATE_ID
signature = settings.UNISMS_SIGNATURE
template_data = {"code": code}
return await self.send_sms(
to=phone,
signature=signature,
template_id=template_id,
template_data=template_data
)

Binary file not shown.