Source code for deadbeat.ip

# -*- coding: utf-8 -*-

""" Filtering and anonymisation cogs for DeadBeat """

from __future__ import absolute_import, division, print_function, unicode_literals

import ipranges
import shlex
import logging
from . import conf
from operator import itemgetter
from .movement import Cog, basestring

__all__ = ["cast_ip_range_list", "anonymise_base_config", "IdeaAnonymise", "filter_base_config", "FilterBase", "IPFilter"]

[docs]def cast_ip_range_list(l): """ Translate comma separated list of IP addresses into list of ipranges objects. """ if isinstance(l, basestring): lex = shlex.shlex(l, posix=True) lex.whitespace=',' lex.whitespace_split=True strings = list(lex) else: strings = l return [ipranges.from_str(r.strip()) for r in strings if r]
#: Anonymisation configuration insert anonymise_base_config = ( conf.cfg_item( "sources", cast_ip_range_list, "List of source ranges, which will be reported instead of IP addresses within them", default=()), conf.cfg_item( "sources_exclude", cast_ip_range_list, "List of source ranges, which will not be anonymised", default=()), conf.cfg_item( "targets", cast_ip_range_list, "List of target ranges, which will be reported instead of IP addresses within them", default=()), conf.cfg_item( "targets_exclude", cast_ip_range_list, "List of target ranges, which will not be anonymised", default=()) )
[docs]class IdeaAnonymise(Cog): """ Cog for anonymisation of Idea events """
[docs] def __init__(self, sources=(), targets=(), sources_exclude=(), targets_exclude=()): """ Initialize IdeaAnonymise. :param sources: List of IP ranges to anonymise Source.IP? to. :param targets: List of IP ranges to anonymise Target.IP? to. :param sources_exclude: List of IP ranges to NOT anonymise in Source.IP?. :param targets_exclude: List of IP ranges to NOT anonymise in Target.IP?. """ self.sources = sources self.targets = targets self.sources_exclude = sources_exclude self.targets_exclude = targets_exclude
def __call__(self, idea): """ Main pipeline event handler """ anonymised = False for node, ranges, excludes in ( ("Source", self.sources, self.sources_exclude), ("Target", self.targets, self.targets_exclude)): for net in idea.get(node, []): for key in ("IP4", "IP6"): ips = net.get(key, []) for i in range(len(ips)): ip = ips[i] for te in excludes: if ip in te: break else: for tr in ranges: if ip in tr: ips[i] = tr anonymised = True break if anonymised: type_ = net.get("Type", []) if "Anonymised" not in type_: type_.append("Anonymised") net["Type"] = type_ return (idea,)
#: Filtering configuration insert filter_base_config = ( conf.cfg_item( "sources", cast_ip_range_list, "List of source ranges, which will not be processed", default=()), conf.cfg_item( "targets", cast_ip_range_list, "List of target ranges, which will not be processes", default=()) )
[docs]class FilterBase(Cog): """ Base cog for creating hard filters. """
[docs] def __init__(self, train, id_get=None): """ Initialize FilterBase. :param train: Train singleton. :param id_get: ID getter. """ self.train = train self.id_get = id_get or itemgetter("ID")
def __call__(self, data): """ Main pipeline event handler. """ if self.condition(data): event_id = self.id_get(data) if event_id is not None: self.train.notify("event_done", event_id) logging.debug("%s: dropped", event_id) return None else: return (data,)
[docs]class IPFilter(FilterBase): """ Cog for dropping unwanted event data based on IP ranges. """
[docs] def __init__(self, train, id_get=None, ranges=None, item_get=None): """ Initialize IPFilter. :param ranges: List of IP ranges. :param item_get: Getter for data to consult. """ super(IPFilter, self).__init__(train, id_get) self.ranges = ranges or [] self.item_get = item_get or itemgetter("ip")
[docs] def condition(self, data): ip = self.item_get(data) for r in self.ranges: if ip in r: return True return False