# -*- coding: utf-8 -*-
""" Filtering and anonymisation cogs for DeadBeat """
from __future__ import absolute_import, division, print_function, unicode_literals
import ipranges
import shlex
import logging
from . import conf
from operator import itemgetter
from .movement import Cog, basestring
__all__ = ["cast_ip_range_list", "anonymise_base_config", "IdeaAnonymise", "filter_base_config", "FilterBase", "IPFilter"]
[docs]def cast_ip_range_list(l):
""" Translate comma separated list of IP addresses into list of ipranges objects. """
if isinstance(l, basestring):
lex = shlex.shlex(l, posix=True)
lex.whitespace=','
lex.whitespace_split=True
strings = list(lex)
else:
strings = l
return [ipranges.from_str(r.strip()) for r in strings if r]
#: Anonymisation configuration insert
anonymise_base_config = (
conf.cfg_item(
"sources", cast_ip_range_list,
"List of source ranges, which will be reported instead of IP addresses within them", default=()),
conf.cfg_item(
"sources_exclude", cast_ip_range_list,
"List of source ranges, which will not be anonymised",
default=()),
conf.cfg_item(
"targets", cast_ip_range_list,
"List of target ranges, which will be reported instead of IP addresses within them", default=()),
conf.cfg_item(
"targets_exclude", cast_ip_range_list,
"List of target ranges, which will not be anonymised",
default=())
)
[docs]class IdeaAnonymise(Cog):
""" Cog for anonymisation of Idea events """
[docs] def __init__(self, sources=(), targets=(), sources_exclude=(), targets_exclude=()):
""" Initialize IdeaAnonymise.
:param sources: List of IP ranges to anonymise Source.IP? to.
:param targets: List of IP ranges to anonymise Target.IP? to.
:param sources_exclude: List of IP ranges to NOT anonymise in Source.IP?.
:param targets_exclude: List of IP ranges to NOT anonymise in Target.IP?.
"""
self.sources = sources
self.targets = targets
self.sources_exclude = sources_exclude
self.targets_exclude = targets_exclude
def __call__(self, idea):
""" Main pipeline event handler """
anonymised = False
for node, ranges, excludes in (
("Source", self.sources, self.sources_exclude),
("Target", self.targets, self.targets_exclude)):
for net in idea.get(node, []):
for key in ("IP4", "IP6"):
ips = net.get(key, [])
for i in range(len(ips)):
ip = ips[i]
for te in excludes:
if ip in te:
break
else:
for tr in ranges:
if ip in tr:
ips[i] = tr
anonymised = True
break
if anonymised:
type_ = net.get("Type", [])
if "Anonymised" not in type_:
type_.append("Anonymised")
net["Type"] = type_
return (idea,)
#: Filtering configuration insert
filter_base_config = (
conf.cfg_item(
"sources", cast_ip_range_list,
"List of source ranges, which will not be processed", default=()),
conf.cfg_item(
"targets", cast_ip_range_list,
"List of target ranges, which will not be processes", default=())
)
[docs]class FilterBase(Cog):
""" Base cog for creating hard filters. """
[docs] def __init__(self, train, id_get=None):
""" Initialize FilterBase.
:param train: Train singleton.
:param id_get: ID getter.
"""
self.train = train
self.id_get = id_get or itemgetter("ID")
def __call__(self, data):
""" Main pipeline event handler. """
if self.condition(data):
event_id = self.id_get(data)
if event_id is not None:
self.train.notify("event_done", event_id)
logging.debug("%s: dropped", event_id)
return None
else:
return (data,)
[docs]class IPFilter(FilterBase):
""" Cog for dropping unwanted event data based on IP ranges. """
[docs] def __init__(self, train, id_get=None, ranges=None, item_get=None):
""" Initialize IPFilter.
:param ranges: List of IP ranges.
:param item_get: Getter for data to consult.
"""
super(IPFilter, self).__init__(train, id_get)
self.ranges = ranges or []
self.item_get = item_get or itemgetter("ip")
[docs] def condition(self, data):
ip = self.item_get(data)
for r in self.ranges:
if ip in r:
return True
return False