Source code for mentat.daemon.component.testsuite

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------

import unittest
from unittest.mock import Mock
from pprint import pformat

import os
import json
import copy
import difflib

import pyzenkit
import idea.lite
import mentat.daemon.piper
import mentat.idea.internal

messages_raw = [
        {
            "ID" : "message01",
            "Category" : [
                "Attempt.Login"
            ],
            "Target" : [
                {
                    "IP4" : [
                        "195.113.165.128/25"
                    ],
                    "Port" : [
                        22
                    ]
                }
            ],
            "Source" : [
                {
                    "IP4" : [
                        "188.14.166.39"
                    ]
                }
            ],
            "Note" : "SSH login attempt",
            "Node" : [
                {
                    "SW" : [
                        "Kippo"
                    ],
                    "Name" : "cz.uhk.apate.cowrie",
                    "Type" : [
                        "Connection",
                        "Honeypot"
                    ]
                }
            ],
            "_Mentat" : {
                "StorageTime" : "2016-06-21T14:00:07Z"
            },
            "Format" : "IDEA0",
            "DetectTime" : "2016-06-21T13:08:27Z"
        }
    ]

def _load_messages():
    result = []
    for msg in messages_raw:
        msgid = msg['ID']
        msgc = copy.deepcopy(msg)
        result.append({
            'id':       msgid,
            'raw':      msgc,
            'lite':     idea.lite.Idea(msgc),
            'internal': mentat.idea.internal.Idea(msgc)
        })
    return result

[docs]class DaemonComponentTestCase(unittest.TestCase):
[docs] def setUp(self): # # Turn on more verbose output, which includes print-out of constructed # objects. This will really clutter your console, usable only for test # debugging. # self.verbose = False cdir_path = os.path.dirname(os.path.realpath(__file__)) self.core_config = pyzenkit.jsonconf.config_load_dir( os.path.realpath( os.path.join(cdir_path, '../../../../conf/core') ) ) # Process all test messages into expected data structure. self.messages = _load_messages()
def _verbose_print(self, label, data): """ Helper method, print additional information in verbose mode. """ if self.verbose: print("{}\n{}\n".format(label, pformat(data))) def _assert_result(self, result, expected_results): """ Compare given result with expected results. """ res = result['idea'] if isinstance(res, idea.base.IdeaBase): res = json.dumps(res, indent=4, sort_keys=True, default=res.json_default) else: res = json.dumps(res, indent=4, sort_keys=True) exp = expected_results[result['idea_id']] exp = json.dumps(exp, indent=4, sort_keys=True) self.assertEqual(res, exp, list(difflib.context_diff(res.split('\n'), exp.split('\n')))) def _build_daemon_mock(self, config_list, core_config_list = None): """ Helper method, build and return daemon mock object. """ daemon = Mock( config = self.core_config, CORE_FILEQUEUE = mentat.daemon.piper.PiperDaemon.CORE_FILEQUEUE, CONFIG_QUEUE_IN_DIR = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_IN_DIR, CONFIG_QUEUE_IN_WAIT = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_IN_WAIT, CONFIG_QUEUE_OUT_DIR = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_OUT_DIR, CONFIG_QUEUE_OUT_LIMIT = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_OUT_LIMIT, CONFIG_QUEUE_OUT_WAIT = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_OUT_WAIT, is_done = lambda: False ) daemon.attach_mock( Mock( # Define required configuration values in order they are required # from daemon instance (ussually in setup() method). side_effect = config_list ), 'c' ) if core_config_list: daemon.attach_mock( Mock( # Define required configuration values in order they are required # from daemon instance (ussually in setup() method). side_effect = core_config_list ), 'cc' ) return daemon