# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import sys
from operator import itemgetter
from Crypto.Cipher import AES
import ipranges
from . import movement
__all__ = ["CryptoPan"]
try:
int.to_bytes
int.from_bytes
except AttributeError:
# Python 2: slow 'bytes' substitutes
def _to_bytes(int_value, int_value_len, byteorder):
return str("").join(chr(int_value>>n&0xFF) for n in range(int_value_len*8-8, -8, -8))
def _from_bytes(byte_array, byteorder):
res = 0
for b in byte_array:
res = (res << 8) | ord(b)
return res
def _first_byte(arr):
return ord(arr[0])
else:
# Python 3: use builtins
_to_bytes = int.to_bytes
_from_bytes = int.from_bytes
_first_byte = itemgetter(0)
MASK128 = 2**128 - 1
[docs]class CryptoPan(movement.Cog):
""" Cog for cryptopan anonymisation. """
[docs] def __init__(self, key, id_get = None, ip_get = None, cpan_set = None, ip_del = None, ip_err_del = None):
""" Initialize CryptoPan.
:param key: a 32 bytes(py3) or str(py2) object used for AES key
and padding for a block cipher operation. The first 16 bytes
are used for the AES key, and the rest for the padding.
:param id_get: Getter for ID.
:param ip_get: Getter for IP address to anonymise
:param cpan_set: Setter for anonymised cryptopan IP
:param ip_del: Deleter for original IP address
:param ip_err_del: Deleter for original IP address in case of error (no cryptopan calculated)
"""
self.id_get = id_get or itemgetter("id")
self.ip_get = ip_get or itemgetter("ip")
self.cpan_set = cpan_set or movement.itemsetter("cryptopan_ip")
self.ip_del = ip_del or movement.itemdeleter("ip")
self.ip_err_del = ip_err_del or self.ip_del
self._cipher = AES.new(key[:16], AES.MODE_ECB)
self._padding = _from_bytes(self._cipher.encrypt(key[16:32]), byteorder="big")
def __call__(self, data):
""" Main pipeline event handler. """
try:
ip = self.ip_get(data)
anonymised = self._anonymise(ip)
except Exception:
eid = self.id_get(data)
logging.exception("%s: Failed to calculate cryptopan anonymisation", eid)
data = self.ip_err_del(data)
else:
data = self.cpan_set(data, anonymised)
data = self.ip_del(data)
return (data,)
def _anonymise(self, ip):
""" Anonymise an IP address represented as an ipranges value.
:param ip: An IP address string.
:returns: An anonymised IP address string.
"""
addr = ipranges.ip_from_str(ip)
ext_addr = addr.ip << (128-addr.bit_length)
result = 0
for pos in range(addr.bit_length):
prefix = ext_addr & MASK128 << (128-pos)
padded_addr = prefix | (self._padding & MASK128 >> pos)
f = self._cipher.encrypt(_to_bytes(padded_addr, 16, byteorder="big"))
result |= (_first_byte(f) >> 7) << (addr.bit_length-pos-1)
return str(addr.single(addr.ip ^ result))