Source code for hawat.blueprints.timeline

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------


"""
This file contains pluggable module for Hawat web interface containing features
related to `IDEA <https://idea.cesnet.cz/en/index>`__ event timeline based
visualisations.
"""


__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"


import copy
import datetime
import pytz

import flask
from flask_babel import lazy_gettext

import mentat.stats.idea
import mentat.services.eventstorage
from mentat.const import tr_
from mentat.services.eventstorage import QTYPE_TIMELINE

import hawat.events
import vial.const
import vial.acl
from vial.app import VialBlueprint
from vial.view import BaseSearchView, CustomSearchView
from vial.view.mixin import HTMLMixin, AJAXMixin
from vial.utils import URLParamsBuilder
from hawat.base import PsycopgMixin
from hawat.blueprints.timeline.forms import SimpleTimelineSearchForm


BLUEPRINT_NAME = 'timeline'
"""Name of the blueprint as module global constant."""

AGGREGATIONS = (
    (mentat.stats.idea.ST_SKEY_CATEGORIES,  {}, {"aggr_set": "category"}),
    (mentat.stats.idea.ST_SKEY_IPS,         {}, {"aggr_set": "source_ip"}),
    #('', {"aggr_set": "target_ip"}),
    (mentat.stats.idea.ST_SKEY_SRCPORTS,    {}, {"aggr_set": "source_port"}),
    (mentat.stats.idea.ST_SKEY_TGTPORTS,    {}, {"aggr_set": "target_port"}),
    (mentat.stats.idea.ST_SKEY_SRCTYPES,    {}, {"aggr_set": "source_type"}),
    (mentat.stats.idea.ST_SKEY_TGTTYPES,    {}, {"aggr_set": "target_type"}),
    (mentat.stats.idea.ST_SKEY_PROTOCOLS,   {}, {"aggr_set": "protocol"}),
    (mentat.stats.idea.ST_SKEY_DETECTORS,   {}, {"aggr_set": "node_name"}),
    (mentat.stats.idea.ST_SKEY_DETECTORTPS, {}, {"aggr_set": "node_type"}),
    (mentat.stats.idea.ST_SKEY_ABUSES,      {}, {"aggr_set": "cesnet_resolvedabuses"}),
    (mentat.stats.idea.ST_SKEY_CLASSES,     {}, {"aggr_set": "cesnet_eventclass"}),
    (mentat.stats.idea.ST_SKEY_SEVERITIES,  {}, {"aggr_set": "cesnet_eventseverity"}),
)


def _get_search_form(request_args = None):
    choices = hawat.events.get_event_form_choices()
    aggrchc = list(
        zip(
            map(lambda x: x[0], AGGREGATIONS),
            map(lambda x: x[0], AGGREGATIONS)
        )
    )

    form = SimpleTimelineSearchForm(
        request_args,
        meta = {'csrf': False},
        choices_source_types    = choices['source_types'],
        choices_target_types    = choices['target_types'],
        choices_host_types      = choices['host_types'],
        choices_detectors       = choices['detectors'],
        choices_detector_types  = choices['detector_types'],
        choices_categories      = choices['categories'],
        choices_severities      = choices['severities'],
        choices_classes         = choices['classes'],
        choices_protocols       = choices['protocols'],
        choices_inspection_errs = choices['inspection_errs'],
        choices_aggregations    = aggrchc
    )

    # In case no time bounds were set adjust them manually.
    if request_args and not ('dt_from' in request_args or 'dt_to' in request_args or 'st_from' in request_args or 'st_to' in request_args):
        form.dt_from.process_data(vial.forms.default_dt_with_delta())
        form.dt_to.process_data(vial.forms.default_dt())

    return form


