#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------
import datetime
import unittest
from pprint import pprint
import pytz
import mentat.stats.idea
import mentat.datatype.sqldb
[docs]class TestMentatStatsIdea(unittest.TestCase):
#
# Turn on more verbose output, which includes print-out of constructed
# objects. This will really clutter your console, usable only for test
# debugging.
#
verbose = False
ideas_raw = [
{
"Format": "IDEA0",
"ID": "msg01",
"CreateTime": "2012-11-03T10:00:02Z",
"DetectTime": "2012-11-03T10:00:07Z",
"Category": ["Fraud.Phishing"],
"Source": [
{
"Type": ["Phishing"],
"IP4": ["192.168.0.2-192.168.0.5", "192.168.0.0/25"],
"IP6": ["2001:db8::ff00:42:0/112"]
}
],
"Target": [
{
"IP4": ["192.168.1.2-192.168.1.5", "192.168.1.0/25"],
"IP6": ["2001:db8::ff01:42:0/112"]
}
],
"Node": [
{
"Name": "org.example.kippo",
"Tags": ["Protocol", "Honeypot"],
"SW": ["Kippo"]
}
],
"_Mentat" : {
"ResolvedAbuses" : [
"abuse@cesnet.cz"
]
}
},
{
"Format": "IDEA0",
"ID": "msg02",
"CreateTime": "2012-11-03T11:00:02Z",
"DetectTime": "2012-11-03T11:00:07Z",
"Category": ["Fraud.Phishing"],
"Source": [
{
"Type": ["Phishing"],
"IP4": ["192.168.0.2-192.168.0.5", "192.168.0.0/25"],
"IP6": ["2001:db8::ff00:42:0/112"]
}
],
"Node": [
{
"Name": "org.example.kippo",
"Tags": ["Protocol", "Honeypot"],
"SW": ["Kippo"]
}
],
"_Mentat" : {
"ResolvedAbuses" : [
"abuse@cesnet.cz"
]
}
},
{
"Format": "IDEA0",
"ID": "msg03",
"CreateTime": "2012-11-03T12:00:02Z",
"DetectTime": "2012-11-03T12:00:07Z",
"Category": ["Fraud.Phishing"],
"Source": [
{
"Type": ["Phishing"],
"IP4": ["192.168.0.2-192.168.0.5", "192.168.0.0/25"],
"IP6": ["2001:db8::ff00:42:0/112"]
}
],
"Target": [
{
"IP4": ["192.168.1.2-192.168.1.5", "192.168.1.0/25"],
"IP6": ["2001:db8::ff01:42:0/112"]
}
],
"Node": [
{
"Name": "org.example.dionaea",
"Tags": ["Protocol", "Honeypot"],
"SW": ["Kippo"]
}
],
"_Mentat" : {
"ResolvedAbuses" : [
"abuse@cesnet.cz"
]
}
},
{
"Format": "IDEA0",
"ID": "msg04",
"CreateTime": "2012-11-03T15:00:02Z",
"DetectTime": "2012-11-03T15:00:07Z",
"Category": ["Spam"],
"Source": [
{
"Type": ["Spam"],
"IP4": ["192.168.0.100", "192.168.0.105"]
}
],
"Node": [
{
"Name": "org.example.dionaea",
"Tags": ["Protocol", "Honeypot"],
"SW": ["Dionaea"]
}
]
},
{
"Format": "IDEA0",
"ID": "msg05",
"CreateTime": "2012-11-03T18:00:02Z",
"DetectTime": "2012-11-03T18:00:07Z",
"Category": ["Exploit"],
"Source": [
{
"Type": ["Exploit"],
"IP4": ["192.168.0.109", "192.168.0.200"]
}
],
"Target": [
{
"IP4": ["192.168.1.109", "192.168.1.200"]
}
],
"Node": [
{
"Name": "org.example.labrea",
"Tags": ["Protocol", "Honeypot"],
"SW": ["LaBrea"]
}
],
"_Mentat" : {
"ResolvedAbuses" : [
"abuse@cesnet.cz"
]
}
}
,
{
"Format": "IDEA0",
"ID": "msg06",
"CreateTime": "2012-11-03T18:00:02Z",
"DetectTime": "2012-11-03T18:00:07Z",
"Category": ["Exploit"],
"Source": [
{
"Type": ["Exploit"],
"IP4": ["192.172.0.109", "192.172.0.200"]
}
],
"Node": [
{
"Name": "org.example.labrea",
"Tags": ["Protocol", "Honeypot"],
"SW": ["LaBrea"]
},
{
"SW" : [
"Beekeeper"
],
"Name" : "cz.cesnet.holly"
}
]
}
]
[docs] def test_01_counter_inc(self):
"""
Test counter incrementation utility.
"""
self.maxDiff = None
test = {}
self.assertEqual(mentat.stats.idea._counter_inc(test, 'x', 'a'), {'x': {'a': 1}}) # pylint: disable=locally-disabled,protected-access
self.assertEqual(mentat.stats.idea._counter_inc(test, 'x', 'a'), {'x': {'a': 2}}) # pylint: disable=locally-disabled,protected-access
self.assertEqual(mentat.stats.idea._counter_inc(test, 'x', 'a'), {'x': {'a': 3}}) # pylint: disable=locally-disabled,protected-access
self.assertEqual(mentat.stats.idea._counter_inc(test, 'x', 'a', 5), {'x': {'a': 8}}) # pylint: disable=locally-disabled,protected-access
[docs] def test_02_make_toplist(self):
"""
Test toplist creation utility.
"""
self.maxDiff = None
test1 = {
'detectors': {
'org.example.holly': 1,
'org.example.rimmer': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.dionaea': 5,
'org.example.kippo': 3,
'org.example.labrea': 2
}
}
self.assertEqual(mentat.stats.idea._make_toplist(test1, 'detectors', 5), { # pylint: disable=locally-disabled,protected-access
'detectors': {
'org.example.holly': 1,
'org.example.rimmer': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.dionaea': 5,
'org.example.kippo': 3,
'org.example.labrea': 2
}
})
test2 = {
'detectors': {
'__REST__': 50,
'org.example.rimmer': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.dionaea': 5,
'org.example.kippo': 3,
'org.example.labrea': 2
}
}
self.assertEqual(mentat.stats.idea._make_toplist(test2, 'detectors', 5, True), { # pylint: disable=locally-disabled,protected-access
'detectors': {
'__REST__': 55,
'org.example.dionaea': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.rimmer': 5
}
})
test3 = {
'detectors': {
'__REST__': 50,
'org.example.rimmer': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
}
}
self.assertEqual(mentat.stats.idea._make_toplist(test3, 'detectors', 5), { # pylint: disable=locally-disabled,protected-access
'detectors': {
'__REST__': 50,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.rimmer': 5
}
})
test4 = {
'ip4s': {
'org.example.holly': 1,
'org.example.rimmer': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.dionaea': 5,
'org.example.kippo': 3,
'org.example.labrea': 2
}
}
self.assertEqual(mentat.stats.idea._make_toplist(test4, 'ip4s', 5), { # pylint: disable=locally-disabled,protected-access
'ip4s': {
'__REST__': 6,
'org.example.dionaea': 5,
'org.example.kryten': 10,
'org.example.queeg': 20,
'org.example.rimmer': 5
}
})
[docs] def test_03_datetime_rounding(self):
"""Test datetime rounding"""
self.maxDiff = None
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(hours=1),
direction='up'
),
datetime.datetime(2022, 10, 11, 12, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(hours=1),
direction='down'
),
datetime.datetime(2022, 10, 11, 11, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(hours=1)
),
datetime.datetime(2022, 10, 11, 12, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(minutes=5),
direction='up'
),
datetime.datetime(2022, 10, 11, 11, 35, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(minutes=5),
direction='down'
),
datetime.datetime(2022, 10, 11, 11, 30, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(minutes=5)
),
datetime.datetime(2022, 10, 11, 11, 30, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(hours=1),
timezone=pytz.timezone('Europe/Prague'),
direction='up'
),
datetime.datetime(2022, 10, 11, 12, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(hours=1),
timezone=pytz.timezone('Europe/Prague'),
direction='down'
),
datetime.datetime(2022, 10, 11, 11, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(hours=1),
timezone=pytz.timezone('Europe/Prague')
),
datetime.datetime(2022, 10, 11, 12, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(days=1),
timezone=pytz.timezone('Europe/Prague'),
direction='up'
),
datetime.datetime(2022, 10, 11, 23, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(days=1),
timezone=pytz.timezone('Europe/Prague'),
direction='down'
),
datetime.datetime(2022, 10, 10, 23, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(days=1),
timezone=pytz.timezone('Europe/Prague')
),
datetime.datetime(2022, 10, 11, 23, 0, 0)
)
self.assertEqual(
mentat.stats.idea._round_datetime( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 11, 11, 32, 12),
datetime.timedelta(days=1)
),
datetime.datetime(2022, 10, 11, 0, 0, 0)
)
[docs] def test_04_timedelta_rounding(self):
"""
Test rounding of timedelta
"""
self.maxDiff = None
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(seconds=42),
datetime.timedelta(seconds=10),
direction='up'
),
datetime.timedelta(seconds=50)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(seconds=42),
datetime.timedelta(seconds=10),
direction='down'
),
datetime.timedelta(seconds=40)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(seconds=42),
datetime.timedelta(seconds=10)
),
datetime.timedelta(seconds=40)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(microseconds=687231),
datetime.timedelta(microseconds=500),
direction='up'
),
datetime.timedelta(microseconds=687500)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(microseconds=687231),
datetime.timedelta(microseconds=500),
direction='down'
),
datetime.timedelta(microseconds=687000)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(microseconds=687231),
datetime.timedelta(microseconds=500)
),
datetime.timedelta(microseconds=687000)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(microseconds=687231),
datetime.timedelta(seconds=2),
direction='up'
),
datetime.timedelta(seconds=2)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(microseconds=687231),
datetime.timedelta(seconds=2),
direction='down'
),
datetime.timedelta(seconds=0)
)
self.assertEqual(
mentat.stats.idea._round_timedelta( # pylint: disable=locally-disabled,protected-access
datetime.timedelta(microseconds=687231),
datetime.timedelta(seconds=2)
),
datetime.timedelta(seconds=0)
)
[docs] def test_05_timeline_steps(self):
"""
Test timeline step calculations.
"""
self.maxDiff = None
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2018, 1, 1, 1, 11, 1),
datetime.datetime(2018, 1, 11, 23, 59, 31),
100
),
(
datetime.datetime(2018, 1, 1, 3, 0, 0),
datetime.timedelta(seconds=10800),
88
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.datetime(2022, 10, 24, 15, 22, 32),
42
),
(
datetime.datetime(2022, 10, 24, 15, 17, 10),
datetime.timedelta(seconds=10),
34
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.datetime(2022, 10, 24, 15, 18, 23),
400
),
(
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.timedelta(microseconds=200000),
390
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.datetime(2022, 10, 24, 15, 18, 23),
400,
1
),
(
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.timedelta(seconds=1),
78
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.datetime(2022, 10, 24, 15, 18, 23),
400,
0.314159
),
(
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.timedelta(microseconds=500000),
156
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.datetime(2022, 10, 24, 15, 17, 5),
200
),
(
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.timedelta(microseconds=1),
0
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.datetime(2022, 10, 24, 15, 17, 5),
42,
1
),
(
datetime.datetime(2022, 10, 24, 15, 17, 5),
datetime.timedelta(seconds=1),
0
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 0),
datetime.datetime(2022, 10, 24, 15, 23, 40),
200
),
(
datetime.datetime(2022, 10, 24, 15, 17, 0),
datetime.timedelta(seconds=2),
200
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=locally-disabled,protected-access
datetime.datetime(2022, 10, 24, 15, 17, 1),
datetime.datetime(2022, 10, 24, 15, 23, 41),
200
),
(
datetime.datetime(2022, 10, 24, 15, 17, 3),
datetime.timedelta(seconds=3),
134
)
)
self.assertEqual(
mentat.stats.idea._calculate_timeline_steps( # pylint: disable=protected-access
datetime.datetime(2022, 6, 2, 2, 17, 1),
datetime.datetime(2022, 12, 9, 12, 28, 34),
200,
timezone=pytz.timezone('Canada/Newfoundland')
),
(
datetime.datetime(2022, 6, 2, 3, 30, 0),
datetime.timedelta(days=1),
192
)
)
[docs] def test_06_calc_timeline_cfg(self):
"""
Test timeline config calculations.
"""
[docs] def test_07_evaluate_events(self):
"""
Perform the message evaluation tests.
"""
self.maxDiff = None
self.assertEqual(mentat.stats.idea.evaluate_events(self.ideas_raw), {
'abuses': {'__unknown__': 2, 'abuse@cesnet.cz': 4},
'analyzers': {'Beekeeper': 1, 'Dionaea': 1, 'Kippo': 3, 'LaBrea': 1},
'asns': {'__unknown__': 6},
'categories': {'Exploit': 2, 'Fraud.Phishing': 3, 'Spam': 1},
'category_sets': {'Exploit': 2, 'Fraud.Phishing': 3, 'Spam': 1},
'classes': {'__unknown__': 6},
'cnt_alerts': 6,
'cnt_events': 6,
'cnt_recurring': 0,
'cnt_unique': 6,
'countries': {'__unknown__': 6},
'detectors': {
'cz.cesnet.holly': 1,
'org.example.dionaea': 2,
'org.example.kippo': 2,
'org.example.labrea': 1
},
'detectorsws': {
'cz.cesnet.holly/Beekeeper': 1,
'org.example.dionaea/Dionaea': 1,
'org.example.dionaea/Kippo': 1,
'org.example.kippo/Kippo': 2,
'org.example.labrea/LaBrea': 1
},
'sources': {
'192.168.0.0/25': 3,
'192.168.0.100': 1,
'192.168.0.105': 1,
'192.168.0.109': 1,
'192.168.0.2-192.168.0.5': 3,
'192.168.0.200': 1,
'192.172.0.109': 1,
'192.172.0.200': 1,
'2001:db8::ff00:42:0/112': 3
},
'targets': {
'192.168.1.2-192.168.1.5': 2,
'192.168.1.0/25': 2,
'2001:db8::ff01:42:0/112': 2,
'192.168.1.109': 1,
'192.168.1.200': 1,
'__unknown__': 3
},
'list_ids': ['msg01', 'msg02', 'msg03', 'msg04', 'msg05', 'msg06'],
'severities': {'__unknown__': 6}
})
[docs] def test_08_truncate_stats(self):
"""
Perform the basic operativity tests.
"""
self.maxDiff = None
self.assertEqual(
mentat.stats.idea.truncate_stats(mentat.stats.idea.evaluate_events(self.ideas_raw), 3, True),
{
'abuses': {'__unknown__': 2, 'abuse@cesnet.cz': 4},
'analyzers': {'Beekeeper': 1, 'Kippo': 3, '__REST__': 2},
'asns': {'__unknown__': 6},
'categories': {'Exploit': 2, 'Fraud.Phishing': 3, '__REST__': 1},
'category_sets': {'Exploit': 2, 'Fraud.Phishing': 3, '__REST__': 1},
'classes': {'__unknown__': 6},
'cnt_alerts': 6,
'cnt_events': 6,
'cnt_recurring': 0,
'cnt_unique': 6,
'countries': {'__unknown__': 6},
'detectors': {'__REST__': 2, 'org.example.dionaea': 2, 'org.example.kippo': 2},
'detectorsws': {'__REST__': 3,
'cz.cesnet.holly/Beekeeper': 1,
'org.example.kippo/Kippo': 2},
'sources': {'192.168.0.0/25': 3, '192.168.0.2-192.168.0.5': 3, '__REST__': 9},
'targets': {'__unknown__': 3, '192.168.1.0/25': 2, '__REST__': 6},
'severities': {'__unknown__': 6}
}
)
self.assertEqual(
mentat.stats.idea.truncate_stats(mentat.stats.idea.evaluate_events(self.ideas_raw), 2),
{
'abuses': {'__unknown__': 2, 'abuse@cesnet.cz': 4},
'analyzers': {'Beekeeper': 1, 'Dionaea': 1, 'Kippo': 3, 'LaBrea': 1},
'asns': {'__unknown__': 6},
'categories': {'Exploit': 2, 'Fraud.Phishing': 3, 'Spam': 1},
'category_sets': {'Exploit': 2, 'Fraud.Phishing': 3, 'Spam': 1},
'classes': {'__unknown__': 6},
'cnt_alerts': 6,
'cnt_events': 6,
'cnt_recurring': 0,
'cnt_unique': 6,
'countries': {'__unknown__': 6},
'detectors': {'cz.cesnet.holly': 1,
'org.example.dionaea': 2,
'org.example.kippo': 2,
'org.example.labrea': 1},
'detectorsws': {'cz.cesnet.holly/Beekeeper': 1,
'org.example.dionaea/Dionaea': 1,
'org.example.dionaea/Kippo': 1,
'org.example.kippo/Kippo': 2,
'org.example.labrea/LaBrea': 1},
'sources': {'192.168.0.0/25': 3, '__REST__': 12},
'targets': {'__unknown__': 3, '__REST__': 8},
'severities': {'__unknown__': 6}
}
)
[docs] def test_09_group_events(self):
"""
Perform the basic operativity tests.
"""
self.maxDiff = None
self.assertEqual(
mentat.stats.idea.group_events(self.ideas_raw),
{
'stats_external': [
{
'Category': ['Spam'],
'CreateTime': '2012-11-03T15:00:02Z',
'DetectTime': '2012-11-03T15:00:07Z',
'Format': 'IDEA0',
'ID': 'msg04',
'Node': [{'Name': 'org.example.dionaea',
'SW': ['Dionaea'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.100', '192.168.0.105'],
'Type': ['Spam']}]
},
{
'Category': ['Exploit'],
'CreateTime': '2012-11-03T18:00:02Z',
'DetectTime': '2012-11-03T18:00:07Z',
'Format': 'IDEA0',
'ID': 'msg06',
'Node': [{'Name': 'org.example.labrea',
'SW': ['LaBrea'],
'Tags': ['Protocol', 'Honeypot']},
{'Name': 'cz.cesnet.holly', 'SW': ['Beekeeper']}],
'Source': [{'IP4': ['192.172.0.109', '192.172.0.200'],
'Type': ['Exploit']}]
}
],
'stats_internal': [
{
'Category': ['Fraud.Phishing'],
'CreateTime': '2012-11-03T10:00:02Z',
'DetectTime': '2012-11-03T10:00:07Z',
'Format': 'IDEA0',
'ID': 'msg01',
'Node': [{'Name': 'org.example.kippo',
'SW': ['Kippo'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25'],
'IP6': ['2001:db8::ff00:42:0/112'],
'Type': ['Phishing']}],
'Target': [{'IP4': ['192.168.1.2-192.168.1.5', '192.168.1.0/25'],
'IP6': ['2001:db8::ff01:42:0/112']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Fraud.Phishing'],
'CreateTime': '2012-11-03T11:00:02Z',
'DetectTime': '2012-11-03T11:00:07Z',
'Format': 'IDEA0',
'ID': 'msg02',
'Node': [{'Name': 'org.example.kippo',
'SW': ['Kippo'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25'],
'IP6': ['2001:db8::ff00:42:0/112'],
'Type': ['Phishing']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Fraud.Phishing'],
'CreateTime': '2012-11-03T12:00:02Z',
'DetectTime': '2012-11-03T12:00:07Z',
'Format': 'IDEA0',
'ID': 'msg03',
'Node': [{'Name': 'org.example.dionaea',
'SW': ['Kippo'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25'],
'IP6': ['2001:db8::ff00:42:0/112'],
'Type': ['Phishing']}],
'Target': [{'IP4': ['192.168.1.2-192.168.1.5', '192.168.1.0/25'],
'IP6': ['2001:db8::ff01:42:0/112']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Exploit'],
'CreateTime': '2012-11-03T18:00:02Z',
'DetectTime': '2012-11-03T18:00:07Z',
'Format': 'IDEA0',
'ID': 'msg05',
'Node': [{'Name': 'org.example.labrea',
'SW': ['LaBrea'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.109', '192.168.0.200'],
'Type': ['Exploit']}],
'Target': [{"IP4": ["192.168.1.109", "192.168.1.200"]}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
}
],
'stats_overall': [
{
'Category': ['Fraud.Phishing'],
'CreateTime': '2012-11-03T10:00:02Z',
'DetectTime': '2012-11-03T10:00:07Z',
'Format': 'IDEA0',
'ID': 'msg01',
'Node': [{'Name': 'org.example.kippo',
'SW': ['Kippo'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25'],
'IP6': ['2001:db8::ff00:42:0/112'],
'Type': ['Phishing']}],
'Target': [{'IP4': ['192.168.1.2-192.168.1.5', '192.168.1.0/25'],
'IP6': ['2001:db8::ff01:42:0/112']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Fraud.Phishing'],
'CreateTime': '2012-11-03T11:00:02Z',
'DetectTime': '2012-11-03T11:00:07Z',
'Format': 'IDEA0',
'ID': 'msg02',
'Node': [{'Name': 'org.example.kippo',
'SW': ['Kippo'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25'],
'IP6': ['2001:db8::ff00:42:0/112'],
'Type': ['Phishing']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Fraud.Phishing'],
'CreateTime': '2012-11-03T12:00:02Z',
'DetectTime': '2012-11-03T12:00:07Z',
'Format': 'IDEA0',
'ID': 'msg03',
'Node': [{'Name': 'org.example.dionaea',
'SW': ['Kippo'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25'],
'IP6': ['2001:db8::ff00:42:0/112'],
'Type': ['Phishing']}],
'Target': [{'IP4': ['192.168.1.2-192.168.1.5', '192.168.1.0/25'],
'IP6': ['2001:db8::ff01:42:0/112']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Spam'],
'CreateTime': '2012-11-03T15:00:02Z',
'DetectTime': '2012-11-03T15:00:07Z',
'Format': 'IDEA0',
'ID': 'msg04',
'Node': [{'Name': 'org.example.dionaea',
'SW': ['Dionaea'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.100', '192.168.0.105'],
'Type': ['Spam']}]
},
{
'Category': ['Exploit'],
'CreateTime': '2012-11-03T18:00:02Z',
'DetectTime': '2012-11-03T18:00:07Z',
'Format': 'IDEA0',
'ID': 'msg05',
'Node': [{'Name': 'org.example.labrea',
'SW': ['LaBrea'],
'Tags': ['Protocol', 'Honeypot']}],
'Source': [{'IP4': ['192.168.0.109', '192.168.0.200'],
'Type': ['Exploit']}],
'Target': [{'IP4': ['192.168.1.109', '192.168.1.200']}],
'_Mentat': {'ResolvedAbuses': ['abuse@cesnet.cz']}
},
{
'Category': ['Exploit'],
'CreateTime': '2012-11-03T18:00:02Z',
'DetectTime': '2012-11-03T18:00:07Z',
'Format': 'IDEA0',
'ID': 'msg06',
'Node': [{'Name': 'org.example.labrea',
'SW': ['LaBrea'],
'Tags': ['Protocol', 'Honeypot']},
{'Name': 'cz.cesnet.holly', 'SW': ['Beekeeper']}],
'Source': [{'IP4': ['192.172.0.109', '192.172.0.200'],
'Type': ['Exploit']}]
}
]
}
)
[docs] def test_10_evaluate_event_groups(self):
"""
Perform the basic operativity tests.
"""
result = mentat.stats.idea.evaluate_event_groups(self.ideas_raw)
if self.verbose:
print('*** result = mentat.stats.idea.evaluate_event_groups(self.ideas_raw) ***')
pprint(result)
self.assertTrue(result)
result = mentat.stats.idea.truncate_evaluations(result, 3)
if self.verbose:
print('*** result = mentat.stats.idea.truncate_evaluations(result, 3) ***')
pprint(result)
self.assertTrue(result)
[docs] def test_11_merge_stats(self):
"""
Perform the statistics aggregation tests.
"""
self.maxDiff = None
sts1 = mentat.stats.idea.evaluate_events(self.ideas_raw)
sts2 = mentat.stats.idea.evaluate_events(self.ideas_raw)
sts3 = mentat.stats.idea.evaluate_events(self.ideas_raw)
result = mentat.stats.idea._merge_stats(sts1) # pylint: disable=locally-disabled,protected-access
result = mentat.stats.idea._merge_stats(sts2, result) # pylint: disable=locally-disabled,protected-access
result = mentat.stats.idea._merge_stats(sts3, result) # pylint: disable=locally-disabled,protected-access
self.assertEqual(
result,
{
'abuses': {'__unknown__': 6, 'abuse@cesnet.cz': 12},
'analyzers': {'Beekeeper': 3, 'Dionaea': 3, 'Kippo': 9, 'LaBrea': 3},
'asns': {'__unknown__': 18},
'categories': {'Exploit': 6, 'Fraud.Phishing': 9, 'Spam': 3},
'category_sets': {'Exploit': 6, 'Fraud.Phishing': 9, 'Spam': 3},
'classes': {'__unknown__': 18},
'cnt_alerts': 18,
'cnt_events': 18,
'countries': {'__unknown__': 18},
'detectors': {'cz.cesnet.holly': 3,
'org.example.dionaea': 6,
'org.example.kippo': 6,
'org.example.labrea': 3},
'detectorsws': {'cz.cesnet.holly/Beekeeper': 3,
'org.example.dionaea/Dionaea': 3,
'org.example.dionaea/Kippo': 3,
'org.example.kippo/Kippo': 6,
'org.example.labrea/LaBrea': 3},
'sources': {'192.168.0.0/25': 9,
'192.168.0.100': 3,
'192.168.0.105': 3,
'192.168.0.109': 3,
'192.168.0.2-192.168.0.5': 9,
'192.168.0.200': 3,
'192.172.0.109': 3,
'192.172.0.200': 3,
'2001:db8::ff00:42:0/112': 9},
'targets': {'192.168.1.2-192.168.1.5': 6,
'192.168.1.0/25': 6,
'2001:db8::ff01:42:0/112': 6,
'192.168.1.109': 3,
'192.168.1.200': 3,
'__unknown__': 9},
'severities': {'__unknown__': 18}
}
)
[docs] def test_12_aggregate_stat_groups(self):
"""
Perform the statistic group aggregation tests.
"""
self.maxDiff = None
timestamp = 1485993600
stse1 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stse2 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stse3 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stso1 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stso2 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stso3 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stsi1 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stsi2 = mentat.stats.idea.evaluate_events(self.ideas_raw)
stsi3 = mentat.stats.idea.evaluate_events(self.ideas_raw)
sts1 = mentat.datatype.sqldb.EventStatisticsModel(
interval = 'interval1',
dt_from = datetime.datetime.fromtimestamp(timestamp),
dt_to = datetime.datetime.fromtimestamp(timestamp+300),
count = stso1[mentat.stats.idea.ST_SKEY_CNT_ALERTS],
stats_overall = stso1,
stats_internal = stsi1,
stats_external = stse1
)
sts2 = mentat.datatype.sqldb.EventStatisticsModel(
interval = 'interval2',
dt_from = datetime.datetime.fromtimestamp(timestamp+300),
dt_to = datetime.datetime.fromtimestamp(timestamp+600),
count = stso2[mentat.stats.idea.ST_SKEY_CNT_ALERTS],
stats_overall = stso2,
stats_internal = stsi2,
stats_external = stse2
)
sts3 = mentat.datatype.sqldb.EventStatisticsModel(
interval = 'interval3',
dt_from = datetime.datetime.fromtimestamp(timestamp+600),
dt_to = datetime.datetime.fromtimestamp(timestamp+900),
count = stso3[mentat.stats.idea.ST_SKEY_CNT_ALERTS],
stats_overall = stso3,
stats_internal = stsi3,
stats_external = stse3
)
result = mentat.stats.idea.aggregate_stat_groups([sts1, sts2, sts3])
self.assertTrue(result)
self.assertEqual(result['dt_from'], datetime.datetime.fromtimestamp(timestamp))
self.assertEqual(result['dt_to'], datetime.datetime.fromtimestamp(timestamp+900))
#-------------------------------------------------------------------------------
if __name__ == "__main__":
unittest.main()