#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------
"""
This pluggable module provides access to network record management features. These
features include:
* general network record listing
* detailed network record view
* creating new network records
* updating existing network records
* deleting existing network records
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import markupsafe
import flask
import flask_login
import flask_principal
from flask_babel import gettext, lazy_gettext
from sqlalchemy import or_
from sqlalchemy import text
from mentat.datatype.sqldb import NetworkModel, GroupModel, ItemChangeLogModel
from ipranges import from_str, ip_from_str, IP4, IP4Range, IP4Net, IP6
import hawat.acl
import hawat.menu
from hawat.base import HawatBlueprint
from hawat.view import ItemListView, ItemShowView, ItemCreateView, ItemCreateForView, ItemUpdateView, ItemDeleteView, SimpleView
from hawat.view.mixin import HTMLMixin, SQLAlchemyMixin, AJAXMixin
from hawat.blueprints.networks.forms import BaseNetworkForm, MaintainerNetworkForm, AdminNetworkForm, NetworkSearchForm
BLUEPRINT_NAME = 'networks'
"""Name of the blueprint as module global constant."""
[docs]class ListView(HTMLMixin, SQLAlchemyMixin, ItemListView):
"""
General network record listing.
"""
methods = ['GET']
authentication = True
authorization = [hawat.acl.PERMISSION_POWER]
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Network management')
@property
def dbmodel(self):
return NetworkModel
[docs] @staticmethod
def build_query(query, model, form_args):
# Adjust query based on text search string.
if 'search' in form_args and form_args['search']:
# First of all, assume the user entered address/net/range.
try:
# Convert user input to ipranges object.
net = from_str(form_args['search'])
# 'a >>= b' means: a contains b or is equal to b
# So search for networks which contain the address/net/range from the user input.
# Also, the user input is converted to range to solve issues with addresses such as 195.113.144.1/24
query = query.filter(text("network >>= '{}-{}'".format(ip_from_str(net.low()), ip_from_str(net.high()))))
except ValueError:
# Try searching by the name of the network or the description.
query = query \
.filter(
or_(
model.netname.ilike('%{}%'.format(form_args['search'])),
model.description.ilike('%{}%'.format(form_args['search']))
)
)
# Adjust query based on lower time boudary selection.
if 'dt_from' in form_args and form_args['dt_from']:
query = query.filter(model.createtime >= form_args['dt_from'])
# Adjust query based on upper time boudary selection.
if 'dt_to' in form_args and form_args['dt_to']:
query = query.filter(model.createtime <= form_args['dt_to'])
# Adjust query based on record source selection.
if 'source' in form_args and form_args['source']:
query = query \
.filter(model.source == form_args['source'])
# Adjust query based on user membership selection.
if 'group' in form_args and form_args['group']:
query = query \
.filter(model.group_id == form_args['group'].id)
if 'sortby' in form_args and form_args['sortby']:
sortmap = {
'createtime.desc': lambda x, y: x.order_by(y.createtime.desc()),
'createtime.asc': lambda x, y: x.order_by(y.createtime.asc()),
'netname.desc': lambda x, y: x.order_by(y.netname.desc()),
'netname.asc': lambda x, y: x.order_by(y.netname.asc()),
'network.desc': lambda x, y: x.order_by(y.network.desc()),
'network.asc': lambda x, y: x.order_by(y.network.asc()),
'rank.desc': lambda x, y: x.order_by(y.rank.desc()),
'rank.asc': lambda x, y: x.order_by(y.rank.asc())
}
query = sortmap[form_args['sortby']](query, model)
return query
[docs]class ShowView(HTMLMixin, SQLAlchemyMixin, ItemShowView):
"""
Detailed network record view.
"""
methods = ['GET']
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Show network record details')
@property
def dbmodel(self):
return NetworkModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_mm = flask_principal.Permission(
hawat.acl.MembershipNeed(kwargs['item'].group.id),
hawat.acl.ManagementNeed(kwargs['item'].group.id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_mm.can()
[docs] def do_before_response(self, **kwargs):
item = self.response_context['item']
if self.can_access_endpoint('networks.update', item=item) and self.has_endpoint('changelogs.search'):
self.response_context.update(
context_action_menu_changelogs=self.get_endpoint_class(
'changelogs.search'
).get_context_action_menu()
)
item_changelog = self.dbsession.query(ItemChangeLogModel). \
filter(ItemChangeLogModel.model == item.__class__.__name__). \
filter(ItemChangeLogModel.model_id == item.id). \
order_by(ItemChangeLogModel.createtime.desc()). \
limit(100). \
all()
self.response_context.update(item_changelog=item_changelog)
[docs]class CreateView(HTMLMixin, SQLAlchemyMixin, ItemCreateView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for creating new network records.
"""
methods = ['GET', 'POST']
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Create new network record')
@property
def dbmodel(self):
return NetworkModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
return hawat.acl.PERMISSION_POWER.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully created.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to create new network record for group <strong>%(parent_id)s</strong>.',
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext('Canceled creating new network record.')
[docs]class CreateForView(HTMLMixin, SQLAlchemyMixin,
ItemCreateForView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for creating new network records.
"""
methods = ['GET', 'POST']
authentication = True
module_name_par = 'groups'
[docs] @classmethod
def get_view_icon(cls):
return 'module-{}'.format(BLUEPRINT_NAME)
[docs] @classmethod
def get_view_url(cls, **kwargs):
return flask.url_for(
cls.get_view_endpoint(),
parent_id=kwargs['item'].id
)
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Create new network record for group')
@property
def dbmodel(self):
return NetworkModel
@property
def dbmodel_par(self):
return GroupModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_m = flask_principal.Permission(
hawat.acl.ManagementNeed(kwargs['item'].id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully created.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['parent']))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to create new network record for group <strong>%(parent_id)s</strong>.',
parent_id=markupsafe.escape(str(kwargs['parent']))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext('Canceled creating new network record.')
[docs] @staticmethod
def add_parent_to_item(item, parent):
item.group = parent
[docs]class UpdateView(HTMLMixin, SQLAlchemyMixin, ItemUpdateView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for updating existing network records.
"""
methods = ['GET', 'POST']
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Update network record details')
@property
def dbmodel(self):
return NetworkModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_m = flask_principal.Permission(
hawat.acl.ManagementNeed(kwargs['item'].group.id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully updated.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to update network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext(
'Canceled updating network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs]class DeleteView(HTMLMixin, SQLAlchemyMixin, ItemDeleteView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View for deleting existing network records.
"""
methods = ['GET', 'POST']
authentication = True
@property
def dbmodel(self):
return NetworkModel
@property
def dbchlogmodel(self):
return ItemChangeLogModel
[docs] @classmethod
def authorize_item_action(cls, **kwargs):
permission_m = flask_principal.Permission(
hawat.acl.ManagementNeed(kwargs['item'].group.id)
)
return hawat.acl.PERMISSION_POWER.can() or permission_m.can()
[docs] @staticmethod
def get_message_success(**kwargs):
return gettext(
'Network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong> was successfully and permanently deleted.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_failure(**kwargs):
return gettext(
'Unable to permanently delete network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
[docs] @staticmethod
def get_message_cancel(**kwargs):
return gettext(
'Canceled deleting network record <strong>%(item_id)s</strong> for group <strong>%(parent_id)s</strong>.',
item_id=markupsafe.escape(str(kwargs['item'])),
parent_id=markupsafe.escape(str(kwargs['item'].group))
)
# -------------------------------------------------------------------------------
[docs]class NetworksBlueprint(HawatBlueprint):
"""Pluggable module - network management (*networks*)."""
[docs] @classmethod
def get_module_title(cls):
return lazy_gettext('Network record management')
[docs] def register_app(self, app):
app.menu_main.add_entry(
'view',
'admin.{}'.format(BLUEPRINT_NAME),
position=70,
view=ListView
)
[docs]class APINetworksView(AJAXMixin, SQLAlchemyMixin, SimpleView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View responsible for presenting the list of networks in the form of a JSON document.
"""
methods = ['GET']
authentication = True
authorization = [hawat.acl.PERMISSION_POWER]
[docs] @classmethod
def get_view_name(cls):
return 'apinetworks'
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Networks')
@property
def dbmodel(self):
return NetworkModel
def _get_networks(self):
return self.search(form_args={})
def _get_resolved_abuses(self, network):
settings = network.group.settings_rep
if network.is_base:
return {"fallback": next((email for email in [settings.emails_low, settings.emails_medium, settings.emails_high, settings.emails_critical] if email), [])}
out = {}
if settings.emails_low:
out["low"] = settings.emails_low
if settings.emails_medium:
out["medium"] = settings.emails_medium
if settings.emails_high:
out["high"] = settings.emails_high
if settings.emails_critical:
out["critical"] = settings.emails_critical
return out
def _get_ip_addresses(self, ip):
out = {}
ipobj = from_str(ip)
if isinstance(ipobj, (IP4, IP4Range, IP4Net)):
out["ip4_start"] = IP4(ipobj.low())
out["ip4_end"] = IP4(ipobj.high())
elif '/' in ip:
out["ip6_addr"] = ip.split('/')[0]
out["ip6_prefix"] = int(ip.split('/')[1])
else:
out["ip6_start"] = IP6(ipobj.low())
out["ip6_end"] = IP6(ipobj.high())
return out
[docs] def do_before_response(self, **kwargs):
networks = self._get_networks()
out = []
for network in networks:
ip = self._get_ip_addresses(network.network)
out.append({
"rank": network.rank,
"source": network.source,
"netname": network.netname,
"descr": network.description,
"resolved_abuses": self._get_resolved_abuses(network),
"client_id": network.local_id,
**ip
})
self.response_context.update({"data": out})
# -------------------------------------------------------------------------------
[docs]def get_blueprint():
"""
Mandatory interface for :py:mod:`hawat.Hawat` and factory function. This function
must return a valid instance of :py:class:`hawat.app.HawatBlueprint` or
:py:class:`flask.Blueprint`.
"""
hbp = NetworksBlueprint(
BLUEPRINT_NAME,
__name__,
template_folder='templates'
)
hbp.register_view_class(ListView, '/{}/list'.format(BLUEPRINT_NAME))
hbp.register_view_class(CreateView, '/{}/create'.format(BLUEPRINT_NAME))
hbp.register_view_class(CreateForView, '/{}/createfor/<int:parent_id>'.format(BLUEPRINT_NAME))
hbp.register_view_class(ShowView, '/{}/<int:item_id>/show'.format(BLUEPRINT_NAME))
hbp.register_view_class(UpdateView, '/{}/<int:item_id>/update'.format(BLUEPRINT_NAME))
hbp.register_view_class(DeleteView, '/{}/<int:item_id>/delete'.format(BLUEPRINT_NAME))
hbp.register_view_class(APINetworksView, '/api/{}/get'.format(BLUEPRINT_NAME))
return hbp