add gold agent

This commit is contained in:
aaron 2025-04-28 10:42:02 +08:00
parent 05db03e931
commit 230f501f98
10 changed files with 736 additions and 49 deletions

View File

@ -0,0 +1,451 @@
import os
import sys
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
import datetime
import time
import json
from datetime import datetime, timedelta
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from api.binance_api import BinanceAPI
from api.alltick_api import AllTickAPI
from api.deepseek_api import DeepSeekAPI
from models.data_processor import DataProcessor
from utils.config_loader import ConfigLoader
from utils.dingtalk_bot import DingTalkBot
class GoldAgent:
"""黄金分析智能体,用于分析黄金市场数据并生成交易策略"""
def __init__(self, config_path: str = None):
"""
初始化黄金分析智能体
Args:
config_path: 配置文件路径如果为None则使用默认路径
"""
# 加载配置
self.config_loader = ConfigLoader(config_path)
# 获取各部分配置
self.binance_config = self.config_loader.get_binance_config()
self.deepseek_config = self.config_loader.get_deepseek_config()
self.alltick_config = self.config_loader.get_config('alltick')
self.gold_config = self.config_loader.get_config('gold') # 黄金特定配置
self.data_config = self.config_loader.get_data_config()
self.agent_config = self.config_loader.get_agent_config()
self.dingtalk_config = self.config_loader.get_dingtalk_config()
# 初始化API客户端
self.binance_api = BinanceAPI(
api_key=self.binance_config['api_key'],
api_secret=self.binance_config['api_secret'],
test_mode=self.binance_config['test_mode']
)
# 初始化AllTick API客户端用于获取黄金数据
self.alltick_api = AllTickAPI(
api_key=self.alltick_config['api_key']
)
self.deepseek_api = DeepSeekAPI(
api_key=self.deepseek_config['api_key'],
model=self.deepseek_config['model']
)
# 初始化数据处理器
self.data_processor = DataProcessor(storage_path=os.path.join(self.data_config['storage_path'], 'gold'))
# 初始化钉钉机器人(如果启用)
self.dingtalk_bot = None
if self.dingtalk_config.get('enabled', False):
self.dingtalk_bot = DingTalkBot(
webhook_url=self.dingtalk_config['webhook_url'],
secret=self.dingtalk_config.get('secret')
)
print("钉钉机器人已启用")
# 设置黄金交易对
self.gold_symbols = self.gold_config.get('symbols', ["XAUUSD"]) # 默认黄金/美元
# 设置时间间隔
self.time_interval = self.gold_config.get('time_interval', '1d') # 默认日线
# 风险等级
self.risk_level = self.agent_config['risk_level']
def fetch_historical_data(self, symbol: str, days: int = 180) -> pd.DataFrame:
"""
获取历史数据
Args:
symbol: 黄金交易对符号例如 'XAUUSD'
days: 要获取的天数
Returns:
历史数据DataFrame
"""
print(f"获取{symbol}的历史数据({days}天)...")
# 计算开始时间
start_time = datetime.now() - timedelta(days=days)
start_str = start_time.strftime("%Y-%m-%d")
try:
# 使用AllTick API获取黄金K线数据
data = self.alltick_api.get_historical_klines(
symbol=symbol,
interval=self.time_interval,
start_str=start_str,
limit=days # 请求天数对应的K线数量
)
except Exception as e:
print(f"从AllTick获取数据出错: {e}")
if data.empty:
print(f"无法获取{symbol}的历史数据")
return pd.DataFrame()
# 保存原始数据
raw_data_path = self.data_processor.save_data(symbol, data, data_type='raw')
print(f"原始数据已保存到: {raw_data_path}")
return data
def process_data(self, symbol: str, data: pd.DataFrame) -> pd.DataFrame:
"""
处理数据
Args:
symbol: 黄金交易对符号例如 'XAUUSD'
data: 原始数据
Returns:
处理后的数据
"""
print(f"处理{symbol}的数据...")
# 预处理数据
processed_data = self.data_processor.preprocess_market_data(symbol, data)
# 计算额外的黄金特定指标
processed_data = self._calculate_gold_indicators(processed_data)
# 保存处理后的数据
processed_data_path = self.data_processor.save_data(symbol, processed_data, data_type='processed')
print(f"处理后的数据已保存到: {processed_data_path}")
return processed_data
def _calculate_gold_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
"""
计算黄金特定技术指标
Args:
data: 预处理后的数据
Returns:
添加了黄金特定指标的数据
"""
# 复制数据,避免修改原始数据
df = data.copy()
# 黄金波动率指标 (GVZ - Gold Volatility Index)
df['close_pct_change'] = df['close'].pct_change()
df['GVZ'] = df['close_pct_change'].rolling(window=20).std() * 100
# 黄金与美元指数相关性 (模拟数据,实际应用中需要获取美元指数数据)
df['USD_correlation'] = np.random.normal(-0.6, 0.2, len(df)) # 模拟负相关
# 季节性因素 (根据日期生成季节性指标)
df['month'] = pd.to_datetime(df.index).month
df['seasonal_factor'] = df['month'].apply(lambda m: 1 if m in [1, 8, 9, 12] else -1 if m in [3, 4, 6, 7] else 0)
# 黄金看涨/看跌指标 (计算基于多个指标的综合信号)
df['gold_bull_signal'] = np.where(
(df['RSI'] < 30) & (df['close'] > df['MA50']) & (df['seasonal_factor'] >= 0),
1, # 看涨信号
np.where(
(df['RSI'] > 70) & (df['close'] < df['MA50']) & (df['seasonal_factor'] <= 0),
-1, # 看跌信号
0 # 中性
)
)
return df
def _format_market_data(self, market_data: Dict[str, Any]) -> str:
"""
格式化市场数据为适合大模型的格式
Args:
market_data: 市场数据
Returns:
格式化的数据字符串
"""
# 这里可以根据实际情况调整格式化方式
return json.dumps(market_data, indent=2)
def _build_gold_analysis_prompt(self, formatted_data: str) -> str:
"""
构建黄金分析提示词
Args:
formatted_data: 格式化的市场数据
Returns:
提示词
"""
return f"""请对以下黄金市场K线数据进行深入分析并给出交易建议。请使用中文回复。
数据:
{formatted_data}
请严格按照以下步骤分析:
1. 分析主要技术指标(包括RSIMACD布林带均线等)判断当前黄金市场趋势
2. 基于K线数据和斐波那契回调水平确定关键支撑位和压力位
3. 分析黄金特定指标(GVZ波动率指标与美元相关性季节性因素)对价格的影响
4. 考虑全球宏观经济地缘政治通胀预期等因素对黄金价格的影响
5. 根据技术分析和基本面因素给出清晰的交易建议
6. 评估当前操作建议的紧迫性
请以JSON格式回复仅包含以下字段:
- market_trend: 市场趋势 (牛市, 熊市, 震荡)
- technical_analysis: 技术指标详细分析 (重点分析RSIMACD布林带均线交叉等情况)
- gold_specific_indicators: 黄金特有指标分析 (波动率美元相关性季节性)
- macro_factors: 宏观经济因素分析 (通胀利率地缘政治等)
- support_levels: 基于斐波那契回调的支撑位列表标明各个支撑位对应的斐波那契水平
- resistance_levels: 基于斐波那契回调的阻力位列表同样标明对应水平
- volume_analysis: 交易量分析重点关注量价关系
- recommendation: 操作建议 (买入卖出或等待)
- entry_points: 推荐入场点位
- exit_points: 推荐出场点位
- stop_loss: 建议止损位
- take_profit: 建议止盈位
- urgency_level: 操作紧迫性评级 (1-51为最低紧迫性5为最高紧迫性)
- urgency_reason: 紧迫性评级的原因说明
- summary: 分析总结不超过70字
请确保回复为有效的JSON格式分析要精准专业重点关注黄金的避险属性通胀对冲功能以及与主要货币的相关性"""
def analyze_gold(self) -> Dict[str, Dict[str, Any]]:
"""
分析所有支持的黄金交易对
Returns:
所有交易对的分析结果
"""
results = {}
for symbol in self.gold_symbols:
print(f"\n开始分析{symbol}...")
# 获取并处理数据
raw_data = self.fetch_historical_data(symbol, days=self.gold_config.get('historical_days', 180))
if not raw_data.empty:
processed_data = self.process_data(symbol, raw_data)
# 准备市场数据
market_data = {
"symbol": symbol,
"current_price": float(processed_data['close'].iloc[-1]),
"price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-2]),
"price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-2]) / processed_data['close'].iloc[-2] * 100),
"historical_prices": processed_data['close'].tail(100).tolist(),
"volumes": processed_data['volume'].tail(100).tolist(),
"technical_indicators": {
"rsi": float(processed_data['RSI'].iloc[-1]),
"macd": float(processed_data['MACD'].iloc[-1]),
"macd_signal": float(processed_data['MACD_Signal'].iloc[-1]),
"bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]),
"bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]),
"ma5": float(processed_data['MA5'].iloc[-1]),
"ma10": float(processed_data['MA10'].iloc[-1]),
"ma20": float(processed_data['MA20'].iloc[-1]),
"ma50": float(processed_data['MA50'].iloc[-1]),
"ma200": float(processed_data['MA50'].iloc[-1]) if 'MA200' not in processed_data else float(processed_data['MA200'].iloc[-1]),
"atr": float(processed_data['ATR'].iloc[-1])
},
"gold_specific_indicators": {
"gvz": float(processed_data['GVZ'].iloc[-1]),
"usd_correlation": float(processed_data['USD_correlation'].iloc[-1]),
"seasonal_factor": float(processed_data['seasonal_factor'].iloc[-1]),
"gold_bull_signal": float(processed_data['gold_bull_signal'].iloc[-1])
},
"klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(30).to_dict('records')
}
# 将市场数据格式化为适合大模型的格式
formatted_data = self._format_market_data(market_data)
# 构建提示词
prompt = self._build_gold_analysis_prompt(formatted_data)
# 调用API获取分析和交易建议
response, usage = self.deepseek_api.call_model(prompt, task_type="黄金分析", symbol=symbol)
# 解析响应
analysis_result = self.deepseek_api.extract_json_from_response(response)
# 添加token使用信息
if usage:
analysis_result["_token_usage"] = usage
# 整合结果
results[symbol] = {
"analysis": analysis_result,
"timestamp": datetime.now().isoformat()
}
# 如果钉钉机器人已启用,发送分析报告
if self.dingtalk_bot:
try:
print(f"发送{symbol}分析报告到钉钉...")
response = self.dingtalk_bot.send_analysis_report(symbol, results[symbol])
if response.get('errcode') == 0:
print(f"{symbol}分析报告发送成功")
else:
print(f"{symbol}分析报告发送失败: {response}")
except Exception as e:
print(f"发送{symbol}分析报告时出错: {e}")
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
return results
def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool:
"""
发送分析结果通知
Args:
symbol: 交易对符号
analysis_data: 分析数据
Returns:
发送是否成功
"""
if not self.dingtalk_bot:
print(f"钉钉通知未启用,跳过发送 {symbol} 的分析结果")
return False
try:
# 使用已初始化的钉钉机器人实例发送完整分析报告
response = self.dingtalk_bot.send_analysis_report(symbol, analysis_data)
if response.get('errcode') == 0:
print(f"成功发送 {symbol} 分析结果到钉钉")
return True
else:
print(f"发送 {symbol} 分析结果到钉钉失败: {response}")
return False
except Exception as e:
print(f"发送钉钉通知时出错: {e}")
return False
def run_analysis_cycle(self) -> Dict[str, Any]:
"""
运行一个完整的分析周期
Returns:
分析结果
"""
print(f"开始新的黄金分析周期,时间:{datetime.now().isoformat()}")
results = self.analyze_gold()
# 保存分析结果
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
results_dir = os.path.join(self.data_config['storage_path'], 'gold', 'analysis_results')
os.makedirs(results_dir, exist_ok=True)
results_file = os.path.join(results_dir, f"gold_analysis_results_{timestamp}.json")
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"分析结果已保存到:{results_file}")
# 导出 DeepSeek API token 使用情况
self._export_token_usage()
return results
def _export_token_usage(self) -> None:
"""
导出 DeepSeek API token 使用情况
"""
try:
# 每天导出一次详细的JSON数据
today = datetime.now().strftime("%Y%m%d")
token_usage_dir = os.path.join(self.data_config['storage_path'], 'gold', 'token_usage')
os.makedirs(token_usage_dir, exist_ok=True)
json_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.json")
# 如果文件不存在,导出一次
if not os.path.exists(json_file):
exported_file = self.deepseek_api.export_token_usage(json_file)
if exported_file:
print(f"DeepSeek API token 使用情况已导出到:{exported_file}")
# 每次都导出CSV格式的统计数据
csv_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.csv")
self.deepseek_api.export_token_usage(csv_file, "csv")
# 输出当前使用情况统计
stats = self.deepseek_api.get_token_usage_stats()
print(f"DeepSeek API token 使用统计:")
print(f"- 总调用次数: {stats['total_calls']}")
print(f"- 总token数: {stats['total_tokens']}")
print(f"- 输入token: {stats['total_prompt_tokens']}")
print(f"- 输出token: {stats['total_completion_tokens']}")
print(f"- 平均每次调用: {stats['average_tokens_per_call']:.2f} tokens")
except Exception as e:
print(f"导出 token 使用情况时出错: {e}")
def start_agent(self, run_once: bool = False) -> None:
"""
启动智能体
Args:
run_once: 是否只运行一次分析周期
"""
print("启动黄金分析智能体...")
try:
if run_once:
self.run_analysis_cycle()
# 导出最终的token使用情况
self._export_token_usage()
else:
while True:
self.run_analysis_cycle()
# 等待下一个分析周期
analysis_interval = self.gold_config.get('analysis_interval',
self.agent_config['analysis_interval'])
print(f"等待{analysis_interval}分钟后进行下一次分析...")
time.sleep(analysis_interval * 60)
except KeyboardInterrupt:
print("\n智能体已停止")
# 导出最终的token使用情况
self._export_token_usage()
except Exception as e:
print(f"智能体运行时出错: {e}")
import traceback
traceback.print_exc()
# 发生错误时也尝试导出token使用情况
self._export_token_usage()

