Source code for hawat.blueprints.filters.forms
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
# -------------------------------------------------------------------------------
"""
This module contains custom reporting filter management forms for Hawat.
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import pynspect.gparser
import wtforms
from wtforms_sqlalchemy.fields import QuerySelectField
#
# Flask related modules.
#
import flask_wtf
from flask_babel import gettext, lazy_gettext
#
# Custom modules.
#
import hawat.db
import hawat.forms
from mentat.datatype.sqldb import GroupModel
from mentat.const import REPORTING_FILTER_BASIC, REPORTING_FILTER_ADVANCED
from mentat.idea.internal import Idea, IDEAFilterCompiler
[docs]def get_available_groups():
"""
Query the database for list of all available groups.
"""
return hawat.db.db_query(GroupModel).order_by(GroupModel.name).all()
[docs]def check_filter(_form, field): # pylint: disable=locally-disabled,unused-argument
"""
Callback for validating Pynspect filter.
"""
psr = pynspect.gparser.PynspectFilterParser()
psr.build()
compiler = IDEAFilterCompiler()
try:
rule = psr.parse(field.data)
compiler.compile(rule)
except Exception as err:
raise wtforms.validators.ValidationError(
gettext(
'Filtering rule parse error: "%(error)s".',
error=str(err)
)
)
[docs]def check_event(_form, field): # pylint: disable=locally-disabled,unused-argument
"""
Callback for validating IDEA event JSON.
"""
try:
Idea.from_json(field.data)
except Exception as err:
raise wtforms.validators.ValidationError(
gettext(
'Event JSON parse error: "%(error)s".',
error=str(err)
)
)
# -------------------------------------------------------------------------------
[docs]class BaseFilterForm(hawat.forms.BaseItemForm):
"""
Class representing base reporting filter form.
"""
name = wtforms.StringField(
lazy_gettext('Name:'),
validators=[
wtforms.validators.DataRequired(),
wtforms.validators.Length(min=3, max=250),
hawat.forms.check_null_character
]
)
type = wtforms.SelectField(
lazy_gettext('Type:'),
validators=[
wtforms.validators.DataRequired(),
],
choices=[
(REPORTING_FILTER_BASIC, lazy_gettext('Basic')),
(REPORTING_FILTER_ADVANCED, lazy_gettext('Advanced'))
]
)
description = wtforms.TextAreaField(
lazy_gettext('Description:'),
validators=[
wtforms.validators.DataRequired(),
]
)
filter = wtforms.TextAreaField(
lazy_gettext('Filter:'),
validators=[
wtforms.validators.Optional(),
check_filter
]
)
detectors = wtforms.SelectMultipleField(
lazy_gettext('Detectors:'),
validators=[
wtforms.validators.Optional(),
],
filters=[lambda x: x or []]
)
categories = wtforms.SelectMultipleField(
lazy_gettext('Categories:'),
validators=[
wtforms.validators.Optional(),
],
filters=[lambda x: x or []]
)
sources = hawat.forms.CommaListField(
lazy_gettext('Source IPs:'),
validators=[
wtforms.validators.Optional(),
hawat.forms.check_network_record_list
],
widget=wtforms.widgets.TextArea()
)
valid_from = hawat.forms.SmartDateTimeField(
lazy_gettext('Valid from:'),
validators=[
wtforms.validators.Optional()
]
)
valid_to = hawat.forms.SmartDateTimeField(
lazy_gettext('Valid to:'),
validators=[
wtforms.validators.Optional()
]
)
enabled = wtforms.RadioField(
lazy_gettext('State:'),
validators=[
wtforms.validators.InputRequired(),
],
choices=[
(True, lazy_gettext('Enabled')),
(False, lazy_gettext('Disabled'))
],
filters=[hawat.forms.str_to_bool],
coerce=hawat.forms.str_to_bool
)
submit = wtforms.SubmitField(
lazy_gettext('Submit')
)
preview = wtforms.SubmitField(
lazy_gettext('Preview')
)
cancel = wtforms.SubmitField(
lazy_gettext('Cancel')
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.detectors.choices = kwargs['choices_detectors']
self.categories.choices = kwargs['choices_categories']
[docs]class AdminFilterForm(BaseFilterForm):
"""
Class representing reporting filter create form.
"""
group = QuerySelectField(
lazy_gettext('Group:'),
query_factory=get_available_groups,
allow_blank=False
)
[docs]class PlaygroundFilterForm(flask_wtf.FlaskForm):
"""
Class representing IP geolocation search form.
"""
filter = wtforms.TextAreaField(
lazy_gettext('Filtering rule:'),
validators=[
wtforms.validators.DataRequired(),
check_filter
]
)
event = wtforms.TextAreaField(
lazy_gettext('IDEA event:'),
validators=[
wtforms.validators.DataRequired(),
check_event
]
)
submit = wtforms.SubmitField(
lazy_gettext('Check')
)
[docs]class FilterSearchForm(hawat.forms.BaseSearchForm):
"""
Class representing simple user search form.
"""
search = wtforms.StringField(
lazy_gettext('Name, filter, description:'),
validators=[
wtforms.validators.Optional(),
wtforms.validators.Length(min=3, max=100),
hawat.forms.check_null_character
],
filters=[
lambda x: x or '',
str.strip
],
description=lazy_gettext(
'Filter`s name, content or description. Search is performed even in the middle of the strings.')
)
dt_from = hawat.forms.SmartDateTimeField(
lazy_gettext('Creation time from:'),
validators=[
wtforms.validators.Optional()
],
description=lazy_gettext(
'Lower time boundary for item creation time. Timestamp is expected to be in the format <code>YYYY-MM-DD hh:mm:ss</code> and in the timezone according to the user`s preferences.')
)
dt_to = hawat.forms.SmartDateTimeField(
lazy_gettext('Creation time to:'),
validators=[
wtforms.validators.Optional()
],
description=lazy_gettext(
'Upper time boundary for item creation time. Timestamp is expected to be in the format <code>YYYY-MM-DD hh:mm:ss</code> and in the timezone according to the user`s preferences.')
)
type = wtforms.SelectField(
lazy_gettext('Type:'),
validators=[
wtforms.validators.Optional(),
],
choices=[
('', lazy_gettext('Nothing selected')),
(REPORTING_FILTER_BASIC, lazy_gettext('Basic')),
(REPORTING_FILTER_ADVANCED, lazy_gettext('Advanced'))
],
default='',
description=lazy_gettext('Search for filters of particular type.')
)
state = wtforms.SelectField(
lazy_gettext('State:'),
validators=[
wtforms.validators.Optional(),
],
choices=[
('', lazy_gettext('Nothing selected')),
('enabled', lazy_gettext('Enabled')),
('disabled', lazy_gettext('Disabled'))
],
default='',
description=lazy_gettext('Search for filters with particular state.')
)
group = QuerySelectField(
lazy_gettext('Group:'),
query_factory=get_available_groups,
allow_blank=True,
description=lazy_gettext('Search for filters belonging to particular group.')
)
sortby = wtforms.SelectField(
lazy_gettext('Sort by:'),
validators=[
wtforms.validators.Optional()
],
choices=[
('createtime.desc', lazy_gettext('by creation time descending')),
('createtime.asc', lazy_gettext('by creation time ascending')),
('name.desc', lazy_gettext('by netname descending')),
('name.asc', lazy_gettext('by netname ascending')),
('hits.desc', lazy_gettext('by number of hits descending')),
('hits.asc', lazy_gettext('by number of hits ascending')),
('last_hit.desc', lazy_gettext('by time of last hit descending')),
('last_hit.asc', lazy_gettext('by time of last hit ascending'))
],
default='name.asc'
)
[docs] @staticmethod
def is_multivalue(field_name):
"""
Check, if given form field is a multivalue field.
:param str field_name: Name of the form field.
:return: ``True``, if the field can contain multiple values, ``False`` otherwise.
:rtype: bool
"""
return False