mentat.idea.internal module

This module provides classess and tools for object representation of IDEA messages in Mentat project.

This module is based on idea.lite module from idea package. It is attempting to reuse already defined structures wherever and whenever possible and only append few custom datatypes to existing definition.

The most important part of this module, which is available to users, is the mentat.idea.internal.Idea class. Following code snippet demonstates use case scenario:

>>> import mentat.idea.internal

# IDEA messages ussually come from regular dicts or JSON
>>> idea_raw = {...}

# Just pass the dict as parameter to constructor
>>> idea_msg = mentat.idea.internal.Idea(idea_raw)
class mentat.idea.internal.CESNETDict(init_data=None)[source]

Bases: abc.TypedDict

This type definition represents a custom subdictionary under key _CESNET in message root dictionary.

allow_unknown = True
typedef = {'EventClass': {'description': 'Event class determined by inspection', 'type': <class 'str'>}, 'EventSeverity': {'description': 'Event severity determined by inspection', 'type': <class 'str'>}, 'EventTemplate': {'description': 'Template used to generate the event (deprecated)', 'type': <class 'str'>}, 'Impact': {'description': 'More user friendly description of event impact, used for reporting (IDMEF legacy)', 'type': <class 'str'>}, 'InspectionErrors': {'description': 'List of event pecularities found during inspection', 'type': <class 'abc.InspectionErrorsList'>}, 'ResolvedAbuses': {'description': 'Abuse contacts related to any alert source', 'type': <class 'abc.ResolvedAbusesList'>}, 'SourceResolvedASN': {'description': 'AS numbers related to any alert source', 'type': <class 'abc.Integer'>}, 'SourceResolvedCountry': {'description': 'Coutry ISO codes related to any alert source', 'type': <class 'abc.String'>}, 'StorageTime': {'description': 'Unix timestamp of the moment the message was stored into database', 'type': <function Timestamp>}}
class mentat.idea.internal.IDEAFilterCompiler[source]

Bases: pynspect.compilers.IDEAFilterCompiler

Custom filter compiler tailored for extended schema of mentat.idea.internal.Idea messages. The implementation is based on pynspect.compilers.IDEAFilterCompiler and it simply adds additional compilation directives for the extended IDEA schema. When working with mentat.idea.internal.Idea messages this compiler should be used for proper rule compilations.

class mentat.idea.internal.Idea(init_data=None)[source]

Bases: idea.lite.Idea

This class attempts to make only very contained changes to original idea.lite.Idea class, so that there is no need to update this class in case underlying definition changes.

To make these changes, the addon feature of typedcols library is used.

allow_unknown = True
static from_json(json_string)[source]

Instantinate message object directly from given JSON serialized string.

get_abuses()[source]

Convenience method for returning list of all resolved abuses.

Returns

Value of message attribute idea['_CESNET']['ResolvedAbuses'].

Return type

list of strings

get_addresses(node, get_v4=True, get_v6=True)[source]

Convenience method for returning list of all addresses (both v4 and v6) for given node (Source and Target).

Parameters
  • node (str) – Type of the node (Source or Target).

  • get_v4 (bool) – Fetch IPv4 addressess.

  • get_v6 (bool) – Fetch IPv6 addressess.

Returns

Value of message attributes idea[node]['IP4'] and idea[node]['IP6'].

Return type

list of ipranges

get_asns_src()[source]

Convenience method for returning list of all resolved source ASNs.

Returns

Value of message attribute idea['_CESNET']['SourceResolvedASN'].

Return type

list of strings

get_categories()[source]

Convenience method for returning list of all message categories.

Returns

Value of message attribute idea['Category'].

Return type

list of strings

get_class()[source]

Convenience method for returning message event class.

Returns

Value of message attribute idea['_CESNET']['EventClass'].

Return type

str

get_countries_src()[source]

Convenience method for returning list of all resolved source countries.

Returns

Value of message attribute idea['_CESNET']['SourceResolvedCountry'].

Return type

list of strings

get_description()[source]

