Source code for hawat.test

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------


"""
Base library for web interface unit tests.
"""


import io
import unittest
import pprint
import logging
import flask_mail
import hawat
import hawat.app
import hawat.db
import hawat.events


[docs]class do_as_user_decorator: # pylint: disable=locally-disabled,invalid-name,too-few-public-methods """ Decorator class for accessing application endpoints as given user. """ def __init__(self, user_name, login_func_name = 'login_dev', login_func_params = None): self.user_name = user_name self.login_func_name = login_func_name self.login_func_params = login_func_params or {} def __call__(self, func): def wrapped_f(other_self, *args, **kwargs): login_func = getattr(other_self, self.login_func_name) response = login_func(self.user_name, **self.login_func_params) other_self.assertEqual(response.status_code, 200) func(other_self, *args, **kwargs) response = other_self.logout() other_self.assertEqual(response.status_code, 200) return wrapped_f
[docs]def app_context_wrapper_decorator(func): """ Decorator class for conditional wrapping of given function with application context. """ def wrapped_f(self, *args, **kwargs): if 'with_app_ctx' not in kwargs or not kwargs['with_app_ctx']: return func(self, *args, **kwargs) with self.app.app_context(): return func(self, *args, **kwargs) return wrapped_f
[docs]class HawatTestCase(unittest.TestCase): """ Class for testing :py:class:`hawat.app.Hawat` application. """ logger = logging.getLogger() logger.level = logging.DEBUG
[docs] def on_email_sent(self, message, app): # pylint: disable=locally-disabled,unused-argument """ Signal handler for handling :py:func:`flask_mail.email_dispatched` signal. Log subject and recipients of all emails that have been sent. """ #pprint.pprint(message) #app.logger.info( # "Sent email '%s' to '%s'", # message.subject, # ', '.join(message.recipients) #) self.mailbox.append(message)
#--------------------------------------------------------------------------
[docs] def setUp(self): self.setup_logging() self.mailbox = [] self.fixtures_db = [] self.app = self.setup_app() self.client = self.app.test_client() self.setup_db()
[docs] def setup_logging(self): """ Setup logging configuration for testing purposes. """ #for hdlr in self.logger.handlers: # self.logger.removeHandler(hdlr) self.loghdlr = logging.StreamHandler(io.StringIO()) self.logger.addHandler(self.loghdlr)
[docs] def setup_app(self): """ Setup application object. """ raise NotImplementedError()
[docs] def setup_db(self): """ Perform database setup. """ with self.app.app_context(): hawat.db.db_get().drop_all() hawat.db.db_get().create_all() self.setup_fixtures_db() hawat.events.db_init(self.app) hawat.events.db_get().database_create()
[docs] def get_fixtures_db(self, app): raise NotImplementedError()
[docs] def setup_fixtures_db(self): """ Setup general database object fixtures. """ fixture_list = self.get_fixtures_db(self.app) for dbobject in fixture_list: hawat.db.db_session().add(dbobject) hawat.db.db_session().commit() self.fixtures_db.extend(fixture_list)
#--------------------------------------------------------------------------
[docs] def tearDown(self): self.teardown_logging() self.teardown_db()
[docs] def teardown_logging(self): """ Teardown logging configuration for testing purposes. """ #print( # "CAPTURED LOG CONTENTS:\n{}".format( # self.loghdlr.stream.getvalue() # ) #) self.loghdlr.stream.close() self.logger.removeHandler(self.loghdlr)
[docs] def teardown_db(self): with self.app.app_context(): hawat.db.db_get().drop_all() hawat.events.db_get().database_drop()
#--------------------------------------------------------------------------
[docs] def login_dev(self, login): """ Login given user with *auth_dev* module. """ return self.client.post( '/auth_dev/login', data = {"login": login, "submit": 'Login'}, follow_redirects = True )
[docs] def login_pwd(self, login, password): """ Login given user with *auth_pwd* module. """ return self.client.post( '/auth_pwd/login', data ={"login": login, "password": password, "submit": 'Login'}, follow_redirects = True )
[docs] def login_env(self, login, envvar = 'eppn'): """ Login given user with *auth_env* module. """ return self.client.get( '/auth_env/login', environ_base = {envvar: login}, follow_redirects = True )
[docs] def logout(self): """ Logout current user. """ return self.client.get( '/logout', follow_redirects = True )
#--------------------------------------------------------------------------
[docs] def log_get(self): """ Get content written to log so far. """ return self.loghdlr.stream.getvalue()
[docs] def log_clear(self): """ Clear log content. """ self.loghdlr.stream.close() self.loghdlr.stream = io.StringIO()
#--------------------------------------------------------------------------
[docs] def mailbox_clear(self): """ Clear internal mailbox. """ self.mailbox = []
[docs] def mailbox_monitoring(self, state): """ Enable/disable mailbox monitoring. """ if state == 'on': flask_mail.email_dispatched.connect(self.on_email_sent) return if state == 'off': flask_mail.email_dispatched.disconnect(self.on_email_sent) return raise ValueError( 'Invalid parameter for mailbox_monitoring, must be "on" or "off", received {}'.format( str(state) ) )
#--------------------------------------------------------------------------
[docs] def user_model(self): """ Get user model class. """ return self.app.get_model(hawat.const.MODEL_USER)
[docs] @app_context_wrapper_decorator def user_get(self, user_name, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Get user object according to given user name from database. """ user_model = self.user_model() return hawat.db.db_session().query(user_model).filter(user_model.login == user_name).one_or_none()
[docs] @app_context_wrapper_decorator def user_save(self, user_object, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Update given user object within database. """ hawat.db.db_session().add(user_object) hawat.db.db_session().commit()
[docs] @app_context_wrapper_decorator def user_id(self, user_name, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Get ID of user with given name within database. """ uobj = self.user_get(user_name) return uobj.id
[docs] @app_context_wrapper_decorator def user_enabled(self, user_name, state, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Enable/disable given user within database. """ user = self.user_get(user_name) user.enabled = state self.user_save(user)
#--------------------------------------------------------------------------
[docs] def group_model(self): """ Get user model class. """ return self.app.get_model(hawat.const.MODEL_GROUP)
[docs] @app_context_wrapper_decorator def group_get(self, group_name, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Get group object according to given group name within database. """ group_model = self.group_model() return hawat.db.db_session().query(group_model).filter(group_model.name == group_name).one_or_none()
[docs] @app_context_wrapper_decorator def group_save(self, group_object, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Update given group object within database. """ hawat.db.db_session().add(group_object) hawat.db.db_session().commit()
[docs] @app_context_wrapper_decorator def group_id(self, group_name, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Get ID of given group within database. """ gobj = self.group_get(group_name) return gobj.id
[docs] @app_context_wrapper_decorator def group_enabled(self, group_name, state, **kwargs): # pylint: disable=locally-disabled,unused-argument """ Enable/disable given group within database. """ group = self.group_get(group_name) group.enabled = state self.group_save(group)
#--------------------------------------------------------------------------
[docs] def assertGetURL(self, url, status_code = 200, content_checks = None, print_response = False, follow_redirects = True): # pylint: disable=locally-disabled,invalid-name """ Perform GET request and check some default assertions against the response. """ response = self.client.get( url, follow_redirects = follow_redirects ) if print_response: print("--------------------------------------------------------------------------------") print("Response for GET {}: {} ({})".format(url, response.status_code, response.status)) pprint.pprint(response.headers) pprint.pprint(response.data) print("--------------------------------------------------------------------------------") self.assertEqual(response.status_code, status_code) if content_checks: for cch in content_checks: self.assertTrue(cch in response.data) return response
[docs] def assertPostURL(self, url, data, status_code = 200, content_checks = None, print_response = False, follow_redirects = True): # pylint: disable=locally-disabled,invalid-name """ Perform POST request and check some default assertions against the response. """ response = self.client.post( url, data = data, follow_redirects = follow_redirects ) if print_response: print("--------------------------------------------------------------------------------") print("Response for POST {}, {}: {} ({})".format(url, pprint.pformat(data), response.status_code, response.status)) pprint.pprint(response.headers) pprint.pprint(response.data) print("--------------------------------------------------------------------------------") self.assertEqual(response.status_code, status_code) if content_checks: for cch in content_checks: self.assertTrue(cch in response.data) return response
[docs] def assertMailbox(self, checklist): # pylint: disable=locally-disabled,invalid-name """ Check internal mailbox. """ for attr_name in ('subject', 'sender', 'recipients', 'cc', 'bcc', 'body', 'html'): if attr_name in checklist: self.assertEqual( list( map( lambda x, attr_name=attr_name: getattr(x, attr_name), self.mailbox, ) ), checklist[attr_name] )
[docs]class ItemCreateHawatTestCase(HawatTestCase): """ Class for testing :py:class:`hawat.app.Hawat` application item creation views. """ maxDiff = None
[docs] def assertCreate(self, item_model, url, data, content_checks = None, print_response = False): # pylint: disable=locally-disabled,invalid-name """ Perform attempt to create given item. """ # Verify, that the item form correctly displays. response = self.assertGetURL( url, 200, [ '<form method="POST" action="{}" id="form-{}-create'.format( url, item_model.__name__.lower() ).encode('utf8'), b'<div class="btn-toolbar" role="toolbar" aria-label="Form submission buttons">' ], print_response = print_response ) # Attempt to send empty item form. There is always at least one mandatory # form field, so we should get some "This field is required." error. request_data = {'submit': 'Submit'} response = self.assertPostURL( url, request_data, 200, [ b'This field is required.', b'help-block form-error' ], print_response = print_response ) # Attempt to send form with some mandatory fields missing. #for idx, param in enumerate(data): # if idx == len(data) - 1: # break # response = self.client.post( # url, # follow_redirects = True, # data = { # i[0]: i[1] for i in data[0:idx+1] # } # ) # self.assertEqual(response.status_code, 200) # self.assertTrue(b'This field is required.' in response.data) # self.assertTrue(b'help-block form-error' in response.data) # Attempt to send form with valid data. request_data = { i[0]: i[1] for i in data } request_data['submit'] = 'Submit' response = self.assertPostURL( url, request_data, 200, [ b'<div class="alert alert-success alert-dismissible">' ], print_response = print_response ) if content_checks: for cch in content_checks: self.assertTrue(cch in response.data) return response
[docs]class RegistrationHawatTestCase(HawatTestCase): """ Class for testing :py:class:`hawat.app.Hawat` application registration views. """ maxDiff = None user_fixture = { 'apikey': None, 'email': 'test.user@domain.org', 'enabled': False, 'fullname': 'Test User', 'id': 5, 'locale': None, 'login': 'test', 'logintime': 'None', 'managements': [], 'memberships': [], 'memberships_wanted': [], 'organization': 'TEST, org.', 'roles': ['user'], 'timezone': None }
[docs] def assertRegisterFail(self, url, data, environ_base=None): # pylint: disable=locally-disabled,invalid-name if environ_base is None: environ_base = {} response = response = self.client.get( url, follow_redirects = True, environ_base = environ_base, ) self.assertEqual(response.status_code, 200) self.assertTrue(b'User account registration' in response.data) for idx, param in enumerate(data): if idx == len(data) - 1: break response = response = self.client.post( url, follow_redirects = True, environ_base = environ_base, data = { i[0]: i[1] for i in data[0:idx+1] } ) self.assertEqual(response.status_code, 200) self.assertTrue(b'This field is required.' in response.data) self.assertTrue(b'help-block form-error' in response.data) response = response = self.client.post( url, follow_redirects = True, environ_base = environ_base, data = { i[0]: i[1] for i in data } ) self.assertEqual(response.status_code, 200) self.assertTrue( b'Please use different login, the &#34;user&#34; is already taken.' in response.data or \ b'Please use different login, the "user" is already taken.' in response.data )
[docs] def assertRegister(self, url, data, emails, environ_base=None): # pylint: disable=locally-disabled,invalid-name if environ_base is None: environ_base = {} uname = 'test' self.mailbox_monitoring('on') response = response = self.client.get( url, follow_redirects = True, environ_base = environ_base ) self.assertEqual(response.status_code, 200) self.assertTrue(b'User account registration' in response.data) for idx, param in enumerate(data): if idx == len(data) - 1: break response = response = self.client.post( url, follow_redirects = True, environ_base = environ_base, data = { i[0]: i[1] for i in data[0:idx+1] } ) self.assertEqual(response.status_code, 200) self.assertTrue(b'This field is required.' in response.data) self.assertTrue(b'help-block form-error' in response.data) response = response = self.client.post( url, follow_redirects = True, environ_base = environ_base, data = { i[0]: i[1] for i in data } ) self.assertEqual(response.status_code, 200) self.assertTrue(b'User account <strong>test (Test User)</strong> was successfully registered.' in response.data) with self.app.app_context(): uobj = self.user_get(uname) self.assertTrue(uobj) self.assertMailbox( { 'subject': [ '[{}] Account registration - {}'.format(self.app.config['APPLICATION_NAME'], uname), '[{}] Account registration - {}'.format(self.app.config['APPLICATION_NAME'], uname) ], 'sender': [ 'root@unittest', 'root@unittest' ], 'recipients': [ ['admin@unittest'], ['test.user@domain.org'] ], 'cc': [[],[]], 'bcc': [[], ['admin@unittest']], 'body': emails['txt'], 'html': emails['html'] } ) self.mailbox_monitoring('off') with self.app.app_context(): user = self.user_get(uname) user_dict = user.to_dict() del user_dict['createtime'] del user_dict['password'] self.assertEqual( user_dict, self.user_fixture ) response = self.login_dev(uname) self.assertEqual(response.status_code, 403) #self.assertTrue(b'is currently disabled, you are not permitted to log in.' in response.data) with self.app.app_context(): user = self.user_get(uname) user.set_state_enabled() self.user_save(user) with self.app.app_context(): user = self.user_get(uname) user_dict = user.to_dict() del user_dict['createtime'] del user_dict['password'] response = self.login_dev(uname) self.assertEqual(response.status_code, 200) self.assertTrue(b'You have been successfully logged in as' in response.data) response = self.logout() self.assertEqual(response.status_code, 200) self.assertTrue(b'You have been successfully logged out' in response.data)