alphax/app/cli.py
2026-05-14 17:56:33 +08:00

89 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Unified CLI entrypoint for AlphaX Agent Crypto jobs."""
import argparse
from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, price_tracker, review_engine, sentiment_monitor
def build_parser():
parser = argparse.ArgumentParser(description="AlphaX Agent Crypto unified CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
screener = subparsers.add_parser("screener", help="运行粗筛/细筛")
screener.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
confirm = subparsers.add_parser("confirm", help="运行确认流程")
confirm.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
tracker = subparsers.add_parser("tracker", help="运行价格跟踪")
review = subparsers.add_parser("review", help="运行复盘")
review.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
review.add_argument("--no-push", action="store_true", help="只运行复盘,不发飞书")
event = subparsers.add_parser("event", help="运行事件驱动筛选")
event.add_argument("--no-process-existing", action="store_true", help="只处理本轮新采集事件")
sentiment = subparsers.add_parser("sentiment", help="运行舆情任务")
sentiment.add_argument("--collect", action="store_true", help="采集并存储")
sentiment.add_argument("--check", action="store_true", help="输出舆情异动")
sentiment.add_argument("--scores", action="store_true", help="输出评分")
llm = subparsers.add_parser("llm-insights", help="异步生成 LLM 缓存解释")
llm.add_argument("--scope", choices=["recommendations", "sentiment", "sentiment-events", "review"], default="recommendations")
llm.add_argument("--limit", type=int, default=30)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.command == "screener":
return altcoin_screener.main(compact=args.compact)
if args.command == "confirm":
return altcoin_confirm.main(compact=args.compact)
if args.command == "tracker":
return price_tracker.main()
if args.command == "review":
return review_engine.run_review(push_enabled=not args.no_push, compact=args.compact)
if args.command == "event":
result = event_driven_screener.run_once(process_existing=not args.no_process_existing)
print(event_driven_screener.json.dumps(result, ensure_ascii=False, indent=2, default=str))
return result
if args.command == "sentiment":
if args.collect:
result = sentiment_monitor.collect_and_store()
print(sentiment_monitor.json.dumps(result, ensure_ascii=False))
return result
if args.scores:
result = sentiment_monitor.get_sentiment_scores()
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
holdings = sentiment_monitor.get_active_holdings()
alerts = sentiment_monitor.get_sentiment_alert(holdings=holdings)
if args.check:
print(sentiment_monitor.json.dumps(alerts, ensure_ascii=False, indent=2))
return alerts
result = {
"alerts": alerts,
"sentiment_scores": sentiment_monitor.get_sentiment_scores(),
"holdings_count": len(holdings),
"check_time": sentiment_monitor.datetime.now().isoformat(),
}
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
if args.command == "llm-insights":
from app.services import llm_insights
result = llm_insights.run(scope=args.scope, limit=args.limit)
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
parser.error(f"unknown command: {args.command}")
if __name__ == "__main__":
main()