Files
homeschool/backend/app/routers/logs.py
derekc 956df11f49 Add PDF export to Activity Log with date range filtering
- Backend: add date_from/date_to query params to timeline, strikes, and logs endpoints
- Frontend: Export PDF button opens a dialog to select child and date range, generates a printable HTML report in a new tab

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-10 23:18:49 -07:00

275 lines
9.1 KiB
Python

from datetime import date
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload
from app.dependencies import get_db, get_current_user
from app.models.activity import ActivityLog
from app.models.child import Child
from app.models.session import DailySession, TimerEvent
from app.models.schedule import ScheduleBlock
from app.models.strike import StrikeEvent
from app.models.subject import Subject # noqa: F401
from app.models.user import User
from app.schemas.activity import ActivityLogCreate, ActivityLogOut, ActivityLogUpdate, StrikeEventOut, TimelineEventOut, TimelineEventUpdate
router = APIRouter(prefix="/api/logs", tags=["logs"])
@router.get("/timeline", response_model=list[TimelineEventOut])
async def get_timeline(
child_id: int | None = None,
log_date: date | None = None,
date_from: date | None = None,
date_to: date | None = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
query = (
select(TimerEvent)
.join(DailySession, TimerEvent.session_id == DailySession.id)
.join(Child, DailySession.child_id == Child.id)
.where(Child.user_id == current_user.id)
.options(
selectinload(TimerEvent.block).selectinload(ScheduleBlock.subject),
selectinload(TimerEvent.session).options(
selectinload(DailySession.child),
selectinload(DailySession.template),
),
)
.where(TimerEvent.event_type != "select")
.order_by(TimerEvent.occurred_at.desc())
)
if child_id:
query = query.where(DailySession.child_id == child_id)
if log_date:
query = query.where(DailySession.session_date == log_date)
if date_from:
query = query.where(DailySession.session_date >= date_from)
if date_to:
query = query.where(DailySession.session_date <= date_to)
result = await db.execute(query)
events = result.scalars().all()
return [_to_timeline_out(e) for e in events]
def _to_timeline_out(e: TimerEvent) -> TimelineEventOut:
blk = e.block
sub = blk.subject if blk else None
if e.event_type == "session_start":
block_label = e.session.template.name if e.session.template else None
else:
block_label = (blk.label or sub.name) if blk and sub else (blk.label if blk else None)
return TimelineEventOut(
id=e.id,
event_type=e.event_type,
occurred_at=e.occurred_at,
session_date=e.session.session_date,
child_id=e.session.child_id,
child_name=e.session.child.name,
block_label=block_label,
subject_name=sub.name if sub else None,
subject_icon=sub.icon if sub else None,
subject_color=sub.color if sub else None,
)
async def _get_timer_event(event_id: int, current_user: User, db: AsyncSession) -> TimerEvent:
result = await db.execute(
select(TimerEvent)
.join(DailySession, TimerEvent.session_id == DailySession.id)
.join(Child, DailySession.child_id == Child.id)
.where(TimerEvent.id == event_id, Child.user_id == current_user.id)
.options(
selectinload(TimerEvent.block).selectinload(ScheduleBlock.subject),
selectinload(TimerEvent.session).options(
selectinload(DailySession.child),
selectinload(DailySession.template),
),
)
)
event = result.scalar_one_or_none()
if not event:
raise HTTPException(status_code=404, detail="Event not found")
return event
@router.patch("/timeline/{event_id}", response_model=TimelineEventOut)
async def update_timeline_event(
event_id: int,
body: TimelineEventUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
event = await _get_timer_event(event_id, current_user, db)
if body.event_type is not None:
event.event_type = body.event_type
if body.occurred_at is not None:
event.occurred_at = body.occurred_at
await db.commit()
await db.refresh(event)
event = await _get_timer_event(event_id, current_user, db)
return _to_timeline_out(event)
@router.delete("/timeline/{event_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_timeline_event(
event_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
event = await _get_timer_event(event_id, current_user, db)
await db.delete(event)
await db.commit()
@router.get("/strikes", response_model=list[StrikeEventOut])
async def get_strike_events(
child_id: int | None = None,
log_date: date | None = None,
date_from: date | None = None,
date_to: date | None = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
query = (
select(StrikeEvent)
.join(Child, StrikeEvent.child_id == Child.id)
.where(Child.user_id == current_user.id)
.options(selectinload(StrikeEvent.child))
.order_by(StrikeEvent.occurred_at.desc())
)
if child_id:
query = query.where(StrikeEvent.child_id == child_id)
if log_date:
query = query.where(func.date(StrikeEvent.occurred_at) == log_date)
if date_from:
query = query.where(func.date(StrikeEvent.occurred_at) >= date_from)
if date_to:
query = query.where(func.date(StrikeEvent.occurred_at) <= date_to)
result = await db.execute(query)
events = result.scalars().all()
return [
StrikeEventOut(
id=e.id,
child_id=e.child_id,
child_name=e.child.name,
occurred_at=e.occurred_at,
log_date=e.occurred_at.date(),
prev_strikes=e.prev_strikes,
new_strikes=e.new_strikes,
)
for e in events
]
@router.delete("/strikes/{strike_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_strike_event(
strike_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(StrikeEvent)
.join(Child, StrikeEvent.child_id == Child.id)
.where(StrikeEvent.id == strike_id, Child.user_id == current_user.id)
)
event = result.scalar_one_or_none()
if not event:
raise HTTPException(status_code=404, detail="Strike event not found")
await db.delete(event)
await db.commit()
@router.get("", response_model=list[ActivityLogOut])
async def list_logs(
child_id: int | None = None,
log_date: date | None = None,
date_from: date | None = None,
date_to: date | None = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
query = (
select(ActivityLog)
.join(Child)
.where(Child.user_id == current_user.id)
.order_by(ActivityLog.log_date.desc(), ActivityLog.created_at.desc())
)
if child_id:
query = query.where(ActivityLog.child_id == child_id)
if log_date:
query = query.where(ActivityLog.log_date == log_date)
if date_from:
query = query.where(ActivityLog.log_date >= date_from)
if date_to:
query = query.where(ActivityLog.log_date <= date_to)
result = await db.execute(query)
return result.scalars().all()
@router.post("", response_model=ActivityLogOut, status_code=status.HTTP_201_CREATED)
async def create_log(
body: ActivityLogCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
child_result = await db.execute(
select(Child).where(Child.id == body.child_id, Child.user_id == current_user.id)
)
if not child_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Child not found")
log = ActivityLog(**body.model_dump())
db.add(log)
await db.commit()
await db.refresh(log)
return log
@router.patch("/{log_id}", response_model=ActivityLogOut)
async def update_log(
log_id: int,
body: ActivityLogUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(ActivityLog)
.join(Child)
.where(ActivityLog.id == log_id, Child.user_id == current_user.id)
)
log = result.scalar_one_or_none()
if not log:
raise HTTPException(status_code=404, detail="Log not found")
for field, value in body.model_dump(exclude_none=True).items():
setattr(log, field, value)
await db.commit()
await db.refresh(log)
return log
@router.delete("/{log_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_log(
log_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(ActivityLog)
.join(Child)
.where(ActivityLog.id == log_id, Child.user_id == current_user.id)
)
log = result.scalar_one_or_none()
if not log:
raise HTTPException(status_code=404, detail="Log not found")
await db.delete(log)
await db.commit()