#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------
"""
This file contains pluggable module for Hawat web interface containing features
related to `IDEA <https://idea.cesnet.cz/en/index>`__ events, database searching,
viewing event details and producing event dashboards.
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import datetime
import pytz
import flask
from flask_babel import lazy_gettext
import mentat.stats.idea
import mentat.services.eventstorage
from mentat.datatype.sqldb import EventStatisticsModel
from mentat.const import tr_
import hawat.const
import hawat.events
import hawat.const
import hawat.acl
import hawat.forms
import hawat.menu
from hawat.base import HawatBlueprint
from hawat.view import BaseView, SimpleView, BaseSearchView, ItemShowView
from hawat.view.mixin import HTMLMixin, AJAXMixin, SQLAlchemyMixin
from hawat.utils import URLParamsBuilder
from hawat.base import PsycopgMixin
from hawat.blueprints.events.forms import SimpleEventSearchForm, EventDashboardForm
BLUEPRINT_NAME = 'events'
"""Name of the blueprint as module global constant."""
def _get_search_form(request_args=None):
choices = hawat.events.get_event_form_choices()
form = SimpleEventSearchForm(
request_args,
meta={'csrf': False},
choices_source_types=choices['source_types'],
choices_target_types=choices['target_types'],
choices_host_types=choices['host_types'],
choices_detectors=choices['detectors'],
choices_detector_types=choices['detector_types'],
choices_categories=choices['categories'],
choices_severities=choices['severities'],
choices_classes=choices['classes'],
choices_protocols=choices['protocols'],
choices_inspection_errs=choices['inspection_errs'],
)
# In case no time bounds were set adjust them manually.
if request_args and not (
'dt_from' in request_args or 'dt_to' in request_args or 'st_from' in request_args or 'st_to' in request_args):
form.dt_from.process_data(hawat.forms.default_dt_with_delta())
form.dt_to.process_data(hawat.forms.default_dt())
return form
[docs]class AbstractSearchView(PsycopgMixin, BaseSearchView): # pylint: disable=locally-disabled,abstract-method
"""
Base class for all views responsible for searching `IDEA <https://idea.cesnet.cz/en/index>`__
event database.
"""
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Search event database')
[docs] @classmethod
def get_view_icon(cls):
return 'module-{}'.format(cls.module_name)
[docs] def do_before_search(self, form_data): # pylint: disable=locally-disabled,no-self-use,unused-argument
form_data['groups'] = [item.name for item in form_data['groups']]
[docs] def do_before_response(self, **kwargs):
self.response_context.update(
quicksearch_list=self.get_quicksearch_by_time()
)
[docs]class SearchView(HTMLMixin, AbstractSearchView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View responsible for searching the `IDEA <https://idea.cesnet.cz/en/index>`__
event database and presenting the results in the form of HTML page.
"""
methods = ['GET']
has_help = True
[docs] @staticmethod
def get_qtype():
"""
Get type of the event select query.
"""
return mentat.services.eventstorage.QTYPE_SELECT_GHOST
[docs]class APISearchView(AJAXMixin, AbstractSearchView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View responsible for searching the `IDEA <https://idea.cesnet.cz/en/index>`__
event database and presenting the results in the form of JSON document.
"""
methods = ['GET', 'POST']
[docs] @classmethod
def get_view_name(cls):
return 'apisearch'
[docs]class AbstractShowView(PsycopgMixin, ItemShowView): # pylint: disable=locally-disabled,abstract-method
"""
Base class responsible for fetching and presenting single `IDEA <https://idea.cesnet.cz/en/index>`__
event.
"""
authentication = True
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Show event')
[docs]class ShowView(HTMLMixin, AbstractShowView): # pylint: disable=locally-disabled,too-many-ancestors
"""
Detailed `IDEA <https://idea.cesnet.cz/en/index>`__ event view that presents
the result as HTML page.
"""
methods = ['GET']
has_help = True
[docs]class APIShowView(AJAXMixin, AbstractShowView): # pylint: disable=locally-disabled,too-many-ancestors
"""
Detailed `IDEA <https://idea.cesnet.cz/en/index>`__ event view that presents
the result as HTML page.
"""
methods = ['GET', 'POST']
[docs] @classmethod
def get_view_name(cls):
return 'apishow'
[docs]class DownloadView(PsycopgMixin, BaseView):
"""
Download `IDEA <https://idea.cesnet.cz/en/index>`__ event as JSON file.
"""
methods = ['GET']
authentication = True
[docs] @classmethod
def get_view_name(cls):
return 'download'
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Download event')
[docs] @classmethod
def get_view_url(cls, **kwargs):
return flask.url_for(
cls.get_view_endpoint(),
item_id=kwargs['item'].get_id()
)
# ---------------------------------------------------------------------------
[docs] def dispatch_request(self, item_id): # pylint: disable=locally-disabled,arguments-differ
"""
Mandatory interface required by the :py:func:`flask.views.View.dispatch_request`.
Will be called by the *Flask* framework to service the request.
Single item with given unique identifier will be retrieved from database
and injected into template to be displayed to the user.
"""
item = self.fetch(item_id)
if not item:
flask.abort(404)
self.logger.debug(
"Event %s is being downloaded as a standalone file.",
item['ID']
)
response = flask.make_response(
item.to_json(indent=4, sort_keys=True)
)
response.mimetype = 'application/json'
response.headers['Content-Disposition'] = 'attachment; filename={}.idea.json'.format(item_id)
return response
[docs]class AbstractDashboardView(SQLAlchemyMixin, BaseSearchView): # pylint: disable=locally-disabled,abstract-method
"""
Base class for presenting overall `IDEA <https://idea.cesnet.cz/en/index>`__
event statistics dashboard.
"""
authentication = True
[docs] @classmethod
def get_view_icon(cls):
return 'module-{}'.format(BLUEPRINT_NAME)
[docs] @classmethod
def get_view_title(cls, **kwargs):
return lazy_gettext('Overall event dashboards')
[docs] @classmethod
def get_view_template(cls):
return '{}/{}.html'.format(cls.module_name, cls.get_view_name())
# ---------------------------------------------------------------------------
@property
def dbmodel(self):
return EventStatisticsModel
[docs] @staticmethod
def build_query(query, model, form_args):
# Adjust query based on lower time boudary selection.
if 'dt_from' in form_args and form_args['dt_from']:
query = query.filter(model.dt_from >= form_args['dt_from'])
# Adjust query based on upper time boudary selection.
if 'dt_to' in form_args and form_args['dt_to']:
query = query.filter(model.dt_to <= form_args['dt_to'])
# Return the result sorted by interval.
return query.order_by(model.interval)
[docs] def do_after_search(self, items):
self.logger.debug(
"Calculating event dashboard overview from %d records.",
len(items)
)
if items:
dt_from = self.response_context['form_data'].get('dt_from', None)
if dt_from:
dt_from = dt_from.replace(tzinfo=None)
dt_to = self.response_context['form_data'].get('dt_to', None)
if dt_to:
dt_to = dt_to.replace(tzinfo=None)
if not dt_from:
dt_from = self.dbcolumn_min(self.dbmodel.createtime)
if not dt_to:
dt_to = datetime.datetime.utcnow()
self.response_context.update(
statistics=mentat.stats.idea.aggregate_timeline_groups(
items,
dt_from=dt_from,
dt_to=dt_to,
max_count=flask.current_app.config['HAWAT_CHART_TIMELINE_MAXSTEPS'],
min_step=300,
timezone=pytz.timezone(flask.session.get('timezone', 'UTC'))
)
)
[docs] def do_before_response(self, **kwargs):
self.response_context.update(
quicksearch_list=self.get_quicksearch_by_time()
)
[docs]class DashboardView(HTMLMixin, AbstractDashboardView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View responsible for presenting overall `IDEA <https://idea.cesnet.cz/en/index>`__
event statistics dashboard in the form of HTML page.
"""
methods = ['GET']
[docs] @classmethod
def get_view_name(cls):
return 'dashboard'
[docs]class APIDashboardView(AJAXMixin, AbstractDashboardView): # pylint: disable=locally-disabled,too-many-ancestors
"""
View responsible for presenting overall `IDEA <https://idea.cesnet.cz/en/index>`__
event statistics dashboard in the form of JSON document.
"""
methods = ['GET', 'POST']
[docs] @classmethod
def get_view_name(cls):
return 'apidashboard'
[docs] def process_response_context(self):
super().process_response_context()
# Prevent certain response context keys to appear in final response.
for key in ('items', 'quicksearch_list'):
try:
del self.response_context[key]
except KeyError:
pass
return self.response_context
# -------------------------------------------------------------------------------
[docs]class EventsBlueprint(HawatBlueprint):
"""Pluggable module - `IDEA <https://idea.cesnet.cz/en/index>`__ event database (*events*)."""
[docs] @classmethod
def get_module_title(cls):
return lazy_gettext('<a href="https://idea.cesnet.cz/en/index">IDEA</a> event database')
[docs] def register_app(self, app):
app.menu_main.add_entry(
'view',
'dashboards.{}'.format(BLUEPRINT_NAME),
position=10,
view=DashboardView
)
app.menu_main.add_entry(
'view',
BLUEPRINT_NAME,
position=140,
view=SearchView,
resptitle=True
)
def _get_upb():
return URLParamsBuilder(
{'submit': tr_('Search')}
).add_kwrule(
'dt_from', False, True
).add_kwrule(
'dt_to', False, True
)
# Register context search actions provided by this module.
app.set_csag(
hawat.const.CSAG_ABUSE,
tr_('Search for abuse group <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('groups', True)
)
app.set_csag(
hawat.const.CSAG_ADDRESS,
tr_('Search for source <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('source_addrs', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_ADDRESS,
tr_('Search for target <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('target_addrs', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_ADDRESS,
tr_('Search for host <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('host_addrs', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_CATEGORY,
tr_('Search for category <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('categories', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_CLASS,
tr_('Search for class <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('classes', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_DETECTOR,
tr_('Search for detector <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('detectors', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_DETTYPE,
tr_('Search for detector type <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('detector_types', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_HOSTTYPE,
tr_('Search for source type <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('source_types', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_HOSTTYPE,
tr_('Search for target type <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('target_types', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_HOSTTYPE,
tr_('Search for host type <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('host_types', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_PORT,
tr_('Search for source port <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('source_ports', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_PORT,
tr_('Search for target port <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('target_ports', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_PORT,
tr_('Search for host port <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('host_ports', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_PROTOCOL,
tr_('Search for protocol <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('protocols', True).add_kwrule('groups', True, True)
)
app.set_csag(
hawat.const.CSAG_SEVERITY,
tr_('Search for severity <strong>%(name)s</strong> in event database'),
SearchView,
_get_upb().add_rule('severities', True).add_kwrule('groups', True, True)
)
# -------------------------------------------------------------------------------
[docs]def get_blueprint():
"""
Mandatory interface for :py:mod:`hawat.Hawat` and factory function. This function
must return a valid instance of :py:class:`hawat.app.HawatBlueprint` or
:py:class:`flask.Blueprint`.
"""
hbp = EventsBlueprint(
BLUEPRINT_NAME,
__name__,
template_folder='templates',
static_folder='static',
static_url_path='/{}/static'.format(BLUEPRINT_NAME)
)
hbp.register_view_class(SearchView, '/{}/search'.format(BLUEPRINT_NAME))
hbp.register_view_class(ShowView, '/{}/<item_id>/show'.format(BLUEPRINT_NAME))
hbp.register_view_class(DownloadView, '/{}/<item_id>/download'.format(BLUEPRINT_NAME))
hbp.register_view_class(DashboardView, '/{}/dashboard'.format(BLUEPRINT_NAME))
hbp.register_view_class(APISearchView, '/api/{}/search'.format(BLUEPRINT_NAME))
hbp.register_view_class(APIShowView, '/api/{}/<item_id>/show'.format(BLUEPRINT_NAME))
hbp.register_view_class(APIDashboardView, '/api/{}/dashboard'.format(BLUEPRINT_NAME))
hbp.register_view_class(APIMetadataView, '/api/{}/metadata'.format(BLUEPRINT_NAME))
return hbp