Source code for hawat.blueprints.networks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------


"""
This pluggable module provides access to network record management features. These
features include:

* general network record listing
* detailed network record view
* creating new network records
* updating existing network records
* deleting existing network records
"""

__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"

import markupsafe
import flask
import flask_login
import flask_principal
from flask_babel import gettext, lazy_gettext

from sqlalchemy import or_
from sqlalchemy import text

from mentat.datatype.sqldb import NetworkModel, GroupModel, ItemChangeLogModel
from ipranges import from_str, ip_from_str, IP4, IP4Range, IP4Net, IP6

import hawat.acl
import hawat.menu
from hawat.base import HawatBlueprint
from hawat.view import ItemListView, ItemShowView, ItemCreateView, ItemCreateForView, ItemUpdateView, ItemDeleteView, SimpleView
from hawat.view.mixin import HTMLMixin, SQLAlchemyMixin, AJAXMixin
from hawat.blueprints.networks.forms import BaseNetworkForm, MaintainerNetworkForm, AdminNetworkForm, NetworkSearchForm

BLUEPRINT_NAME = 'networks'
"""Name of the blueprint as module global constant."""


[docs]class ListView(HTMLMixin, SQLAlchemyMixin, ItemListView): """ General network record listing. """ methods = ['GET'] authentication = True authorization = [hawat.acl.PERMISSION_POWER]
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Network management')
@property def dbmodel(self): return NetworkModel
[docs] @classmethod def get_action_menu(cls): action_menu = hawat.menu.Menu() action_menu.add_entry( 'endpoint', 'create', endpoint='networks.create', resptitle=True ) return action_menu
[docs] @classmethod def get_context_action_menu(cls): action_menu = hawat.menu.Menu() action_menu.add_entry( 'endpoint', 'show', endpoint='networks.show', hidetitle=True ) action_menu.add_entry( 'endpoint', 'update', endpoint='networks.update', hidetitle=True ) action_menu.add_entry( 'endpoint', 'delete', endpoint='networks.delete', hidetitle=True ) return action_menu
[docs] @staticmethod def get_search_form(request_args): """ Must return instance of :py:mod:`flask_wtf.FlaskForm` appropriate for searching given type of items. """ return NetworkSearchForm( request_args, meta={'csrf': False} )
[docs] @staticmethod def build_query(query, model, form_args): # Adjust query based on text search string. if 'search' in form_args and form_args['search']: # First of all, assume the user entered address/net/range. try: # Convert user input to ipranges object. net = from_str(form_args['search']) # 'a >>= b' means: a contains b or is equal to b # So search for networks which contain the address/net/range from the user input. # Also, the user input is converted to range to solve issues with addresses such as 195.113.144.1/24 query = query.filter(text("network >>= '{}-{}'".format(ip_from_str(net.low()), ip_from_str(net.high())))) except ValueError: # Try searching by the name of the network or the description. query = query \ .filter( or_( model.netname.ilike('%{}%'.format(form_args['search'])), model.description.ilike('%{}%'.format(form_args['search'])) ) ) # Adjust query based on lower time boudary selection. if 'dt_from' in form_args and form_args['dt_from']: query = query.filter(model.createtime >= form_args['dt_from']) # Adjust query based on upper time boudary selection. if 'dt_to' in form_args and form_args['dt_to']: query = query.filter(model.createtime <= form_args['dt_to']) # Adjust query based on record source selection. if 'source' in form_args and form_args['source']: query = query \ .filter(model.source == form_args['source']) # Adjust query based on user membership selection. if 'group' in form_args and form_args['group']: query = query \ .filter(model.group_id == form_args['group'].id) if 'sortby' in form_args and form_args['sortby']: sortmap = { 'createtime.desc': lambda x, y: x.order_by(y.createtime.desc()), 'createtime.asc': lambda x, y: x.order_by(y.createtime.asc()), 'netname.desc': lambda x, y: x.order_by(y.netname.desc()), 'netname.asc': lambda x, y: x.order_by(y.netname.asc()), 'network.desc': lambda x, y: x.order_by(y.network.desc()), 'network.asc': lambda x, y: x.order_by(y.network.asc()), 'rank.desc': lambda x, y: x.order_by(y.rank.desc()), 'rank.asc': lambda x, y: x.order_by(y.rank.asc()) } query = sortmap[form_args['sortby']](query, model) return query
[docs]class ShowView(HTMLMixin, SQLAlchemyMixin, ItemShowView): """ Detailed network record view. """ methods = ['GET'] authentication = True
[docs] @classmethod def get_menu_legend(cls, **kwargs): return lazy_gettext( 'View details of network record &quot;%(item)s&quot;', item=markupsafe.escape(kwargs['item'].netname) )
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Show network record details')
@property def dbmodel(self): return NetworkModel
[docs] @classmethod def authorize_item_action(cls, **kwargs): permission_mm = flask_principal.Permission( hawat.acl.MembershipNeed(kwargs['item'].group.id), hawat.acl.ManagementNeed(kwargs['item'].group.id) ) return hawat.acl.PERMISSION_POWER.can() or permission_mm.can()
[docs] @classmethod def get_action_menu(cls): action_menu = hawat.menu.Menu() action_menu.add_entry( 'endpoint', 'update', endpoint='networks.update' ) action_menu.add_entry( 'endpoint', 'delete', endpoint='networks.delete' ) return action_menu
[docs] def do_before_response(self, **kwargs): item = self.response_context['item'] if self.can_access_endpoint('networks.update', item=item) and self.has_endpoint('changelogs.search'): self.response_context.update( context_action_menu_changelogs=self.get_endpoint_class( 'changelogs.search' ).get_context_action_menu() ) item_changelog = self.dbsession.query(ItemChangeLogModel). \ filter(ItemChangeLogModel.model == item.__class__.__name__). \ filter(ItemChangeLogModel.model_id == item.id). \ order_by(ItemChangeLogModel.createtime.desc()). \ limit(100). \ all() self.response_context.update(item_changelog=item_changelog)
[docs]class CreateView(HTMLMixin, SQLAlchemyMixin, ItemCreateView): # pylint: disable=locally-disabled,too-many-ancestors """ View for creating new network records. """ methods = ['GET', 'POST'] authentication = True
[docs] @classmethod def get_menu_title(cls, **kwargs): return lazy_gettext('Create network record')
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Create new network record')
@property def dbmodel(self): return NetworkModel @property def dbchlogmodel(self): return ItemChangeLogModel
[docs] @classmethod def authorize_item_action(cls, **kwargs): return hawat.acl.PERMISSION_POWER.can()
[docs] @staticmethod def get_message_success(**kwargs): return gettext( 'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully created.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_message_failure(**kwargs): return gettext( 'Unable to create new network record for group <strong>%(parent_id)s</strong>.', parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_message_cancel(**kwargs): return gettext('Canceled creating new network record.')
[docs] @staticmethod def get_item_form(item): return AdminNetworkForm()
[docs]class CreateForView(HTMLMixin, SQLAlchemyMixin, ItemCreateForView): # pylint: disable=locally-disabled,too-many-ancestors """ View for creating new network records. """ methods = ['GET', 'POST'] authentication = True module_name_par = 'groups'
[docs] @classmethod def get_view_icon(cls): return 'module-{}'.format(BLUEPRINT_NAME)
[docs] @classmethod def get_menu_title(cls, **kwargs): return lazy_gettext('Create network record')
[docs] @classmethod def get_menu_legend(cls, **kwargs): return lazy_gettext( 'Create network record for group &quot;%(item)s&quot;', item=markupsafe.escape(str(kwargs['item'])) )
[docs] @classmethod def get_view_url(cls, **kwargs): return flask.url_for( cls.get_view_endpoint(), parent_id=kwargs['item'].id )
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Create new network record for group')
@property def dbmodel(self): return NetworkModel @property def dbmodel_par(self): return GroupModel @property def dbchlogmodel(self): return ItemChangeLogModel
[docs] @classmethod def authorize_item_action(cls, **kwargs): permission_m = flask_principal.Permission( hawat.acl.ManagementNeed(kwargs['item'].id) ) return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod def get_message_success(**kwargs): return gettext( 'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully created.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['parent'])) )
[docs] @staticmethod def get_message_failure(**kwargs): return gettext( 'Unable to create new network record for group <strong>%(parent_id)s</strong>.', parent_id=markupsafe.escape(str(kwargs['parent'])) )
[docs] @staticmethod def get_message_cancel(**kwargs): return gettext('Canceled creating new network record.')
[docs] @staticmethod def get_item_form(item): is_admin = flask_login.current_user.has_role('admin') is_maintainer = flask_login.current_user.has_role('maintainer') if is_admin or is_maintainer: return MaintainerNetworkForm() return BaseNetworkForm()
[docs] @staticmethod def add_parent_to_item(item, parent): item.group = parent
[docs]class UpdateView(HTMLMixin, SQLAlchemyMixin, ItemUpdateView): # pylint: disable=locally-disabled,too-many-ancestors """ View for updating existing network records. """ methods = ['GET', 'POST'] authentication = True
[docs] @classmethod def get_menu_legend(cls, **kwargs): return lazy_gettext( 'Update details of network record &quot;%(item)s&quot;', item=markupsafe.escape(kwargs['item'].netname) )
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Update network record details')
@property def dbmodel(self): return NetworkModel @property def dbchlogmodel(self): return ItemChangeLogModel
[docs] @classmethod def authorize_item_action(cls, **kwargs): permission_m = flask_principal.Permission( hawat.acl.ManagementNeed(kwargs['item'].group.id) ) return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod def get_message_success(**kwargs): return gettext( 'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully updated.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_message_failure(**kwargs): return gettext( 'Unable to update network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_message_cancel(**kwargs): return gettext( 'Canceled updating network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_item_form(item): if flask_login.current_user.has_role('admin'): return AdminNetworkForm(obj=item) elif flask_login.current_user.has_role('maintainer'): return MaintainerNetworkForm(obj=item) return BaseNetworkForm(obj=item)
[docs]class DeleteView(HTMLMixin, SQLAlchemyMixin, ItemDeleteView): # pylint: disable=locally-disabled,too-many-ancestors """ View for deleting existing network records. """ methods = ['GET', 'POST'] authentication = True
[docs] @classmethod def get_menu_legend(cls, **kwargs): return lazy_gettext( 'Delete network record &quot;%(item)s&quot;', item=markupsafe.escape(kwargs['item'].netname) )
@property def dbmodel(self): return NetworkModel @property def dbchlogmodel(self): return ItemChangeLogModel
[docs] @classmethod def authorize_item_action(cls, **kwargs): permission_m = flask_principal.Permission( hawat.acl.ManagementNeed(kwargs['item'].group.id) ) return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod def get_message_success(**kwargs): return gettext( 'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully and permanently deleted.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_message_failure(**kwargs): return gettext( 'Unable to permanently delete network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
[docs] @staticmethod def get_message_cancel(**kwargs): return gettext( 'Canceled deleting network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.', item_id=markupsafe.escape(str(kwargs['item'])), parent_id=markupsafe.escape(str(kwargs['item'].group)) )
# -------------------------------------------------------------------------------
[docs]class NetworksBlueprint(HawatBlueprint): """Pluggable module - network management (*networks*)."""
[docs] @classmethod def get_module_title(cls): return lazy_gettext('Network record management')
[docs] def register_app(self, app): app.menu_main.add_entry( 'view', 'admin.{}'.format(BLUEPRINT_NAME), position=70, view=ListView )
[docs]class APINetworksView(AJAXMixin, SQLAlchemyMixin, SimpleView): # pylint: disable=locally-disabled,too-many-ancestors """ View responsible for presenting the list of networks in the form of a JSON document. """ methods = ['GET'] authentication = True authorization = [hawat.acl.PERMISSION_POWER]
[docs] @classmethod def get_view_name(cls): return 'apinetworks'
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Networks')
@property def dbmodel(self): return NetworkModel def _get_networks(self): return self.search(form_args={}) def _get_resolved_abuses(self, network): settings = network.group.settings_rep if network.is_base: return {"fallback": next((email for email in [settings.emails_low, settings.emails_medium, settings.emails_high, settings.emails_critical] if email), [])} out = {} if settings.emails_low: out["low"] = settings.emails_low if settings.emails_medium: out["medium"] = settings.emails_medium if settings.emails_high: out["high"] = settings.emails_high if settings.emails_critical: out["critical"] = settings.emails_critical return out def _get_ip_addresses(self, ip): out = {} ipobj = from_str(ip) if isinstance(ipobj, (IP4, IP4Range, IP4Net)): out["ip4_start"] = IP4(ipobj.low()) out["ip4_end"] = IP4(ipobj.high()) elif '/' in ip: out["ip6_addr"] = ip.split('/')[0] out["ip6_prefix"] = int(ip.split('/')[1]) else: out["ip6_start"] = IP6(ipobj.low()) out["ip6_end"] = IP6(ipobj.high()) return out
[docs] def do_before_response(self, **kwargs): networks = self._get_networks() out = [] for network in networks: ip = self._get_ip_addresses(network.network) out.append({ "rank": network.rank, "source": network.source, "netname": network.netname, "descr": network.description, "resolved_abuses": self._get_resolved_abuses(network), "client_id": network.local_id, **ip }) self.response_context.update({"data": out})
# -------------------------------------------------------------------------------
[docs]def get_blueprint(): """ Mandatory interface for :py:mod:`hawat.Hawat` and factory function. This function must return a valid instance of :py:class:`hawat.app.HawatBlueprint` or :py:class:`flask.Blueprint`. """ hbp = NetworksBlueprint( BLUEPRINT_NAME, __name__, template_folder='templates' ) hbp.register_view_class(ListView, '/{}/list'.format(BLUEPRINT_NAME)) hbp.register_view_class(CreateView, '/{}/create'.format(BLUEPRINT_NAME)) hbp.register_view_class(CreateForView, '/{}/createfor/<int:parent_id>'.format(BLUEPRINT_NAME)) hbp.register_view_class(ShowView, '/{}/<int:item_id>/show'.format(BLUEPRINT_NAME)) hbp.register_view_class(UpdateView, '/{}/<int:item_id>/update'.format(BLUEPRINT_NAME)) hbp.register_view_class(DeleteView, '/{}/<int:item_id>/delete'.format(BLUEPRINT_NAME)) hbp.register_view_class(APINetworksView, '/api/{}/get'.format(BLUEPRINT_NAME)) return hbp