ML users authentification¶
In the previous chapter we have created the user model. Now we need to implement the authentification of those users in the application.
We will use JWT tokens for authentification.
Each token will last for 60 minutes and will only be issued to existing and enabled users. Additionaly, when requesting the token, the passwords must match the ones in the database.
The API endpoint for token creation is:
/token - POST request that accepts the following data:
{
"username": "username",
"password": "password"
}
If the credentials are correct, we will return a JWT token.
Token based functionalities¶
All the functionalities are in the ML_API/jwt_tokens.py file.
!cat ML_API/jwt_tokens.py
# Importing the JWT library
import jwt
# The Users model
from Users import User
# Importing the session
from database import session
# Datetime functionality
import datetime
# Reading the configuration file
import yaml
# OS functionalities
import os
# Infering the file path
_file_dir = os.path.dirname(os.path.abspath(__file__))
conf = yaml.safe_load(open(os.path.join(_file_dir, "config.yml")))
# JWT constants
_SECRET = conf["jwt"]["secret"]
_ALGORITHM = conf["jwt"]["algorithm"]
_EXPIRATION_TIME = conf["jwt"]["expiration_time"] # Minutes until expiration
# User authenfification endpoint
def authenticate_user_view(username: str, password: str) -> bool:
"""
Function that authenticates a user using the username and password
Parameters
----------
username (str): The username of the user to authenticate
password (str): The password of the user to authenticate
Returns
-------
bool: True if the user is authenticated, False otherwise
"""
# Checking if the user exists in the database
user = session.query(User).filter(User.username == username).first()
if user:
# Checking if the password is correct
if user.check_password_match(password):
return user
else:
return None
else:
return None
# Creating the JWT token
def create_token_view(user_id: int) -> str:
"""
Method to create a JWT token for a user using internal user_id
Parameters
----------
user_id (int): The user_id of the user to create the token for
Returns
-------
str: The JWT token
"""
# Creating the claims dictionary
claims = {
# Expiration date of the token
"exp": datetime.datetime.now() + datetime.timedelta(minutes=_EXPIRATION_TIME),
# Issue time of the token
"iat": datetime.datetime.now(),
# Subject of the token
"sub": user_id
}
# Creating the token
return jwt.encode(claims, _SECRET, algorithm=_ALGORITHM)
# Authenticating the JWT token
def authenticate_token_view(jwt_token: str) -> bool:
"""
Function that decodes the token and authenticates it.
Parameters
----------
jwt_token (str): The JWT token to authenticate
Returns
-------
bool: True if the token is valid, False otherwise
"""
try:
# Decoding the token
claims = jwt.decode(jwt_token, _SECRET, algorithms=[_ALGORITHM])
# Extracting the user_id from the token
user_id = claims["sub"]
# Extracting the expiration date from the token
expiration_date = claims["exp"]
# Checking if the token is expired
if datetime.datetime.fromtimestamp(expiration_date) < datetime.datetime.utcnow():
return False
# Checking if the user exists in the database
user = session.query(User).filter(User.id == user_id).first()
if user:
return user
else:
return None
except:
# If the token is invalid, return False
return None
Creating a JWT token for a user¶
First of all, lets create a new user.
import requests
# Defining the url
url = "http://localhost:8001"
# Defining the user dict
user_dict = {
"username": "eligijus",
"password": "123456",
"email": "eligijus@testmail.com"
}
# Sending the post request to the running API
response = requests.post(f"{url}/register-user", json=user_dict)
# Getting the user id
user_id = response.json().get("user_id")
# Printing the response
print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 409; Response: {'message': 'User already exists', 'user_id': 4}
Now that the user is registered, we can try to create a token with the username and password. First, lets send a bad password:
response = requests.post(f"{url}/token", json={'username': "eligijus", 'password': "654321"})
print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 401; Response: {'message': 'Invalid credentials'}
Now lets the send the password with which the user was registered:
response = requests.post(f"{url}/token", json=user_dict)
print(f"Response code: {response.status_code}; Response: {response.json()}")
# Saving the token
token = response.json().get("token")
Response code: 200; Response: {'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDIzNDc4MDMsImlhdCI6MTY0MjM0NDIwMywic3ViIjo0fQ.eTVFfReXzUQO4UztvvRKJyQMhmeMyy_dYn9eUnp99a4'}
If get the token successfully, for the remaining 60 minutes we should only use this token to make requests to the API.
Checking token validity¶
To check whether the given token is valid or not, we can use the endpoint:
/token_status/<token> - a GET request that returns either that a token is valid (status code 200) or that it is not valid (status code 401).
# Sending the request to inspect the token validity
response = requests.get(f"{url}/token_status/{token}")
print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 200; Response: {'message': 'Token is valid'}
# Sending a bad request to inspect the token validity
response = requests.get(f"{url}/token_status/{token[:-1]}")
print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 401; Response: {'message': 'Invalid token'}