ML users authentification

In the previous chapter we have created the user model. Now we need to implement the authentification of those users in the application.

We will use JWT tokens for authentification.

Each token will last for 60 minutes and will only be issued to existing and enabled users. Additionaly, when requesting the token, the passwords must match the ones in the database.

The API endpoint for token creation is:

/token - POST request that accepts the following data:

{
    "username": "username",
    "password": "password"
}

If the credentials are correct, we will return a JWT token.

Token based functionalities

All the functionalities are in the ML_API/jwt_tokens.py file.

!cat ML_API/jwt_tokens.py
# Importing the JWT library
import jwt

# The Users model 
from Users import User

# Importing the session 
from database import session

# Datetime functionality
import datetime

# Reading the configuration file 
import yaml 

# OS functionalities
import os 

# Infering the file path 
_file_dir = os.path.dirname(os.path.abspath(__file__))

conf = yaml.safe_load(open(os.path.join(_file_dir, "config.yml")))

# JWT constants
_SECRET = conf["jwt"]["secret"]
_ALGORITHM = conf["jwt"]["algorithm"]
_EXPIRATION_TIME = conf["jwt"]["expiration_time"] # Minutes until expiration

# User authenfification endpoint 
def authenticate_user_view(username: str, password: str) -> bool:
    """
    Function that authenticates a user using the username and password
    
    Parameters
    ----------
        username (str): The username of the user to authenticate
        password (str): The password of the user to authenticate
    
    Returns
    -------
        bool: True if the user is authenticated, False otherwise
    """
    # Checking if the user exists in the database
    user = session.query(User).filter(User.username == username).first()
    if user:
        # Checking if the password is correct
        if user.check_password_match(password):
            return user
        else:
            return None
    else:
        return None

# Creating the JWT token
def create_token_view(user_id: int) -> str:
    """
    Method to create a JWT token for a user using internal user_id
    
    Parameters
    ----------
        user_id (int): The user_id of the user to create the token for

    Returns
    -------
        str: The JWT token
    """
    # Creating the claims dictionary
    claims = {
        # Expiration date of the token
        "exp": datetime.datetime.now() + datetime.timedelta(minutes=_EXPIRATION_TIME),
        
        # Issue time of the token
        "iat": datetime.datetime.now(),

        # Subject of the token
        "sub": user_id
    }

    # Creating the token
    return jwt.encode(claims, _SECRET, algorithm=_ALGORITHM)

# Authenticating the JWT token
def authenticate_token_view(jwt_token: str) -> bool:
    """
    Function that decodes the token and authenticates it. 

    Parameters
    ----------
        jwt_token (str): The JWT token to authenticate

    Returns
    -------
        bool: True if the token is valid, False otherwise
    """
    try:
        # Decoding the token
        claims = jwt.decode(jwt_token, _SECRET, algorithms=[_ALGORITHM])

        # Extracting the user_id from the token
        user_id = claims["sub"]

        # Extracting the expiration date from the token
        expiration_date = claims["exp"]

        # Checking if the token is expired
        if datetime.datetime.fromtimestamp(expiration_date) < datetime.datetime.utcnow():
            return False

        # Checking if the user exists in the database
        user = session.query(User).filter(User.id == user_id).first()
        if user:
            return user
        else:
            return None
    except:
        # If the token is invalid, return False
        return None 

Creating a JWT token for a user

First of all, lets create a new user.

import requests 

# Defining the url 
url = "http://localhost:8001"

# Defining the user dict 
user_dict = {
    "username": "eligijus",
    "password": "123456",
    "email": "eligijus@testmail.com"
}

# Sending the post request to the running API 
response = requests.post(f"{url}/register-user", json=user_dict)

# Getting the user id 
user_id = response.json().get("user_id")

# Printing the response 
print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 409; Response: {'message': 'User already exists', 'user_id': 4}

Now that the user is registered, we can try to create a token with the username and password. First, lets send a bad password:

response = requests.post(f"{url}/token", json={'username': "eligijus", 'password': "654321"})

print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 401; Response: {'message': 'Invalid credentials'}

Now lets the send the password with which the user was registered:

response = requests.post(f"{url}/token", json=user_dict)

print(f"Response code: {response.status_code}; Response: {response.json()}")

# Saving the token 
token = response.json().get("token")
Response code: 200; Response: {'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDIzNDc4MDMsImlhdCI6MTY0MjM0NDIwMywic3ViIjo0fQ.eTVFfReXzUQO4UztvvRKJyQMhmeMyy_dYn9eUnp99a4'}

If get the token successfully, for the remaining 60 minutes we should only use this token to make requests to the API.

Checking token validity

To check whether the given token is valid or not, we can use the endpoint:

/token_status/<token> - a GET request that returns either that a token is valid (status code 200) or that it is not valid (status code 401).

# Sending the request to inspect the token validity 
response = requests.get(f"{url}/token_status/{token}")

print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 200; Response: {'message': 'Token is valid'}
# Sending a bad request to inspect the token validity 
response = requests.get(f"{url}/token_status/{token[:-1]}")

print(f"Response code: {response.status_code}; Response: {response.json()}")
Response code: 401; Response: {'message': 'Invalid token'}