Source code for deadbeat.twist

# -*- coding: utf-8 -*-

""" Transformation cogs for DeadBeat """

from __future__ import absolute_import, division, print_function, unicode_literals

import sys
import logging
import copy
from collections import Sequence, Mapping
from operator import itemgetter
from typedcols import TypedDict, typed_dict
from .movement import Cog, selfgetter, uuidsetter

__all__ = [
    "Normalize", "WindowContextMgr", "Unwind"
]

[docs]class Normalize(Cog): """ Cog for type conversion/normalizaton. Uses :py:obj:`typedcols.TypedDict` for adapting types. Note that expects `data` as dict-like object, which will be acted upon. """
[docs] def __init__(self, typedef, allow_unknown=True, one_shot=True, tname=None): """ Initialize Normalize. :param one_shot: Return plain dict if True, or TypedDict instance otherwise :param tname: Name of the new TypedDict type """ self.typedef = typedef self.allow_unknown = allow_unknown self.one_shot = one_shot if isinstance(typedef, type) and issubclass(typedef, TypedDict): self.factory = typedef self.tname = type(typedef).__name__ else: self.tname = str(tname) or type(self).__name__ + str(id(self)) self.factory = typed_dict( self.tname, allow_unknown, typedef)
def __call__(self, indata): """ Main pipeline event handler. """ norm = self.factory(indata) return (norm.data,) if self.one_shot else (norm,)
class LambdaTransform(Cog): def __init__(self, description, factory=None, **kwargs): self.factory = factory for key, value in kwargs.items(): setattr(self, key, value) self.description = description(self) def get_value(self, src, value, exception=(KeyError, IndexError)): try: if callable(value): value = value(src) elif isinstance(value, str): value = src[value] elif isinstance(value, Sequence): value = self.process_list(src, value) elif isinstance(value, Mapping): value = self.process_dict(src, value) except exception: # Usually raised when key not present in source Event value = None return value def process_list(self, src, description): target = [] for desc in description: val = self.get_value(src, desc) if val is not None: target.append(val) return target or None def process_dict(self, src, description): target = {} for key, desc in description.items(): val = self.get_value(src, desc) if val is not None: target[key] = val return target or None def __call__(self, s): res = self.process_dict(s, self.description) if self.factory: res = self.factory(res) return (res,) def _get(s, path, **kwargs): if "default" in kwargs: default = kwargs.pop("default") throw = False else: throw = True try: # Maybe string? path = path.split(".") except Exception: # No, treat as sequence directly pass try: val = s for key in path: if isinstance(val, Sequence): key = int(key) val = val[key] return val except Exception: if throw: raise return default
[docs]class WindowContextMgr(Cog): """ Abstract class for deriving floating window context based cogs. Implements floating window - if something is added to the context within `window` interval, context is kept open, otherwise it closes and sends further data accumulated so far. Context is also closed if absolute `timeout` is reached (to not stay open indefinitely). Useful for aggregation, deduplication, statistics. Derived class should (re)define `get_context_keys`, `open_context`, `update_context` and possibly `close_context`. """
[docs] def __init__(self, train, window=60*10, timeout=60*5, id_get=None, ctx_id_get=None): """ Initialize WindowContextMgr. :param train: Train singleton instance :param window: Floating window size in seconds :param timeout: Absolute lifetime in seconds :param id_get: Getter for ID :param ctx_id_get: Getter for context ID """ self.train = train self.window = window self.timeout = timeout self.id_get = id_get or itemgetter("ID") self.ctx_id_get = ctx_id_get or itemgetter("ID") self.contexts = {}
[docs] def get_context_keys(self, data): """ Calculates and returns iterable of selectors, denoting which contexts will the data fall into. Usually selector will be tuple with some combination of data fields, for example source IP + port. All data with the same selector will fall into the same context. It should be relatively performant, as it will be computed on every passing data. Default implementation returns ID, which (in cooperation with other defaults) effectively means deduplication based on duplicit ID. """ return (self.id_get(data),)
[docs] def open_context(self, data, ctx): """ `open_context` is called if context with provided selector does not yet exist. It returns context structure, initialized with first event. Structure can contain full data (in case of deduplication), merged fields (in case of aggregation), computed fields (in case of statistics), whatever suits the aim. Note that `update_context` is not called for the first event, `open_context` should do it in its own, if it makes sense for the situation. :param ctx: identification of context for planning calls on it """ return data
[docs] def close_context(self, data, ctx): """ `close_context` is called when context is expired. It returns final events to be sent further down to pipeline. :param ctx: identification of context for planning calls on it """ return (data,)
[docs] def update_context(self, ctx_data, data, ctx): """ `update_context` is called when new data comes, corresponding to existing context. It is meant to update context data with new event data, possibly merging, dropping or recalculating what is necessary. :param ctx: identification of context for planning calls on it """ return ctx_data
[docs] def expire(self, ctx): try: ctx_data = self.contexts.pop(ctx) except KeyError: return ctx_id = self.ctx_id_get(ctx_data) events = self.close_context(ctx_data, ctx) self.train.esc.unschedule(self.timeout, handle=ctx) logging.debug("Closed context %s (%s)", ctx_id, ctx) return events
def __call__(self, data): """ Main pipeline event handler. """ ctxs = self.get_context_keys(data) if ctxs is None: return data for ctx in ctxs: ctx_data = self.contexts.get(ctx) if ctx_data is None: ctx_data = self.open_context(data, ctx) self.train.esc.after(self.timeout, self.expire, ctx, handle=ctx) else: ctx_data = self.update_context(ctx_data, data, ctx) self.contexts[ctx] = ctx_data old_id = self.id_get(data) new_id = self.ctx_id_get(ctx_data) logging.debug("Merged %s into context %s (%s)", old_id, new_id, str(ctx)) self.train.notify("event_rename", old_id, new_id) self.train.esc.after(self.window, self.expire, ctx, handle=ctx)
[docs] def finish(self): """ `finish` event handler. Close all open contexts and send their data further down to the pipelien. """ ret = [] for ctx in list(self.contexts): self.train.esc.unschedule(self.timeout, handle=ctx) self.train.esc.unschedule(self.expire, handle=ctx) ret.extend(self.expire(ctx)) return ret
[docs]class Unwind(Cog): """ Breaks iterable field to a single value contained in its own separate data object. For example, instance of: Unwind(train, item_get=itemgetter("array"),item_set=itemgetter("array")) for data: {"array": ["one", "two", "three"], "ID": "bflm"} yields: {"array": "one", "ID": "bflm"} {"array": "two", "ID": "bflm"} {"array": "three", "ID": "bflm"} Data instance must support deepcopy. """
[docs] def __init__(self, train, base_get=None, item_get=None, item_set=None, item_cleaners = (), id_get=None, id_set=None): """ Initialize Unwind. :param train: Train singleton. :param base_get: Base namespace where to operate. Defaults to whole data. :param item_get: Getter for the iterable value. :param item_set: Setter for the single value (can write into the same value as item_get). :param item_cleaners: List of deleters to cleanup the data before splitting. :param id_get: Getter for data ID. :param id_set: Setter for unique IDs of created instances. """ self.train = train self.base_get = base_get or selfgetter self.item_get = item_get or itemgetter("input") self.item_set = item_set or itemsetter("output") self.item_cleaners = item_cleaners self.id_get = id_get or itemgetter("ID") self.id_set = id_set or uuidsetter("ID") self.on_fly = {}
def __call__(self, data): """ Main pipeline event handler. """ base = self.base_get(data) iterable = self.item_get(base) eid = self.id_get(data) for deleter in self.item_cleaners: data = deleter(data) res = [] backlink = dict(eid=eid) for value in iterable: new_data = copy.deepcopy(data) new_data = self.id_set(new_data) new_eid = self.id_get(new_data) new_base = self.base_get(new_data) self.item_set(new_base, value) res.append(new_data) self.on_fly[new_eid] = backlink backlink["count"] = len(res) return res
[docs] def event_done(self, event_id): """ Handler for finished events. """ backlink = self.on_fly.get(event_id) if not backlink: return None del self.on_fly[event_id] backlink["count"] -= 1 if not backlink["count"]: self.train.notify("event_done", backlink["eid"])
[docs] def event_error(self, event_id): """ Handler for erred events. """ backlink = self.on_fly.get(event_id) if not backlink: return None del self.on_fly[event_id] backlink["count"] -= 1 self.train.notify("event_error", backlink["eid"])