#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------
"""
This pluggable module provides access to network record management features. These
features include:
* general network record listing
* detailed network record view
* creating new network records
* updating existing network records
* deleting existing network records
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import markupsafe
import flask
import flask_login
import flask_principal
from flask_babel import gettext, lazy_gettext
from sqlalchemy import or_
from sqlalchemy import text
from mentat.datatype.sqldb import NetworkModel, GroupModel, ItemChangeLogModel
from ipranges import from_str, ip_from_str
import hawat.acl
import hawat.menu
from hawat.base import HawatBlueprint
from hawat.view import ItemListView, ItemShowView, ItemCreateView, ItemCreateForView, ItemUpdateView, ItemDeleteView
from hawat.view.mixin import HTMLMixin, SQLAlchemyMixin
from hawat.blueprints.networks.forms import BaseNetworkForm, AdminNetworkForm, NetworkSearchForm
BLUEPRINT_NAME = 'networks'
"""Name of the blueprint as module global constant."""
[docs]class ListView(HTMLMixin, SQLAlchemyMixin, ItemListView):
"""
General network record listing.
"""
methods = ['GET']
authentication = True
authorization = [hawat.acl.PERMISSION_POWER]
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Network management')
@property
def dbmodel(self):
return NetworkModel
[docs] @staticmethod
def build_query(query, model, form_args):
# Adjust query based on text search string.
if 'search' in form_args and form_args['search']:
# First of all, assume the user entered address/net/range.
try:
# Convert user input to ipranges object.
net = from_str(form_args['search'])
# 'a >>= b' means: a contains b or is equal to b
# So search for networks which contain the address/net/range from the user input.
# Also, the user input is converted to range to solve issues with addresses such as 195.113.144.1/24
query = query.filter(text("network >>= '{}-{}'".format(ip_from_str(net.low()), ip_from_str(net.high()))))
except ValueError:
# Try searching by the name of the network or the description.
query = query \
.filter(
or_(
model.netname.ilike('%{}%'.format(form_args['search'])),
model.description.ilike('%{}%'.format(form_args['search']))
)
)
# Adjust query based on lower time boudary selection.
if 'dt_from' in form_args and form_args['dt_from']:
query = query.filter(model.createtime >= form_args['dt_from'])
# Adjust query based on upper time boudary selection.
if 'dt_to' in form_args and form_args['dt_to']:
query = query.filter(model.createtime <= form_args['dt_to'])
# Adjust query based on record source selection.
if 'source' in form_args and form_args['source']:
query = query \
.filter(model.source == form_args['source'])
# Adjust query based on user membership selection.
if 'group' in form_args and form_args['group']:
query = query \
.filter(model.group_id == form_args['group'].id)
if 'sortby' in form_args and form_args['sortby']:
sortmap = {
'createtime.desc': lambda x, y: x.order_by(y.createtime.desc()),
'createtime.asc': lambda x, y: x.order_by(y.createtime.asc()),
'netname.desc': lambda x, y: x.order_by(y.netname.desc()),
'netname.asc': lambda x, y: x.order_by(y.netname.asc()),
'network.desc': lambda x, y: x.order_by(y.network.desc()),
'network.asc': lambda x, y: x.order_by(y.network.asc()),
'rank.desc': lambda x, y: x.order_by(y.rank.desc()),
'rank.asc': lambda x, y: x.order_by(y.rank.asc())
}
query = sortmap[form_args['sortby']](query, model)
return query
[docs]class ShowView(HTMLMixin, SQLAlchemyMixin, ItemShowView):
"""
Detailed network record view.
"""
methods = ['GET']
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Show network record details')
@property
def dbmodel(self):
return NetworkModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_mm = flask_principal.Permission(
hawat.acl.MembershipNeed(kwargs['item'].group.id),
hawat.acl.ManagementNeed(kwargs['item'].group.id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_mm.can()
[docs] def do_before_response(self, **kwargs):
item = self.response_context['item']
if self.can_access_endpoint('networks.update', item=item) and self.has_endpoint('changelogs.search'):
self.response_context.update(
context_action_menu_changelogs=self.get_endpoint_class(
'changelogs.search'
).get_context_action_menu()
)
item_changelog = self.dbsession.query(ItemChangeLogModel). \
filter(ItemChangeLogModel.model == item.__class__.__name__). \
filter(ItemChangeLogModel.model_id == item.id). \
order_by(ItemChangeLogModel.createtime.desc()). \
limit(100). \
all()
self.response_context.update(item_changelog=item_changelog)
[docs]class CreateView(HTMLMixin, SQLAlchemyMixin, ItemCreateView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for creating new network records.
"""
methods = ['GET', 'POST']
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Create new network record')
@property
def dbmodel(self):
return NetworkModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
return hawat.acl.PERMISSION_POWER.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully created.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to create new network record for group <strong>%(parent_id)s</strong>.',
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext('Canceled creating new network record.')
[docs]class CreateForView(HTMLMixin, SQLAlchemyMixin,
ItemCreateForView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for creating new network records.
"""
methods = ['GET', 'POST']
authentication = True
module_name_par = 'groups'
[docs] @classmethod
def get_view_icon(cls):
return 'module-{}'.format(BLUEPRINT_NAME)
[docs] @classmethod
def get_view_url(cls, **kwargs):
return flask.url_for(
cls.get_view_endpoint(),
parent_id=kwargs['item'].id
)
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Create new network record for group')
@property
def dbmodel(self):
return NetworkModel
@property
def dbmodel_par(self):
return GroupModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_m = flask_principal.Permission(
hawat.acl.ManagementNeed(kwargs['item'].id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully created.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['parent']))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to create new network record for group <strong>%(parent_id)s</strong>.',
parent_id=markupsafe.escape(str(kwargs['parent']))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext('Canceled creating new network record.')
[docs] @staticmethod
def add_parent_to_item(item, parent):
item.group = parent
[docs]class UpdateView(HTMLMixin, SQLAlchemyMixin, ItemUpdateView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for updating existing network records.
"""
methods = ['GET', 'POST']
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Update network record details')
@property
def dbmodel(self):
return NetworkModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_m = flask_principal.Permission(
hawat.acl.ManagementNeed(kwargs['item'].group.id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully updated.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to update network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext(
'Canceled updating network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs]class DeleteView(HTMLMixin, SQLAlchemyMixin, ItemDeleteView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for deleting existing network records.
"""
methods = ['GET', 'POST']
authentication = True
@property
def dbmodel(self):
return NetworkModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_m = flask_principal.Permission(
hawat.acl.ManagementNeed(kwargs['item'].group.id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully and permanently deleted.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to permanently delete network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext(
'Canceled deleting network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
# -------------------------------------------------------------------------------
[docs]class NetworksBlueprint(HawatBlueprint):
"""Pluggable module - network management (*networks*)."""
[docs] @classmethod
def get_module_title(cls):
return lazy_gettext('Network record management')
[docs] def register_app(self, app):
app.menu_main.add_entry(
'view',
'admin.{}'.format(BLUEPRINT_NAME),
position=70,
view=ListView
)
# -------------------------------------------------------------------------------
[docs]def get_blueprint():
"""
Mandatory interface for :py:mod:`hawat.Hawat` and factory function. This function
must return a valid instance of :py:class:`hawat.app.HawatBlueprint` or
:py:class:`flask.Blueprint`.
"""
hbp = NetworksBlueprint(
BLUEPRINT_NAME,
__name__,
template_folder='templates',
url_prefix='/{}'.format(BLUEPRINT_NAME)
)
hbp.register_view_class(ListView, '/list')
hbp.register_view_class(CreateView, '/create')
hbp.register_view_class(CreateForView, '/createfor/<int:parent_id>')
hbp.register_view_class(ShowView, '/<int:item_id>/show')
hbp.register_view_class(UpdateView, '/<int:item_id>/update')
hbp.register_view_class(DeleteView, '/<int:item_id>/delete')
return hbp