Convenience method for returning message description.

Returns

Value of message attribute idea['description'].

Return type

list of strings

get_detect_time()[source]

Convenience method for returning message detection time.

Returns

Value of message attribute idea['DetectTime'].

Return type

datetime.datetime

get_detectors()[source]

Convenience method for returning list of all message detectors.

Returns

Value of message attribute idea['Node']['Name'].

Return type

list of strings

get_id()[source]

Convenience method for returning message identifier.

Returns

Value of message attribute idea['ID'].

Return type

str

get_jpath_value(jpath)[source]

Return single (first in case o a list) value on given JPath within the IDEA message.

Parameters

jpath (str) – JPath as defined in pynspect.jpath module.

Returns

Single (or first) value on given JPath.

get_jpath_values(jpath)[source]

Return all values on given JPath within the IDEA message.

Parameters

jpath (str) – JPath as defined in pynspect.jpath module.

Returns

List of all values on given JPath.

Return type

list

get_ports(node)[source]

Convenience method for returning list of all ports for given node (Source and Target).

Parameters

node (str) – Type of the node (Source or Target).

Returns

Value of message attributes idea[node]['Port'].

Return type

list of int

get_protocols(node)[source]

Convenience method for returning list of all protocols for given node (Source and Target).

Parameters

node (str) – Type of the node (Source or Target).

Returns

Value of message attributes idea[node]['Port'].

Return type

list of int

get_severity()[source]

Convenience method for returning message event severity.

Returns

Value of message attribute idea['_CESNET']['EventSeverity'].

Return type

str

get_storage_time()[source]

Convenience method for returning message storage time.

Returns

Value of message attribute idea['_CESNET']['StorageTime'].

Return type

datetime.datetime

get_types(node)[source]

Convenience method for returning list of all types for given node (Source, Target and Node).

Parameters

node (str) – Type of the node (Source, Target or Node).

Returns

Value of message attributes idea[node]['Port'].

Return type

list of int

static json_default(o)[source]

Helper method for JSON serialization of mentat.idea.internal.Idea messages.

Example usage:

