Source code for hawat.blueprints.auth_api

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------


"""
This pluggable module provides API key based authentication service. When this
module is enabled, users may generate and use API keys to authenticate themselves
when accessing various API application endpoints.

Currently, the API key may be provided via one of the following methods:

* The ``Authorization`` HTTP header.

  You may provide your API key by adding ``Authorization`` HTTP header to your
  requests. Following forms are accepted::

    Authorization: abcd1234
    Authorization: key abcd1234
    Authorization: token abcd1234

* The ``api_key`` or ``api_token`` parameter of the HTTP ``POST`` request.

  You may provide your API key as additional HTTP parameter ``api_key`` or
  ``api_token`` of your ``POST`` request to particular application endpoint.
  Using ``GET`` requests is forbidden due to the fact that request URLs are getting
  logged on various places and your keys could thus be easily compromised.


Provided endpoints
--------------------------------------------------------------------------------

``/auth_api/<user_id>/key-generate``
    Page enabling generation of new API key.

    * *Authentication:* login required
    * *Authorization:* ``admin``
    * *Methods:* ``GET``, ``POST``

``/auth_api/<user_id>/key-delete``
    Page enabling deletion of existing API key.

    * *Authentication:* login required
    * *Authorization:* ``admin``
    * *Methods:* ``GET``, ``POST``
"""

__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"

import itsdangerous
import markupsafe

import flask
import flask_principal
from flask_babel import gettext, lazy_gettext

import hawat.const
import hawat.forms
import hawat.db
import hawat.acl
from hawat.base import HawatBlueprint
from hawat.view import ItemChangeView
from hawat.view.mixin import HTMLMixin, SQLAlchemyMixin

BLUEPRINT_NAME = 'auth_api'
"""Name of the blueprint as module global constant."""


[docs]class GenerateKeyView(HTMLMixin, SQLAlchemyMixin, ItemChangeView): # pylint: disable=locally-disabled,too-many-ancestors """ View for generating API keys for user accounts. """ methods = ['GET', 'POST'] authentication = True
[docs] @classmethod def get_view_name(cls): return 'key-generate'
[docs] @classmethod def get_view_icon(cls): return 'action-genkey'
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Generate API key')
[docs] @classmethod def get_view_template(cls): return 'auth_api/key-generate.html'
@property def dbmodel(self): return self.get_model(hawat.const.MODEL_USER) @property def dbchlogmodel(self): return self.get_model(hawat.const.MODEL_ITEM_CHANGELOG)
[docs] @classmethod def authorize_item_action(cls, **kwargs): """ Perform access authorization for current user to particular item. """ # Each user must be able to manage his/her API keys. permission_me = flask_principal.Permission( flask_principal.UserNeed(kwargs['item'].id) ) return hawat.acl.PERMISSION_POWER.can() or permission_me.can()
[docs] @staticmethod def get_message_success(**kwargs): return gettext( 'API key for user account <strong>%(item_id)s</strong> was successfully generated.', item_id=markupsafe.escape(str(kwargs['item'])) )
[docs] @staticmethod def get_message_failure(**kwargs): return gettext( 'Unable to generate API key for user account <strong>%(item_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])) )
[docs] @staticmethod def get_message_cancel(**kwargs): return gettext( 'Canceled generating API key for user account <strong>%(item_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])) )
[docs] @classmethod def change_item(cls, **kwargs): serializer = itsdangerous.URLSafeTimedSerializer( flask.current_app.config['SECRET_KEY'], salt='apikey-user' ) kwargs['item'].apikey = serializer.dumps(kwargs['item'].id)
[docs]class DeleteKeyView(HTMLMixin, SQLAlchemyMixin, ItemChangeView): # pylint: disable=locally-disabled,too-many-ancestors """ View for deleting API keys from user accounts. """ methods = ['GET', 'POST'] authentication = True
[docs] @classmethod def get_view_name(cls): return 'key-delete'
[docs] @classmethod def get_view_icon(cls): return 'action-delete'
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Delete API key')
[docs] @classmethod def get_view_template(cls): return 'auth_api/key-delete.html'
@property def dbmodel(self): return self.get_model(hawat.const.MODEL_USER) @property def dbchlogmodel(self): return self.get_model(hawat.const.MODEL_ITEM_CHANGELOG)
[docs] @classmethod def authorize_item_action(cls, **kwargs): """ Perform access authorization for current user to particular item. """ # Each user must be able to manage his/her API keys. permission_me = flask_principal.Permission( flask_principal.UserNeed(kwargs['item'].id) ) return hawat.acl.PERMISSION_POWER.can() or permission_me.can()
[docs] @staticmethod def get_message_success(**kwargs): return gettext( 'API key for user account <strong>%(item_id)s</strong> was successfully deleted.', item_id=markupsafe.escape(str(kwargs['item'])) )
[docs] @staticmethod def get_message_failure(**kwargs): return gettext( 'Unable to delete API key for user account <strong>%(item_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])) )
[docs] @staticmethod def get_message_cancel(**kwargs): return gettext( 'Canceled deleting API key for user account <strong>%(item_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])) )
[docs] @classmethod def change_item(cls, **kwargs): kwargs['item'].apikey = None
# -------------------------------------------------------------------------------
[docs]class APIAuthBlueprint(HawatBlueprint): """Pluggable module - API key authentication service (*auth_api*)."""
[docs] @classmethod def get_module_title(cls): return lazy_gettext('API key authentication service')
[docs] def register_app(self, app): login_manager = app.get_resource(hawat.const.RESOURCE_LOGIN_MANAGER) user_model = app.get_model(hawat.const.MODEL_USER) @login_manager.request_loader def load_user_from_request(request): # pylint: disable=locally-disabled,unused-variable """ Custom login callback for login via request object. https://flask-login.readthedocs.io/en/latest/#custom-login-using-request-loader """ # Attempt to extract token from Authorization header. Following formats # may be used: # Authorization: abcd1234 # Authorization: key abcd1234 # Authorization: token abcd1234 api_key = request.headers.get("Authorization") if api_key: vals = api_key.split() if len(vals) == 1: api_key = vals[0] elif len(vals) == 2 and vals[0] in ("token", "key"): api_key = vals[1] else: api_key = None # API key may also be received via POST method, parameters 'api_key' # or 'api_token'. The GET method is forbidden due to the lack # of security, there is a possiblity for it to be stored in various # insecure places like web server logs. if not api_key: api_key = request.form.get('api_key') if not api_key: api_key = request.form.get('api_token') # Now login the user with provided API key. if api_key: dbsess = hawat.db.db_session() try: user = dbsess.query(user_model).filter(user_model.apikey == api_key).one() if user: if user.enabled: flask.current_app.logger.info( "User '{}' used API key to access the resource '{}'.".format( user.login, request.url ) ) return user flask.current_app.logger.error( "The API key for user '{}' was rejected, the account is disabled.".format( user.login ) ) except: # pylint: disable=locally-disabled,bare-except pass # Return ``None`` if API key method did not login the user. return None
# -------------------------------------------------------------------------------
[docs]def get_blueprint(): """ Mandatory interface for :py:mod:`hawat.Hawat` and factory function. This function must return a valid instance of :py:class:`hawat.app.HawatBlueprint` or :py:class:`flask.Blueprint`. """ hbp = APIAuthBlueprint( BLUEPRINT_NAME, __name__, template_folder='templates', url_prefix='/{}'.format(BLUEPRINT_NAME) ) hbp.register_view_class(GenerateKeyView, '/<int:item_id>/key-generate') hbp.register_view_class(DeleteKeyView, '/<int:item_id>/key-delete') return hbp