134
cryptoai/api/alltick_api.py Normal file
View File

@ -0,0 +1,134 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
import requests
import urllib.parse
from typing import Dict, Any, Optional
from datetime import datetime
import pandas as pd
class AllTickAPI:
"""AllTick API客户端用于获取黄金等商品的K线数据"""
def __init__(self, api_key: str):
"""
初始化AllTick API客户端
Args:
api_key: AllTick API密钥
"""
self.api_key = api_key
self.base_url = "https://quote.alltick.io/quote-b-api"
self.session = requests.Session()
def get_historical_klines(self, symbol: str, interval: str, start_str: Optional[str] = None,
limit: int = 500) -> pd.DataFrame:
"""
获取历史K线数据
Args:
symbol: 交易对符号例如 "XAUUSD"
interval: K线时间间隔可选: 1m, 5m, 15m, 30m, 1h, 2h, 4h, 1d, 1w, 1M
start_str: 开始时间格式为 "YYYY-MM-DD"
limit: 获取的K线数量最大为1000
Returns:
K线数据的DataFrame
"""
# 转换时间间隔格式
kline_type = self._convert_interval(interval)
# 计算结束时间戳
end_timestamp = 0 # 0表示从当前时间开始查询
# 限制查询数量
query_kline_num = min(limit, 1000) # AllTick API每次最多查询1000根K线
# 构建查询参数
query_data = {
"trace": datetime.now().strftime("%Y%m%d%H%M%S"),
"data": {
"code": symbol,
"kline_type": kline_type,
"kline_timestamp_end": end_timestamp,
"query_kline_num": query_kline_num,
"adjust_type": 0 # 0: 除权
}
}
# 对查询参数进行编码
encoded_query = urllib.parse.quote(json.dumps(query_data))
# 构建完整的URL
url = f"{self.base_url}/kline?token={self.api_key}&query={encoded_query}"
try:
# 发送请求
response = self.session.get(url)
response.raise_for_status() # 如果响应状态码不是200则抛出异常
# 解析响应
result = response.json()
# 检查返回状态
if result.get("ret") != 200:
print(f"请求出错: {result.get('msg')}")
return pd.DataFrame()
# 获取K线数据
kline_data = result.get("data", {}).get("kline_list", [])
if not kline_data:
print(f"未获取到{symbol}的K线数据")
return pd.DataFrame()
# 转换为DataFrame
df = pd.DataFrame(kline_data)
# 转换列类型
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="s")
df["open"] = df["open_price"].astype(float)
df["high"] = df["high_price"].astype(float)
df["low"] = df["low_price"].astype(float)
df["close"] = df["close_price"].astype(float)
df["volume"] = df["volume"].astype(float)
# 设置索引
df.set_index("timestamp", inplace=True)
# 选择需要的列
df = df[["open", "high", "low", "close", "volume"]]
return df
except Exception as e:
print(f"获取K线数据时出错: {e}")
return pd.DataFrame()
def _convert_interval(self, interval: str) -> int:
"""
将Binance格式的时间间隔转换为AllTick格式
Args:
interval: Binance格式的时间间隔例如 "1m", "1h", "1d"
Returns:
AllTick格式的K线类型
"""
interval_map = {
"1m": 1, # 1分钟K
"5m": 2, # 5分钟K
"15m": 3, # 15分钟K
"30m": 4, # 30分钟K
"1h": 5, # 小时K
"2h": 6, # 2小时K
"4h": 7, # 4小时K
"1d": 8, # 日K
"1w": 9, # 周K
"1M": 10 # 月K
}
return interval_map.get(interval, 8) # 默认返回日K

