Source code for backend.app.api.contacts

from typing import Annotated

from fastapi import APIRouter, Depends
from sqlmodel import Session, select

from backend.app.models.contact import Contact
from backend.app.models.dataset import Dataset
from backend.database import get_db

contact_router = APIRouter(prefix="/contacts", tags=["Contacts for Datasets"])


[docs] @contact_router.post("/") async def create_contact( contact: Contact, db: Annotated[Session, Depends(get_db)] ): """ Create a new contact. This endpoint allows the creation of a new contact in the database. :param contact: The contact information to be added. :type contact: Contact :param db: The database session. :type db: Session :return: The created contact. :rtype: Contact """ db.add(contact) db.commit() db.refresh(contact) return contact
[docs] @contact_router.get("/") async def get_all_contacts(db: Session = Depends(get_db)): """ Retrieve all contacts. This endpoint retrieves all contacts stored in the database. :param db: The database session. :type db: Session :return: A list of all contacts. :rtype: list[Contact] """ return db.exec(select(Contact)).all()
[docs] @contact_router.get("/by-dataset/{dataset_id}") async def get_contacts_by_dataset_id( dataset_id: int, db: Session = Depends(get_db) ): """ Retrieve contacts by dataset ID. This endpoint retrieves contacts associated with a specific dataset. :param dataset_id: The ID of the dataset. :type dataset_id: int :param db: The database session. :type db: Session :return: A list of contacts associated with the specified dataset. :rtype: list[Contact] """ statement = select(Dataset).where(Dataset.id == dataset_id) dataset = db.exec(statement).one() return dataset.contacts