[docs]class AbstractSearchView(PsycopgMixin, CustomSearchView): # pylint: disable=locally-disabled,abstract-method """ Base class for view responsible for searching `IDEA <https://idea.cesnet.cz/en/index>`__ event database and presenting the results in timeline-based manner. """ authentication = True url_params_unsupported = ('page', 'sortby')
[docs] @classmethod def get_view_icon(cls): return 'module-{}'.format(cls.module_name)
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Search event timeline')
[docs] @classmethod def get_menu_title(cls, **kwargs): return lazy_gettext('Timeline')
[docs] @staticmethod def get_search_form(request_args): return _get_search_form(request_args)
def _search_events_aggr(self, form_args, qtype, aggr_name, enable_toplist = True): self.mark_time( '{}_{}'.format(qtype, aggr_name), 'begin', tag = 'search', label = 'Begin aggregation calculations "{}:{}"'.format( qtype, aggr_name ), log = True ) search_result = self.get_db().search_events_aggr( form_args, qtype = qtype, dbtoplist = enable_toplist ) self.mark_time( '{}_{}'.format(qtype, aggr_name), 'end', tag = 'search', label = 'Finished aggregation calculations "{}:{}" [yield {} row(s)]'.format( qtype, aggr_name, len(search_result) ), log = True ) self.response_context['sqlqueries'].append( self.get_db().cursor.lastquery.decode('utf-8') ) self.response_context['search_result']["{}:{}".format(qtype, aggr_name)] = search_result
[docs] def do_before_response(self, **kwargs): self.response_context.update( quicksearch_list = self.get_quicksearch_by_time() )
[docs]class SearchView(HTMLMixin, AbstractSearchView): # pylint: disable=locally-disabled,too-many-ancestors """ View responsible for querying `IDEA <https://idea.cesnet.cz/en/index>`__ event database and presenting the results in the form of HTML page. """ methods = ['GET']
[docs] @classmethod def get_breadcrumbs_menu(cls): breadcrumbs_menu = vial.menu.Menu() breadcrumbs_menu.add_entry( 'endpoint', 'home', endpoint = flask.current_app.config['ENDPOINT_HOME'] ) breadcrumbs_menu.add_entry( 'endpoint', 'search', endpoint = '{}.search'.format(cls.module_name) ) return breadcrumbs_menu
[docs]class APISearchView(AJAXMixin, AbstractSearchView): # pylint: disable=locally-disabled,too-many-ancestors """ View responsible for querying `IDEA <https://idea.cesnet.cz/en/index>`__ event database and presenting the results in the form of JSON document. """ methods = ['GET','POST']
[docs] @classmethod def get_view_name(cls): return 'apisearch'
#-------------------------------------------------------------------------------
[docs]class AbstractLegacySearchView(PsycopgMixin, BaseSearchView): # pylint: disable=locally-disabled,abstract-method """ Base class for view responsible for searching `IDEA <https://idea.cesnet.cz/en/index>`__ event database and presenting the results in timeline-based manner. """ authentication = True url_params_unsupported = ('page', 'limit', 'sortby')
[docs] @classmethod def get_view_icon(cls): return 'module-{}'.format(cls.module_name)
[docs] @classmethod def get_view_title(cls, **kwargs): return lazy_gettext('Search event timeline')
[docs] @classmethod def get_menu_title(cls, **kwargs): return lazy_gettext('Timeline')
[docs] @staticmethod def get_search_form(request_args): return _get_search_form(request_args)
[docs] def do_before_response(self, **kwargs): self.response_context.update( quicksearch_list = self.get_quicksearch_by_time() )
[docs] @staticmethod def get_qtype(): """ Get type of the event select query. """ return mentat.services.eventstorage.QTYPE_SELECT_GHOST
[docs]class APILegacySearchView(AJAXMixin, AbstractSearchView): # pylint: disable=locally-disabled,too-many-ancestors """ View responsible for querying `IDEA <https://idea.cesnet.cz/en/index>`__ event database and presenting the results in the form of JSON document. *Deprecated legacy implementation, kept only for the purposes of comparison. """ methods = ['GET','POST']
[docs] @classmethod def get_view_name(cls): return 'apilegacysearch'
#-------------------------------------------------------------------------------
[docs]class TimelineBlueprint(VialBlueprint): """Pluggable module - IDEA event timelines (*timeline*)."""
[docs] @classmethod def get_module_title(cls): return lazy_gettext('<a href="https://idea.cesnet.cz/en/index">IDEA</a> event timelines')
[docs] def register_app(self, app): app.menu_main.add_entry( 'view', 'dashboards.{}'.format(BLUEPRINT_NAME), position = 30, view = SearchView, resptitle = True ) # Register context actions provided by this module. app.set_csag( hawat.const.CSAG_ADDRESS, tr_('Search for source <strong>%(name)s</strong> on IDEA event timeline'), SearchView, URLParamsBuilder({'submit': tr_('Search')}).add_rule('source_addrs', True).add_kwrule('dt_from', False, True).add_kwrule('dt_to', False, True) )
#-------------------------------------------------------------------------------
[docs]def get_blueprint(): """ Mandatory interface for :py:mod:`vial.Vial` and factory function. This function must return a valid instance of :py:class:`vial.app.VialBlueprint` or :py:class:`flask.Blueprint`. """ hbp = TimelineBlueprint( BLUEPRINT_NAME, __name__, template_folder = 'templates' ) hbp.register_view_class(SearchView, '/{}/search'.format(BLUEPRINT_NAME)) hbp.register_view_class(APISearchView, '/api/{}/search'.format(BLUEPRINT_NAME)) hbp.register_view_class(APILegacySearchView, '/api/{}/legacysearch'.format(BLUEPRINT_NAME)) return hbp