Source code for

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of Mentat system (
# Copyright (C) since 2011 CESNET, z.s.p.o (
# Use of this source is governed by the MIT license, see LICENSE file.
# This product includes GeoLite2 data created by MaxMind, available from

Implementation of internal **geoip2** database library.


This library is dependent on `geoip2 <>`__
(available through `PyPI <>`__) and requires
GeoLite2 Free `downloadable databases <>`__:

* `GeoLite2-ASN.mmdb <>`__
* `GeoLite2-City.mmdb <>`__
* `GeoLite2-Country.mmdb <>`__

.. note::

    This product includes GeoLite2 data created by MaxMind, available from

.. warning::

    Work in progress and alpha code.


__author__ = "Jan Mach <>"
__credits__ = "Pavel Kácha <>, Andrea Kropáčová <>"

import copy

import geoip2.database



[docs]class GeoipService: """ Implementation of internal **geoip2** database service. """ def __init__(self, asndb = None, citydb = None, countrydb = None): """ Initialize geolocation service with paths to desired database files. """ self.fn_asndb = asndb self.fn_citydb = citydb self.fn_countrydb = countrydb self.asndb = None self.citydb = None self.countrydb = None def __del__(self): """ Close internal geolocation database readers. """ if self.asndb: self.asndb.close() self.asndb = None if self.citydb: self.citydb.close() self.citydb = None if self.countrydb: self.countrydb.close() self.countrydb = None
[docs] def setup(self): """ Setup internal geolocation database readers. """ if self.fn_asndb: self.asndb = geoip2.database.Reader(self.fn_asndb) else: self.asndb = None if self.fn_citydb: self.citydb = geoip2.database.Reader(self.fn_citydb) else: self.citydb = None if self.fn_countrydb: self.countrydb = geoip2.database.Reader(self.fn_countrydb) else: self.countrydb = None
[docs] def status(self): """ Display status of internal geolocation readers. """ return { 'asn': self.fn_asndb, 'city': self.fn_citydb, 'country': self.fn_countrydb }
[docs] def lookup(self, ipaddr): """ Lookup given IP address in all databases. """ result = {} if self.asndb: result['asn'] = self.lookup_asn(ipaddr) if not result['asn']: del result['asn'] if self.citydb: result['city'] = self.lookup_city(ipaddr) if not result['city']: del result['city'] if self.countrydb: result['country'] = self.lookup_country(ipaddr) if not result['country']: del result['country'] return result or None
[docs] def lookup_asn(self, ipaddr): """ Lookup given IP address in ASN database. """ try: response = self.asndb.asn(str(ipaddr)) return { 'ip': response.ip_address, 'asn': response.autonomous_system_number, 'org': response.autonomous_system_organization } except geoip2.errors.AddressNotFoundError: return None
[docs] def lookup_city(self, ipaddr): """ Lookup given IP address in city database. """ try: response = return { 'ip': response.traits.ip_address, 'cty_name':, 'ctr_code':, 'ctr_name':, 'cnt_code': response.continent.code, 'cnt_name':, 'longitude': response.location.longitude, 'latitude': response.location.latitude, 'timezone': response.location.time_zone, 'accuracy': response.location.accuracy_radius, } except geoip2.errors.AddressNotFoundError: return None
[docs] def lookup_country(self, ipaddr): """ Lookup given IP address in Country database. """ try: response = return { 'ip': response.traits.ip_address, 'ctr_code':, 'ctr_name':, 'cnt_code': response.continent.code, 'cnt_name':, } except geoip2.errors.AddressNotFoundError: return None
[docs]class GeoipServiceManager: """ Class representing a custom GeoipServiceManager capable of understanding and parsing Mentat system core configurations and enabling easy way of unified bootstrapping of :py:class:`` service. """ def __init__(self, core_config, updates = None): """ Initialize GeoipServiceManager object with full core configuration tree structure. :param dict core_config: Mentat core configuration structure. :param dict updates: Optional configuration updates (same structure as ``core_config``). """ self._geoconfig = {} self._service = None self._configure_geoip(core_config, updates) def _configure_geoip(self, core_config, updates): """ Internal sub-initialization helper: Configure database structure parameters and optionally merge them with additional updates. :param dict core_config: Mentat core configuration structure. :param dict updates: Optional configuration updates (same structure as ``core_config``). """ self._geoconfig = copy.deepcopy(core_config[CKEY_CORE_SERVICES][CKEY_CORE_SERVICES_GEOIP]) if updates and CKEY_CORE_SERVICES in updates and CKEY_CORE_SERVICES_GEOIP in updates[CKEY_CORE_SERVICES]: self._geoconfig.update( updates[CKEY_CORE_SERVICES][CKEY_CORE_SERVICES_GEOIP] )
[docs] def service(self): """ Return handle to geoip service according to internal configurations. :return: Reference to geoip service object. :rtype: """ if not self._service: self._service = GeoipService(**self._geoconfig) self._service.setup() return self._service
[docs]def init(core_config, updates = None): """ (Re-)Initialize :py:class:`GeoipServiceManager` instance at module level and store the refence within module. """ global _MANAGER # pylint: disable=locally-disabled,global-statement _MANAGER = GeoipServiceManager(core_config, updates)
[docs]def manager(): """ Obtain reference to :py:class:`GeoipServiceManager` instance stored at module level. """ return _MANAGER
[docs]def service(): """ Obtain reference to :py:class:`GeoipService` instance from module level manager. """ return manager().service()