View File

@ -9,6 +9,13 @@ deepseek:
api_key: "your_deepseek_api_key_here"
model: "deepseek-chat" # 使用的模型
# AllTick API设置用于获取黄金数据
alltick:
api_key: "6c7ba077eee07f6f270e219d4848700e-c-app"
symbols:
- "XAUUSD" # 黄金/美元
# - "XAGUSD" # 白银/美元
# 加密货币设置
crypto:
base_currencies:
@ -20,6 +27,21 @@ crypto:
quote_currency: "USDT"
time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
# 黄金市场分析配置
gold:
# 黄金交易对
symbols: ["XAUUSD"]
# 历史数据天数
historical_days: 180
# 时间间隔
time_interval: "1d"
# 分析间隔(分钟)
analysis_interval: 360
# 是否启用宏观经济因素分析
enable_macro_analysis: true
# 是否保存图表
save_charts: true
# 数据设置
data:
storage_path: "./cryptoai/data"

View File

@ -9,17 +9,39 @@ deepseek:
api_key: "sk-9f6b56f08796435d988cf202e37f6ee3"
model: "deepseek-chat" # 使用的模型
# AllTick API设置用于获取黄金数据
alltick:
api_key: "ee66d8e2868fd988fffacec40d078df8-c-app"
symbols:
- "XAUUSD" # 黄金/美元
# - "XAGUSD" # 白银/美元
# 加密货币设置
crypto:
base_currencies:
- "BTC"
# - "ONDO"
# - "ETH"
# - "BNB"
# - "SOL"
# - "SUI"
- "SUI"
quote_currency: "USDT"
time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
# 黄金市场分析配置
gold:
# 黄金交易对
symbols: ["GOLD"]
# 历史数据天数
historical_days: 180
# 时间间隔
time_interval: "1d"
# 分析间隔(分钟)
analysis_interval: 360
# 是否启用宏观经济因素分析
enable_macro_analysis: true
# 是否保存图表
save_charts: true
# 数据设置
data:
storage_path: "./cryptoai/data"

