mentat.services.eventstorage module

Event database storage abstraction layer. The current implementation requires the PostgreSQL database and is based directly on the Psycopg2 library for performance reasons.

Warning

Current implementation is for optimalization purposes using some advanced schema features provided by the PostgreSQL database and thus no other engines are currently supported.

Warning

The PostgreSQL extension ip4r must be installed.

exception mentat.services.eventstorage.DataError[source]

Bases: EventStorageException

Class for custom event storage exceptions related to data errors.

class mentat.services.eventstorage.EventStorageCursor(cursor)[source]

Bases: object

Encapsulation of psycopg2.cursor class.

close()[source]

Close current database connection.

count_events(parameters=None, qname=None)[source]

Count the number of IDEA events in database. There is an option to assign given unique name to the count query, so that it can be identified within the pg_stat_activity table.

Parameters
  • parameters (dict) – Count query parameters.

  • qname (str) – Optional unique name for the generated query.

Returns

Number of IDEA events in database.

Return type

int

delete_event(eventid)[source]

Delete IDEA event with given primary identifier from database.

Parameters

eventid (str) – Primary identifier of the message to fetch.

delete_events(parameters=None, qname=None)[source]

Delete IDEA events in database according to given parameters. There is an option to assign given unique name to the query, so that it can be identified within the pg_stat_activity table.

Parameters
  • parameters (dict) – Delete query parameters.

  • qname (str) – Optional unique name for the generated query.

Returns

Number of deleted events.

Return type

int

fetch_event(eventid)[source]

Fetch IDEA event with given primary identifier from database.

Parameters

eventid (str) – Primary identifier of the message to fetch.

Returns

Instance of IDEA event.

Return type

mentat.idea.internal

insert_event(idea_event)[source]

Insert given IDEA event into database.

Parameters

idea_event (mentat.idea.internal) – Instance of IDEA event.

query_direct(raw_query, idents=None, params=None)[source]

Perform direct database query.

Parameters
  • raw_query (str) – Raw SQL query. Will be converted to psycopg2.sql.SQL.

  • idents (list) – Optional list of SQL identifiers, will be converted to psycopg2.sql.Identifier and formatted into raw_query above.

  • params (list) – Optional list of SQL parameters, will be formatted into raw_query above.

search_column_with(column, function='min')[source]

Search given column with given aggregation function. This method is intended to produce single min or max values for given column name.

search_events(parameters=None, qtype='select', qname=None)[source]

Search IDEA events in database according to given parameters. The parameters will be passed down to the mentat.services.eventstorage.build_query() function to generate proper SQL query. There is an option to assign given unique name to the select query, so that it can be identified within the pg_stat_activity table.

Parameters
  • parameters (dict) – Search query parameters, see mentat.services.eventstorage.build_query() for details.

  • qtype (string) – Type of the select query.

  • qname (str) – Optional unique name for the generated query.

Returns

Number of IDEA events in the result and list of events.

Return type

tuple

search_events_aggr(parameters=None, qtype='aggregate', qname=None, dbtoplist=False)[source]

Search IDEA events in database according to given parameters and perform selected aggregations. The parameters will be passed down to the mentat.services.eventstorage.build_query() function to generate proper SQL query. There is an option to assign given unique name to the select query, so that it can be identified within the pg_stat_activity table.

Parameters
  • parameters (dict) – Search query parameters, see mentat.services.eventstorage.build_query() for details.

  • qtype (string) – Type of the select query.

  • qname (str) – Optional unique name for the generated query.

Returns

Number of IDEA events in the result and list of events.

Return type

tuple

search_relapsed_events(group_name, severity, ttl)[source]

Search for list of relapsed events for given group, severity and TTL. Event is considered to be relapsed, when following conditions are met:

  • there is record in thresholds table with thresholds.ttltime <= $ttl (this means that thresholding window expired)

  • there is record in events_thresholded table with events_thresholded.createtime >= thresholds.relapsetime (this meant that the event was thresholded in relapse period)

Parameters
  • group_name (str) – Name of the abuse group.

  • severity (str) – Event severity.

  • ttl (datetime.datetime) – Record TTL time.

Returns

List of relapsed events as touple of id, json of event data and list of threshold keys.

Return type

list

table_cleanup(table, column, ttl)[source]

