Source code for mentat.services.nerd

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#
# This product includes GeoLite2 data created by MaxMind, available from
# http://www.maxmind.com.
#-------------------------------------------------------------------------------


"""
Implementation of internal **NERD** service connector.
"""


__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"


import copy
import requests

from mentat.const import CKEY_CORE_SERVICES, CKEY_CORE_SERVICES_NERD


_MANAGER = None


[docs]class NerdConfigException(ValueError): pass
[docs]class NerdRuntimeException(RuntimeError): pass
[docs]class NerdService: """ Implementation of internal **NERD** access service. """ def __init__(self, base_url, base_api_url, api_key): """ Initialize geolocation service with paths to desired database files. """ self.base_url = base_url self.base_api_url = base_api_url self.api_key = api_key # Check presence and validity of config. if not self.base_url: raise NerdConfigException( "NERD service is used but base URL is not configured" ) if not (self.base_url.startswith("https://") or self.base_url.startswith("http://")): raise NerdConfigException( "Invalid NERD service base URL" ) if not self.base_api_url: raise NerdConfigException( "NERD service is used but base API URL is not configured" ) if not (self.base_api_url.startswith("https://") or self.base_api_url.startswith("http://")): raise NerdConfigException( "Invalid NERD service base API URL" ) if not self.api_key: raise NerdConfigException( "NERD service is used but api_key is not configured" ) # Ensure both base_url and base_api_url end with slash. if not self.base_url.endswith("/"): self.base_url += "/" if not self.base_api_url.endswith("/"): self.base_api_url += "/"
[docs] def setup(self): """ Additional internal setup currently not necessary. """
[docs] def status(self): """ Display status of the service. """ return { 'base_url': self.base_url, 'base_api_url': self.base_api_url, }
[docs] def get_url_lookup_ip(self, ipaddr): """ Get URL for looking up given IP address in NERD service. """ return "{}ip/{}".format( self.base_url, str(ipaddr) )
[docs] def get_api_url_lookup_ip(self, ipaddr): """ Get URL for looking up given IP address in NERD service. """ return "{}ip/{}".format( self.base_api_url, str(ipaddr) )
[docs] def lookup_ip(self, ipaddr): """ Lookup given IP address in NERD service. """ # Prepare request to NERD API url = self.get_api_url_lookup_ip(ipaddr) headers = { "Authorization": self.api_key } # Send request try: resp = requests.get(url, headers = headers) except Exception as exc: raise NerdRuntimeException( "Can't get data from NERD service: {}".format(str(exc)) ) from exc if resp.status_code == requests.codes.not_found: return None resp.raise_for_status() # Parse response try: return resp.json() except Exception as exc: raise NerdRuntimeException( "Invalid data received from NERD service: {}".format(str(exc)) ) from exc
[docs]class NerdServiceManager: """ Class representing a custom NerdServiceManager capable of understanding and parsing Mentat system core configurations and enabling easy way of unified bootstrapping of :py:class:`mentat.services.nerd.NerdService` service. """ def __init__(self, core_config, updates = None): """ Initialize NerdServiceManager object with full core configuration tree structure. :param dict core_config: Mentat core configuration structure. :param dict updates: Optional configuration updates (same structure as ``core_config``). """ self._nerdconfig = {} self._service = None self._configure_nerd(core_config, updates) def _configure_nerd(self, core_config, updates): """ Internal sub-initialization helper: Configure database structure parameters and optionally merge them with additional updates. :param dict core_config: Mentat core configuration structure. :param dict updates: Optional configuration updates (same structure as ``core_config``). """ self._nerdconfig = copy.deepcopy( core_config[CKEY_CORE_SERVICES][CKEY_CORE_SERVICES_NERD] ) if updates and CKEY_CORE_SERVICES in updates and CKEY_CORE_SERVICES_NERD in updates[CKEY_CORE_SERVICES]: self._nerdconfig.update( updates[CKEY_CORE_SERVICES][CKEY_CORE_SERVICES_NERD] )
[docs] def service(self): """ Return handle to NERD service according to internal configurations. :return: Reference to NERD service object. :rtype: mentat.services.nerd.NerdService """ if not self._service: self._service = NerdService(**self._nerdconfig) self._service.setup() return self._service
#-------------------------------------------------------------------------------
[docs]def init(core_config, updates = None): """ (Re-)Initialize :py:class:`NerdServiceManager` instance at module level and store the refence within module. """ global _MANAGER # pylint: disable=locally-disabled,global-statement _MANAGER = NerdServiceManager(core_config, updates)
[docs]def manager(): """ Obtain reference to :py:class:`NerdServiceManager` instance stored at module level. """ return _MANAGER
[docs]def service(): """ Obtain reference to :py:class:`NerdService` instance from module level manager. """ return manager().service()