View File

@ -12,6 +12,7 @@ parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from cryptoai.agents.crypto_agent import CryptoAgent
from cryptoai.agents.gold_agent import GoldAgent
from cryptoai.utils.config_loader import ConfigLoader
@ -38,6 +39,13 @@ def parse_arguments():
choices=['low', 'medium', 'high'],
help='风险等级,默认使用配置中的值')
parser.add_argument('--agent', type=str, default='crypto',
choices=['crypto', 'gold'],
help='要使用的智能体类型,默认为加密货币智能体')
parser.add_argument('--alltick-key', type=str, default=None,
help='AllTick API密钥用于获取黄金等商品数据')
return parser.parse_args()
@ -56,28 +64,41 @@ def override_config_with_args(config_loader: ConfigLoader, args) -> None:
# 覆盖配置
if args.symbol:
crypto_config['base_currencies'] = [args.symbol.replace(crypto_config['quote_currency'], '')]
if args.agent == 'crypto':
crypto_config['base_currencies'] = [args.symbol.replace(crypto_config['quote_currency'], '')]
elif args.agent == 'gold':
gold_config = config_loader.get_config('gold')
gold_config['symbols'] = [args.symbol]
if args.days:
data_config['historical_days'] = args.days
if args.interval:
crypto_config['time_interval'] = args.interval
if args.agent == 'crypto':
crypto_config['time_interval'] = args.interval
elif args.agent == 'gold':
gold_config = config_loader.get_config('gold')
gold_config['time_interval'] = args.interval
if args.risk_level:
agent_config['risk_level'] = args.risk_level
# 设置AllTick API密钥
if args.alltick_key:
alltick_config = config_loader.get_config('alltick')
alltick_config['api_key'] = args.alltick_key
# 保存修改后的配置
# 注意:这只是修改了内存中的配置,没有写入文件
# 如果需要保存到文件,可以实现一个 save_config 方法
def analyze_single_symbol(agent: CryptoAgent, symbol: str, days: int = 30) -> Dict[str, Any]:
def analyze_single_symbol(agent, symbol: str, days: int = 30) -> Dict[str, Any]:
"""
分析单个交易对
Args:
agent: 加密货币智能体
agent: 智能体实例
symbol: 交易对符号
days: 历史数据天数
@ -95,21 +116,26 @@ def analyze_single_symbol(agent: CryptoAgent, symbol: str, days: int = 30) -> Di
processed_data = agent.process_data(symbol, raw_data)
# 分析市场
analysis_result = agent.analyze_market(symbol, processed_data)
# 根据agent类型执行不同的分析
if isinstance(agent, CryptoAgent):
# 分析市场
analysis_result = agent.analyze_market(symbol, processed_data)
# 预测价格
prediction_result = agent.predict_price(symbol)
# 预测价格
prediction_result = agent.predict_price(symbol)
# 生成策略
strategy = agent.generate_strategy(symbol, analysis_result)
# 生成策略
strategy = agent.generate_strategy(symbol, analysis_result)
# 整合结果
result = {
"analysis": analysis_result,
"prediction": prediction_result,
"strategy": strategy
}
# 整合结果
result = {
"analysis": analysis_result,
"prediction": prediction_result,
"strategy": strategy
}
elif isinstance(agent, GoldAgent):
# 黄金agent没有单独的analyze_market方法使用完整分析
result = agent.analyze_gold()
print(f"{symbol}分析完成")
@ -128,20 +154,36 @@ def main():
# 使用命令行参数覆盖配置
override_config_with_args(config_loader, args)
# 创建智能体
agent = CryptoAgent(config_path=args.config)
# 根据agent类型创建不同的智能体
if args.agent == 'crypto':
agent = CryptoAgent(config_path=args.config)
print("已启动加密货币分析智能体")
elif args.agent == 'gold':
agent = GoldAgent(config_path=args.config)
print("已启动黄金分析智能体")
# 如果指定了单个交易对
if args.symbol:
result = analyze_single_symbol(
agent=agent,
symbol=args.symbol,
days=args.days or agent.data_config['historical_days']
days=args.days or (
agent.data_config['historical_days'] if isinstance(agent, CryptoAgent)
else agent.gold_config.get('historical_days', 180)
)
)
print("\n分析结果:")
print(f"市场趋势: {result.get('analysis', {}).get('market_trend', 'unknown')}")
print(f"24小时预测: {result.get('prediction', {}).get('prediction_24h', {})}")
print(f"建议操作: {result.get('strategy', {}).get('position', 'unknown')}")
if isinstance(agent, CryptoAgent):
print("\n分析结果:")
print(f"市场趋势: {result.get('analysis', {}).get('market_trend', 'unknown')}")
print(f"24小时预测: {result.get('prediction', {}).get('prediction_24h', {})}")
print(f"建议操作: {result.get('strategy', {}).get('position', 'unknown')}")
elif isinstance(agent, GoldAgent) and result:
print("\n分析结果:")
symbol_result = result.get(args.symbol, {})
analysis = symbol_result.get('analysis', {})
print(f"市场趋势: {analysis.get('market_trend', 'unknown')}")
print(f"建议操作: {analysis.get('recommendation', 'unknown')}")
else:
# 启动智能体
agent.start_agent(run_once=args.run_once)

View File

@ -1,12 +1,15 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import yaml
from typing import Dict, Any
from typing import Dict, Any, Optional
class ConfigLoader:
"""配置加载器,用于读取和获取配置信息"""
def __init__(self, config_path: str = None):
def __init__(self, config_path: Optional[str] = None):
"""
初始化配置加载器
@ -14,51 +17,61 @@ class ConfigLoader:
config_path: 配置文件路径如果为None则使用默认路径
"""
if config_path is None:
# 获取当前文件的目录
current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(current_dir, "config", "config.yaml")
# 默认配置文件路径
current_dir = os.path.dirname(os.path.abspath(__file__))
config_dir = os.path.join(os.path.dirname(current_dir), 'config')
config_path = os.path.join(config_dir, 'config.yaml')
# 如果默认配置文件不存在,则使用示例配置文件
if not os.path.exists(config_path):
config_path = os.path.join(config_dir, 'config.example.yaml')
print(f"配置文件 config.yaml 不存在,使用示例配置文件: {config_path}")
self.config_path = config_path
self.config_data = self._load_config()
# 加载配置
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""
加载配置文件
Returns:
配置数据字典
配置字典
"""
try:
with open(self.config_path, 'r', encoding='utf-8') as file:
config_data = yaml.safe_load(file)
return config_data
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
print(f"已加载配置文件: {self.config_path}")
return config
except Exception as e:
print(f"加载配置文件时出错: {e}")
print(f"加载配置文件失败: {e}")
return {}
def get_config(self, section: str = None) -> Dict[str, Any]:
def get_config(self, section: str) -> Dict[str, Any]:
"""
获取配置数据
获取指定部分的配置
Args:
section: 配置部分名称如果为None则返回整个配置
section: 配置部分名称
Returns:
配置数据或指定部分的配置数据
配置字典
"""
if section is None:
return self.config_data
return self.config_data.get(section, {})
return self.config.get(section, {})
def get_binance_config(self) -> Dict[str, Any]:
"""获取Binance API配置"""
"""获取Binance配置"""
return self.get_config('binance')
def get_deepseek_config(self) -> Dict[str, Any]:
"""获取DeepSeek API配置"""
"""获取DeepSeek配置"""
return self.get_config('deepseek')
def get_alltick_config(self) -> Dict[str, Any]:
"""获取AllTick配置"""
return self.get_config('alltick')
def get_crypto_config(self) -> Dict[str, Any]:
"""获取加密货币配置"""
return self.get_config('crypto')

3
run.py
View File

@ -10,6 +10,9 @@ CryptoAI 启动脚本
python run.py --symbol BTCUSDT # 只分析指定的交易对
python run.py --days 7 # 获取7天的历史数据
python run.py --risk-level low # 设置低风险等级
python run.py --agent gold # 使用黄金分析智能体
python run.py --agent crypto # 使用加密货币分析智能体
python run.py --alltick-key KEY # 设置AllTick API密钥
"""
import sys