alphax/app/cli.py
2026-05-13 23:50:02 +08:00

79 lines
3.3 KiB
Python

"""Unified CLI entrypoint for AlphaX jobs."""
import argparse
from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, price_tracker, review_engine, sentiment_monitor
def build_parser():
parser = argparse.ArgumentParser(description="AlphaX unified CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
screener = subparsers.add_parser("screener", help="运行粗筛/细筛")
screener.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
confirm = subparsers.add_parser("confirm", help="运行确认流程")
confirm.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
tracker = subparsers.add_parser("tracker", help="运行价格跟踪")
review = subparsers.add_parser("review", help="运行复盘")
review.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
review.add_argument("--no-push", action="store_true", help="只运行复盘,不发飞书")
event = subparsers.add_parser("event", help="运行事件驱动筛选")
event.add_argument("--no-process-existing", action="store_true", help="只处理本轮新采集事件")
sentiment = subparsers.add_parser("sentiment", help="运行舆情任务")
sentiment.add_argument("--collect", action="store_true", help="采集并存储")
sentiment.add_argument("--check", action="store_true", help="输出舆情异动")
sentiment.add_argument("--scores", action="store_true", help="输出评分")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.command == "screener":
return altcoin_screener.main(compact=args.compact)
if args.command == "confirm":
return altcoin_confirm.main(compact=args.compact)
if args.command == "tracker":
return price_tracker.main()
if args.command == "review":
return review_engine.run_review(push_enabled=not args.no_push, compact=args.compact)
if args.command == "event":
result = event_driven_screener.run_once(process_existing=not args.no_process_existing)
print(event_driven_screener.json.dumps(result, ensure_ascii=False, indent=2, default=str))
return result
if args.command == "sentiment":
if args.collect:
result = sentiment_monitor.collect_and_store()
print(sentiment_monitor.json.dumps(result, ensure_ascii=False))
return result
if args.scores:
result = sentiment_monitor.get_sentiment_scores()
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
holdings = sentiment_monitor.get_active_holdings()
alerts = sentiment_monitor.get_sentiment_alert(holdings=holdings)
if args.check:
print(sentiment_monitor.json.dumps(alerts, ensure_ascii=False, indent=2))
return alerts
result = {
"alerts": alerts,
"sentiment_scores": sentiment_monitor.get_sentiment_scores(),
"holdings_count": len(holdings),
"check_time": sentiment_monitor.datetime.now().isoformat(),
}
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
parser.error(f"unknown command: {args.command}")
if __name__ == "__main__":
main()