Source code for deadbeat.text

# -*- coding: utf-8 -*-

""" Text transformation cogs for DeadBeat """

from __future__ import absolute_import, division, print_function, unicode_literals

import re
import csv
import json
from collections import namedtuple
from operator import itemgetter
from .movement import Cog, nullsetter, itemsetter, dictupdater, selfgetter

__all__ = [
    "ConstLexicon", "re_rule", "LinearRegexpLexicon", "RegexpLexicon",
    "CSVParse", "CSVMarshall", "JSONParse", "JSONMarshall", "JinjaMarshall"
]

[docs]class ConstLexicon(Cog): """ Cog, which enriches data based on constant set of dictionary values. """
[docs] def __init__(self, lexicon=None): """ Initialize ConstLexicon. :param lexicon: Description of comparisons and actions Comparison and action dictionary example:: { itemgetter("field"): { # What to decide on "value": { # Matching value "itemsetter("other"): "Something" # Set "other" to "Something" "itemsetter("different"): "Anything" # Set "different" to "Anything" } } } """ self.lexicon = lexicon
def __call__(self, data): """ Main pipeline event handler. """ for getter, tests in self.lexicon.items(): test_value = getter(data) try: set_def = tests[test_value] except KeyError: if None in tests: set_def = tests[None] else: raise for setter, set_value in set_def.items(): data = setter(data, set_value) return (data,)
re_rule = namedtuple("re_rule", ("name", "regex", "fields", "example")) re_rule.__new__.__defaults__ = re_rule.__new__.func_defaults = (None,)
[docs]class LinearRegexpLexicon(Cog): """ Cog, which applies set of regular expressions to data and extracts named fields. """
[docs] def __init__(self, rlist, str_get=None, rname_set=None, flags=0): """ Initialize LinearRegexpLexicon. :param rlist: Ruleset of names, expressions, field setters and examples :param str_get: Getter for matched data :param rname_set: Setter for rule name :param flags: Regular expression library flags (see :py:mod:`re`) Ruleset is in following form:: ( text.rrule( name="connect", regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*', fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")), example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"), ... ) """ self.rlist = rlist self.str_get = str_get or itemgetter("input") self.rname_set = rname_set or nullsetter self.flags = flags self.regexps = [(name, re.compile(regex), fields, dummy) for name, regex, fields, dummy in rlist]
def __call__(self, data): """ Main pipeline event handler. """ line = self.str_get(data) for name, regex, fields, dummy in self.regexps: match = regex.match(line, self.flags) if match: #~ res = dict(zip(fields, match.groups())) for setter, group in zip(fields, match.groups()): data = setter(data, group) data = self.rname_set(data, name) #~ data.update(res) return (data,) return None
[docs]class RegexpLexicon(Cog): """ Cog, which applies set of regular expressions to data and extracts named fields. Note that this is much faster version than :py:obj:`LinearRegexpLexicon`, however allows only for arbitrarily limited number of rules because of :py:mod:`re` limits. """
[docs] def __init__(self, rlist, str_get=None, rname_set=None, flags=0): """ Initialize RegexpLexicon. :param rlist: Ruleset of names, expressions, field setters and examples :param str_get: Getter for matched data :param rname_set: Setter for rule name :param flags: Regular expression library flags (see :py:mod:`re`) Ruleset is in following form:: ( text.rrule( name="connect", regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*', fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")), example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"), ... ) """ self.rlist = rlist self.str_get = str_get or itemgetter("input") self.rname_set = rname_set or nullsetter self.flags = flags self.restr = "" self.indexgroup = {} pos = 0 for name, regex, fields, dummy in rlist: re.compile(regex) groups = fields self.restr = self.restr + r"(%s)|" % regex self.indexgroup[pos] = groups, name pos += len(groups) + 1 self.restr = self.restr[:-1] self.regex = re.compile(self.restr)
def __call__(self, data): """ Main pipeline event handler. """ line = self.str_get(data) match = self.regex.match(line) if not match: return None mg = match.groups() i = 0 while mg[i] is None: i += 1 groups, rname = self.indexgroup[i] #~ res = dict(zip(groups, mg[i+1:])) for setter, group in zip(groups, mg[i+1:]): data = setter(data, group) data = self.rname_set(data, rname) #~ data.update(res) return (data,)
[docs]class CSVParse(Cog): """ Cog for parsing CSV lines. """
[docs] def __init__(self, fieldsetters, restkey_setter=None, restval=None, dialect=None, str_get=None, *args, **kwargs): """ Initialize CSVParse. :param str_get: Getter for data to be parsed :param fieldsetters: sequence, whose elements are itemsetters, associated with the fields of the input data in order. Note that fieldsetters is analogy to fieldnames of csv.DictReader, but have to be not only names, but callable itemsetters. For other arguments see :py:obj:`csv.DictReader` """ self.fieldsetters = fieldsetters self.restkey_setter = restkey_setter self.restval = restval self.dialect = dialect self.str_get = str_get or itemgetter("input") self.args = args self.kwargs = kwargs self.csv_reader = csv.reader(self._line_getter(), dialect, *args, **kwargs)
def _line_getter(self): while True: yield self.line def __call__(self, data): """ Main pipeline event handler. """ # CSV reader expects full control on incoming iterator, however we do not iterate because we have # incoming separate lines of data, potentially not from raw file, but parsed out from bigger # pieces, like syslog. So we present reader with simple iterator, which feeds it only the # one line, which is available in the self.line. self.line = self.str_get(data) row = next(self.csv_reader) self.line = "" if not row: return None # Adapted from csv.py, as original DictReader may try reading more than we have and block. for setter, field in zip(self.fieldsetters, row): data = setter(data, field) lf = len(self.fieldsetters) lr = len(row) if lf < lr: data = self.restkey_setter(data, row[lf:]) elif lf > lr: for setter in self.fieldsetters[lr:]: data = setter(data, self.restval) return (data,)
[docs]class CSVMarshall(Cog): """ Cog for writing CSV lines. """
[docs] def __init__(self, fieldgetters, dialect="excel", data_set=None, *args, **kwargs): """ Initialize CSVMarshall. :param fieldgetters: Sequence of itemgetters that identify the order in which the values are written to CSV line :param data_set: Setter for the resulting data For other orguments see :py:obj:`csv.writer` """ self.fieldgetters = fieldgetters self.dialect = dialect self.data_set = data_set or itemsetter("output") self.args = args self.kwargs = kwargs self.writer = csv.writer(self, dialect=self.dialect, *args, **kwargs)
[docs] def write(self, s): """ Helper function to make self behave as fake filelike object for DictWriter. """ self.output = s
def __call__(self, data): """ Main pipeline event handler. """ arr = [getter(data) for getter in self.fieldgetters] self.writer.writerow(arr) data = self.data_set(data, self.output) return (data,)
[docs]class JSONParse(Cog): """ Cog for parsing JSON data. """
[docs] def __init__(self, str_get=None, data_set=None, *args, **kwargs): """ Initialize JSONParse. :param str_get: Getter for parsed data :param data_set: Setter for JSON object. By default values get merget into `data` dict For other orguments see :py:obj:`json.JSONDecoder` """ self.str_get = str_get or itemgetter("input") self.data_set = data_set or dictupdater self.args = args self.kwargs = kwargs self.decoder = json.JSONDecoder(*args, **kwargs)
def __call__(self, data): """ Main pipeline event handler. """ s = self.str_get(data) output = self.decoder.decode(s) data = self.data_set(data, output) return (data,)
[docs]class JSONMarshall(Cog): """ Cog for converting data into JSON. """
[docs] def __init__(self, data_get=None, str_set=None, *args, **kwargs): """ Cog for converting data into JSON. :param data_get: Getter for data to marshall. By default returns `data` itself :param str_set: Setter for the resulting string For other arguments see :py:obj:`json.JSONEncoder` """ self.data_get = data_get or selfgetter self.str_set = str_set or itemsetter("output") self.encoder = json.JSONEncoder(*args, **kwargs)
def __call__(self, data): """ Main pipeline event handler. """ indata = self.data_get(data) json = self.encoder.encode(indata) data = self.str_set(data, json) return (data,)
[docs]class JinjaMarshall(Cog): """ Cog for converting data according to Jinja template. Can be used for generating mail reports, creating specifically formated log lines, generating HTML documents, etc. """
[docs] def __init__(self, jinja_env, data_get=None, str_set=None, template_get="template"): """ Cog for converting data according to Jinja template. :param jinja_env: Pre-created Jinja environment, see :py:mod:`jinja2` :param data_get: Getter for the dict of variables for template. By defaults returns `data` itself :param str_set: Setter for the resulting data :param template_get: Getter for the template name (often :py:func:`deadbeat.constgetter`) """ self.env = jinja_env self.data_get = data_get or selfgetter self.str_set = str_set or itemsetter("output") self.template_get = template_get or itemgetter("template") self.template_cache = {}
def __call__(self, data): """ Main pipeline event handler. """ template_name = self.template_get(data) try: template = self.template_cache[template_name] except KeyError: self.template_cache[template_name] = template = self.env.get_template(template_name) indata = self.data_get(data) res = template.render(indata) data = self.str_set(data, res) return (data,)