This commit is contained in:
aaron 2025-04-29 11:45:24 +08:00
parent 13c335a524
commit 51a3ea5e2f
7 changed files with 136 additions and 88 deletions

View File

@ -217,6 +217,87 @@ class CryptoAgent:
请确保回复为有效的JSON格式分析要精准专业""" 请确保回复为有效的JSON格式分析要精准专业"""
def analyze_symbol(self, symbol: str) -> Dict[str, Any]:
"""
分析单个交易对
"""
results = {}
print(f"\n开始分析{symbol}...")
# 获取并处理数据
raw_data = self.fetch_historical_data(symbol, days=self.crypto_config['historical_days'])
if not raw_data.empty:
processed_data = self.process_data(symbol, raw_data)
# 准备市场数据
market_data = {
"symbol": symbol,
"current_price": float(processed_data['close'].iloc[-1]),
# "price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]),
# "price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]) / processed_data['close'].iloc[-24] * 100),
"historical_prices": processed_data['close'].tail(100).tolist(),
"volumes": processed_data['volume'].tail(100).tolist(),
"technical_indicators": {
"rsi": float(processed_data['RSI'].iloc[-1]),
"macd": float(processed_data['MACD'].iloc[-1]),
"macd_signal": float(processed_data['MACD_Signal'].iloc[-1]),
"bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]),
"bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]),
"ma5": float(processed_data['MA5'].iloc[-1]),
"ma10": float(processed_data['MA10'].iloc[-1]),
"ma20": float(processed_data['MA20'].iloc[-1]),
"ma50": float(processed_data['MA50'].iloc[-1]),
"atr": float(processed_data['ATR'].iloc[-1])
},
"klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(100).to_dict('records')
}
# 将市场数据格式化为适合大模型的格式
formatted_data = self._format_market_data(market_data)
# 构建提示词
prompt = self._build_market_analysis_prompt(formatted_data)
# 调用API获取分析和交易建议
response, usage = self.deepseek_api.call_model(prompt, task_type="交易分析", symbol=symbol)
# 解析响应
analysis_result = self.deepseek_api.extract_json_from_response(response)
# 添加token使用信息
if usage:
analysis_result["_token_usage"] = usage
# 整合结果
results[symbol] = {
"analysis": analysis_result,
"timestamp": datetime.now().isoformat()
}
# 保存分析结果到数据库
try:
# 保存到数据库
saved = self.db_manager.save_analysis_result(
agent='crypto',
symbol=symbol,
time_interval=self.time_interval,
analysis_result=analysis_result
)
if saved:
print(f"{symbol}分析结果已保存到数据库")
else:
print(f"{symbol}分析结果保存到数据库失败")
except Exception as e:
print(f"保存{symbol}分析结果到数据库时出错: {e}")
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
return results
def analyze_all_symbols(self) -> Dict[str, Dict[str, Any]]: def analyze_all_symbols(self) -> Dict[str, Dict[str, Any]]:
""" """
分析所有支持的交易对 分析所有支持的交易对
@ -229,81 +310,13 @@ class CryptoAgent:
for symbol in self.symbols: for symbol in self.symbols:
print(f"\n开始分析{symbol}...") print(f"\n开始分析{symbol}...")
# 获取并处理数据 single_result = self.analyze_symbol(symbol)
raw_data = self.fetch_historical_data(symbol, days=self.crypto_config['historical_days']) results[symbol] = single_result[symbol]
if not raw_data.empty:
processed_data = self.process_data(symbol, raw_data)
# 准备市场数据
market_data = {
"symbol": symbol,
"current_price": float(processed_data['close'].iloc[-1]),
# "price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]),
# "price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]) / processed_data['close'].iloc[-24] * 100),
"historical_prices": processed_data['close'].tail(100).tolist(),
"volumes": processed_data['volume'].tail(100).tolist(),
"technical_indicators": {
"rsi": float(processed_data['RSI'].iloc[-1]),
"macd": float(processed_data['MACD'].iloc[-1]),
"macd_signal": float(processed_data['MACD_Signal'].iloc[-1]),
"bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]),
"bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]),
"ma5": float(processed_data['MA5'].iloc[-1]),
"ma10": float(processed_data['MA10'].iloc[-1]),
"ma20": float(processed_data['MA20'].iloc[-1]),
"ma50": float(processed_data['MA50'].iloc[-1]),
"atr": float(processed_data['ATR'].iloc[-1])
},
"klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(100).to_dict('records')
}
# 将市场数据格式化为适合大模型的格式
formatted_data = self._format_market_data(market_data)
# 构建提示词
prompt = self._build_market_analysis_prompt(formatted_data)
# 调用API获取分析和交易建议
response, usage = self.deepseek_api.call_model(prompt, task_type="交易分析", symbol=symbol)
# 解析响应
analysis_result = self.deepseek_api.extract_json_from_response(response)
# 添加token使用信息
if usage:
analysis_result["_token_usage"] = usage
# 整合结果
results[symbol] = {
"analysis": analysis_result,
"timestamp": datetime.now().isoformat()
}
# 保存分析结果到数据库
try:
# 保存到数据库
saved = self.db_manager.save_analysis_result(
agent='crypto',
symbol=symbol,
time_interval=self.time_interval,
analysis_result=analysis_result
)
if saved:
print(f"{symbol}分析结果已保存到数据库")
else:
print(f"{symbol}分析结果保存到数据库失败")
except Exception as e:
print(f"保存{symbol}分析结果到数据库时出错: {e}")
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
return results return results
def run_analysis_cycle(self) -> Dict[str, Any]: def run_analysis_cycle(self, symbol: Optional[str] = None) -> Dict[str, Any]:
""" """
运行一个完整的分析周期 运行一个完整的分析周期
@ -312,7 +325,10 @@ class CryptoAgent:
""" """
print(f"开始新的分析周期,时间:{datetime.now().isoformat()}") print(f"开始新的分析周期,时间:{datetime.now().isoformat()}")
results = self.analyze_all_symbols() if symbol:
results = self.analyze_symbol(symbol)
else:
results = self.analyze_all_symbols()
# 保存分析结果 # 保存分析结果
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
@ -343,7 +359,7 @@ class CryptoAgent:
把分析的JSON结果调用大模型转化成交易建议 把分析的JSON结果调用大模型转化成交易建议
""" """
prompt = f""" prompt = f"""
请对以下加密货币市场分析的JSON结果进行深入分析 转化成包含分析时间技术分析支撑位压力位 建议买点卖点止损位止盈位仓位建议 增加适当的emoji便于阅读 简单明了 请对以下加密货币市场分析的JSON结果进行深入分析 转化成包含分析时间分析时间级别技术分析支撑位压力位 建议买点卖点止损位止盈位仓位建议 增加适当的emoji便于阅读 简单明了
分析 JSON 结果: 分析 JSON 结果:
{results} {results}
@ -392,7 +408,7 @@ class CryptoAgent:
except Exception as e: except Exception as e:
print(f"导出 token 使用情况时出错: {e}") print(f"导出 token 使用情况时出错: {e}")
def start_agent(self) -> None: def start_agent(self, symbol: Optional[str] = None) -> None:
""" """
启动智能体 启动智能体
@ -402,11 +418,13 @@ class CryptoAgent:
print("启动加密货币分析智能体...") print("启动加密货币分析智能体...")
try: try:
self.run_analysis_cycle() if symbol:
self.run_analysis_cycle(symbol)
else:
self.run_analysis_cycle()
# 导出最终的token使用情况 # 导出最终的token使用情况
self._export_token_usage() self._export_token_usage()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n智能体已停止") print("\n智能体已停止")
# 导出最终的token使用情况 # 导出最终的token使用情况

