Source code for hawat.command

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------


"""
This module contains custom commands for ``hawat-cli`` command line interface.
"""

__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"

import re
import sys
import traceback
import functools
import sqlalchemy

import click
import flask
from flask.cli import AppGroup

import hawat.const
import hawat.db


[docs]def account_exists(func): """ Decorator: Catch SQLAlchemy exceptions for non-existing user accounts. """ @functools.wraps(func) def wrapper_account_exists(login, *args, **kwargs): try: return func(login, *args, **kwargs) except sqlalchemy.orm.exc.NoResultFound: click.secho( "[FAIL] User account '{}' was not found.".format(login), fg='red' ) except Exception: # pylint: disable=locally-disabled,broad-except hawat.db.db_session().rollback() click.echo( traceback.TracebackException(*sys.exc_info()) ) return wrapper_account_exists
[docs]def validate_email(_ctx, _param, value): """Validate ``login/email`` command line parameter.""" if value: if hawat.const.CRE_EMAIL.match(value): return value raise click.BadParameter( "Value '{}' does not look like valid email address.".format(value) )
[docs]def validate_roles(_ctx, _param, value): """Validate ``role`` command line parameter.""" if value: for val in value: if val not in hawat.const.ROLES: raise click.BadParameter( "Value '{}' does not look like valid user role.".format(val) ) return value
user_cli = AppGroup('users', help="User account management module.") @user_cli.command('create') @click.argument('login', callback=validate_email) @click.argument('fullname') @click.option('--email', callback=validate_email, help='Optional email, login will be used as default') @click.password_option() @click.option('--enabled/--no-enabled', default=False) @click.option('--role', callback=validate_roles, help='Role to be assigned to the user (multiple)', multiple=True) def users_create(login, fullname, email, password, enabled, role): """Create new user account.""" user_model = flask.current_app.get_model(hawat.const.MODEL_USER) sqlobj = user_model() sqlobj.login = login sqlobj.fullname = fullname sqlobj.email = email or login sqlobj.roles = role or [hawat.const.ROLE_USER] sqlobj.enabled = enabled if password: sqlobj.set_password(password) click.echo("Creating new user account:") click.echo(" - Login: {}".format(sqlobj.login)) click.echo(" - Full name: {}".format(sqlobj.fullname)) click.echo(" - Email: {}".format(sqlobj.email)) click.echo(" - Roles: {}".format(','.join(sqlobj.roles))) click.echo(" - Enabled: {}".format(sqlobj.enabled)) click.echo(" - Password: {}".format(sqlobj.password)) try: hawat.db.db_session().add(sqlobj) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully created", fg='green' ) except sqlalchemy.exc.IntegrityError as exc: hawat.db.db_session().rollback() match = re.search(r'Key \((\w+)\)=\(([^)]+)\) already exists.', str(exc)) if match: click.secho( "[FAIL] User account with {} '{}' already exists.".format( match.group(1), match.group(2) ), fg='red' ) else: click.secho( "[FAIL] There already is an user account with similar data.", fg='red' ) click.secho( "\n{}".format(exc), fg='blue' ) except Exception: # pylint: disable=locally-disabled,broad-except hawat.db.db_session().rollback() click.echo( traceback.TracebackException(*sys.exc_info()) ) @user_cli.command('passwd') @click.argument('login', callback=validate_email) @click.password_option() @account_exists def users_passwd(login, password): """Change/set password to given user account.""" user_model = flask.current_app.get_model(hawat.const.MODEL_USER) item = hawat.db.db_session().query( user_model ).filter( user_model.login == login ).one() if password: click.echo("Setting password for user account '{}'".format(login)) item.set_password(password) hawat.db.db_session().add(item) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully updated", fg='green' ) @user_cli.command('roleadd') @click.argument('login', callback=validate_email) @click.argument('role', callback=validate_roles, nargs=-1) @account_exists def users_roleadd(login, role): """Add role(s) to given user account.""" if not role: return click.echo("Adding roles '{}' to user account '{}'".format(','.join(role), login)) user_model = flask.current_app.get_model(hawat.const.MODEL_USER) item = hawat.db.db_session().query( user_model ).filter( user_model.login == login ).one() current_roles = set(item.roles) for i in role: current_roles.add(i) item.roles = list(current_roles) hawat.db.db_session().add(item) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully updated", fg='green' ) @user_cli.command('roledel') @click.argument('login', callback=validate_email) @click.argument('role', callback=validate_roles, nargs=-1) @account_exists def users_roledel(login, role): """Delete role(s) to given user account.""" click.echo("Deleting roles '{}' from user account '{}'".format(','.join(role), login)) user_model = flask.current_app.get_model(hawat.const.MODEL_USER) item = hawat.db.db_session().query( user_model ).filter( user_model.login == login ).one() current_roles = set(item.roles) for i in role: try: current_roles.remove(i) except KeyError: pass item.roles = list(current_roles) hawat.db.db_session().add(item) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully updated", fg='green' ) @user_cli.command('enable') @click.argument('login', callback=validate_email) @account_exists def users_enable(login): """Enable given user account.""" click.echo("Enabling user account '{}'".format(login)) user_model = flask.current_app.get_model(hawat.const.MODEL_USER) item = hawat.db.db_session().query( user_model ).filter( user_model.login == login ).one() if not item.enabled: item.enabled = True hawat.db.db_session().add(item) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully enabled", fg='green' ) else: click.secho( "[OK] User account was already enabled", fg='green' ) @user_cli.command('disable') @click.argument('login', callback=validate_email) @account_exists def users_disable(login): """Disable given user account.""" click.echo("Disabling user account '{}'".format(login)) user_model = flask.current_app.get_model(hawat.const.MODEL_USER) item = hawat.db.db_session().query( user_model ).filter( user_model.login == login ).one() if item.enabled: item.enabled = False hawat.db.db_session().add(item) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully disabled", fg='green' ) else: click.secho( "[OK] User account was already disabled", fg='green' ) @user_cli.command('delete') @click.argument('login', callback=validate_email) @account_exists def users_delete(login): """Delete existing user account.""" click.echo("Deleting user account '{}'".format(login)) user_model = flask.current_app.get_model(hawat.const.MODEL_USER) item = hawat.db.db_session().query( user_model ).filter( user_model.login == login ).one() hawat.db.db_session().delete(item) hawat.db.db_session().commit() click.secho( "[OK] User account was successfully deleted", fg='green' ) @user_cli.command('list') def users_list(): """List all available user accounts.""" try: user_model = flask.current_app.get_model(hawat.const.MODEL_USER) items = hawat.db.db_session().query(user_model).all() if items: click.echo("List of existing user accounts:") for item in items: click.echo(" - {}: {} ({})".format(item.login, item.fullname, ','.join(item.roles))) else: click.echo("There are currently no user accounts in the database.") except Exception: # pylint: disable=locally-disabled,broad-except hawat.db.db_session().rollback() click.echo( traceback.TracebackException(*sys.exc_info()) ) # -------------------------------------------------------------------------------
[docs]def setup_cli(app): """ Setup custom application CLI commands. """ app.cli.add_command(user_cli)