Handling Authentication and Authorization in Flask Restful

September 13th, 2018

I recommend installing the following tools:

  • Vagrant (Manage servers through the CLI)
  • VirtualBox (VM provider)
  • jq (JSON processing tool)

Setting up the enviroment

First clone the Todo app

git clone https://github.com/jordanknott/flask-auth-example
cd flask-auth-example

Inside the project folder, run vagrant up

Once it is done, SSH inside with vagrant ssh

Let's install the needed software:

sudo apt update
sudo apt install python3 python3-pip python3-pymysql mysql-server

Enter "test" as the password for the root user for the MySQL server.

Once that is done, install the needed python packages

pip3 install --user flask flask_restful flask_sqlalchemy
pip3 install --user --pre marshmallow

Next we'll need to create the Todo table in the database:

mysql -u root -p todo < sql/todos.sql

This will import a SQL script that will create the needed table.

With the above done, we can spin up the API:

python3 todos.py

Now you should be able to connect to the API from your own machine. I prefer httpie over tools like cURL: vagrant

export BASE=http://192.168.50.25:5000
http $BASE/todos

You should see an empty list returned from the above command.

Users & Permissions

For our Todo app, in order for a client to successfully interact with our API, they need two things:

A token that is tied to a user (authentication) and the correct permssion for the endpoint (authorization).

Let's tackle authenication first:

We'll have a endpoint /login that a will take a username & password and return a valid opaque token.

A opaque token is simply a unique string that is stored in the database and used for authentication.

Let's create both the user table and the token table. Ignore the role_id field for now

CREATE TABLE `user` (
    `id` INT  NOT NULL AUTO_INCREMENT,
    `username` VARCHAR(120)  NOT NULL ,
    `password` VARCHAR(120)  NOT NULL ,
    `role_id` INT  NOT NULL ,
    PRIMARY KEY (
        `id`
    ),
);
CREATE TABLE `token` (
    `id` INT  NOT NULL AUTO_INCREMENT,
    `value` VARCHAR(120) NOT NULL,
    `user_id` INT  NOT NULL,
    PRIMARY KEY (
        `id`
    ),
);

Whenever a client hits the /login endpoint, it will check that the username and password refers to a valid user in the database. If it does, then it will create a new token, store it in the database, and return it to the client.

This token will then be used in the Authorization header to authenticate for any protected endpoints.

First, we need to create the models for the new tables:

class Token(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer)
    value = db.Column(db.String(120))

    def __init__(self, user_id):
        self.user_id = user_id
        self.value = str(uuid4())

    def serialize(self):
        return {
                "id": self.id,
                "user_id": self.user_id,
                "value": self.value,
                }


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(120))
    password = db.Column(db.String(120))
    role_id = db.Column(db.Integer)

    def __init__(self, username, password, role_id):
        self.username = username
        self.password = password
        self.role_id = role_id

    def serialize(self):
        return {
                "id": self.id,
                "username": self.username,
                "password": self.password,
                "role_id": self.role_id
                }

Let's create the /login endpoint next,

class LoginResource(Resource):
    def post(self):
        args = request.get_json()
        if 'username' in args and 'password' in args:
            user = User.query.filter_by(username=args['username']).first()
            if user is None:
                return {'message': "username or password is wrong"}, 404
            if user.password == args['password']:
                token = Token(user.id)
                db.session.add(token)
                db.session.commit()
                return {
                        "token": token.value
                    }
            else:
                return {'message': "username or password is wrong"}, 404
        else:
            return {
                    "must include both a username and password field", 400
                    }

Finally, register the new endpoint:

api.add_resource(LoginResource, '/login')

Let's add a test user with a password of test:

mysql -u root -p todo -e "INSERT INTO user VALUES (1, 'test', 'test', 1);

Now from the host machine, you can login and get a valid token:

http $BASE/login username=test password=test

Protecting endpoints

Now that we can login to our API, let's actually have the /todos endpoint require a valid token in order to access.

One way we can do this is through the use of method decorators, a feature of Flask_Restful.

Let's create a decorator that handles checking if a token is valid or not:

from functools import wraps