View File

@ -276,7 +276,7 @@ class GoldAgent:
"seasonal_factor": float(processed_data['seasonal_factor'].iloc[-1]), "seasonal_factor": float(processed_data['seasonal_factor'].iloc[-1]),
"gold_bull_signal": float(processed_data['gold_bull_signal'].iloc[-1]) "gold_bull_signal": float(processed_data['gold_bull_signal'].iloc[-1])
}, },
"klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(30).to_dict('records') "klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(100).to_dict('records')
} }
# 将市场数据格式化为适合大模型的格式 # 将市场数据格式化为适合大模型的格式
@ -332,7 +332,34 @@ class GoldAgent:
# 导出 DeepSeek API token 使用情况 # 导出 DeepSeek API token 使用情况
self._export_token_usage() self._export_token_usage()
return results # 把分析的JSON结果调用大模型转化成交易建议
trading_suggestions = self.convert_analysis_to_trading_suggestions(results)
print(f"交易建议:{trading_suggestions}")
if self.dingtalk_bot:
self.dingtalk_bot.send_markdown(title="黄金交易建议", text=trading_suggestions)
return results, trading_suggestions
def convert_analysis_to_trading_suggestions(self, results: Dict[str, Any]) -> str:
"""
把分析的JSON结果调用大模型转化成交易建议
"""
prompt = f"""
请对以下黄金市场分析的JSON结果进行深入分析 转化成包含分析时间技术分析支撑位压力位 建议买点卖点止损位止盈位仓位建议 增加适当的emoji便于阅读 简单明了
分析 JSON 结果:
{results}
"""
system_prompt = """
你是一个专业的黄金分析助手你擅长分析市场趋势预测价格走向和提供交易建议请始终使用中文回复并确保输出格式规范的Markdown
"""
response, usage = self.deepseek_api.call_model(prompt, system_prompt=system_prompt, task_type="交易建议")
message = self.deepseek_api.extract_text_from_response(response)
return message
def _export_token_usage(self) -> None: def _export_token_usage(self) -> None:
""" """

View File

@ -25,13 +25,13 @@ alltick:
# 加密货币设置 # 加密货币设置
crypto: crypto:
base_currencies: base_currencies:
- "BTC" # - "BTC"
- "ETH" # - "ETH"
# - "SOL" # - "SOL"
# - "SUI" # - "SUI"
# - "XRP" - "CETUS"
quote_currency: "USDT" quote_currency: "USDT"
time_interval: "1h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
historical_days: 30 historical_days: 30
# 黄金市场分析配置 # 黄金市场分析配置
@ -39,9 +39,9 @@ gold:
# 黄金交易对 # 黄金交易对
symbols: ["GOLD"] symbols: ["GOLD"]
# 历史数据天数 # 历史数据天数
historical_days: 180 historical_days: 30
# 时间间隔 # 时间间隔
time_interval: "1d" time_interval: "4h"
# 数据设置 # 数据设置

View File

@ -18,7 +18,7 @@ from cryptoai.utils.config_loader import ConfigLoader
def main(): def main():
try: try:
print("程序启动") print("定时程序启动")
# 设置每个4小时运行一次 # 设置每个4小时运行一次
schedule.every(4).hours.do(CryptoAgent().start_agent) schedule.every(4).hours.do(CryptoAgent().start_agent)
@ -26,7 +26,10 @@ def main():
while True: while True:
schedule.run_pending() schedule.run_pending()
time.sleep(1) time.sleep(1)
# CryptoAgent().start_agent()
# GoldAgent().start_agent()
# CryptoAgent().start_agent('BTCUSDT')
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n程序已退出") print("\n程序已退出")