Clean expired table records according to given TTL.

Parameters
  • table (str) – Name of the table to cleanup.

  • column (str) – Name of the column holding the time information.

  • ttl (datetime.datetime) – Maximal valid TTL.

Returns

Number of cleaned up records.

Return type

int

threshold_check(key, ttl)[source]

Check thresholding cache for record with given key.

Parameters
  • key (str) – Record key to the thresholding cache.

  • ttl (datetime.datetime) – Upper TTL boundary for valid record.

Returns

Full cache record as tuple.

Return type

tuple

threshold_save(eventid, keyid, group_name, severity, createtime)[source]

Save given event to the list of thresholded events.

Parameters
  • eventid (str) – Unique event identifier.

  • keyid (str) – Record key to the thresholding cache.

  • group_name (str) – Name of the abuse group.

  • severity (str) – Event severity.

  • createtime (datetime.datetime) – Record creation time.

threshold_set(key, thresholdtime, relapsetime, ttl)[source]

Insert new threshold record into the thresholding cache.

Parameters
  • key (str) – Record key to the thresholding cache.

  • thresholdtime (datetime.datetime) – Threshold window start time.

  • relapsetime (datetime.datetime) – Relapse window start time.

  • ttl (datetime.datetime) – Record TTL.

thresholded_events_clean()[source]

Clean no longer valid records from list of thresholded events. Record is no longer valid in following cases:

  • there is no appropriate record in thresholds table (there is no longer active thresholding window)

  • the events_thresholded.createtime < thresholds.relapsetime (there is an active thresholding window, but event does not belong to relapse interval)

Returns

Number of cleaned up records.

Return type

int

thresholded_events_count()[source]

Count number of records in list of thresholded events.

Returns

Number of records in list of thresholded events.

Return type

int

thresholds_clean(ttl)[source]

Clean no longer valid threshold records from thresholding cache.

Parameters

ttl (datetime.datetime) – Maximal valid TTL.

Returns

Number of cleaned up records.

Return type

int

thresholds_count()[source]

Count threshold records in thresholding cache.

Returns

Number of records in thresholding cache.

Return type

int

watchdog_events(interval)[source]

Perform watchdog operation on event database: Check if any new events were added into the database within given time interval.

Parameters

interval (int) – Desired time interval in seconds.

Returns

True in case any events were stored within given interval, False otherwise.

Return type

bool

exception mentat.services.eventstorage.EventStorageException[source]

Bases: Exception

Class for custom event storage exceptions.

class mentat.services.eventstorage.EventStorageService(**conncfg)[source]

Bases: object

Proxy object for working with persistent SQL based event storages. Maintains and provides access to database connection.

close()[source]

Close current database connection.

commit(*args, **kwargs)[source]
commit_bulk(*args, **kwargs)[source]
count_events(*args, **kwargs)[source]
cursor_new(*args, **kwargs)[source]
database_create(*args, **kwargs)[source]
database_drop(*args, **kwargs)[source]
database_status(brief=False)[source]

Determine status of all tables within current database and general PostgreSQL configuration.

delete_event(*args, **kwargs)[source]
delete_events(*args, **kwargs)[source]
distinct_values(*args, **kwargs)[source]
fetch_event(*args, **kwargs)[source]
handle_db_exceptions()[source]

Handle exceptions raised during database interfacing operations.

index_create(*args, **kwargs)[source]
index_drop(*args, **kwargs)[source]
insert_event(*args, **kwargs)[source]
insert_event_bulkci(*args, **kwargs)[source]
mogrify(*args, **kwargs)[source]
queries_status(qpattern=None)[source]

Determine status of all currently running queries.

query_cancel(qname)[source]

Cancel given query.

query_direct(*args, **kwargs)[source]
query_status(qname)[source]

Determine status of given query.

rollback(*args, **kwargs)[source]
savepoint_create(*args, **kwargs)[source]
savepoint_release(*args, **kwargs)[source]
savepoint_rollback(*args, **kwargs)[source]
search_column_with(*args, **kwargs)[source]
search_events(*args, **kwargs)[source]
search_events_aggr(*args, **kwargs)[source]
search_relapsed_events(*args, **kwargs)[source]
table_cleanup(*args, **kwargs)[source]
table_status(table_name, time_column)[source]

