alphax/tests/test_user_subscription_auth.py
2026-05-13 22:32:50 +08:00

232 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
from datetime import datetime, timedelta
import pytest
from fastapi.testclient import TestClient
PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if PROJECT_DIR not in sys.path:
sys.path.insert(0, PROJECT_DIR)
import auth_db
import web_server
@pytest.fixture
def temp_auth_db(monkeypatch, tmp_path):
db_path = tmp_path / "auth.db"
monkeypatch.setattr(auth_db, "DB_PATH", str(db_path))
monkeypatch.setattr(web_server.auth_db, "DB_PATH", str(db_path))
# 默认测试环境不配置 SMTP注册接口返回 dev_verification_code 便于验证。
for key in [
"ASTOCK_SMTP_HOST", "ASTOCK_SMTP_PORT", "ASTOCK_SMTP_USERNAME",
"ASTOCK_SMTP_PASSWORD", "ASTOCK_SMTP_SENDER",
]:
monkeypatch.delenv(key, raising=False)
auth_db.init_auth_db()
return db_path
def test_register_creates_unverified_user_with_invite_code_and_email_verification(temp_auth_db):
result = auth_db.register_user("alice@example.com", "StrongPass123")
assert result["email"] == "alice@example.com"
assert result["email_verified"] is False
assert len(result["invite_code"]) >= 8
assert result["verification_code"] and len(result["verification_code"]) == 6
user = auth_db.get_user_by_email("alice@example.com")
assert user["password_hash"] != "StrongPass123"
assert user["status"] == "pending_email_verification"
assert user["invited_by_user_id"] is None
def test_register_with_invite_code_locks_inviter_relationship(temp_auth_db):
inviter = auth_db.register_user("inviter@example.com", "StrongPass123")
invited = auth_db.register_user("bob@example.com", "StrongPass123", invite_code=inviter["invite_code"])
user = auth_db.get_user_by_email("bob@example.com")
assert user["invited_by_user_id"] == inviter["user_id"]
assert invited["invited_by_user_id"] == inviter["user_id"]
def test_invalid_invite_code_rejects_registration(temp_auth_db):
with pytest.raises(auth_db.AuthError) as exc:
auth_db.register_user("bad@example.com", "StrongPass123", invite_code="NO_SUCH_CODE")
assert "邀请码无效" in str(exc.value)
def test_verify_email_activates_user_and_login_requires_verified_email(temp_auth_db):
reg = auth_db.register_user("alice@example.com", "StrongPass123")
with pytest.raises(auth_db.AuthError) as exc:
auth_db.login_user("alice@example.com", "StrongPass123")
assert "邮箱未验证" in str(exc.value)
verified = auth_db.verify_email("alice@example.com", reg["verification_code"])
assert verified["email_verified"] is True
assert verified["status"] == "active"
session = auth_db.login_user("alice@example.com", "StrongPass123")
assert session["token"]
assert session["user"]["email"] == "alice@example.com"
def test_free_trial_subscription_can_only_be_claimed_once(temp_auth_db):
reg = auth_db.register_user("alice@example.com", "StrongPass123")
auth_db.verify_email("alice@example.com", reg["verification_code"])
user = auth_db.get_user_by_email("alice@example.com")
sub = auth_db.claim_free_trial(user["id"])
assert sub["plan_code"] == "free_trial_1m"
assert sub["status"] == "active"
assert sub["source"] == "free_trial"
assert auth_db.get_user_by_email("alice@example.com")["free_trial_claimed"] == 1
with pytest.raises(auth_db.AuthError) as exc:
auth_db.claim_free_trial(user["id"])
assert "只能领取一次" in str(exc.value)
def test_subscription_tables_reserve_paid_usdt_order_schema(temp_auth_db):
cols = auth_db.get_table_columns("payment_order")
for required in ["user_id", "plan_code", "amount_usdt", "chain", "pay_address", "txid", "status"]:
assert required in cols
def test_auth_api_register_verify_login_and_free_trial(temp_auth_db):
client = TestClient(web_server.app)
r = client.post("/api/auth/register", json={"email": "alice@example.com", "password": "StrongPass123"})
assert r.status_code == 200
payload = r.json()
assert payload["ok"] is True
assert payload["user"]["email_verified"] is False
assert payload["dev_verification_code"] # SMTP 未配置前用于本地/测试验证;配置后隐藏
code = payload["dev_verification_code"]
r = client.post("/api/auth/verify-email", json={"email": "alice@example.com", "code": code})
assert r.status_code == 200
assert r.json()["user"]["email_verified"] is True
r = client.post("/api/auth/login", json={"email": "alice@example.com", "password": "StrongPass123"})
assert r.status_code == 200
token = r.cookies.get("altcoin_session")
assert token
r = client.post("/api/subscriptions/free-trial", cookies={"altcoin_session": token})
assert r.status_code == 200
assert r.json()["subscription"]["plan_code"] == "free_trial_1m"
r = client.post("/api/subscriptions/free-trial", cookies={"altcoin_session": token})
assert r.status_code == 400
assert "只能领取一次" in r.json()["detail"]
def test_register_sends_email_and_hides_dev_code_when_smtp_configured(temp_auth_db, monkeypatch):
sent = []
monkeypatch.setenv("ASTOCK_SMTP_HOST", "smtp.example.com")
monkeypatch.setenv("ASTOCK_SMTP_PORT", "465")
monkeypatch.setenv("ASTOCK_SMTP_USERNAME", "noreply@example.com")
monkeypatch.setenv("ASTOCK_SMTP_PASSWORD", "secret")
monkeypatch.setenv("ASTOCK_SMTP_SENDER", "noreply@example.com")
def fake_send(to_email, code):
sent.append((to_email, code))
return True
monkeypatch.setattr(auth_db, "send_verification_email", fake_send)
client = TestClient(web_server.app)
r = client.post("/api/auth/register", json={"email": "mail@example.com", "password": "StrongPass123"})
assert r.status_code == 200
payload = r.json()
assert payload["dev_verification_code"] is None
assert payload["email_sent"] is True
assert sent and sent[0][0] == "mail@example.com"
def test_resend_verification_code_has_rate_limit(temp_auth_db, monkeypatch):
sent = []
monkeypatch.setattr(auth_db, "send_verification_email", lambda email, code: sent.append((email, code)) or True)
reg = auth_db.register_user("alice@example.com", "StrongPass123")
# 注册验证码也视为一次发送;需要过冷却时间后才允许重发。
with pytest.raises(auth_db.AuthError) as exc:
auth_db.resend_verification_code("alice@example.com")
assert "请稍后再试" in str(exc.value)
conn = auth_db.get_conn()
old_time = (datetime.now() - timedelta(seconds=auth_db.RESEND_COOLDOWN_SECONDS + 5)).isoformat(timespec="seconds")
conn.execute("UPDATE email_verification_code SET created_at=? WHERE email=?", (old_time, "alice@example.com"))
conn.commit()
conn.close()
resent = auth_db.resend_verification_code("alice@example.com")
assert resent["email"] == "alice@example.com"
assert resent["verification_code"] != reg["verification_code"]
def test_auth_page_hides_internal_requirements_and_has_modern_member_copy(temp_auth_db):
client = TestClient(web_server.app)
r = client.get("/auth")
assert r.status_code == 200
html = r.text
for forbidden in [
"先把会员系统搭起来",
"第一阶段支持",
"后续用于会员天数奖励或返佣统计",
"USDT 订阅已预留表结构",
"SMTP 未配置",
"提示词",
"领取免费体验 1 个月",
"claimTrial()",
"/api/subscriptions/free-trial",
]:
assert forbidden not in html
for expected in [
"提前发现机会,别在强信号后追高",
"登录或开启免费体验",
"创建账号",
"会员登录",
"前往订阅中心",
"AI Opportunity Radar",
]:
assert expected in html
def test_subscription_page_owns_trial_and_plan_flow(temp_auth_db):
client = TestClient(web_server.app)
r = client.get("/subscription")
assert r.status_code == 200
html = r.text
for expected in [
"订阅中心",
"免费体验 1 个月",
"月付",
"/api/subscriptions/free-trial",
]:
assert expected in html
assert "先把会员系统搭起来" not in html
assert "USDT 订阅已预留表结构" not in html
def test_app_shell_returns_200_for_all_users(temp_auth_db):
"""v2: /app 是纯壳页,不校验登录/订阅(鉴权由 JS 调用 /api/auth/me 完成)"""
client = TestClient(web_server.app)
# 未登录也能拿到壳页JS自己判断跳转
assert client.get("/app").status_code == 200
assert "Omnix" in client.get("/app").text
# 登录用户也一样
reg = auth_db.register_user("alice@example.com", "StrongPass123")
auth_db.verify_email("alice@example.com", reg["verification_code"])
login = auth_db.login_user("alice@example.com", "StrongPass123")
token = login["token"]
assert client.get("/app", cookies={"altcoin_session": token}).status_code == 200