Files
yolkbook/backend/main.py
derekc f6cc7a606e Harden security: CORS, XSS, rate limiting, CSP, SRI
- Lock down CORS to ALLOWED_ORIGINS env var (was wildcard)
- Fix admin panel XSS: use data-username attributes instead of
  interpolating usernames into onclick handlers
- Add rate limiting to /api/auth/register (3r/m) and /api/admin/*
  (10r/m); set limit_req_status 429
- Add Content-Security-Policy header restricting scripts to self
  and cdn.jsdelivr.net
- Add Subresource Integrity hash to Chart.js CDN script tag

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-19 23:18:33 -07:00

136 lines
4.4 KiB
Python

import os
import logging
import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import select, update, text
from database import Base, engine, SessionLocal
from models import User, EggCollection, FlockHistory, FeedPurchase, OtherPurchase
from auth import hash_password
from routers import eggs, flock, feed, stats, other
from routers import auth_router, admin
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("yolkbook")
_REQUIRED_ENV = ["ADMIN_USERNAME", "ADMIN_PASSWORD", "JWT_SECRET", "DATABASE_URL"]
def _validate_env():
missing = [k for k in _REQUIRED_ENV if not os.environ.get(k)]
if missing:
logger.critical("Missing required environment variables: %s", ", ".join(missing))
sys.exit(1)
def _seed_admin():
"""Create or update the admin user from environment variables.
Also assigns any records with NULL user_id to the admin (post-migration).
"""
admin_username = os.environ["ADMIN_USERNAME"]
admin_password = os.environ["ADMIN_PASSWORD"]
with SessionLocal() as db:
admin_user = db.scalars(
select(User).where(User.username == admin_username)
).first()
if admin_user is None:
admin_user = User(
username=admin_username,
hashed_password=hash_password(admin_password),
is_admin=True,
)
db.add(admin_user)
db.commit()
db.refresh(admin_user)
logger.info("Admin user '%s' created.", admin_username)
else:
# Always sync password + admin flag from env vars
admin_user.hashed_password = hash_password(admin_password)
admin_user.is_admin = True
db.commit()
# Assign orphaned records (from pre-migration data) to admin
for model in [EggCollection, FlockHistory, FeedPurchase, OtherPurchase]:
db.execute(
update(model)
.where(model.user_id == None) # noqa: E711
.values(user_id=admin_user.id)
)
db.commit()
def _run_migrations():
"""Apply incremental schema changes that create_all won't handle on existing tables."""
with SessionLocal() as db:
# v2.1 — timezone column on users
try:
db.execute(text(
"ALTER TABLE users ADD COLUMN timezone VARCHAR(64) NOT NULL DEFAULT 'UTC'"
))
db.commit()
except Exception:
db.rollback() # column already exists — safe to ignore
# v2.2 — composite (user_id, date) indexes for faster filtered queries
for sql in [
"CREATE INDEX ix_flock_history_user_date ON flock_history (user_id, date)",
"CREATE INDEX ix_feed_purchases_user_date ON feed_purchases (user_id, date)",
"CREATE INDEX ix_other_purchases_user_date ON other_purchases (user_id, date)",
]:
try:
db.execute(text(sql))
db.commit()
except Exception:
db.rollback() # index already exists — safe to ignore
# v2.3 — unique constraint on flock_history (user_id, date)
try:
db.execute(text(
"ALTER TABLE flock_history ADD CONSTRAINT uq_flock_user_date UNIQUE (user_id, date)"
))
db.commit()
except Exception:
db.rollback() # constraint already exists — safe to ignore
@asynccontextmanager
async def lifespan(app: FastAPI):
_validate_env()
Base.metadata.create_all(bind=engine)
_run_migrations()
_seed_admin()
yield
app = FastAPI(title="Yolkbook API", lifespan=lifespan)
_cors_origins = [o.strip() for o in os.environ.get("ALLOWED_ORIGINS", "").split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
app.include_router(auth_router.router)
app.include_router(admin.router)
app.include_router(eggs.router)
app.include_router(flock.router)
app.include_router(feed.router)
app.include_router(other.router)
app.include_router(stats.router)
@app.get("/api/health")
def health():
return {"status": "ok"}