>>>import json
>>>idea_internal = ...
>>>json.dumps(idea_internal, indent=4, sort_keys=True, default=idea_internal.json_default)
typedef = {'AggrID': {'description': 'List of identifiers of messages, which are aggregated into more concise form by this message. Should be sent mostly by intermediary nodes, which detect duplicates, or aggregate events, spanning multiple detection windows, into one longer.', 'type': <class 'abc.ID'>}, 'AltNames': {'description': 'List of alternative identifiers; strings which help to pair the event to internal system information (for example tickets in request tracker systems).', 'type': <class 'abc.String'>}, 'Attach': {'description': 'Array of additional attachments information and data.', 'type': <class 'abc.AttachList'>}, 'ByteCount': {'description': 'Number of bytes transferred.', 'type': <class 'int'>}, 'Category': {'description': 'List of event categories.', 'required': True, 'type': <class 'abc.EventTag'>}, 'CeaseTime': {'description': 'Deduced end of the event/attack.', 'type': <function Timestamp>}, 'Confidence': {'description': 'Confidence of detector in its own reliability of this particular detection. (0 - surely false, 1 - no doubts). If key is not presented, detector does not know (or has no capability to estimate the confidence).', 'type': <class 'float'>}, 'ConnCount': {'description': 'Number of individual connections attempted or taken place.', 'type': <class 'int'>}, 'CorrelID': {'description': 'List of identifiers of messages, which are information sources for creation of this message in case the message has been created based on correlation/analysis/deduction of other messages.', 'type': <class 'abc.ID'>}, 'CreateTime': {'description': 'Timestamp of the creation of the IDEA message. May point out delay between detection and processing of data.', 'type': <function Timestamp>}, 'Description': {'description': 'Short free text human readable description.', 'type': <class 'str'>}, 'DetectTime': {'description': 'Timestamp of the moment of detection of event (not necessarily time of the event taking place). This timestamp is mandatory, because every detector is able to know when it detected the information - for example when line about event appeared in the logfile, or when its information source says the event was detected, or at least when it accepted the information from the source.', 'required': True, 'type': <function Timestamp>}, 'EventTime': {'description': 'Deduced start of the event/attack, or just time of the event if its solitary.', 'type': <function Timestamp>}, 'FlowCount': {'description': 'Number of individual simplex (one direction) flows.', 'type': <class 'int'>}, 'Format': {'default': 'IDEA0', 'description': 'Identifier of the IDEA container.', 'required': True, 'type': <function Version>}, 'ID': {'default': <function <lambda>>, 'description': 'Unique message identifier.', 'required': True, 'type': <function ID>}, 'Node': {'description': 'List of detector or possible intermediary (event aggregator, correlator, etc.) descriptions, last visited first (as in e-mail Received headers).', 'type': <class 'abc.NodeList'>}, 'Note': {'description': 'Free text human readable addidional note, possibly longer description of incident if not obvious.', 'type': <class 'str'>}, 'PacketCount': {'description': 'Number of individual packets transferred.', 'type': <class 'int'>}, 'PredID': {'description': 'List of identifiers of messages, which are obsoleted and information in them is replaced by this message. Should be sent only by detection nodes to incorporate further data about ongoing event.', 'type': <class 'abc.ID'>}, 'Ref': {'description': 'List of references to known sources, related to attack and/or vulnerability. May be URL of the additional info, or URN (according to [[http://tools.ietf.org/html/rfc2141|RFC 2141]]) in registered namespace ([[http://www.iana.org/assignments/urn-namespaces/urn-namespaces.xhtml|IANA]]) or unregistered ad-hoc namespace bearing reasonable information value and uniqueness, such as "urn:cve:CVE-2013-5634".', 'type': <class 'abc.URI'>}, 'RelID': {'description': 'List of otherwise related messages identifiers.', 'type': <class 'abc.ID'>}, 'Source': {'description': 'List of dictionaries of information concerning particular source of the problem.', 'type': <class 'abc.SourceList'>}, 'Target': {'description': 'List of dictionaries of information concerning particular target of the problem.', 'type': <class 'abc.TargetList'>}, 'WinEndTime': {'description': 'End of aggregation window in which event has been observed.', 'type': <function Timestamp>}, 'WinStartTime': {'description': 'Beginning of aggregation window in which event has been observed.', 'type': <function Timestamp>}, '_CESNET': {'description': 'Custom CESNET/Mentat abominations to IDEA definition', 'type': <class 'mentat.idea.internal.CESNETDict'>}, 'ts': {'description': 'CESNET specific timestamp as NTP timestamp', 'type': <function Timestamp>}, 'ts_u': {'description': 'CESNET specific timestamp as native Unix timestamp', 'type': <class 'int'>}}
class mentat.idea.internal.IdeaGhost(init_data=None)[source]

Bases: mentat.idea.internal.Idea

This class represents simplified IDEA message objects as reconstructed from SQL records retrieved from database. These records are represented by the mentat.idea.sqldb.Idea class, however the native database record object is used when the message is fetched from database.

Objects of this class ARE NOT a perfect 1to1 match with original objects based on mentat.idea.internal.Idea. During the conversion to database representation some of the information gets lost due to the design of the data model optimized for database search. However from the point of view of the searching these objects are same, because there is the same set of categories, source and target IPs and all other message attributes, so they can be used as representatives in use cases where the full message is not necessary. The big pro in using these ghost objects is that they are much cheaper to construct in comparison with the full IDEA message representation.

classmethod from_record(record)[source]

Construct the IDEA ghost object from the given SQL record.

mentat.idea.internal.cesnet_dict_typedef(flavour, list_flavour, errors_list, abuses_list, addon=None)[source]

Typedef generator helper for easy usage of custom definitions at multiple places.

mentat.idea.internal.internal_base_addon_typedef(flavour, list_flavour, cesnet_dict, addon=None)[source]

Typedef generator helper for easy usage of custom definitions at multiple places.