#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------
"""
Unit test module for testing the :py:mod:`mentat.plugin.enricher.whois` module.
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import unittest
from unittest.mock import call
#
# Custom libraries
#
import mentat.services.sqlstorage
from mentat.plugin.enricher.testsuite import MentatEnricherPluginTestCase
from mentat.plugin.enricher.whois import WhoisEnricherPlugin
#-------------------------------------------------------------------------------
# NOTE: Sorry for the long lines in this file. They are deliberate, because the
# assertion permutations are (IMHO) more readable this way.
#-------------------------------------------------------------------------------
[docs]class TestMentatWhoisEnricherPlugin(MentatEnricherPluginTestCase):
"""
Unit test class for testing the :py:class:`mentat.plugin.enricher.whois.WhoisEnricherPlugin` class.
"""
networks_raw = [
{
"id" : 1,
"source" : "negistry",
"netname" : "MUNI-01",
"network" : "195.178.86.0-195.178.87.255",
"type" : "ipv4",
"resolved_abuses": ["abuse@muni.cz"]
},
{
"id" : 2,
"source" : "negistry",
"netname" : "MUNI-02",
"network" : "147.251.0.0-147.251.255.255",
"type" : "ipv4",
"resolved_abuses": ["abuse@muni.cz"]
},
{
"id" : 3,
"source" : "negistry",
"netname" : "MUNI-03",
"network" : "2001:718:800:5::/64",
"type" : "ipv6",
"resolved_abuses": ["abuse@muni.cz"]
},
{
"id" : 4,
"source" : "negistry",
"netname" : "CESNET-01",
"network" : "195.179.86.0-195.179.87.255",
"type" : "ipv4",
"resolved_abuses": ["abuse@cesnet.cz"]
},
{
"id" : 5,
"source" : "negistry",
"netname" : "CESNET-00",
"network" : "195.179.0.0-195.179.255.255",
"type" : "ipv4",
"resolved_abuses": ["abuse@cesnet.cz"]
},
{
"id" : 6,
"source" : "negistry",
"netname" : "CESNET-02",
"network" : "147.252.0.0-147.252.255.255",
"type" : "ipv4",
"resolved_abuses": ["abuse@cesnet.cz"]
},
{
"id" : 7,
"source" : "negistry",
"netname" : "CESNET-03",
"network" : "2001:718:1:6::/64",
"type" : "ipv6",
"resolved_abuses": ["abuse@cesnet.cz"]
}
]
[docs] def setUp(self):
# WARNING: Do not forget to call parent version of setUp() method !!!
super().setUp()
# Override settings for verbose output
self.verbose = False
self.plugin = WhoisEnricherPlugin()
mentat.services.sqlstorage.init(
{
"__core__database": {
"sqlstorage": {
"url": "postgresql://mentat:mentat@localhost/mentat_utest",
"echo": False
}
}
}
)
mentat.services.sqlstorage.service().database_drop()
mentat.services.sqlstorage.service().database_create()
groups = {}
for net in self.networks_raw:
obj = mentat.datatype.sqldb.NetworkModel()
obj.source = net['source']
obj.netname = net['netname']
obj.network = net['network']
for grp in net['resolved_abuses']:
if grp not in groups:
groups[grp] = mentat.datatype.sqldb.GroupModel(name = grp, source = 'manual', description = grp)
groups[grp].networks.append(obj)
mentat.services.sqlstorage.service().session.add_all(groups.values())
mentat.services.sqlstorage.service().session.commit()
mentat.services.sqlstorage.service().session.flush()
mentat.services.sqlstorage.close()
[docs] def tearDown(self):
# WARNING: Do not forget to call parent version of tearDown() method !!!
super().tearDown()
mentat.services.sqlstorage.service().session.commit()
mentat.services.sqlstorage.service().session.close()
mentat.services.sqlstorage.service().database_drop()
mentat.services.sqlstorage.close()
[docs] def test_01_setup(self):
"""
Perform test of plugin setup.
"""
self.maxDiff = None
mentat.services.sqlstorage.init(
{
"__core__database": {
"sqlstorage": {
"url": "postgresql://mentat:mentat@localhost/mentat_utest",
"echo": False
}
}
}
)
mapplication = self._build_application_mock([])
self.plugin.setup(mapplication)
self._verbose_print("TEST01: daemon mock calls after plugin 'setup'", mapplication.mock_calls)
#mapplication.assert_has_calls([
# call.logger.info("Initialized '%s' enricher plugin: %s", 'WhoisEnricherPlugin', "[{'count_ip4': 331, 'count_ip6': 0, 'name': 'FileWhoisModule'},\n {'count_ip4': 5, 'count_ip6': 2, 'name': 'SqldbWhoisModule'}]")
#])
[docs] def test_02_process_01(self):
"""
Perform tests of message processing - message that should be enriched.
"""
self.maxDiff = None
mentat.services.sqlstorage.init(
{
"__core__database": {
"sqlstorage": {
"url": "postgresql://mentat:mentat@localhost/mentat_utest",
"echo": False
}
}
}
)
mapplication = self._build_application_mock([])
msg = {
'Source': [
{'IP4': '195.179.86.50'}
]
}
self.plugin.setup(mapplication)
#mapplication.assert_has_calls([
# call.logger.info("Initialized '%s' enricher plugin: %s", 'WhoisEnricherPlugin', "[{'count_ip4': 331, 'count_ip6': 0, 'name': 'FileWhoisModule'},\n {'count_ip4': 5, 'count_ip6': 2, 'name': 'SqldbWhoisModule'}]")
#])
mapplication.reset_mock()
result = self.plugin.process(mapplication, 'msgid', msg)
self._verbose_print("TEST02: daemon mock calls after plugin 'setup'", mapplication.mock_calls)
self._verbose_print("TEST02: enriched message:", msg)
self._verbose_print("TEST02: enrichement result:", result)
mapplication.assert_has_calls([
call.logger.debug("WHOIS - processing message '%s'", 'msgid'),
call.logger.debug("WHOIS - Enriched message '%s' with attribute '_Mentat.ResolvedAbuses' and values %s", 'msgid', "['abuse@cesnet.cz']")
])
self.assertEqual(msg, {
'Source': [{'IP4': '195.179.86.50'}],
'_Mentat': {
'ResolvedAbuses': ['abuse@cesnet.cz']
}
})
self.assertEqual(result, (1, True))
[docs] def test_03_process_02(self):
"""
Perform tests of message processing - message that should not be enriched.
"""
self.maxDiff = None
mapplication = self._build_application_mock([])
msg = {
'Source': [
{'IP4': '192.168.1.1'}
]
}
self.plugin.setup(mapplication)
#mapplication.assert_has_calls([
# call.logger.info("Initialized '%s' enricher plugin: %s", 'WhoisEnricherPlugin', "[{'count_ip4': 331, 'count_ip6': 0, 'name': 'FileWhoisModule'},\n {'count_ip4': 5, 'count_ip6': 2, 'name': 'SqldbWhoisModule'}]")
#])
mapplication.reset_mock()
result = self.plugin.process(mapplication, 'msgid', msg)
self._verbose_print("TEST02: daemon mock calls after plugin 'setup'", mapplication.mock_calls)
self._verbose_print("TEST02: enriched message:", msg)
self._verbose_print("TEST02: enrichement result:", result)
mapplication.assert_has_calls([
call.logger.debug("WHOIS - processing message '%s'", 'msgid')
])
self.assertEqual(msg, {'Source': [{'IP4': '192.168.1.1'}]})
self.assertEqual(result, (1, False))
#-------------------------------------------------------------------------------
if __name__ == "__main__":
unittest.main()