def authenticate(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        token_raw = str(request.headers.get("Authorization"))
        token = Token.query.filter_by(value=token_raw).first()
        if token is not None:
            return func(*args, **kwargs)
        return {"message": "token is invalid or missing"}, 401
    return wrapper

Next let's create a subclass of Resource that uses this new decorator.

We'll then change the TodosResource to use the new ProtectedResource class.

class ProtectedResource(Resource):
  method_decoratos = [authenticate]

...

class TodosResource(ProtectedResource):
  ...

The decorator will be called before any of the endpoint methods are called. If the token is not valid, then the endpoint will return a 401.

Now if you try to hit the /todos endpoint without a valid token, it will return the following:

401 UNAUTHORIZED
{
  "message": "token is invalid or missing"
}

But if you have a valid token, then it will work correctly:

http $BASE/login username=test password=test | jq ".token" > .token
export AUTH=Authorization:`cat .token`
http $BASE/todos $AUTH

Note: The jq command above is used to parse and interact with JSON content.

Now that we can protect our resources from invalid users, what if we wanted each user to be able to access only a specific set of endpoints? This is where the concept of authorization comes into play.

While authentication is used to ensure that you are who you say you are, authorization is used to ensure that you have access to what you're trying to do.

Let's say that for our Todo app, we'll have two "roles". A standard role that can manage todos and an admin role that can manage todos as well as manage users.

Before we get started on that, let's add a new user and users endpoint so we can manage them.

class UsersResource(Resource):

    def get(self):
        users = User.query.all()
        users = [user.serialize() for user in users]
        return users

    def post(self):
        args = request.get_json()
        if 'username' in args and 'password' in args and 'role_id' in args:
            user = User(args['username'], args['password'], args['role_id'])
            db.session.add(user)
            db.ession.commit()
            return {"id": user.id}
        else:
            return {"message": "must include username, password, and role_id fields"}, 400


class UserResource(Resource):

    def get(self, user_id):
        user = User.query.filter_by(id=user_id).first()
        if user is None:
            return {"message": "user not found"}, 404
        return user.serialize()

    def delete(self, user_id):
        user = User.query.filter_by(id=user_id).first()
        if user is None:
            return {"message": "user not found"}, 404
        db.session.remove(user)
        db.session.commit()

Roles & Permissions

In order to deal with authoization, we need to plan on how we're to define who can access what. We'll be using a system of "roles" and "permissions". A role is simply a collection of permissions. A permission is simply a name and an ID. A single permissions maps to a single method of an endpoint. For example, if someone wants to interact with the /todos endpoint and get a list of Todo objects, they would need the "todos:read" permission.

A good way to represent this in a database is with three tables:

TODO: Add diagram

The role_permission table is used as the glue between a role and it's list of permissions.

Let's create the three tables next:

CREATE TABLE role (
  id INT NOT NULL AUTO_INCREMENT,
  name VARCHAR(60) NOT NULL,
  PRIMARY KEY (id),
);
CREATE TABLE permission (
  id INT NOT NULL AUTO_INCREMENT,
  name VARCHAR(60) NOT NULL,
  PRIMARY KEY (id),
);
CREATE TABLE role_permission (
  id INT NOT NULL AUTO_INCREMENT,
  role_id INT NOT NULL,
  permission_id INT NOT NULL,
  PRIMARY KEY (id)
);

Then the models

class Role(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(60))

    def __init__(self, name):
        self.name = name

    def serialize(self):
        return {"id": self.id, "name": self.name}


class Permission(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(60))

    def __init__(self, name):
        self.name = name

    def serialize(self):
        return {"id": self.id, "name": self.name}


class RolePermission(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    role_id = db.Column(db.Integer)
    permission_id = db.Column(db.Integer)

    def __init__(self, role_id, permission_id):
        self.role_id = role_id
        self.permission_id = permission_id

    def serialize(self):
        return {
                "id": self.id,
                "role_id": self.role_id,
                "permission_id": self.permission_id

We won't be adding endpoints to manage the above, so we'll just have to create them directly in the database.

Let's create the standard role and it's permissions, as well as glue them together.

USE todo;

INSERT INTO role VALUES (1, 'Standard');

INSERT INTO permission VALUES (1, 'todos:read');
INSERT INTO permission VALUES (2, 'todos:create');
INSERT INTO permission VALUES (3, 'todo:read');
INSERT INTO permission VALUES (4, 'todo:update');
INSERT INTO permission VALUES (5, 'todo:delete');

INSERT INTO role_permission VALUES (1, 1, 1);
INSERT INTO role_permission VALUES (2, 1, 2);
INSERT INTO role_permission VALUES (3, 1, 3);
INSERT INTO role_permission VALUES (4, 1, 4);
INSERT INTO role_permission VALUES (5, 1, 5);

Whenever we created our test user, we set it's role_id as 1 which is the id of our "Standard" role.

Now that we have our role and permissions set how, the next step is to make sure our API methods check that the user that authenticated has the correct permission.

We'll be making use of decorators again but we'll have to remove our authenticate decorator.

We need to be able to specify an argument to our decorator (the required permission) and our current decorator simply can't handle that.

Remove the ProtectedResource class and switch all instances of it back to the regular Resource class.

Our end goal is to be able to add a decorator of a method of a resource that authenticates the user and makes sure the user has the needed permission. It will look something like this

class TodosResource(Resource):

  @scope("todos:read")
  def get(self):
    ...

The @scopes part will be our new decorator. The argument is the required permission.

The decorator looks like the following:

def scope(scope_name):
  def wrap(f):
    def inner_wrapper(*args, **kwarsg):
      pass

The inner_wrapper function will get called every time one of the endpoint methods is called.

Inside of inner_wrapper, we'll add the old authentication code as well as the new authorization code

def scope(scope_name):
    def wrap(f):
        def inner_wrapper(*args, **kwargs):
            token_value = str(request.headers.get("Authorization"))

            token = Token.query.filter_by(value=token_value).first()
            if token is None:
                return {"message": "token is invalid or missing"}, 401

            user = User.query.filter_by(id=token.user_id).first()
            role = Role.query.filter_by(id=user.role_id).first()
            permission_ids = RolePermission.query.filter_by(role_id=role.id).all()
            permission_ids = [p.id for p in permission_ids]
            for permission_id in permission_ids:
                permission = Permission.query.filter_by(id=permission_id).first()
                if permission.name == scope_name:
                    return f(*args, **kwargs)

            return {"message": "you do not have the correct permission"}, 403
        return inner_wrapper
    return wrap

Now all we have to do is add the scope decorator to any Resource method we want to protect and make sure the required permission is in the database.