aidress/app/utils/client.py
2025-03-21 22:49:03 +08:00

192 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import httpx
import json
import logging
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
class HttpClient:
"""
HTTP客户端工具类
封装了请求和响应处理逻辑
"""
def __init__(
self,
base_url: str = "",
timeout: int = 30,
headers: Optional[Dict[str, str]] = None,
verify_ssl: bool = True
):
"""
初始化HTTP客户端
Args:
base_url: API基础URL
timeout: 请求超时时间(秒)
headers: 默认请求头
verify_ssl: 是否验证SSL证书
"""
self.base_url = base_url
self.timeout = timeout
self.headers = headers or {}
self.verify_ssl = verify_ssl
async def request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""
发送HTTP请求
Args:
method: 请求方法 (GET, POST, PUT, DELETE等)
url: 请求URL路径
params: URL查询参数
data: 表单数据或二进制数据
json_data: JSON数据
headers: 请求头
timeout: 超时时间(秒)
Returns:
响应数据
Raises:
Exception: 请求失败时抛出异常
"""
if self.base_url:
full_url = urljoin(self.base_url, url)
else:
full_url = url
request_headers = {**self.headers}
if headers:
request_headers.update(headers)
timeout_value = timeout or self.timeout
try:
async with httpx.AsyncClient(verify=self.verify_ssl) as client:
response = await client.request(
method=method,
url=full_url,
params=params,
data=data,
json=json_data,
headers=request_headers,
timeout=timeout_value,
)
# 记录请求和响应
logger.debug(f"HTTP请求: {method} {full_url}")
logger.debug(f"状态码: {response.status_code}")
# 尝试解析JSON响应
response_data = None
try:
response_data = response.json()
except json.JSONDecodeError:
response_data = {"content": response.text}
# 检查状态码
response.raise_for_status()
# 验证是否成功
self._verify_success(response_data, response.status_code)
return response_data
except httpx.HTTPStatusError as e:
logger.error(f"HTTP状态错误: {e.response.status_code} - {e.response.text}")
try:
error_data = e.response.json()
msg = error_data.get("message", str(e))
except json.JSONDecodeError:
msg = e.response.text or str(e)
raise Exception(f"请求失败 ({e.response.status_code}): {msg}")
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
raise Exception(f"请求错误: {str(e)}")
except Exception as e:
logger.error(f"未知错误: {str(e)}")
raise
def _verify_success(self, data: Dict[str, Any], status_code: int) -> None:
"""
验证响应是否成功
Args:
data: 响应数据
status_code: HTTP状态码
Raises:
Exception: 验证失败时抛出异常
"""
# 检查HTTP状态码
if status_code >= 400:
raise Exception(f"请求失败,状态码: {status_code}")
# 检查响应体中的success字段如果存在
if isinstance(data, dict) and "success" in data and data["success"] is False:
msg = data.get("message", "未知错误")
code = data.get("code", status_code)
raise Exception(f"请求失败 (code: {code}): {msg}")
async def get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""发送GET请求"""
return await self.request("GET", url, params=params, headers=headers, timeout=timeout)
async def post(
self,
url: str,
data: Optional[Any] = None,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""发送POST请求"""
return await self.request(
"POST", url, params=params, data=data, json_data=json_data,
headers=headers, timeout=timeout
)
async def put(
self,
url: str,
data: Optional[Any] = None,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""发送PUT请求"""
return await self.request(
"PUT", url, params=params, data=data, json_data=json_data,
headers=headers, timeout=timeout
)
async def delete(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""发送DELETE请求"""
return await self.request("DELETE", url, params=params, headers=headers, timeout=timeout)