Files
yolkbook/backend/routers/stats.py
derekc aa12648228 Add multi-user auth, admin panel, and timezone support; rename to Yolkbook
- Rename app from Eggtracker to Yolkbook throughout
- Add JWT-based authentication (python-jose, passlib/bcrypt)
- Add users table; all data tables gain user_id FK for full data isolation
- Super admin credentials sourced from ADMIN_USERNAME/ADMIN_PASSWORD env vars,
  synced on every startup; orphaned rows auto-assigned to admin post-migration
- Login page with self-registration; JWT stored in localStorage (30-day expiry)
- Admin panel (/admin): list users, reset passwords, disable/enable, delete,
  and impersonate (Login As) with Return to Admin banner
- Settings modal (gear icon in nav): timezone selector and change password
- Timezone stored per-user; stats date windows computed in user's timezone;
  date input setToday() respects user timezone via Intl API
- migrate_v2.sql for existing single-user installs
- Auto-migration adds timezone column to users on startup
- Updated README with full setup, auth, admin, and migration docs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 23:19:29 -07:00

262 lines
9.2 KiB
Python

import calendar
from datetime import date, datetime, timedelta
from decimal import Decimal
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from fastapi import APIRouter, Depends
from sqlalchemy import select, func
from sqlalchemy.orm import Session
from database import get_db
from models import EggCollection, FlockHistory, FeedPurchase, OtherPurchase, User
from schemas import DashboardStats, BudgetStats, MonthlySummary
from auth import get_current_user
router = APIRouter(prefix="/api/stats", tags=["stats"])
def _today(user_timezone: str) -> date:
try:
return datetime.now(ZoneInfo(user_timezone)).date()
except ZoneInfoNotFoundError:
return date.today()
def _avg_per_hen_30d(db: Session, user_id: int, start_30d: date) -> float | None:
flock_at_date = (
select(FlockHistory.chicken_count)
.where(FlockHistory.user_id == user_id)
.where(FlockHistory.date <= EggCollection.date)
.order_by(FlockHistory.date.desc())
.limit(1)
.correlate(EggCollection)
.scalar_subquery()
)
rows = db.execute(
select(EggCollection.eggs, flock_at_date.label('flock_count'))
.where(EggCollection.user_id == user_id)
.where(EggCollection.date >= start_30d)
).all()
valid = [(r.eggs, r.flock_count) for r in rows if r.flock_count]
if not valid:
return None
return round(sum(e / f for e, f in valid) / len(valid), 3)
def _current_flock(db: Session, user_id: int) -> int | None:
row = db.scalars(
select(FlockHistory)
.where(FlockHistory.user_id == user_id)
.order_by(FlockHistory.date.desc())
.limit(1)
).first()
return row.chicken_count if row else None
def _total_eggs(db: Session, user_id: int, start: date | None = None, end: date | None = None) -> int:
q = select(func.coalesce(func.sum(EggCollection.eggs), 0)).where(EggCollection.user_id == user_id)
if start:
q = q.where(EggCollection.date >= start)
if end:
q = q.where(EggCollection.date <= end)
return db.scalar(q)
def _total_feed_cost(db: Session, user_id: int, start: date | None = None, end: date | None = None):
q = select(
func.coalesce(func.sum(FeedPurchase.bags * FeedPurchase.price_per_bag), 0)
).where(FeedPurchase.user_id == user_id)
if start:
q = q.where(FeedPurchase.date >= start)
if end:
q = q.where(FeedPurchase.date <= end)
return db.scalar(q)
def _total_other_cost(db: Session, user_id: int, start: date | None = None, end: date | None = None):
q = select(func.coalesce(func.sum(OtherPurchase.total), 0)).where(OtherPurchase.user_id == user_id)
if start:
q = q.where(OtherPurchase.date >= start)
if end:
q = q.where(OtherPurchase.date <= end)
return db.scalar(q)
@router.get("/dashboard", response_model=DashboardStats)
def dashboard_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uid = current_user.id
today = _today(current_user.timezone)
start_30d = today - timedelta(days=30)
start_7d = today - timedelta(days=7)
total_alltime = _total_eggs(db, uid)
total_30d = _total_eggs(db, uid, start=start_30d)
total_7d = _total_eggs(db, uid, start=start_7d)
flock = _current_flock(db, uid)
days_tracked = db.scalar(
select(func.count(func.distinct(EggCollection.date)))
.where(EggCollection.user_id == uid)
)
days_with_data_30d = db.scalar(
select(func.count(func.distinct(EggCollection.date)))
.where(EggCollection.user_id == uid)
.where(EggCollection.date >= start_30d)
)
avg_per_day = round(total_30d / days_with_data_30d, 2) if days_with_data_30d else None
avg_per_hen = _avg_per_hen_30d(db, uid, start_30d)
return DashboardStats(
current_flock=flock,
total_eggs_alltime=total_alltime,
total_eggs_30d=total_30d,
total_eggs_7d=total_7d,
avg_eggs_per_day_30d=avg_per_day,
avg_eggs_per_hen_day_30d=avg_per_hen,
days_tracked=days_tracked,
)
@router.get("/monthly", response_model=list[MonthlySummary])
def monthly_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uid = current_user.id
MONTH_NAMES = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
egg_rows = db.execute(
select(
func.year(EggCollection.date).label('year'),
func.month(EggCollection.date).label('month'),
func.sum(EggCollection.eggs).label('total_eggs'),
func.count(EggCollection.date).label('days_logged'),
)
.where(EggCollection.user_id == uid)
.group_by(func.year(EggCollection.date), func.month(EggCollection.date))
.order_by(func.year(EggCollection.date).desc(), func.month(EggCollection.date).desc())
).all()
if not egg_rows:
return []
feed_rows = db.execute(
select(
func.year(FeedPurchase.date).label('year'),
func.month(FeedPurchase.date).label('month'),
func.sum(FeedPurchase.bags * FeedPurchase.price_per_bag).label('feed_cost'),
)
.where(FeedPurchase.user_id == uid)
.group_by(func.year(FeedPurchase.date), func.month(FeedPurchase.date))
).all()
feed_map = {(r.year, r.month): r.feed_cost for r in feed_rows}
other_rows = db.execute(
select(
func.year(OtherPurchase.date).label('year'),
func.month(OtherPurchase.date).label('month'),
func.sum(OtherPurchase.total).label('other_cost'),
)
.where(OtherPurchase.user_id == uid)
.group_by(func.year(OtherPurchase.date), func.month(OtherPurchase.date))
).all()
other_map = {(r.year, r.month): r.other_cost for r in other_rows}
results = []
for row in egg_rows:
y, m = int(row.year), int(row.month)
last_day = calendar.monthrange(y, m)[1]
month_end = date(y, m, last_day)
flock_row = db.scalars(
select(FlockHistory)
.where(FlockHistory.user_id == uid)
.where(FlockHistory.date <= month_end)
.order_by(FlockHistory.date.desc())
.limit(1)
).first()
flock = flock_row.chicken_count if flock_row else None
total_eggs = int(row.total_eggs)
days_logged = int(row.days_logged)
avg_per_day = round(total_eggs / days_logged, 2) if days_logged else None
avg_per_hen = round(avg_per_day / flock, 3) if (avg_per_day and flock) else None
raw_feed_cost = feed_map.get((y, m))
raw_other_cost = other_map.get((y, m))
feed_cost = round(Decimal(str(raw_feed_cost)), 2) if raw_feed_cost else None
other_cost = round(Decimal(str(raw_other_cost)), 2) if raw_other_cost else None
raw_total_cost = (raw_feed_cost or 0) + (raw_other_cost or 0)
cpe = round(Decimal(str(raw_total_cost)) / Decimal(total_eggs), 4) if (raw_total_cost and total_eggs) else None
cpd = round(cpe * 12, 4) if cpe else None
results.append(MonthlySummary(
year=y,
month=m,
month_label=f"{MONTH_NAMES[m - 1]} {y}",
total_eggs=total_eggs,
days_logged=days_logged,
avg_eggs_per_day=avg_per_day,
flock_at_month_end=flock,
avg_eggs_per_hen_per_day=avg_per_hen,
feed_cost=feed_cost,
other_cost=other_cost,
cost_per_egg=cpe,
cost_per_dozen=cpd,
))
return results
@router.get("/budget", response_model=BudgetStats)
def budget_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uid = current_user.id
today = _today(current_user.timezone)
start_30d = today - timedelta(days=30)
total_feed_cost = _total_feed_cost(db, uid)
total_feed_cost_30d = _total_feed_cost(db, uid, start=start_30d)
total_other_cost = _total_other_cost(db, uid)
total_other_cost_30d = _total_other_cost(db, uid, start=start_30d)
total_eggs = _total_eggs(db, uid)
total_eggs_30d = _total_eggs(db, uid, start=start_30d)
def cost_per_egg(cost, eggs):
if not eggs or not cost:
return None
return round(Decimal(str(cost)) / Decimal(eggs), 4)
def cost_per_dozen(cpe):
return round(cpe * 12, 4) if cpe else None
combined_cost = total_feed_cost + total_other_cost
combined_cost_30d = total_feed_cost_30d + total_other_cost_30d
cpe = cost_per_egg(combined_cost, total_eggs)
cpe_30d = cost_per_egg(combined_cost_30d, total_eggs_30d)
return BudgetStats(
total_feed_cost=round(Decimal(str(total_feed_cost)), 2) if total_feed_cost else None,
total_feed_cost_30d=round(Decimal(str(total_feed_cost_30d)), 2) if total_feed_cost_30d else None,
total_other_cost=round(Decimal(str(total_other_cost)), 2) if total_other_cost else None,
total_other_cost_30d=round(Decimal(str(total_other_cost_30d)), 2) if total_other_cost_30d else None,
total_eggs_alltime=total_eggs,
total_eggs_30d=total_eggs_30d,
cost_per_egg=cpe,
cost_per_dozen=cost_per_dozen(cpe),
cost_per_egg_30d=cpe_30d,
cost_per_dozen_30d=cost_per_dozen(cpe_30d),
)