Para que os clientes se autentiquem na nossa aplicação, precisamos criar um endpoint que receba as credenciais. Vamos chamá-lo de /token
.
Alguns pontos:
OAuth2PasswordRequestForm
Token
Session
com Depends
datetime.timedelta
# app.py
from fastapi.security import OAuth2PasswordRequestForm
# ...
@app.post('/token')
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
session: Session = Depends(get_session),
):
...
OAuth2 É um protocolo aberto para autorização. O FastAPI disponibiliza alguns schemas prontos para usar OAuth2, como o OAuth2PasswordRequestForm
. Traduzindo de forma literal: "Formulário de Requisição de Senha OAuth2"
Mostrar como isso ficará no Swagger!
@app.post('/token')
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
session: Session = Depends(get_session),
):
user = session.scalar(select(User).where(User.email == form_data.username))
if not user:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail='Incorrect email or password'
)
if not verify_password(form_data.password, user.password):
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail='Incorrect email or password'
)
Para que os clientes se autentiquem na nossa aplicação, precisamos criar um endpoint que receba as credenciais. Vamos chamá-lo de /token
.
Alguns pontos:
De forma simples, o JWT (Json Web Token) é uma forma de assinatura do servidor.
O token diz que o cliente foi autenticado com a assinatura desse servidor. Ele é divido em 3 partes:
{
"sub": "teste@test.com",
"exp": 1690258153
}
Onde as chaves deste exemplo:
sub
: identifica o "assunto" (subject), basicamente uma forma de identificar o cliente. Pode ser um id, um uuid, email, ...exp
: tempo de expiração do token. O backend vai usar esse dado para validar se o token ainda é válido ou existe a necessidade de uma atualização do token.Existem diversas bibliotecas para geração de tokens, usemos o pyjwt
.
poetry add pyjwt
import jwt
jwt.encode(dados, key) # Os dados devem ser um dict, retorna o token
jwt.decode(token, key) # Isso retorna o dict dos dados
A chave deve ser secreta, ela é o que define em conjunto com o algorítimo que foi assinado pelo nosso servidor. O Python tem uma biblioteca embutida que gera segredos:
import secrets
secrets.token_hex() # Retorna um token randômico
Aqui podemos ver o token e validar a integridade da assinatura.
# schemas.py
class Token(BaseModel):
access_token: str # O token JWT que vamos gerar
token_type: str # O modelo que o cliente deve usar para Autorização
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from jwt import encode
SECRET_KEY = 'your-secret-key' # Isso é privisório, vamos ajustar!
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
to_encode = data.copy()
# Adiciona um tempo de 30 minutos para expiração
expire = datetime.now(tz=ZoneInfo('UTC')) + timedelta(
minutes=ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({'exp': expire})
encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# tests/test_security.py
from jwt import decode
from fast_zero.security import create_access_token, SECRET_KEY
def test_jwt():
data = {'test': 'test'}
token = create_access_token(data)
decoded = decode(token, SECRET_KEY)
assert decoded['test'] == data['test']
assert 'exp' in decoded # Testa se o valor de exp foi adicionado ao token
Pode ser que esse teste falhe!
Dependendo da compilação do python, as propriedades de timezone podem não estar disponíveis.
Para resolver isso:
poetry add tzdata
Agora podemos executar o teste novamente!
/token
# app.py
from fast_zero.schemas import ..., Token, ...
from fast_zero.security import create_access_token, ...
@app.post('/token', response_model=Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
session: Session = Depends(get_session),
):
# ...
access_token = create_access_token(data={'sub': user.email})
return {'access_token': access_token, 'token_type': 'Bearer'}
/token
# test_app.py
def test_get_token(client, user):
response = client.post(
'/token',
data={'username': user.email, 'password': user.password},
)
token = response.json()
assert response.status_code == HTTPStatus.OK
assert token['token_type'] == 'Bearer'
assert 'access_token' in token
from fast_zero.security import get_password_hash
# ...
@pytest.fixture
def user(session):
user = User(
username='test',
email='test@test.com',
# Criando com a senha suja!
password=get_password_hash('testtest'),
)
session.add(user)
session.commit()
session.refresh(user)
return user
A fixture de User
que estamos criando salva a senha limpa. Isso dá erro na hora de comparar se a senha está correta na criação do token.
# conftest.py
@pytest.fixture
def user(session):
password = 'testtest'
user = User(
username='test',
email='test@test.com',
password=get_password_hash(password),
)
session.add(user)
session.commit()
session.refresh(user)
user.clean_password = password # hack!
return user
Embora a senha agora consiga ser comparada, a senha que enviamos na requisição está indo suja também.
tests/test_app.py::test_get_token PASSED
tests/test_app.py::test_root_deve_retornar_ok_e_ola_mundo PASSED
tests/test_app.py::test_create_user PASSED
tests/test_app.py::test_read_users_empty PASSED
tests/test_app.py::test_read_users PASSED
tests/test_app.py::test_update_user PASSED
tests/test_app.py::test_delete_user PASSED
tests/test_models.py::test_create_user PASSED
tests/test_security.py::test_jwt PASSED
A ideia por trás da autorização é garantir que somente pessoas autorizadas possam executar determinadas operações. Como:
Agora que temos os tokens, podemos garantir que só clientes com uma conta já criada e logada possam ter acesso aos endpoints.
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from fast_zero.database import get_session
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(
session: Session = Depends(get_session),
token: str = Depends(oauth2_scheme),
):
...
Assim como nos formulários, o FastAPI também conta com um validador de Tokens passados nos cabeçalhos: OAuth2PasswordBearer
Só com essa exigência de receber o token, podemos aplicar isso em nosso endpoint de listagem.
@app.get('/users/', response_model=UserList)
def list_users(
session: Session = Depends(get_session),
current_user=Depends(get_current_user),
):
database = session.scalars(select(User)).all()
return {'users': database}
Mostar o cadeado no Swagger!
Ao rodar os testes...
Precisamos de um token para enviar aos endpoints agora!
@pytest.fixture
def token(client, user):
response = client.post(
'/token',
data={'username': user.email, 'password': user.clean_password},
)
return response.json()['access_token']
Agora podemos enviar o token no cabeçalho da requisição
# test_app.py
def test_read_users(client: TestClient, token):
response = client.get(
'/users/', headers={'Authorization': f'Bearer {token}'}
)
assert response.status_code == HTTPStatus.OK
assert response.json() == {
'users': [
{
'id': 1,
'username': 'test',
'email': 'test@test.com',
},
],
}
maaaaaaaaaasssssssssss não validamos o payload do token ainda!
def get_current_user(...):
credentials_exception = HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
try:
payload = jwt.decode(token, SECRET_KEY)
subject_email = payload.get('sub')
if not subject_email:
raise credentials_exception
except DecodeError:
raise credentials_exception
# ...
Caso esteja tudo correto com o token:
def get_current_user(...):
# ...
user = session.scalar(
select(User).where(User.email == subject_email)
)
if not user:
raise credentials_exception
return user
Com isso podemos alterar os endpoints para depender do usuário corrente:
@app.put('/users/{user_id}', response_model=UserPublic)
def update_user(
user_id: int,
user: UserSchema,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user),
):
if current_user.id != user_id:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail='Not enough permissions'
)
try:
...
def test_update_user(client, user, token):
response = client.put(
f'/users/{user.id}',
headers={'Authorization': f'Bearer {token}'},
json={
'username': 'bob',
'email': 'bob@test.com',
'password': 'mynewpassword',
},
)
assert response.status_code == HTTPStatus.OK
assert response.json() == {
'username': 'bob',
'email': 'bob@test.com',
'id': 1,
}
def test_update_integrity_error(client, user, token):
# ... bloco de código omitido
# Alterando o user das fixture para fausto
response_update = client.put(
f'/users/{user.id}',
headers={'Authorization': f'Bearer {token}'},
json={
'username': 'fausto',
'email': 'bob@example.com',
'password': 'mynewpassword',
},
)
@app.delete('/users/{user_id}', response_model=Message)
def delete_user(
user_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user),
):
if current_user.id != user_id:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail='Not enough permissions',
)
# ...
def test_delete_user(client, user, token):
response = client.delete(
f'/users/{user.id}',
headers={'Authorization': f'Bearer {token}'},
)
assert response.status_code == HTTPStatus.OK
assert response.json() == {'message': 'User deleted'}
# test_security.py
def test_jwt_invalid_token(client):
response = client.delete(
'/users/1', headers={'Authorization': 'Bearer token-invalido'}
)
assert response.status_code == HTTPStatus.UNAUTHORIZED
assert response.json() == {'detail': 'Could not validate credentials'}
Faça um teste para cobrir o cenário que levanta exception credentials_exception
na autenticação caso o User
não seja encontrado. Ao olhar a cobertura de security.py
você vai notar que esse contexto não está coberto.
Faça um teste para cobrir o cenário que levanta exception credentials_exception
na autenticação caso o email seja enviado, mas não exista um User correspondente cadastrado na base de dados. Ao olhar a cobertura de security.py
você vai notar que esse contexto não está coberto.
Reveja os testes criados até a aula 5 e veja se eles ainda fazem sentido (testes envolvendo 409)
Não se esqueça de responder ao Quiz dessa aula também!
git status
git add .
git commit -m "Protege os endpoints GET, PUT e DELETE com autenticação"
mermaid.js