Source code for mentat.daemon.component.testsuite

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of Mentat system (
# Copyright (C) since 2011 CESNET, z.s.p.o (
# Use of this source is governed by the MIT license, see LICENSE file.

import unittest
from unittest.mock import Mock
from pprint import pformat

import os
import json
import copy
import difflib

import pyzenkit
import idea.lite
import mentat.daemon.piper
import mentat.idea.internal

messages_raw = [
            "ID" : "message01",
            "Category" : [
            "Target" : [
                    "IP4" : [
                    "Port" : [
            "Source" : [
                    "IP4" : [
            "Note" : "SSH login attempt",
            "Node" : [
                    "SW" : [
                    "Name" : "cz.uhk.apate.cowrie",
                    "Type" : [
            "_Mentat" : {
                "StorageTime" : "2016-06-21T14:00:07Z"
            "Format" : "IDEA0",
            "DetectTime" : "2016-06-21T13:08:27Z"

def _load_messages():
    result = []
    for msg in messages_raw:
        msgid = msg['ID']
        msgc = copy.deepcopy(msg)
            'id':       msgid,
            'raw':      msgc,
            'lite':     idea.lite.Idea(msgc),
            'internal': mentat.idea.internal.Idea(msgc)
    return result

[docs]class DaemonComponentTestCase(unittest.TestCase):
[docs] def setUp(self): # # Turn on more verbose output, which includes print-out of constructed # objects. This will really clutter your console, usable only for test # debugging. # self.verbose = False cdir_path = os.path.dirname(os.path.realpath(__file__)) self.core_config = pyzenkit.jsonconf.config_load_dir( os.path.realpath( os.path.join(cdir_path, '../../../../conf/core') ) ) # Process all test messages into expected data structure. self.messages = _load_messages()
def _verbose_print(self, label, data): """ Helper method, print additional information in verbose mode. """ if self.verbose: print("{}\n{}\n".format(label, pformat(data))) def _assert_result(self, result, expected_results): """ Compare given result with expected results. """ res = result['idea'] if isinstance(res, idea.base.IdeaBase): res = json.dumps(res, indent=4, sort_keys=True, default=res.json_default) else: res = json.dumps(res, indent=4, sort_keys=True) exp = expected_results[result['idea_id']] exp = json.dumps(exp, indent=4, sort_keys=True) self.assertEqual(res, exp, list(difflib.context_diff(res.split('\n'), exp.split('\n')))) def _build_daemon_mock(self, config_list, core_config_list = None): """ Helper method, build and return daemon mock object. """ daemon = Mock( config = self.core_config, CORE_FILEQUEUE = mentat.daemon.piper.PiperDaemon.CORE_FILEQUEUE, CONFIG_QUEUE_IN_DIR = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_IN_DIR, CONFIG_QUEUE_IN_WAIT = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_IN_WAIT, CONFIG_QUEUE_OUT_DIR = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_OUT_DIR, CONFIG_QUEUE_OUT_LIMIT = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_OUT_LIMIT, CONFIG_QUEUE_OUT_WAIT = mentat.daemon.piper.PiperDaemon.CONFIG_QUEUE_OUT_WAIT, is_done = lambda: False ) daemon.attach_mock( Mock( # Define required configuration values in order they are required # from daemon instance (ussually in setup() method). side_effect = config_list ), 'c' ) if core_config_list: daemon.attach_mock( Mock( # Define required configuration values in order they are required # from daemon instance (ussually in setup() method). side_effect = core_config_list ), 'cc' ) return daemon