Source code for mentat.daemon.component.test_storage

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------


import unittest
from unittest.mock import Mock, call
from pprint import pprint

import json
import difflib

import mentat.services.eventstorage
from mentat.daemon.component.storage import StorageDaemonComponent


mentat.services.eventstorage.init({
    "__core__database": {
        "eventstorage": {
            'dbname':   "mentat_utest",
            'user':     "mentat",
            'password': "mentat",
            'host':     "localhost",
            'port':     5432
        }
    }
})


[docs]class TestMentatDaemonStorage(unittest.TestCase): # # Turn on more verbose output, which includes print-out of constructed # objects. This will really clutter your console, usable only for test # debugging. # verbose = False # # Test IDEA message in raw form. # idea_raw = { "Category" : [ "Attempt.Login" ], "Target" : [ { "Proto" : [ "tcp", "ssh" ], "IP4" : [ "195.113.165.128/25" ], "Port" : [ "22" ], "Anonymised" : True } ], "ID" : "4dd7cf5e-4a95-49f6-8f04-947de998012c", "WinStartTime" : "2016-06-21 11:55:02Z", "ConnCount" : 2, "Source" : [ { "IP4" : [ "188.14.166.39" ] } ], "Note" : "SSH login attempt", "Node" : [ { "Type" : [ "Relay" ], "Name" : "cz.cesnet.mentat.warden_filer" }, { "SW" : [ "Kippo" ], "AggrWin" : "00:05:00", "Name" : "cz.uhk.apate.cowrie", "Type" : [ "Connection", "Honeypot", "Recon" ] } ], "_Mentat" : { "StorageTime" : "2016-06-21T14:00:07Z" }, "WinEndTime" : "2016-06-21 12:00:02Z", "Format" : "IDEA0", "DetectTime" : "2016-06-21 13:08:27Z" } def _clear_database(self): """ Helper method, clear all contents of test event table. """ storage = mentat.services.eventstorage.service() storage.database_drop() storage.database_create() def _build_daemon_mock(self): """ Helper method, build and return daemon mock object. """ daemon = Mock(config = { "__core__database": { "eventstorage": { 'dbname': "mentat_utest", 'user': "mentat", 'password': "mentat", 'host': "localhost", 'port': 5432 } } }) daemon.attach_mock( Mock( # Define required configuration values in order they are required # from daemon instance (ussually in setup() method). side_effect = [ True, 5, 1, ] ), 'c' ) return daemon #---------------------------------------------------------------------------
[docs] def setUp(self): self._clear_database() self.obj = StorageDaemonComponent()
[docs] def test_01_setup(self): """ Perform the component setup tests. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock() # Setup daemon component. self.obj.setup(daemon) if self.verbose: pprint(daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("[STATUS] Component 'storage': Set up event storage service."), call.info("[STATUS] Component 'storage': Using bulk commits with '5' as enforced commit interval"), call.info("[STATUS] Component 'storage': Using bulk commits with '1' as bulk commit threshold") ])
[docs] def test_02_process_internal(self): """ Perform the mentat.idea.internal.Idea message processing tests. """ self.maxDiff = None # Prepare mock object representing external daemon object. daemon = self._build_daemon_mock() # Setup daemon component. self.obj.setup(daemon) if self.verbose: pprint(daemon.mock_calls) # Setup was already tested, so reset the mock for less clutter. daemon.reset_mock() # This daemon component is expected to work with messages of either # mentat.idea.internal.Idea, or idea.lite.Idea class. idea_msg = mentat.idea.internal.Idea(self.idea_raw) idea_id = idea_msg['ID'] # Perform the actual processing. result = self.obj.cbk_event_message_process( daemon, { 'id': '{}.idea'.format(idea_id), 'idea_id': idea_id, 'data': json.dumps(self.idea_raw, sort_keys = True), 'idea': idea_msg } ) if self.verbose: pprint(result) pprint(daemon.mock_calls) daemon.logger.assert_has_calls([ call.debug("Component 'storage': Storing message '4dd7cf5e-4a95-49f6-8f04-947de998012c.idea':'4dd7cf5e-4a95-49f6-8f04-947de998012c'."), call.info("Component 'storage': Stored message '4dd7cf5e-4a95-49f6-8f04-947de998012c.idea':'4dd7cf5e-4a95-49f6-8f04-947de998012c' into database (bulk mode).") ]) # # Attempt to retrieve the message back from database, convert and compare. # storage = mentat.services.eventstorage.service() idea_out = storage.fetch_event(idea_id) if self.verbose: print("'mentat.idea.internal.Idea' out of PostgreSQL record based on 'mentat.idea.internal.Idea':") print(json.dumps(idea_out, indent=4, sort_keys=True, default=idea_out.json_default)) orig = json.dumps(idea_msg, indent=4, sort_keys=True, default=idea_msg.json_default) new = json.dumps(idea_out, indent=4, sort_keys=True, default=idea_out.json_default) self.assertEqual(orig, new, list(difflib.context_diff(orig.split('\n'), new.split('\n'))))
#------------------------------------------------------------------------------- if __name__ == "__main__": unittest.main()