Determine status of given table within current database.

threshold_check(*args, **kwargs)[source]
threshold_save(*args, **kwargs)[source]
threshold_set(*args, **kwargs)[source]
thresholded_events_clean(*args, **kwargs)[source]
thresholded_events_count(*args, **kwargs)[source]
thresholds_clean(*args, **kwargs)[source]
thresholds_count(*args, **kwargs)[source]
watchdog_events(*args, **kwargs)[source]
class mentat.services.eventstorage.EventStorageServiceManager(core_config, updates=None)[source]

Bases: object

Class representing a custom _EventStorageServiceManager_ capable of understanding and parsing Mentat system core configurations.

close()[source]

Close internal storage connection.

service()[source]

Return handle to storage connection service according to internal configurations.

Returns

Reference to storage service.

Return type

mentat.services.eventstorage.EventStorageService

class mentat.services.eventstorage.IPBaseAdapter(rng)[source]

Bases: object

Adapt a ipranges.IPBase to an SQL quotable object.

Resources: http://initd.org/psycopg/docs/advanced.html#adapting-new-python-types-to-sql-syntax

getquoted()[source]

Implementation of psycopg2 adapter interface.

prepare(conn)[source]

Implementation of psycopg2 adapter interface.

class mentat.services.eventstorage.IPListAdapter(seq)[source]

Bases: object

Adapt a mentat.idea.sqldb.IPList to an SQL quotable object.

Resources: http://initd.org/psycopg/docs/advanced.html#adapting-new-python-types-to-sql-syntax

getquoted()[source]

Implementation of psycopg2 adapter interface.

prepare(conn)[source]

Implementation of psycopg2 adapter interface.

exception mentat.services.eventstorage.QueryCanceledException[source]

Bases: EventStorageException

Class for custom event storage exceptions related to canceled queries.

exception mentat.services.eventstorage.StorageConnectionException[source]

Bases: EventStorageException

Class for custom event storage exceptions related to database connection errors.

exception mentat.services.eventstorage.StorageIntegrityError[source]

Bases: EventStorageException

Class for custom event storage exceptions related to integrity errors.

mentat.services.eventstorage.build_query(parameters=None, qtype='select', qname=None, dbtoplist=False)[source]

Build SQL database query according to given parameters.

Parameters
  • parameters (dict) – Query parametersas complex dictionary structure.

  • qtype (str) – Type of the generated query (‘select’,’count’,’delete’).

  • qname (str) – Unique name for the generated query.

  • dbtoplist (bool) – Build aggregation or timeline SQL queries with toplisting within the database.

Returns

Generated query as psycopg2.sql.SQL and apropriate arguments.

Return type

tuple

mentat.services.eventstorage.close()[source]

Close database connection on mentat.services.eventstorage.EventStorageService instance from module level manager.

class mentat.services.eventstorage.incstats_decorator(stat_name, increment=1)[source]

Bases: object

Decorator for calculating usage statistics.

mentat.services.eventstorage.init(core_config, updates=None)[source]

(Re-)Initialize mentat.services.eventstorage.EventStorageServiceManager instance at module level and store the refence within module.

Parameters
  • core_config (dict) – Mentat core configuration structure.

  • updates (dict) – Optional configuration updates (same structure as core_config).

mentat.services.eventstorage.manager()[source]

Obtain reference to mentat.services.eventstorage.EventStorageServiceManager instance stored at module level.

Returns

Storage service manager reference.

Return type

mentat.services.eventstorage.EventStorageServiceManager

mentat.services.eventstorage.record_to_idea(val)[source]

Convert given SQL record object, as fetched from PostgreSQL database, directly into mentat.idea.internal.Idea object.

mentat.services.eventstorage.record_to_idea_ghost(val)[source]

Convert given SQL record object, as fetched from PostgreSQL database, directly into mentat.idea.internal.IdeaGhost object.

mentat.services.eventstorage.service()[source]

Obtain reference to mentat.services.eventstorage.EventStorageService instance from module level manager.

Returns

Storage service reference.

Return type

mentat.services.eventstorage.EventStorageService

mentat.services.eventstorage.set_manager(man)[source]

Set manager from outside of the module. This should be used only when you know exactly what you are doing.