crypto.ai/cryptoai/agents/gold_agent.py
2025-04-30 23:53:45 +08:00

430 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
import datetime
import time
import json
from datetime import datetime, timedelta
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from api.binance_api import BinanceAPI
from api.alltick_api import AllTickAPI
from api.deepseek_api import DeepSeekAPI
from models.data_processor import DataProcessor
from utils.config_loader import ConfigLoader
from utils.dingtalk_bot import DingTalkBot
class GoldAgent:
"""黄金分析智能体,用于分析黄金市场数据并生成交易策略"""
def __init__(self, config_path: str = None):
"""
初始化黄金分析智能体
Args:
config_path: 配置文件路径如果为None则使用默认路径
"""
# 加载配置
self.config_loader = ConfigLoader(config_path)
# 获取各部分配置
self.binance_config = self.config_loader.get_binance_config()
self.deepseek_config = self.config_loader.get_deepseek_config()
self.alltick_config = self.config_loader.get_alltick_config()
self.gold_config = self.config_loader.get_gold_config() # 黄金特定配置
self.data_config = self.config_loader.get_data_config()
self.dingtalk_config = self.config_loader.get_dingtalk_config()
# 初始化API客户端
self.binance_api = BinanceAPI(
api_key=self.binance_config['api_key'],
api_secret=self.binance_config['api_secret'],
test_mode=self.binance_config['test_mode']
)
# 初始化AllTick API客户端用于获取黄金数据
self.alltick_api = AllTickAPI(
api_key=self.alltick_config['api_key']
)
self.deepseek_api = DeepSeekAPI()
# 初始化数据处理器
self.data_processor = DataProcessor(storage_path=os.path.join(self.data_config['storage_path'], 'gold'))
# 初始化钉钉机器人(如果启用)
self.dingtalk_bot = None
if self.dingtalk_config.get('enabled', False):
self.dingtalk_bot = DingTalkBot(
webhook_url=self.dingtalk_config['webhook_url'],
secret=self.dingtalk_config.get('secret')
)
print("钉钉机器人已启用")
# 设置黄金交易对
self.gold_symbols = self.gold_config.get('symbols', ["XAUUSD"]) # 默认黄金/美元
# 设置时间间隔
self.time_interval = self.gold_config.get('time_interval', '1d') # 默认日线
def fetch_historical_data(self, symbol: str, days: int = 180) -> pd.DataFrame:
"""
获取历史数据
Args:
symbol: 黄金交易对符号,例如 'XAUUSD'
days: 要获取的天数
Returns:
历史数据DataFrame
"""
print(f"获取{symbol}的历史数据({days}天)...")
# 计算开始时间
start_time = datetime.now() - timedelta(days=days)
start_str = start_time.strftime("%Y-%m-%d")
try:
# 使用AllTick API获取黄金K线数据
data = self.alltick_api.get_historical_klines(
symbol=symbol,
interval=self.time_interval,
start_str=start_str,
limit=days # 请求天数对应的K线数量
)
except Exception as e:
print(f"从AllTick获取数据出错: {e}")
if data.empty:
print(f"无法获取{symbol}的历史数据")
return pd.DataFrame()
# 保存原始数据
raw_data_path = self.data_processor.save_data(symbol, data, data_type='raw')
print(f"原始数据已保存到: {raw_data_path}")
return data
def process_data(self, symbol: str, data: pd.DataFrame) -> pd.DataFrame:
"""
处理数据
Args:
symbol: 黄金交易对符号,例如 'XAUUSD'
data: 原始数据
Returns:
处理后的数据
"""
print(f"处理{symbol}的数据...")
# 预处理数据
processed_data = self.data_processor.preprocess_market_data(symbol, data)
# 计算额外的黄金特定指标
processed_data = self._calculate_gold_indicators(processed_data)
# 保存处理后的数据
processed_data_path = self.data_processor.save_data(symbol, processed_data, data_type='processed')
print(f"处理后的数据已保存到: {processed_data_path}")
return processed_data
def _calculate_gold_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
"""
计算黄金特定技术指标
Args:
data: 预处理后的数据
Returns:
添加了黄金特定指标的数据
"""
# 复制数据,避免修改原始数据
df = data.copy()
# 黄金波动率指标 (GVZ - Gold Volatility Index)
df['close_pct_change'] = df['close'].pct_change()
df['GVZ'] = df['close_pct_change'].rolling(window=20).std() * 100
# 黄金与美元指数相关性 (模拟数据,实际应用中需要获取美元指数数据)
df['USD_correlation'] = np.random.normal(-0.6, 0.2, len(df)) # 模拟负相关
# 季节性因素 (根据日期生成季节性指标)
df['month'] = pd.to_datetime(df.index).month
df['seasonal_factor'] = df['month'].apply(lambda m: 1 if m in [1, 8, 9, 12] else -1 if m in [3, 4, 6, 7] else 0)
# 黄金看涨/看跌指标 (计算基于多个指标的综合信号)
df['gold_bull_signal'] = np.where(
(df['RSI'] < 30) & (df['close'] > df['MA50']) & (df['seasonal_factor'] >= 0),
1, # 看涨信号
np.where(
(df['RSI'] > 70) & (df['close'] < df['MA50']) & (df['seasonal_factor'] <= 0),
-1, # 看跌信号
0 # 中性
)
)
return df
def _format_market_data(self, market_data: Dict[str, Any]) -> str:
"""
格式化市场数据为适合大模型的格式
Args:
market_data: 市场数据
Returns:
格式化的数据字符串
"""
# 这里可以根据实际情况调整格式化方式
return json.dumps(market_data, indent=2)
def _build_gold_analysis_prompt(self, formatted_data: str) -> str:
"""
构建黄金分析提示词
Args:
formatted_data: 格式化的市场数据
Returns:
提示词
"""
return f"""请对以下黄金市场K线数据进行深入分析并给出交易建议。请使用中文回复。
数据:
{formatted_data}
请严格按照以下步骤分析:
1. 分析主要技术指标(包括RSI、MACD、布林带、均线等),判断当前黄金市场趋势
2. 基于K线数据和斐波那契回调水平确定关键支撑位和压力位
3. 分析黄金特定指标(GVZ波动率指标、与美元相关性、季节性因素)对价格的影响
4. 考虑全球宏观经济、地缘政治、通胀预期等因素对黄金价格的影响
5. 根据技术分析和基本面因素给出清晰的交易建议
6. 评估当前操作建议的紧迫性
请以JSON格式回复仅包含以下字段:
- market_trend: 市场趋势 (牛市, 熊市, 震荡)
- technical_analysis: 技术指标详细分析 (重点分析RSI、MACD、布林带、均线交叉等情况)
- gold_specific_indicators: 黄金特有指标分析 (波动率、美元相关性、季节性)
- macro_factors: 宏观经济因素分析 (通胀、利率、地缘政治等)
- support_levels: 基于斐波那契回调的支撑位列表(标明各个支撑位对应的斐波那契水平)
- resistance_levels: 基于斐波那契回调的阻力位列表(同样标明对应水平)
- volume_analysis: 交易量分析,重点关注量价关系
- recommendation: 操作建议 (买入、卖出或等待)
- entry_points: 推荐入场点位
- exit_points: 推荐出场点位
- stop_loss: 建议止损位
- take_profit: 建议止盈位
- urgency_level: 操作紧迫性评级 (1-51为最低紧迫性5为最高紧迫性)
- urgency_reason: 紧迫性评级的原因说明
- summary: 分析总结不超过70字
请确保回复为有效的JSON格式分析要精准专业。重点关注黄金的避险属性、通胀对冲功能以及与主要货币的相关性。"""
def analyze_gold(self) -> Dict[str, Dict[str, Any]]:
"""
分析所有支持的黄金交易对
Returns:
所有交易对的分析结果
"""
results = {}
for symbol in self.gold_symbols:
print(f"\n开始分析{symbol}...")
# 获取并处理数据
raw_data = self.fetch_historical_data(symbol, days=self.gold_config.get('historical_days', 180))
if not raw_data.empty:
processed_data = self.process_data(symbol, raw_data)
# 准备市场数据
market_data = {
"symbol": symbol,
"current_price": float(processed_data['close'].iloc[-1]),
"price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-2]),
"price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-2]) / processed_data['close'].iloc[-2] * 100),
"historical_prices": processed_data['close'].tail(100).tolist(),
"volumes": processed_data['volume'].tail(100).tolist(),
"technical_indicators": {
"rsi": float(processed_data['RSI'].iloc[-1]),
"macd": float(processed_data['MACD'].iloc[-1]),
"macd_signal": float(processed_data['MACD_Signal'].iloc[-1]),
"bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]),
"bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]),
"ma5": float(processed_data['MA5'].iloc[-1]),
"ma10": float(processed_data['MA10'].iloc[-1]),
"ma20": float(processed_data['MA20'].iloc[-1]),
"ma50": float(processed_data['MA50'].iloc[-1]),
"ma200": float(processed_data['MA50'].iloc[-1]) if 'MA200' not in processed_data else float(processed_data['MA200'].iloc[-1]),
"atr": float(processed_data['ATR'].iloc[-1])
},
"gold_specific_indicators": {
"gvz": float(processed_data['GVZ'].iloc[-1]),
"usd_correlation": float(processed_data['USD_correlation'].iloc[-1]),
"seasonal_factor": float(processed_data['seasonal_factor'].iloc[-1]),
"gold_bull_signal": float(processed_data['gold_bull_signal'].iloc[-1])
},
"klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(100).to_dict('records')
}
# 将市场数据格式化为适合大模型的格式
formatted_data = self._format_market_data(market_data)
# 构建提示词
prompt = self._build_gold_analysis_prompt(formatted_data)
# 调用API获取分析和交易建议
response, usage = self.deepseek_api.call_model(prompt, task_type="黄金分析", symbol=symbol)
# 解析响应
analysis_result = self.deepseek_api.extract_json_from_response(response)
# 添加token使用信息
if usage:
analysis_result["_token_usage"] = usage
# 整合结果
results[symbol] = {
"analysis": analysis_result,
"timestamp": datetime.now().isoformat()
}
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
return results
def run_analysis_cycle(self) -> Dict[str, Any]:
"""
运行一个完整的分析周期
Returns:
分析结果
"""
print(f"开始新的黄金分析周期,时间:{datetime.now().isoformat()}")
results = self.analyze_gold()
# 保存分析结果
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
results_dir = os.path.join(self.data_config['storage_path'], 'gold', 'analysis_results')
os.makedirs(results_dir, exist_ok=True)
results_file = os.path.join(results_dir, f"gold_analysis_results_{timestamp}.json")
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"分析结果已保存到:{results_file}")
# 导出 DeepSeek API token 使用情况
self._export_token_usage()
# 把分析的JSON结果调用大模型转化成交易建议
trading_suggestions = self.convert_analysis_to_trading_suggestions(results)
print(f"交易建议:{trading_suggestions}")
if self.dingtalk_bot:
self.dingtalk_bot.send_markdown(title="黄金交易建议", text=trading_suggestions)
return results, trading_suggestions
def convert_analysis_to_trading_suggestions(self, results: Dict[str, Any]) -> str:
"""
把分析的JSON结果调用大模型转化成交易建议
"""
prompt = f"""
请对以下黄金市场分析的JSON结果进行归纳总结
需要输出的内容包括:
标题AI Agent 黄金分析报告
1. 对目标交易对行情进行总结
2. 对所有交易对给出操作建议:
2.1 操作建议(做多、做空、观望)
2.2 操作点位
2.3 止损止盈点位
2.4 操作评级
2.5 建议原因
以下是每个交易对的分析结果:
{results}
请以优美的Markdown格式输出通过 emoji 标签来增加可读性。
"""
system_prompt = """
你是一个专业的黄金分析高手,你擅长分析市场趋势、预测价格走向和提供交易建议,请始终使用中文回复。
"""
response, usage = self.deepseek_api.call_model(prompt, system_prompt=system_prompt, task_type="交易建议")
message = self.deepseek_api.extract_text_from_response(response)
return message
def _export_token_usage(self) -> None:
"""
导出 DeepSeek API token 使用情况
"""
try:
# 每天导出一次详细的JSON数据
today = datetime.now().strftime("%Y%m%d")
token_usage_dir = os.path.join(self.data_config['storage_path'], 'gold', 'token_usage')
os.makedirs(token_usage_dir, exist_ok=True)
json_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.json")
# 如果文件不存在,导出一次
if not os.path.exists(json_file):
exported_file = self.deepseek_api.export_token_usage(json_file)
if exported_file:
print(f"DeepSeek API token 使用情况已导出到:{exported_file}")
# 每次都导出CSV格式的统计数据
csv_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.csv")
self.deepseek_api.export_token_usage(csv_file, "csv")
# 输出当前使用情况统计
stats = self.deepseek_api.get_token_usage_stats()
print(f"DeepSeek API token 使用统计:")
print(f"- 总调用次数: {stats['total_calls']}")
print(f"- 总token数: {stats['total_tokens']}")
print(f"- 输入token: {stats['total_prompt_tokens']}")
print(f"- 输出token: {stats['total_completion_tokens']}")
print(f"- 平均每次调用: {stats['average_tokens_per_call']:.2f} tokens")
except Exception as e:
print(f"导出 token 使用情况时出错: {e}")
def start_agent(self) -> None:
"""
启动智能体
Args:
run_once: 是否只运行一次分析周期
"""
print("启动黄金分析智能体...")
try:
self.run_analysis_cycle()
# 导出最终的token使用情况
self._export_token_usage()
except KeyboardInterrupt:
print("\n智能体已停止")
# 导出最终的token使用情况
self._export_token_usage()
except Exception as e:
print(f"智能体运行时出错: {e}")
import traceback
traceback.print_exc()
# 发生错误时也尝试导出token使用情况
self._export_token_usage()