Source code for deadbeat.cryptopan

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import sys
from operator import itemgetter
from Crypto.Cipher import AES
import ipranges
from . import movement

__all__ = ["CryptoPan"]

try:
    int.to_bytes
    int.from_bytes
except AttributeError:
    # Python 2: slow 'bytes' substitutes
    def _to_bytes(int_value, int_value_len, byteorder):
        return str("").join(chr(int_value>>n&0xFF) for n in range(int_value_len*8-8, -8, -8))

    def _from_bytes(byte_array, byteorder):
        res = 0
        for b in byte_array:
            res = (res << 8) | ord(b)
        return res

    def _first_byte(arr):
        return ord(arr[0])
else:
    # Python 3: use builtins
    _to_bytes = int.to_bytes
    _from_bytes = int.from_bytes
    _first_byte = itemgetter(0)

MASK128 = 2**128 - 1

[docs]class CryptoPan(movement.Cog): """ Cog for cryptopan anonymisation. """
[docs] def __init__(self, key, id_get = None, ip_get = None, cpan_set = None, ip_del = None, ip_err_del = None): """ Initialize CryptoPan. :param key: a 32 bytes(py3) or str(py2) object used for AES key and padding for a block cipher operation. The first 16 bytes are used for the AES key, and the rest for the padding. :param id_get: Getter for ID. :param ip_get: Getter for IP address to anonymise :param cpan_set: Setter for anonymised cryptopan IP :param ip_del: Deleter for original IP address :param ip_err_del: Deleter for original IP address in case of error (no cryptopan calculated) """ self.id_get = id_get or itemgetter("id") self.ip_get = ip_get or itemgetter("ip") self.cpan_set = cpan_set or movement.itemsetter("cryptopan_ip") self.ip_del = ip_del or movement.itemdeleter("ip") self.ip_err_del = ip_err_del or self.ip_del self._cipher = AES.new(key[:16], AES.MODE_ECB) self._padding = _from_bytes(self._cipher.encrypt(key[16:32]), byteorder="big")
def __call__(self, data): """ Main pipeline event handler. """ try: ip = self.ip_get(data) anonymised = self._anonymise(ip) except Exception: eid = self.id_get(data) logging.exception("%s: Failed to calculate cryptopan anonymisation", eid) data = self.ip_err_del(data) else: data = self.cpan_set(data, anonymised) data = self.ip_del(data) return (data,) def _anonymise(self, ip): """ Anonymise an IP address represented as an ipranges value. :param ip: An IP address string. :returns: An anonymised IP address string. """ addr = ipranges.ip_from_str(ip) ext_addr = addr.ip << (128-addr.bit_length) result = 0 for pos in range(addr.bit_length): prefix = ext_addr & MASK128 << (128-pos) padded_addr = prefix | (self._padding & MASK128 >> pos) f = self._cipher.encrypt(_to_bytes(padded_addr, 16, byteorder="big")) result |= (_first_byte(f) >> 7) << (addr.bit_length-pos-1) return str(addr.single(addr.ip ^ result))