Source code for mentat.daemon.component.test_inspector

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------


"""
Unit test module for testing the :py:mod:`mentat.daemon.component.inspector` module.
"""


__author__  = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"


import unittest
from unittest.mock import call

from mentat.daemon.component.testsuite import DaemonComponentTestCase
from mentat.daemon.component.inspector import InspectorDaemonComponent


[docs]class TestMentatDaemonInspector(DaemonComponentTestCase): """ Unit test class for testing the :py:mod:`mentat.daemon.component.inspector` module. """ results = { 'tag': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], 'TestTag': {'ValueA1': 'A2', 'ValueA2': ['A4', 'A5']}, '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } }, 'set': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], 'TestTag': {'ValueA1': 1466910407, 'ValueA2': [1466514007, 1466517607]}, '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } }, 'report': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } }, 'drop': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } }, 'dispatch': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } }, 'duplicate': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } }, 'log': { 'message01': { 'Category': ['Attempt.Login'], 'DetectTime': '2016-06-21T13:08:27Z', 'Format': 'IDEA0', 'ID': 'message01', 'Node': [{'Name': 'cz.uhk.apate.cowrie', 'SW': ['Kippo'], 'Type': ['Connection', 'Honeypot']}], 'Note': 'SSH login attempt', 'Source': [{'IP4': ['188.14.166.39']}], 'Target': [{'IP4': ['195.113.165.128/25'], 'Port': [22]}], '_CESNET': {'StorageTime': '2016-06-21T14:00:07Z'} } } }
[docs] def setUp(self): # WARNING: Do not forget to call parent version of setUp() method !!! super().setUp() # Override settings for verbose output self.verbose = False self.component = InspectorDaemonComponent()
[docs] def test_01_setup(self): """ Perform the component setup tests. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempts"', 'actions': [ {'action': 'tag', 'args': {'path': 'TestA.ValueA1', 'value': 'A1'}} ] }, { 'rule': 'Note EQ "SSH login attempt"', 'actions': [ {'action': 'tag', 'args': {'path': 'TestB.ValueB1', 'value': 'B3', 'overwrite': False}} ] }, ], # daemon.c(self.fallback_actions_key) [ {'action': 'tag', 'args': {'path': 'TestB.ValueB2[*]', 'value': 'B3', 'unique': True}} ] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST01: Daemon mock calls after component setup", daemon.mock_calls) daemon.c.assert_has_calls([ call('inspection_rules'), call('fallback_actions') ]) daemon.logger.info.assert_has_calls([ call('[STATUS] Component \'inspector\': Inspection rule action \'Note EQ "SSH login attempts"\':\'tag\' successfully loaded'), call('[STATUS] Component \'inspector\': Inspection rule \'Note EQ "SSH login attempts"\' successfully loaded'), call('[STATUS] Component \'inspector\': Inspection rule action \'Note EQ "SSH login attempt"\':\'tag\' successfully loaded'), call('[STATUS] Component \'inspector\': Inspection rule \'Note EQ "SSH login attempt"\' successfully loaded'), call('[STATUS] Component \'inspector\': Fallback action \'tag\' successfully loaded') ])
[docs] def test_02_tag(self): """ Perform tests of 'tag' action. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempt"', 'name': 'rule_01', 'actions': [ {'action': 'tag', 'name': 'action_01', 'args': {'path': 'TestTag.ValueA1', 'value': 'A1'}}, {'action': 'tag', 'name': 'action_02', 'args': {'path': 'TestTag.ValueA1', 'value': 'A2'}}, {'action': 'tag', 'name': 'action_03', 'args': {'path': 'TestTag.ValueA1', 'value': 'A3', 'overwrite': False}}, {'action': 'tag', 'name': 'action_04', 'args': {'path': 'TestTag.ValueA2[*]', 'value': 'A4', 'unique': True}}, {'action': 'tag', 'name': 'action_05', 'args': {'path': 'TestTag.ValueA2[*]', 'value': 'A5', 'unique': True}}, {'action': 'tag', 'name': 'action_06', 'args': {'path': 'TestTag.ValueA2[*]', 'value': 'A5', 'unique': True}}, ] }, ], # daemon.c(self.fallback_actions_key) [] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST02: Daemon mock calls after component setup", daemon.mock_calls) daemon.logger.info.assert_has_calls([ call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_01' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_02' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_03' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_04' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_05' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_06' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule 'rule_01' successfully loaded"), ]) # Reset mock, so that further testing is more simple. daemon.reset_mock() # Attempt to process the message and analyze the result. for tmsg in self.messages: result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) self._verbose_print("TEST02: Result after processing JSON raw message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['tag']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) self._verbose_print("TEST02: Result after processing idea.lite message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['tag']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) self._verbose_print("TEST02: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['tag']) self._verbose_print("TEST02: Daemon mock calls after message processing", daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A1' key successfully set (O:True U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A2' key successfully set (O:True U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A3' key already exists, not overwriting (O:False U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A4' key successfully set (O:True U:True)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A5' key successfully set (O:True U:True)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A5' value already exists, not inserting (O:True U:True)"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A1' key successfully set (O:True U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A2' key successfully set (O:True U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A3' key already exists, not overwriting (O:False U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A4' key successfully set (O:True U:True)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A5' key successfully set (O:True U:True)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A5' value already exists, not inserting (O:True U:True)"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A1' key successfully set (O:True U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A2' key successfully set (O:True U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA1':'A3' key already exists, not overwriting (O:False U:False)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A4' key successfully set (O:True U:True)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A5' key successfully set (O:True U:True)"), call.debug("Component 'inspector': Tag - Message 'message01' path 'TestTag.ValueA2[*]':'A5' value already exists, not inserting (O:True U:True)") ])
#daemon.queue.schedule_after.assert_called_with(10, 'message_next') #def test_03_set(self): #""" #Perform tests of 'set' action. #""" #self.maxDiff = None ## Prepare mock object representing external daemon object. #daemon = self._build_daemon_mock([ ## daemon.c(self.inspection_rules_key) #[ #{ #'rule': 'Note EQ "SSH login attempt"', #'name': 'rule_01', #'actions': [ #{'action': 'set', 'name': 'action_01', 'args': {'path': 'TestTag.ValueA1', 'expression': '_CESNET.StorageTime + 3600'}}, #{'action': 'set', 'name': 'action_02', 'args': {'path': 'TestTag.ValueA1', 'expression': '_CESNET.StorageTime + 400000'}}, #{'action': 'set', 'name': 'action_03', 'args': {'path': 'TestTag.ValueA1', 'expression': '_CESNET.StorageTime + 800000', 'overwrite': False}}, #{'action': 'set', 'name': 'action_04', 'args': {'path': 'TestTag.ValueA2[*]', 'expression': '_CESNET.StorageTime + 3600', 'unique': True}}, #{'action': 'set', 'name': 'action_05', 'args': {'path': 'TestTag.ValueA2[*]', 'expression': '_CESNET.StorageTime + 7200', 'unique': True}}, #{'action': 'set', 'name': 'action_06', 'args': {'path': 'TestTag.ValueA2[*]', 'expression': '_CESNET.StorageTime + 7200', 'unique': True}}, #] #}, #], ## daemon.c(self.fallback_actions_key) #[] #]) ## Setup daemon component. #self.component.setup(daemon) #self._verbose_print("TEST03: Daemon mock calls after component setup", daemon.mock_calls) #daemon.logger.info.assert_has_calls([ #call("Inspection rule action 'rule_01':'action_01' successfully loaded"), #call("Inspection rule action 'rule_01':'action_02' successfully loaded"), #call("Inspection rule action 'rule_01':'action_03' successfully loaded"), #call("Inspection rule action 'rule_01':'action_04' successfully loaded"), #call("Inspection rule action 'rule_01':'action_05' successfully loaded"), #call("Inspection rule action 'rule_01':'action_06' successfully loaded"), #call("Inspection rule 'rule_01' successfully loaded"), #]) ## Reset mock, so that further testing is more simple. #daemon.reset_mock() ## Attempt to process the message and analyze the result. #for tmsg in self.messages: #result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) #self._verbose_print("TEST03: Result after processing JSON raw message '{}'".format(tmsg['id']), result) #self._assert_result(result[1], self.results['set']) #result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) #self._verbose_print("TEST03: Result after processing idea.lite message '{}'".format(tmsg['id']), result) #self._assert_result(result[1], self.results['set']) #result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) #self._verbose_print("TEST03: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) #self._assert_result(result[1], self.results['set']) #self._verbose_print("TEST03: Daemon mock calls after message processing", daemon.mock_calls) #daemon.logger.assert_has_calls([ #call.info("Inspecting message 'message01':'message01'"), #call.info("Message 'message01':'message01' matched inspection rule 'rule_01'"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 3600'=>1466514007 key successfully set (O:True U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 400000'=>1466910407 key successfully set (O:True U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 800000'=>1467310407 key already exists, not overwriting (O:False U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 3600'=>1466514007 key successfully set (O:True U:True)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 7200'=>1466517607 key successfully set (O:True U:True)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 7200'=>1466517607 value already exists, not inserting (O:True U:True)"), #call.info("Inspecting message 'message01':'message01'"), #call.info("Message 'message01':'message01' matched inspection rule 'rule_01'"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 3600'=>1466514007 key successfully set (O:True U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 400000'=>1466910407 key successfully set (O:True U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 800000'=>1467310407 key already exists, not overwriting (O:False U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 3600'=>1466514007 key successfully set (O:True U:True)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 7200'=>1466517607 key successfully set (O:True U:True)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 7200'=>1466517607 value already exists, not inserting (O:True U:True)"), #call.info("Inspecting message 'message01':'message01'"), #call.info("Message 'message01':'message01' matched inspection rule 'rule_01'"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 3600'=>1466514007 key successfully set (O:True U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 400000'=>1466910407 key successfully set (O:True U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA1':'_CESNET.StorageTime + 800000'=>1467310407 key already exists, not overwriting (O:False U:False)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 3600'=>1466514007 key successfully set (O:True U:True)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 7200'=>1466517607 key successfully set (O:True U:True)"), #call.debug("Set - Message 'message01' path 'TestTag.ValueA2[*]':'_CESNET.StorageTime + 7200'=>1466517607 value already exists, not inserting (O:True U:True)") #])
[docs] def test_04_report(self): """ Perform tests of 'report' action. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempt"', 'name': 'rule_01', 'actions': [ {'action': 'report', 'name': 'action_01', 'args': {'to': 'recipient@organization.com', 'from': 'sender@organization.com', 'subject': 'Inspection alert', 'report_type': 'inspection_alert'}} ] }, ], # daemon.c(self.fallback_actions_key) [] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST04: Daemon mock calls after component setup", daemon.mock_calls) daemon.logger.info.assert_has_calls([ call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_01' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule 'rule_01' successfully loaded"), ]) # Reset mock, so that further testing is more simple. daemon.reset_mock() # Attempt to process the message and analyze the result. for tmsg in self.messages: result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) self._verbose_print("TEST04: Result after processing JSON raw message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['report']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) self._verbose_print("TEST04: Result after processing idea.lite message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['report']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) self._verbose_print("TEST04: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['report']) self._verbose_print("TEST04: Daemon mock calls after message processing", daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Report - Message 'message01' to 'recipient@organization.com' with subject 'Inspection alert'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Report - Message 'message01' to 'recipient@organization.com' with subject 'Inspection alert'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Report - Message 'message01' to 'recipient@organization.com' with subject 'Inspection alert'") ])
[docs] def test_05_drop(self): """ Perform tests of 'drop' action. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempt"', 'name': 'rule_01', 'actions': [ {'action': 'drop', 'name': 'action_01'} ] }, ], # daemon.c(self.fallback_actions_key) [] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST05: Daemon mock calls after component setup", daemon.mock_calls) daemon.logger.info.assert_has_calls([ call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_01' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule 'rule_01' successfully loaded"), ]) # Reset mock, so that further testing is more simple. daemon.reset_mock() # Attempt to process the message and analyze the result. for tmsg in self.messages: result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) self._verbose_print("TEST05: Result after processing JSON raw message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['drop']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) self._verbose_print("TEST05: Result after processing idea.lite message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['drop']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) self._verbose_print("TEST05: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['drop']) self._verbose_print("TEST05: Daemon mock calls after message processing", daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Drop - Message 'message01':'message01': 'Inspection rule matched'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Drop - Message 'message01':'message01': 'Inspection rule matched'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Drop - Message 'message01':'message01': 'Inspection rule matched'"), ])
[docs] def test_06_dispatch(self): """ Perform tests of 'dispatch' action. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempt"', 'name': 'rule_01', 'actions': [ {'action': 'dispatch', 'name': 'action_01', 'args': {'queue_tgt': '/var/tmp'}} ] }, ], # daemon.c(self.fallback_actions_key) [] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST06: Daemon mock calls after component setup", daemon.mock_calls) daemon.logger.info.assert_has_calls([ call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_01' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule 'rule_01' successfully loaded"), ]) # Reset mock, so that further testing is more simple. daemon.reset_mock() # Attempt to process the message and analyze the result. for tmsg in self.messages: result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) self._verbose_print("TEST06: Result after processing JSON raw message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['dispatch']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) self._verbose_print("TEST06: Result after processing idea.lite message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['dispatch']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) self._verbose_print("TEST06: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['dispatch']) self._verbose_print("TEST06: Daemon mock calls after message processing", daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Dispatch - Message 'message01' to target queue '/var/tmp'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Dispatch - Message 'message01' to target queue '/var/tmp'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Dispatch - Message 'message01' to target queue '/var/tmp'") ]) daemon.queue.assert_has_calls([ call.schedule('message_dispatch', {'queue_tgt': '/var/tmp', 'idea_id': 'message01', 'id': 'message01'}), call.schedule('message_dispatch', {'queue_tgt': '/var/tmp', 'idea_id': 'message01', 'id': 'message01'}), call.schedule('message_dispatch', {'queue_tgt': '/var/tmp', 'idea_id': 'message01', 'id': 'message01'}) ])
[docs] def test_06_duplicate(self): """ Perform tests of 'duplicate' action. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempt"', 'name': 'rule_01', 'actions': [ {'action': 'duplicate', 'name': 'action_01', 'args': {'queue_tgt': '/var/tmp'}} ] }, ], # daemon.c(self.fallback_actions_key) [] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST06: Daemon mock calls after component setup", daemon.mock_calls) daemon.logger.info.assert_has_calls([ call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_01' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule 'rule_01' successfully loaded"), ]) # Reset mock, so that further testing is more simple. daemon.reset_mock() # Attempt to process the message and analyze the result. for tmsg in self.messages: result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) self._verbose_print("TEST06: Result after processing JSON raw message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['duplicate']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) self._verbose_print("TEST06: Result after processing idea.lite message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['duplicate']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) self._verbose_print("TEST06: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['duplicate']) self._verbose_print("TEST06: Daemon mock calls after message processing", daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Duplicate - Message 'message01' to target queue '/var/tmp'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Duplicate - Message 'message01' to target queue '/var/tmp'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.debug("Component 'inspector': Duplicate - Message 'message01' to target queue '/var/tmp'") ]) daemon.queue.assert_has_calls([ call.schedule('message_duplicate', {'queue_tgt': '/var/tmp', 'idea_id': 'message01', 'id': 'message01'}), call.schedule('message_duplicate', {'queue_tgt': '/var/tmp', 'idea_id': 'message01', 'id': 'message01'}), call.schedule('message_duplicate', {'queue_tgt': '/var/tmp', 'idea_id': 'message01', 'id': 'message01'}) ])
[docs] def test_07_log(self): """ Perform tests of 'log' action. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock([ # daemon.c(self.inspection_rules_key) [ { 'rule': 'Note EQ "SSH login attempt"', 'name': 'rule_01', 'actions': [ {'action': 'log', 'name': 'action_01', 'args': {'label': 'TEST LABEL'}} ] }, ], # daemon.c(self.fallback_actions_key) [] ]) # Setup daemon component. self.component.setup(daemon) self._verbose_print("TEST07: Daemon mock calls after component setup", daemon.mock_calls) daemon.logger.info.assert_has_calls([ call("[STATUS] Component 'inspector': Inspection rule action 'rule_01':'action_01' successfully loaded"), call("[STATUS] Component 'inspector': Inspection rule 'rule_01' successfully loaded"), ]) # Reset mock, so that further testing is more simple. daemon.reset_mock() # Attempt to process the message and analyze the result. for tmsg in self.messages: result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['raw']}) self._verbose_print("TEST07: Result after processing JSON raw message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['log']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['lite']}) self._verbose_print("TEST07: Result after processing idea.lite message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['log']) result = self.component.cbk_event_message_process(daemon, {'id': tmsg['id'], 'idea_id': tmsg['id'], 'idea': tmsg['internal']}) self._verbose_print("TEST07: Result after processing mentat.idea.internal message '{}'".format(tmsg['id']), result) self._assert_result(result[1], self.results['log']) self._verbose_print("TEST07: Daemon mock calls after message processing", daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.warning("Component 'inspector': Log - Message 'message01':'message01': 'TEST LABEL'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.warning("Component 'inspector': Log - Message 'message01':'message01': 'TEST LABEL'"), call.debug("Component 'inspector': Inspecting message 'message01':'message01'"), call.info("Component 'inspector': Message 'message01':'message01' matched inspection rule 'rule_01'"), call.warning("Component 'inspector': Log - Message 'message01':'message01': 'TEST LABEL'") ])
#------------------------------------------------------------------------------- if __name__ == "__main__": unittest.main()