71 lines
2.3 KiB
Python
71 lines
2.3 KiB
Python
from datetime import date
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db
|
|
from models import FlockHistory
|
|
from schemas import FlockHistoryCreate, FlockHistoryUpdate, FlockHistoryOut
|
|
|
|
router = APIRouter(prefix="/api/flock", tags=["flock"])
|
|
|
|
|
|
@router.get("", response_model=list[FlockHistoryOut])
|
|
def list_flock_history(db: Session = Depends(get_db)):
|
|
q = select(FlockHistory).order_by(FlockHistory.date.desc())
|
|
return db.scalars(q).all()
|
|
|
|
|
|
@router.get("/current", response_model=Optional[FlockHistoryOut])
|
|
def get_current_flock(db: Session = Depends(get_db)):
|
|
"""Returns the most recent flock entry — the current flock size."""
|
|
q = select(FlockHistory).order_by(FlockHistory.date.desc()).limit(1)
|
|
return db.scalars(q).first()
|
|
|
|
|
|
@router.get("/at/{target_date}", response_model=Optional[FlockHistoryOut])
|
|
def get_flock_at_date(target_date: date, db: Session = Depends(get_db)):
|
|
"""Returns the flock size that was in effect on a given date."""
|
|
q = (
|
|
select(FlockHistory)
|
|
.where(FlockHistory.date <= target_date)
|
|
.order_by(FlockHistory.date.desc())
|
|
.limit(1)
|
|
)
|
|
return db.scalars(q).first()
|
|
|
|
|
|
@router.post("", response_model=FlockHistoryOut, status_code=201)
|
|
def create_flock_entry(body: FlockHistoryCreate, db: Session = Depends(get_db)):
|
|
record = FlockHistory(**body.model_dump())
|
|
db.add(record)
|
|
db.commit()
|
|
db.refresh(record)
|
|
return record
|
|
|
|
|
|
@router.put("/{record_id}", response_model=FlockHistoryOut)
|
|
def update_flock_entry(
|
|
record_id: int,
|
|
body: FlockHistoryUpdate,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
record = db.get(FlockHistory, record_id)
|
|
if not record:
|
|
raise HTTPException(status_code=404, detail="Record not found")
|
|
for field, value in body.model_dump(exclude_none=True).items():
|
|
setattr(record, field, value)
|
|
db.commit()
|
|
db.refresh(record)
|
|
return record
|
|
|
|
|
|
@router.delete("/{record_id}", status_code=204)
|
|
def delete_flock_entry(record_id: int, db: Session = Depends(get_db)):
|
|
record = db.get(FlockHistory, record_id)
|
|
if not record:
|
|
raise HTTPException(status_code=404, detail="Record not found")
|
|
db.delete(record)
|
|
db.commit()
|