Ir para o conteúdo

Exercícios da aula 10

Exercício 01

Adicione os campos created_at e updated_at na tabela Todo - Eles devem ser init=False - Deve usar func.now() para criação - O campo updated_at deve ter onupdate

Solução

Devem ser adicionados os dois campos ao modelo Todo:

fast_zero/models.py
@table_registry.mapped_as_dataclass
class Todo:
    __tablename__ = 'todos'

    id: Mapped[int] = mapped_column(init=False, primary_key=True)
    title: Mapped[str]
    description: Mapped[str]
    state: Mapped[TodoState]

    user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))

    user: Mapped[User] = relationship(init=False, back_populates='todos')

    # Exercício 01
    created_at: Mapped[datetime] = mapped_column(
        init=False, server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        init=False, server_default=func.now(), onupdate=func.now()
    )

Exercício 02

Criar uma migração para que os novos campos sejam versionados e também aplicar a migração

Solução

Se executarmos a migração com o primeiro exercício resolvido, teremos algo como:

$ Execução no terminal!
alembic revision --autogenerate -m "Adicionando created_at e updated_at na tabela de todos"
^[[AINFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.autogenerate.compare] Detected added column 'todos.created_at'
INFO  [alembic.autogenerate.compare] Detected added column 'todos.updated_at'
INFO  [alembic.autogenerate.compare] Detected added column 'users.updated_at'
  Generating /home/dunossauro/git/fastapi-do-
  zero/codigo_das_aulas/09/migrations/versions/bd7cea4a4773_adicionando_created_at_e_updated_at_na_.py ...  done
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError)

Caso você receba esse erro ao aplicar a migração, recomendo que veja a Live de Python #211 sobre migrações e bancos de dados evolutivos (recomendada no texto da aula 04 - "Caso nunca tenha trabalhado com Migrações").

No minuto 1:28:33 o motivo e a solução desse erro são abordados em mais detalhes.

Mas, em resumo, isso é um problema causado pelo modo como o python se comunica com o sqlite, fazendo com que cada alteração no banco seja aplicado linha, a linha. Para fazer todas as modificações de uma vez, usamos as operações em batch. A ideia é abrir uma única conexão com o banco de dados e executar determinadas operações para todas as linhas antes da conexão ser fechada.

Para isso será preciso alterar o arquivo de migrações manualmente. O arquivo deve se parecer com esse:

/migrations/versions/bb77f9679811_exercicio_02_aula_04.py
# ...
def upgrade():
    with op.batch_alter_table('users', schema=None) as batch_op:  
        batch_op.add_column(   
            sa.Column(
                'updated_at',
                sa.DateTime(),
                server_default=sa.text('(CURRENT_TIMESTAMP)'),
                nullable=False,
            )
        )


def downgrade():
    with op.batch_alter_table('users', schema=None) as batch_op:  
        batch_op.drop_column('updated_at')  

Gerando a seguinte migração:

"""Adicionando created_at e updated_at na tabela de todos

Revision ID: bd7cea4a4773
Revises: 3a79a86c9e4a
Create Date: 2024-10-05 01:11:38.100051

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'bd7cea4a4773'
down_revision: Union[str, None] = '3a79a86c9e4a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('todos', sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False))
    op.add_column('todos', sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False))
    # ### end Alembic commands ###


def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('todos', 'updated_at')
    op.drop_column('todos', 'created_at')
    # ### end Alembic commands ###

Exercício 03

Adicionar os campos created_at e updated_at no schema de saída dos endpoints. Para que esse valores sejam retornados na API.

Solução

Para adicionar os campos é necessário somente a adição dos mesmos no schema:

fast_zero/schemas.py
from datetime import datetime
# ...


class TodoPublic(TodoSchema):
    id: int
    created_at: datetime
    updated_at: datetime

A adaptação do teste, para validar o tempo, pode usar o evento de mock_db_time. Como o pydantic converte o resultado para json, ele transforma a data no formato iso. Isso deve ser levado em conta na comparação:

tests/test_todos.py
from http import HTTPStatus

from fast_zero.models import Todo, TodoState
from tests.factories import TodoFactory


def test_create_todo(client, token, mock_db_time):
    with mock_db_time(model=Todo) as time:
        response = client.post(
            '/todos/',
            headers={'Authorization': f'Bearer {token}'},
            json={
                'title': 'Test todo',
                'description': 'Test todo description',
                'state': 'draft',
            },
        )

    assert response.json() == {
        'id': 1,
        'title': 'Test todo',
        'description': 'Test todo description',
        'state': 'draft',
        'created_at': time.isoformat(),
        'updated_at': time.isoformat()
    }

Exercício 04

Crie um teste para o endpoint de busca (GET) que valide todos os campos contidos no Todo de resposta. Até o momento, todas as validações foram feitas pelo tamanho do resultado de todos.

Solução

Esse exercício é um pouco mais trabalhoso que os demais. Vamos dividir ele em etapas:

  1. Devemos ter o tempo determinístico (mock_db_time) para poder validar o json
  2. Devemos criar um todo com dados aleatórios (TodoFactory)
  3. Devemos ter um token e um usuário criado

No final das contas, algo parecido (não necessariamente idêntico) a isso:

@pytest.mark.asyncio
async def test_list_todos_should_return_all_expected_fields__exercicio(
    session, client, user, token, mock_db_time
):
    with mock_db_time(model=Todo) as time:
        todo = TodoFactory.create(user_id=user.id)
        session.add(todo)
        await session.commit()

    session.refresh(todo)
    response = client.get(
        '/todos/',
        headers={'Authorization': f'Bearer {token}'},
    )

    assert response.json()['todos'] == [{
        'created_at': time.isoformat(),
        'updated_at': time.isoformat(),
        'description': todo.description,
        'id': todo.id,
        'state': todo.state,
        'title': todo.title,
    }]

Exercício 05

  1. Crie um teste para validar o caso do Enum em state: Mapped[TodoState] na tabela TODO, onde o valor esteja fora dos valores mapeados por ele. Isso forçará um erro que pode ser validado com pytest.raises

Solução

@pytest.mark.asyncio
async def test_create_todo_error(session, user: User):
    todo = Todo(
        title='Test Todo',
        description='Test Desc',
        state='test',
        user_id=user.id,
    )

    session.add(todo)
    await session.commit()

    with pytest.raises(LookupError):
        await session.scalar(select(Todo))