Source code for mentat.services.pdnsr

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------


"""
Implementation of internal **PassiveDNS** service connector.
"""


__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"


import copy
import requests

from mentat.const import CKEY_CORE_SERVICES, CKEY_CORE_SERVICES_PDNS


_MANAGER = None


[docs]class PDNSRConfigException(ValueError): pass
[docs]class PDNSRRuntimeException(RuntimeError): pass
[docs]class PDNSRService: """ Implementation of internal **PassiveDNS** access service. """ def __init__(self, base_url, base_api_url, api_key): """ Initialize geolocation service with paths to desired database files. """ self.base_url = base_url self.base_api_url = base_api_url self.api_key = api_key # Check presence and validity of config. if not self.base_url: raise PDNSRConfigException( "PassiveDNS service is used but base URL is not configured" ) if not (self.base_url.startswith("https://") or self.base_url.startswith("http://")): raise PDNSRConfigException( "Invalid PassiveDNS service base URL" ) if not self.base_api_url: raise PDNSRConfigException( "PassiveDNS service is used but base API URL is not configured" ) if not (self.base_api_url.startswith("https://") or self.base_api_url.startswith("http://")): raise PDNSRConfigException( "Invalid PassiveDNS service base API URL" ) if not self.api_key: raise PDNSRConfigException( "PassiveDNS service is used but api_key is not configured" ) # Ensure both base_url and base_api_url end with slash. if not self.base_url.endswith("/"): self.base_url += "/" if not self.base_api_url.endswith("/"): self.base_api_url += "/"
[docs] def setup(self): """ Additional internal setup currently not necessary. """
[docs] def status(self): """ Display status of the service. """ return { 'base_url': self.base_url, 'base_api_url': self.base_api_url, }
[docs] def get_url_lookup_ip(self, ipaddr): """ Get URL for looking up given IP address in PassiveDNS service. """ return "{}?query={}&type=ip&since=&until=".format( self.base_url, str(ipaddr) )
[docs] def get_api_url_lookup_ip(self, ipaddr): """ Get API URL for looking up given IP address in PassiveDNS service. """ return "{}ip/{}?token={}".format( self.base_api_url, str(ipaddr), self.api_key )
[docs] def lookup_ip(self, ipaddr, sortby = None, limit = None): """ Lookup given IP address in PassiveDNS service. """ # Prepare request to PassiveDNS API url = self.get_api_url_lookup_ip(ipaddr) # Send request try: resp = requests.get(url, timeout=300) except Exception as exc: raise PDNSRRuntimeException( "Can't get data from PassiveDNS service: {}ip/{}".format(self.base_api_url, str(ipaddr)) ) from exc if resp.status_code == requests.codes.not_found: return None resp.raise_for_status() # Parse response try: result = resp.json() if sortby: field, direction = sortby.split('.') reverse = direction == 'desc' result = sorted(result, key = lambda x: x.get(field, None), reverse = reverse) if limit and int(limit): result = result[:int(limit)] return result except Exception as exc: raise PDNSRRuntimeException( "Invalid data received from PassiveDNS service: {}".format(str(exc)) ) from exc
[docs]class PDNSRServiceManager: """ Class representing a custom PDNSRServiceManager capable of understanding and parsing Mentat system core configurations and enabling easy way of unified bootstrapping of :py:class:`mentat.services.nerd.PDNSRService` service. """ def __init__(self, core_config, updates = None): """ Initialize PDNSRServiceManager object with full core configuration tree structure. :param dict core_config: Mentat core configuration structure. :param dict updates: Optional configuration updates (same structure as ``core_config``). """ self._pdnsrconfig = {} self._service = None self._configure_pdnsr(core_config, updates) def _configure_pdnsr(self, core_config, updates): """ Internal sub-initialization helper: Configure database structure parameters and optionally merge them with additional updates. :param dict core_config: Mentat core configuration structure. :param dict updates: Optional configuration updates (same structure as ``core_config``). """ self._nerdconfig = copy.deepcopy( core_config[CKEY_CORE_SERVICES][CKEY_CORE_SERVICES_PDNS] ) if updates and CKEY_CORE_SERVICES in updates and CKEY_CORE_SERVICES_PDNS in updates[CKEY_CORE_SERVICES]: self._nerdconfig.update( updates[CKEY_CORE_SERVICES][CKEY_CORE_SERVICES_PDNS] )
[docs] def service(self): """ Return handle to PassiveDNS service according to internal configurations. :return: Reference to PassiveDNS service object. :rtype: mentat.services.passivednsr.PDNSRService """ if not self._service: self._service = PDNSRService(**self._nerdconfig) self._service.setup() return self._service
#-------------------------------------------------------------------------------
[docs]def init(core_config, updates = None): """ (Re-)Initialize :py:class:`PDNSRServiceManager` instance at module level and store the refence within module. """ global _MANAGER # pylint: disable=locally-disabled,global-statement _MANAGER = PDNSRServiceManager(core_config, updates)
[docs]def manager(): """ Obtain reference to :py:class:`NerdServiceManager` instance stored at module level. """ return _MANAGER
[docs]def service(): """ Obtain reference to :py:class:`NerdService` instance from module level manager. """ return manager().service()