232 lines
8.8 KiB
Python
232 lines
8.8 KiB
Python
import os
|
||
import sys
|
||
from datetime import datetime, timedelta
|
||
|
||
import pytest
|
||
from fastapi.testclient import TestClient
|
||
|
||
PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||
if PROJECT_DIR not in sys.path:
|
||
sys.path.insert(0, PROJECT_DIR)
|
||
|
||
from app.db import auth_db
|
||
from app.web import web_server
|
||
|
||
|
||
@pytest.fixture
|
||
def temp_auth_db(monkeypatch, tmp_path):
|
||
db_path = tmp_path / "auth.db"
|
||
monkeypatch.setattr(auth_db, "DB_PATH", str(db_path))
|
||
monkeypatch.setattr(web_server.auth_db, "DB_PATH", str(db_path))
|
||
# 默认测试环境不配置 SMTP,注册接口返回 dev_verification_code 便于验证。
|
||
for key in [
|
||
"ASTOCK_SMTP_HOST", "ASTOCK_SMTP_PORT", "ASTOCK_SMTP_USERNAME",
|
||
"ASTOCK_SMTP_PASSWORD", "ASTOCK_SMTP_SENDER",
|
||
]:
|
||
monkeypatch.delenv(key, raising=False)
|
||
auth_db.init_auth_db()
|
||
return db_path
|
||
|
||
|
||
def test_register_creates_unverified_user_with_invite_code_and_email_verification(temp_auth_db):
|
||
result = auth_db.register_user("alice@example.com", "StrongPass123")
|
||
|
||
assert result["email"] == "alice@example.com"
|
||
assert result["email_verified"] is False
|
||
assert len(result["invite_code"]) >= 8
|
||
assert result["verification_code"] and len(result["verification_code"]) == 6
|
||
|
||
user = auth_db.get_user_by_email("alice@example.com")
|
||
assert user["password_hash"] != "StrongPass123"
|
||
assert user["status"] == "pending_email_verification"
|
||
assert user["invited_by_user_id"] is None
|
||
|
||
|
||
def test_register_with_invite_code_locks_inviter_relationship(temp_auth_db):
|
||
inviter = auth_db.register_user("inviter@example.com", "StrongPass123")
|
||
invited = auth_db.register_user("bob@example.com", "StrongPass123", invite_code=inviter["invite_code"])
|
||
|
||
user = auth_db.get_user_by_email("bob@example.com")
|
||
assert user["invited_by_user_id"] == inviter["user_id"]
|
||
assert invited["invited_by_user_id"] == inviter["user_id"]
|
||
|
||
|
||
def test_invalid_invite_code_rejects_registration(temp_auth_db):
|
||
with pytest.raises(auth_db.AuthError) as exc:
|
||
auth_db.register_user("bad@example.com", "StrongPass123", invite_code="NO_SUCH_CODE")
|
||
assert "邀请码无效" in str(exc.value)
|
||
|
||
|
||
def test_verify_email_activates_user_and_login_requires_verified_email(temp_auth_db):
|
||
reg = auth_db.register_user("alice@example.com", "StrongPass123")
|
||
|
||
with pytest.raises(auth_db.AuthError) as exc:
|
||
auth_db.login_user("alice@example.com", "StrongPass123")
|
||
assert "邮箱未验证" in str(exc.value)
|
||
|
||
verified = auth_db.verify_email("alice@example.com", reg["verification_code"])
|
||
assert verified["email_verified"] is True
|
||
assert verified["status"] == "active"
|
||
|
||
session = auth_db.login_user("alice@example.com", "StrongPass123")
|
||
assert session["token"]
|
||
assert session["user"]["email"] == "alice@example.com"
|
||
|
||
|
||
def test_free_trial_subscription_can_only_be_claimed_once(temp_auth_db):
|
||
reg = auth_db.register_user("alice@example.com", "StrongPass123")
|
||
auth_db.verify_email("alice@example.com", reg["verification_code"])
|
||
user = auth_db.get_user_by_email("alice@example.com")
|
||
|
||
sub = auth_db.claim_free_trial(user["id"])
|
||
assert sub["plan_code"] == "free_trial_1m"
|
||
assert sub["status"] == "active"
|
||
assert sub["source"] == "free_trial"
|
||
assert auth_db.get_user_by_email("alice@example.com")["free_trial_claimed"] == 1
|
||
|
||
with pytest.raises(auth_db.AuthError) as exc:
|
||
auth_db.claim_free_trial(user["id"])
|
||
assert "只能领取一次" in str(exc.value)
|
||
|
||
|
||
def test_subscription_tables_reserve_paid_usdt_order_schema(temp_auth_db):
|
||
cols = auth_db.get_table_columns("payment_order")
|
||
for required in ["user_id", "plan_code", "amount_usdt", "chain", "pay_address", "txid", "status"]:
|
||
assert required in cols
|
||
|
||
|
||
def test_auth_api_register_verify_login_and_free_trial(temp_auth_db):
|
||
client = TestClient(web_server.app)
|
||
|
||
r = client.post("/api/auth/register", json={"email": "alice@example.com", "password": "StrongPass123"})
|
||
assert r.status_code == 200
|
||
payload = r.json()
|
||
assert payload["ok"] is True
|
||
assert payload["user"]["email_verified"] is False
|
||
assert payload["dev_verification_code"] # SMTP 未配置前用于本地/测试验证;配置后隐藏
|
||
|
||
code = payload["dev_verification_code"]
|
||
r = client.post("/api/auth/verify-email", json={"email": "alice@example.com", "code": code})
|
||
assert r.status_code == 200
|
||
assert r.json()["user"]["email_verified"] is True
|
||
|
||
r = client.post("/api/auth/login", json={"email": "alice@example.com", "password": "StrongPass123"})
|
||
assert r.status_code == 200
|
||
token = r.cookies.get("altcoin_session")
|
||
assert token
|
||
|
||
r = client.post("/api/subscriptions/free-trial", cookies={"altcoin_session": token})
|
||
assert r.status_code == 200
|
||
assert r.json()["subscription"]["plan_code"] == "free_trial_1m"
|
||
|
||
r = client.post("/api/subscriptions/free-trial", cookies={"altcoin_session": token})
|
||
assert r.status_code == 400
|
||
assert "只能领取一次" in r.json()["detail"]
|
||
|
||
|
||
def test_register_sends_email_and_hides_dev_code_when_smtp_configured(temp_auth_db, monkeypatch):
|
||
sent = []
|
||
monkeypatch.setenv("ASTOCK_SMTP_HOST", "smtp.example.com")
|
||
monkeypatch.setenv("ASTOCK_SMTP_PORT", "465")
|
||
monkeypatch.setenv("ASTOCK_SMTP_USERNAME", "noreply@example.com")
|
||
monkeypatch.setenv("ASTOCK_SMTP_PASSWORD", "secret")
|
||
monkeypatch.setenv("ASTOCK_SMTP_SENDER", "noreply@example.com")
|
||
|
||
def fake_send(to_email, code):
|
||
sent.append((to_email, code))
|
||
return True
|
||
|
||
monkeypatch.setattr(auth_db, "send_verification_email", fake_send)
|
||
client = TestClient(web_server.app)
|
||
|
||
r = client.post("/api/auth/register", json={"email": "mail@example.com", "password": "StrongPass123"})
|
||
assert r.status_code == 200
|
||
payload = r.json()
|
||
assert payload["dev_verification_code"] is None
|
||
assert payload["email_sent"] is True
|
||
assert sent and sent[0][0] == "mail@example.com"
|
||
|
||
|
||
def test_resend_verification_code_has_rate_limit(temp_auth_db, monkeypatch):
|
||
sent = []
|
||
monkeypatch.setattr(auth_db, "send_verification_email", lambda email, code: sent.append((email, code)) or True)
|
||
reg = auth_db.register_user("alice@example.com", "StrongPass123")
|
||
|
||
# 注册验证码也视为一次发送;需要过冷却时间后才允许重发。
|
||
with pytest.raises(auth_db.AuthError) as exc:
|
||
auth_db.resend_verification_code("alice@example.com")
|
||
assert "请稍后再试" in str(exc.value)
|
||
|
||
conn = auth_db.get_conn()
|
||
old_time = (datetime.now() - timedelta(seconds=auth_db.RESEND_COOLDOWN_SECONDS + 5)).isoformat(timespec="seconds")
|
||
conn.execute("UPDATE email_verification_code SET created_at=? WHERE email=?", (old_time, "alice@example.com"))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
resent = auth_db.resend_verification_code("alice@example.com")
|
||
assert resent["email"] == "alice@example.com"
|
||
assert resent["verification_code"] != reg["verification_code"]
|
||
|
||
|
||
def test_auth_page_hides_internal_requirements_and_has_modern_member_copy(temp_auth_db):
|
||
client = TestClient(web_server.app)
|
||
r = client.get("/auth")
|
||
assert r.status_code == 200
|
||
html = r.text
|
||
|
||
for forbidden in [
|
||
"先把会员系统搭起来",
|
||
"第一阶段支持",
|
||
"后续用于会员天数奖励或返佣统计",
|
||
"USDT 订阅已预留表结构",
|
||
"SMTP 未配置",
|
||
"提示词",
|
||
"领取免费体验 1 个月",
|
||
"claimTrial()",
|
||
"/api/subscriptions/free-trial",
|
||
]:
|
||
assert forbidden not in html
|
||
|
||
for expected in [
|
||
"提前发现机会,别在强信号后追高",
|
||
"登录或开启免费体验",
|
||
"创建账号",
|
||
"会员登录",
|
||
"前往订阅中心",
|
||
"AI Opportunity Radar",
|
||
]:
|
||
assert expected in html
|
||
|
||
|
||
def test_subscription_page_owns_trial_and_plan_flow(temp_auth_db):
|
||
client = TestClient(web_server.app)
|
||
r = client.get("/subscription")
|
||
assert r.status_code == 200
|
||
html = r.text
|
||
|
||
for expected in [
|
||
"订阅中心",
|
||
"免费体验 1 个月",
|
||
"月付",
|
||
"/api/subscriptions/free-trial",
|
||
]:
|
||
assert expected in html
|
||
|
||
assert "先把会员系统搭起来" not in html
|
||
assert "USDT 订阅已预留表结构" not in html
|
||
|
||
|
||
def test_app_shell_returns_200_for_all_users(temp_auth_db):
|
||
"""v2: /app 是纯壳页,不校验登录/订阅(鉴权由 JS 调用 /api/auth/me 完成)"""
|
||
client = TestClient(web_server.app)
|
||
# 未登录也能拿到壳页(JS自己判断跳转)
|
||
assert client.get("/app").status_code == 200
|
||
assert "Omnix" in client.get("/app").text
|
||
|
||
# 登录用户也一样
|
||
reg = auth_db.register_user("alice@example.com", "StrongPass123")
|
||
auth_db.verify_email("alice@example.com", reg["verification_code"])
|
||
login = auth_db.login_user("alice@example.com", "StrongPass123")
|
||
token = login["token"]
|
||
assert client.get("/app", cookies={"altcoin_session": token}).status_code == 200
|