Source code for mentat.plugin.enricher.passivedns

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2018 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#
#-------------------------------------------------------------------------------


"""
Enricher plugins performing DNS lookup of all Source/IPx addresses using
*CESNET* PassiveDNS service.

The implementation consists of PassiveDNS connector and its Enricher plugin.
The connector provides information about domains linked to a user defined IP address.
Each domain record provides at least information when the domain name in combination
with the IP address was seen for the first and the last time from the point of
a DNS sniffer.

.. warning::

    Still a work in progress and alpha code.

"""

__author__ = "Lukáš Huták <lukas.hutak@cesnet.cz>"
__credits__ = "Václav Bartoš <bartos@cesnet.cz>, Pavel Kácha <pavel.kacha@cesnet.cz>, " \
              "Jan Mach <jan.mach@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"

from datetime import datetime, timedelta
import pprint
import time
import json
import math
import ipaddress
import requests

# Custom libraries
from pynspect.jpath import jpath_values, jpath_set
import mentat.plugin.enricher


[docs]class PassiveDNSConnectorError(RuntimeError): """ Custom error of the PassiveDNSConnector """
[docs]class PassiveDNSConnectorBase: """ The abstract base class for PassiveDNS connectors. The class provides common interface and basic record caching. """ def __init__(self, api_timeout=0.5, rec_validity=168): """ Base connector initializer :param int api_timeout: Query timeout (seconds) :param int rec_validity: Return only records X hours old """ self._cfg_api_timeout = api_timeout self._cfg_rec_validity = rec_validity def _create_rec(self, name, time_first, time_last, **kwargs): """ Create an internal record format :param str name: Domain name :param int time_first: First seen timestamp (seconds since UNIX epoch) :param int time_last: Last seen timestamp (seconds since UNIX epoch) :param kwargs: Additional extra parameters :return: Internal record format :rtype: dict """ time2str = lambda ts: time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) ret = { "Name": name, "FirstSeenTime": time2str(time_first), "LastSeenTime": time2str(time_last), } if kwargs: ret.update(**kwargs) return ret def _query_fn(self, ip_addr, timeout): """ PassiveDNS query function This function is intended to be implemented in subclasses. The function will send a request to a PassiveDNS API and return parsed records in the internal format or raise an exception. :param str ip_addr: IP address to query :param int timeout: Query timeout in seconds :return: Parsed domains as a list of internal records (can be empty) :rtype: list of dict """ raise NotImplementedError("The function is not implemented in the subclass")
[docs] def query(self, ip_addr, timeout=None): """ Get domains of an IP address based on PassiveDNS A new query is sent to the remote server and results are successfully processed and returned. :param str ip_addr: IP address to query :param int timeout: Query timeout in seconds (if None, default timeout is used) :return: Parsed domains as a list of internal records (can be empty) :rtype: list of dict """ if not timeout: timeout = self._cfg_api_timeout domains = self._query_fn(ip_addr, timeout) return domains
[docs] def query_multi(self, ip_addrs, timeout=None): """ Get domains of multiple IP addresses based on PassiveDNS Similar to the casual query, however, results of multiple IP addresses are returned as dictionary where keys are IP addresses and values are lists of parsed domains. IP addresses without known domain records are not present in the result. :param list of str ip_addrs: List of IP addresses to query :param int timeout: Single query timeout in seconds (if None, default timeout is used) :return: IP addresses and their domains (can be empty) :rtype: dict [str, list of dict] """ domain_dict = {} for i in ip_addrs: domains = self.query(i, timeout) if not domains: continue domain_dict[str(i)] = domains return domain_dict
[docs] def status(self): """ Determine and return the status of configuration :return: Dictionary containing various subkeys :rtype: dict """ stats = { 'api_timeout': self._cfg_api_timeout, 'rec_validity': self._cfg_rec_validity } return stats
[docs]class PassiveDNSConnectorCESNET(PassiveDNSConnectorBase): """ PassiveDNS connector for 'CESNET' PassiveDNS API """ # List of configuration parameters API_SERVER = "https://passivedns.cesnet.cz" API_URL = "/pdns/ip/{ip_address}?from={start}&to={end}" def __init__(self, api_limit=100, **kwargs): """ Connector initializer Due to remote API limitation the common parameter 'api_validity' (in hours) is rounded up to represent full days. :param int api_limit: Maximum number of domains per one IP address (if the value is None, no limit is applied) :param kwargs: Additional parameters to override base connector parameters (see :py:class:`PassiveDNSConnectorBase`) """ super().__init__(**kwargs) self._session = requests.Session() self._cfg_rec_validity = math.ceil(self._cfg_rec_validity / 24.0) * 24 self._cfg_api_limit = api_limit def _query_url(self, ip_addr): """ Create a query URL for a new HTTP Get request :param str ip_addr: IP address :return: Formatted URL address :rtype: str """ addr = ipaddress.ip_address(ip_addr) # Determine time range date2str = lambda date: date.strftime("%Y-%m-%d") date_start = date2str(datetime.now() - timedelta(hours=self._cfg_rec_validity)) date_end = date2str(datetime.now() + timedelta(days=1)) return self.API_SERVER + self.API_URL.format( ip_address = str(addr), start = date_start, end = date_end ) def _query_parser(self, json_txt): """ Process a JSON response retrieved from the PassiveDNS API Check validity of received DNS records and convert them into the internal format. :param str json_txt: Response from the PassiveDNS API :return: Parsed information about associated domain names (can be empty) :rtype: list of dict """ # Domain main sanitizer removes the last doc if present name_sanitizer = lambda name: name[:-1] if name[-1] == '.' else name # Timestamp parser converts date to number of seconds from Unix Epoch ts_parser = lambda ts: time.mktime(datetime.strptime(ts, "%Y-%m-%d").timetuple()) domains = [] try: data = json.loads(json_txt) for rec in data: name = name_sanitizer(str(rec["domain"])) ts_first = int(ts_parser(str(rec["time_first"]))) ts_last = int(ts_parser(str(rec["time_last"]))) rec_type = str(rec["type"]).upper() new_domain = self._create_rec(name, ts_first, ts_last, Type=rec_type) domains.append(new_domain) except json.decoder.JSONDecodeError as err: raise PassiveDNSConnectorError("Failed to parse JSON response: " + str(err)) from err except (KeyError, TypeError, ValueError) as err: raise PassiveDNSConnectorError("Unexpected response structure: " + str(err)) from err limit = self._cfg_api_limit if limit is not None and limit < len(domains): # Sort from the newest to the older and remove exceeding records cmp_fn = lambda rec: time.mktime(time.strptime(rec["LastSeenTime"], "%Y-%m-%dT%H:%M:%SZ")) domains.sort(key=cmp_fn) domains = domains[-self._cfg_api_limit:] return domains def _query_fn(self, ip_addr, timeout): """ PassiveDNS query function The function will send a request to a PassiveDNS API and return parsed records in the internal format or raise an exception. :param str ip_addr: IP address to query :param int timeout: Query timeout in seconds :return: Parsed domains as a list of internal records (can be empty) :rtype: list of dict """ url = self._query_url(ip_addr) try: response = self._session.get(url, timeout=timeout) ret_code = response.status_code except requests.exceptions.RequestException as err: raise PassiveDNSConnectorError("API request failed: " + str(err)) from err if ret_code == 200: # Success domains = self._query_parser(response.text) elif ret_code == 404: # IP address not found domains = [] else: err_msg = "Unexpected return code '{}' from the PassiveDNS server (request '{}').".format(ret_code, url) raise PassiveDNSConnectorError(err_msg) return domains
# ------------------------------------------------------------------------------------- def _format_results(source_id, pairs): """ Prepare a formatted result for an IDEA messsage. The function wraps each item in a new dictionary with identification of the type of IDEA enrichment block (key, type, reference, etc). :param str source_id: Identification string of the API provider :param dict [str, list of dict] pairs: IP address and their domains :return: Formatter result :rtype: list of dict """ res = [] for ip_addr, domains in pairs.items(): res.append({ "Key": str(ip_addr), "Type": ["PassiveDNS"], "Ref": str(source_id), "DNS": domains }) return res
[docs]class PassiveDNSCESNETEnricherPlugin(mentat.plugin.enricher.EnricherPlugin): """ Enricher plugin performing PassiveDNS lookup of all Source/IPx addresses using *CESNET* PassiveDNS service. """ SOURCE_ID = "https://passivedns.cesnet.cz/" def __init__(self): """ Initializer of the plugin """ self.connector = None
[docs] def setup(self, daemon, config_updates = None): """ Process configuration parameters and prepare PassiveDNS connector """ # Initialized connector self.connector = PassiveDNSConnectorCESNET(**config_updates) daemon.logger.info("Initialized '{}' enricher plugin: {}".format( self.__class__.__name__, pprint.pformat(self.connector.status()) ))
[docs] def process(self, daemon, message_id, message): """ Process and enrich given message. """ daemon.logger.debug("PassiveDNSCESNET - message '{}'".format(message_id)) sources = [] sources += jpath_values(message, 'Source.IP4') sources += jpath_values(message, 'Source.IP6') # Process only global IP addresses sources = [x for x in sources if ipaddress.ip_address(x).is_global] # Start PasssiveDNS lookup try: pairs = self.connector.query_multi(sources) except PassiveDNSConnectorError as err: daemon.logger.warn("PassiveDNSCESNET lookup failed: " + str(err)) return (daemon.FLAG_CONTINUE, self.FLAG_UNCHANGED) # Store results changed = False enrichments = _format_results(self.SOURCE_ID, pairs) if enrichments: data = jpath_values(message, "Enrich") data.extend(enrichments) jpath_set(message, "Enrich", data) daemon.logger.debug("Enriched message '{}' with attribute 'Enriched'".format(message_id)) changed = True return (daemon.FLAG_CONTINUE, changed)