cici-web/app.py
2026-05-31 22:21:37 +08:00

415 lines
15 KiB
Python

from __future__ import annotations
import hashlib
import hmac
import json
import os
import secrets
import sqlite3
import sys
from datetime import datetime
from http import HTTPStatus
from http.cookies import SimpleCookie
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import urlparse
BASE_DIR = Path(__file__).resolve().parent
DB_PATH = BASE_DIR / "data" / "cici.sqlite"
SESSION_COOKIE = "cici_session"
SESSIONS: set[str] = set()
def db() -> sqlite3.Connection:
DB_PATH.parent.mkdir(exist_ok=True)
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
return connection
def hash_password(password: str, salt: bytes | None = None) -> str:
salt = salt or secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 120_000)
return f"{salt.hex()}:{digest.hex()}"
def verify_password(password: str, stored: str) -> bool:
salt_hex, digest_hex = stored.split(":", 1)
salt = bytes.fromhex(salt_hex)
expected = bytes.fromhex(digest_hex)
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 120_000)
return hmac.compare_digest(actual, expected)
def has_column(connection: sqlite3.Connection, table: str, column: str) -> bool:
rows = connection.execute(f"PRAGMA table_info({table})").fetchall()
return any(row["name"] == column for row in rows)
def init_db() -> None:
with db() as connection:
connection.executescript(
"""
CREATE TABLE IF NOT EXISTS owner (
id INTEGER PRIMARY KEY CHECK (id = 1),
password_hash TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS profile (
id INTEGER PRIMARY KEY CHECK (id = 1),
name TEXT NOT NULL,
bio TEXT NOT NULL,
avatar TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS folders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
folder_id INTEGER NOT NULL DEFAULT 1,
name TEXT NOT NULL,
src TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS diary (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
text TEXT NOT NULL,
image TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL
);
"""
)
if not has_column(connection, "photos", "folder_id"):
connection.execute("ALTER TABLE photos ADD COLUMN folder_id INTEGER NOT NULL DEFAULT 1")
if not has_column(connection, "diary", "image"):
connection.execute("ALTER TABLE diary ADD COLUMN image TEXT NOT NULL DEFAULT ''")
now = datetime.now().strftime("%Y-%m-%d %H:%M")
connection.execute(
"INSERT OR IGNORE INTO folders (id, name, created_at) VALUES (1, ?, ?)",
("默认相册", now),
)
connection.execute(
"INSERT OR IGNORE INTO owner (id, password_hash) VALUES (1, ?)",
(hash_password("cici123"),),
)
connection.execute(
"""
INSERT OR IGNORE INTO profile (id, name, bio, avatar)
VALUES (1, ?, ?, '')
""",
(
"CICI",
"我是一名小学生女孩,喜欢画画、看书、拍照、记录生活。我的风格是甜甜的、酷酷的,心里装着亮晶晶的快乐。",
),
)
class CiciHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(BASE_DIR), **kwargs)
def log_message(self, format: str, *args) -> None:
if sys.stderr:
super().log_message(format, *args)
def end_headers(self) -> None:
self.send_header("Cache-Control", "no-store")
super().end_headers()
def do_GET(self) -> None:
path = urlparse(self.path).path
if path.startswith("/data/") or path.endswith(".py") or path.endswith(".sqlite"):
self.send_error(HTTPStatus.FORBIDDEN)
return
if path == "/api/session":
self.send_json({"authenticated": self.is_authenticated()})
return
if path == "/api/profile":
self.get_profile()
return
if path == "/api/folders":
self.get_folders()
return
if path == "/api/photos":
self.get_photos()
return
if path == "/api/diary":
self.get_diary()
return
if path == "/":
self.path = "/index.html"
super().do_GET()
def do_POST(self) -> None:
path = urlparse(self.path).path
if path == "/api/login":
self.login()
return
if path == "/api/logout":
self.logout()
return
if path == "/api/folders":
if not self.require_owner():
return
self.create_folder()
return
if path == "/api/photos":
if not self.require_owner():
return
self.create_photos()
return
if path == "/api/diary":
if not self.require_owner():
return
self.create_diary()
return
self.send_error(HTTPStatus.NOT_FOUND)
def do_PUT(self) -> None:
path = urlparse(self.path).path
if path == "/api/profile":
if not self.require_owner():
return
self.update_profile()
return
if path.startswith("/api/folders/"):
if not self.require_owner():
return
self.update_folder(int(path.rsplit("/", 1)[-1]))
return
self.send_error(HTTPStatus.NOT_FOUND)
def do_DELETE(self) -> None:
path = urlparse(self.path).path
if path.startswith("/api/folders/"):
if not self.require_owner():
return
self.delete_folder(int(path.rsplit("/", 1)[-1]))
return
if path.startswith("/api/photos/"):
if not self.require_owner():
return
photo_id = int(path.rsplit("/", 1)[-1])
with db() as connection:
connection.execute("DELETE FROM photos WHERE id = ?", (photo_id,))
self.send_json({"ok": True})
return
if path.startswith("/api/diary/"):
if not self.require_owner():
return
entry_id = int(path.rsplit("/", 1)[-1])
with db() as connection:
connection.execute("DELETE FROM diary WHERE id = ?", (entry_id,))
self.send_json({"ok": True})
return
self.send_error(HTTPStatus.NOT_FOUND)
def read_json(self) -> dict:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length).decode("utf-8") if length else "{}"
return json.loads(raw or "{}")
def send_json(self, data: dict, status: int = 200) -> None:
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def send_json_error(self, message: str, status: int) -> None:
self.send_json({"error": message}, status)
def session_token(self) -> str:
cookie = SimpleCookie(self.headers.get("Cookie"))
morsel = cookie.get(SESSION_COOKIE)
return morsel.value if morsel else ""
def is_authenticated(self) -> bool:
return self.session_token() in SESSIONS
def require_owner(self) -> bool:
if not self.is_authenticated():
self.send_json_error("需要主人登录后才能编辑。", HTTPStatus.UNAUTHORIZED)
return False
return True
def login(self) -> None:
data = self.read_json()
password = data.get("password", "")
with db() as connection:
owner = connection.execute("SELECT password_hash FROM owner WHERE id = 1").fetchone()
if not owner or not verify_password(password, owner["password_hash"]):
self.send_json_error("密码不正确。", HTTPStatus.UNAUTHORIZED)
return
token = secrets.token_urlsafe(32)
SESSIONS.add(token)
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Set-Cookie", f"{SESSION_COOKIE}={token}; HttpOnly; SameSite=Lax; Path=/")
payload = b'{"ok": true}'
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def logout(self) -> None:
SESSIONS.discard(self.session_token())
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Set-Cookie", f"{SESSION_COOKIE}=; Max-Age=0; Path=/")
payload = b'{"ok": true}'
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def get_profile(self) -> None:
with db() as connection:
profile = connection.execute("SELECT name, bio, avatar FROM profile WHERE id = 1").fetchone()
self.send_json(dict(profile))
def update_profile(self) -> None:
data = self.read_json()
name = str(data.get("name") or "CICI").strip()[:18]
bio = str(data.get("bio") or "").strip()[:220]
avatar = data.get("avatar")
with db() as connection:
if avatar:
connection.execute(
"UPDATE profile SET name = ?, bio = ?, avatar = ? WHERE id = 1",
(name, bio, avatar),
)
else:
connection.execute(
"UPDATE profile SET name = ?, bio = ? WHERE id = 1",
(name, bio),
)
self.send_json({"ok": True})
def get_folders(self) -> None:
with db() as connection:
rows = connection.execute(
"""
SELECT folders.id, folders.name, folders.created_at, COUNT(photos.id) AS photo_count
FROM folders
LEFT JOIN photos ON photos.folder_id = folders.id
GROUP BY folders.id
ORDER BY folders.id ASC
"""
).fetchall()
self.send_json({"folders": [dict(row) for row in rows]})
def create_folder(self) -> None:
data = self.read_json()
name = str(data.get("name") or "").strip()[:24]
if not name:
self.send_json_error("文件夹名称不能为空。", HTTPStatus.BAD_REQUEST)
return
now = datetime.now().strftime("%Y-%m-%d %H:%M")
with db() as connection:
cursor = connection.execute(
"INSERT INTO folders (name, created_at) VALUES (?, ?)",
(name, now),
)
self.send_json({"id": cursor.lastrowid, "name": name})
def update_folder(self, folder_id: int) -> None:
if folder_id == 1:
self.send_json_error("默认相册不能改名。", HTTPStatus.BAD_REQUEST)
return
data = self.read_json()
name = str(data.get("name") or "").strip()[:24]
if not name:
self.send_json_error("文件夹名称不能为空。", HTTPStatus.BAD_REQUEST)
return
with db() as connection:
connection.execute("UPDATE folders SET name = ? WHERE id = ?", (name, folder_id))
self.send_json({"ok": True, "id": folder_id, "name": name})
def delete_folder(self, folder_id: int) -> None:
if folder_id == 1:
self.send_json_error("默认相册不能删除。", HTTPStatus.BAD_REQUEST)
return
with db() as connection:
connection.execute("UPDATE photos SET folder_id = 1 WHERE folder_id = ?", (folder_id,))
connection.execute("DELETE FROM folders WHERE id = ?", (folder_id,))
self.send_json({"ok": True})
def get_photos(self) -> None:
with db() as connection:
rows = connection.execute(
"SELECT id, folder_id, name, src, created_at FROM photos ORDER BY id DESC LIMIT 120"
).fetchall()
self.send_json({"photos": [dict(row) for row in rows]})
def create_photos(self) -> None:
data = self.read_json()
photos = data.get("photos") or []
folder_id = int(data.get("folder_id") or 1)
now = datetime.now().strftime("%Y-%m-%d %H:%M")
with db() as connection:
folder = connection.execute("SELECT id FROM folders WHERE id = ?", (folder_id,)).fetchone()
if not folder:
folder_id = 1
for photo in photos[:12]:
name = str(photo.get("name") or "CICI 的照片").strip()[:40]
src = str(photo.get("src") or "")
if src.startswith("data:image/"):
connection.execute(
"INSERT INTO photos (folder_id, name, src, created_at) VALUES (?, ?, ?, ?)",
(folder_id, name, src, now),
)
self.send_json({"ok": True})
def get_diary(self) -> None:
with db() as connection:
rows = connection.execute(
"SELECT id, title, text, image, created_at FROM diary ORDER BY id DESC LIMIT 30"
).fetchall()
self.send_json({"entries": [dict(row) for row in rows]})
def create_diary(self) -> None:
data = self.read_json()
title = str(data.get("title") or "今天的小日记").strip()[:24]
text = str(data.get("text") or "").strip()[:360]
image = str(data.get("image") or "")
if not text and not image:
self.send_json_error("日记内容或图片至少需要一个。", HTTPStatus.BAD_REQUEST)
return
if image and not image.startswith("data:image/"):
image = ""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
with db() as connection:
connection.execute(
"INSERT INTO diary (title, text, image, created_at) VALUES (?, ?, ?, ?)",
(title, text, image, now),
)
self.send_json({"ok": True})
def main() -> None:
init_db()
port = int(os.environ.get("PORT", "8001"))
host = os.environ.get("HOST", "127.0.0.1")
server = ThreadingHTTPServer((host, port), CiciHandler)
if sys.stdout:
print(f"CICI website running at http://{host}:{port}")
server.serve_forever()
if __name__ == "__main__":
try:
main()
except Exception as exc:
(BASE_DIR / "server.err").write_text(f"{type(exc).__name__}: {exc}\n", encoding="utf-8")
raise