# -*- coding: utf-8 -*-
""" Text transformation cogs for DeadBeat """
from __future__ import absolute_import, division, print_function, unicode_literals
import re
import csv
import json
from collections import namedtuple
from operator import itemgetter
from .movement import Cog, nullsetter, itemsetter, dictupdater, selfgetter
__all__ = [
"ConstLexicon", "re_rule", "LinearRegexpLexicon", "RegexpLexicon",
"CSVParse", "CSVMarshall", "JSONParse", "JSONMarshall", "JinjaMarshall"
]
[docs]class ConstLexicon(Cog):
""" Cog, which enriches data based on constant set of dictionary values. """
[docs] def __init__(self, lexicon=None):
""" Initialize ConstLexicon.
:param lexicon: Description of comparisons and actions
Comparison and action dictionary example::
{
itemgetter("field"): { # What to decide on
"value": { # Matching value
"itemsetter("other"): "Something" # Set "other" to "Something"
"itemsetter("different"): "Anything" # Set "different" to "Anything"
}
}
}
"""
self.lexicon = lexicon
def __call__(self, data):
""" Main pipeline event handler. """
for getter, tests in self.lexicon.items():
test_value = getter(data)
try:
set_def = tests[test_value]
except KeyError:
if None in tests:
set_def = tests[None]
else:
raise
for setter, set_value in set_def.items():
data = setter(data, set_value)
return (data,)
re_rule = namedtuple("re_rule", ("name", "regex", "fields", "example"))
re_rule.__new__.__defaults__ = re_rule.__new__.func_defaults = (None,)
[docs]class LinearRegexpLexicon(Cog):
""" Cog, which applies set of regular expressions to data and extracts named fields. """
[docs] def __init__(self, rlist, str_get=None, rname_set=None, flags=0):
""" Initialize LinearRegexpLexicon.
:param rlist: Ruleset of names, expressions, field setters and examples
:param str_get: Getter for matched data
:param rname_set: Setter for rule name
:param flags: Regular expression library flags (see :py:mod:`re`)
Ruleset is in following form::
(
text.rrule(
name="connect",
regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
...
)
"""
self.rlist = rlist
self.str_get = str_get or itemgetter("input")
self.rname_set = rname_set or nullsetter
self.flags = flags
self.regexps = [(name, re.compile(regex), fields, dummy) for name, regex, fields, dummy in rlist]
def __call__(self, data):
""" Main pipeline event handler. """
line = self.str_get(data)
for name, regex, fields, dummy in self.regexps:
match = regex.match(line, self.flags)
if match:
#~ res = dict(zip(fields, match.groups()))
for setter, group in zip(fields, match.groups()):
data = setter(data, group)
data = self.rname_set(data, name)
#~ data.update(res)
return (data,)
return None
[docs]class RegexpLexicon(Cog):
""" Cog, which applies set of regular expressions to data and extracts named fields.
Note that this is much faster version than :py:obj:`LinearRegexpLexicon`, however allows only for
arbitrarily limited number of rules because of :py:mod:`re` limits.
"""
[docs] def __init__(self, rlist, str_get=None, rname_set=None, flags=0):
""" Initialize RegexpLexicon.
:param rlist: Ruleset of names, expressions, field setters and examples
:param str_get: Getter for matched data
:param rname_set: Setter for rule name
:param flags: Regular expression library flags (see :py:mod:`re`)
Ruleset is in following form::
(
text.rrule(
name="connect",
regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
...
)
"""
self.rlist = rlist
self.str_get = str_get or itemgetter("input")
self.rname_set = rname_set or nullsetter
self.flags = flags
self.restr = ""
self.indexgroup = {}
pos = 0
for name, regex, fields, dummy in rlist:
re.compile(regex)
groups = fields
self.restr = self.restr + r"(%s)|" % regex
self.indexgroup[pos] = groups, name
pos += len(groups) + 1
self.restr = self.restr[:-1]
self.regex = re.compile(self.restr)
def __call__(self, data):
""" Main pipeline event handler. """
line = self.str_get(data)
match = self.regex.match(line)
if not match:
return None
mg = match.groups()
i = 0
while mg[i] is None:
i += 1
groups, rname = self.indexgroup[i]
#~ res = dict(zip(groups, mg[i+1:]))
for setter, group in zip(groups, mg[i+1:]):
data = setter(data, group)
data = self.rname_set(data, rname)
#~ data.update(res)
return (data,)
[docs]class CSVParse(Cog):
""" Cog for parsing CSV lines. """
[docs] def __init__(self, fieldsetters, restkey_setter=None, restval=None, dialect=None, str_get=None, *args, **kwargs):
""" Initialize CSVParse.
:param str_get: Getter for data to be parsed
:param fieldsetters: sequence, whose elements are itemsetters, associated with the fields
of the input data in order. Note that fieldsetters is analogy to fieldnames of csv.DictReader,
but have to be not only names, but callable itemsetters.
For other arguments see :py:obj:`csv.DictReader`
"""
self.fieldsetters = fieldsetters
self.restkey_setter = restkey_setter
self.restval = restval
self.dialect = dialect
self.str_get = str_get or itemgetter("input")
self.args = args
self.kwargs = kwargs
self.csv_reader = csv.reader(self._line_getter(), dialect, *args, **kwargs)
def _line_getter(self):
while True:
yield self.line
def __call__(self, data):
""" Main pipeline event handler. """
# CSV reader expects full control on incoming iterator, however we do not iterate because we have
# incoming separate lines of data, potentially not from raw file, but parsed out from bigger
# pieces, like syslog. So we present reader with simple iterator, which feeds it only the
# one line, which is available in the self.line.
self.line = self.str_get(data)
row = next(self.csv_reader)
self.line = ""
if not row:
return None
# Adapted from csv.py, as original DictReader may try reading more than we have and block.
for setter, field in zip(self.fieldsetters, row):
data = setter(data, field)
lf = len(self.fieldsetters)
lr = len(row)
if lf < lr:
data = self.restkey_setter(data, row[lf:])
elif lf > lr:
for setter in self.fieldsetters[lr:]:
data = setter(data, self.restval)
return (data,)
[docs]class CSVMarshall(Cog):
""" Cog for writing CSV lines. """
[docs] def __init__(self, fieldgetters, dialect="excel", data_set=None, *args, **kwargs):
""" Initialize CSVMarshall.
:param fieldgetters: Sequence of itemgetters that identify the order in which the values are written to CSV line
:param data_set: Setter for the resulting data
For other orguments see :py:obj:`csv.writer`
"""
self.fieldgetters = fieldgetters
self.dialect = dialect
self.data_set = data_set or itemsetter("output")
self.args = args
self.kwargs = kwargs
self.writer = csv.writer(self, dialect=self.dialect, *args, **kwargs)
[docs] def write(self, s):
""" Helper function to make self behave as fake filelike object for DictWriter. """
self.output = s
def __call__(self, data):
""" Main pipeline event handler. """
arr = [getter(data) for getter in self.fieldgetters]
self.writer.writerow(arr)
data = self.data_set(data, self.output)
return (data,)
[docs]class JSONParse(Cog):
""" Cog for parsing JSON data. """
[docs] def __init__(self, str_get=None, data_set=None, *args, **kwargs):
""" Initialize JSONParse.
:param str_get: Getter for parsed data
:param data_set: Setter for JSON object. By default values get merget into `data` dict
For other orguments see :py:obj:`json.JSONDecoder`
"""
self.str_get = str_get or itemgetter("input")
self.data_set = data_set or dictupdater
self.args = args
self.kwargs = kwargs
self.decoder = json.JSONDecoder(*args, **kwargs)
def __call__(self, data):
""" Main pipeline event handler. """
s = self.str_get(data)
output = self.decoder.decode(s)
data = self.data_set(data, output)
return (data,)
[docs]class JSONMarshall(Cog):
""" Cog for converting data into JSON. """
[docs] def __init__(self, data_get=None, str_set=None, *args, **kwargs):
""" Cog for converting data into JSON.
:param data_get: Getter for data to marshall. By default returns `data` itself
:param str_set: Setter for the resulting string
For other arguments see :py:obj:`json.JSONEncoder`
"""
self.data_get = data_get or selfgetter
self.str_set = str_set or itemsetter("output")
self.encoder = json.JSONEncoder(*args, **kwargs)
def __call__(self, data):
""" Main pipeline event handler. """
indata = self.data_get(data)
json = self.encoder.encode(indata)
data = self.str_set(data, json)
return (data,)
[docs]class JinjaMarshall(Cog):
""" Cog for converting data according to Jinja template.
Can be used for generating mail reports, creating specifically formated
log lines, generating HTML documents, etc.
"""
[docs] def __init__(self, jinja_env, data_get=None, str_set=None, template_get="template"):
""" Cog for converting data according to Jinja template.
:param jinja_env: Pre-created Jinja environment, see :py:mod:`jinja2`
:param data_get: Getter for the dict of variables for template. By defaults returns `data` itself
:param str_set: Setter for the resulting data
:param template_get: Getter for the template name (often :py:func:`deadbeat.constgetter`)
"""
self.env = jinja_env
self.data_get = data_get or selfgetter
self.str_set = str_set or itemsetter("output")
self.template_get = template_get or itemgetter("template")
self.template_cache = {}
def __call__(self, data):
""" Main pipeline event handler. """
template_name = self.template_get(data)
try:
template = self.template_cache[template_name]
except KeyError:
self.template_cache[template_name] = template = self.env.get_template(template_name)
indata = self.data_get(data)
res = template.render(indata)
data = self.str_set(data, res)
return (data,)