#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------
"""
This module provides base implementation of generic message fetching and processing
script. This script will take care of initializing :py:class:`mentat.plugin.app.storage.StoragePlugin`
and provides methods for fetching messages from database.
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import os
import time
#
# Custom libraries.
#
import mentat.const
import mentat.script.base
import mentat.plugin.app.eventstorage
import mentat.plugin.app.sqlstorage
[docs]class FetcherScript(mentat.script.base.MentatBaseScript): # pylint: disable=locally-disabled,abstract-method
"""
Base implementation of generic message fetching and processing script.
"""
def __init__(self, **kwargs):
"""
Initialize fetcher script. This method overrides the base implementation
in :py:func:`pyzenkit.zenscript.ZenScript.__init__` and it aims to even
more simplify the script object creation.
:param kwargs: Various additional parameters passed to object constructor.
"""
self.eventservice = None
# Manipulate the list of script plugins.
plugins = kwargs.get(self.CONFIG_PLUGINS, [])
# Force prepend the EventStoragePlugin.
plugins.insert(0, mentat.plugin.app.eventstorage.EventStoragePlugin())
# Force prepend the SQLStoragePlugin.
plugins.insert(0, mentat.plugin.app.sqlstorage.SQLStoragePlugin())
kwargs[self.CONFIG_PLUGINS] = plugins
super().__init__(**kwargs)
#---------------------------------------------------------------------------
[docs] @staticmethod
def initialize_result(time_low, time_high, interval, result = None):
"""
Initialize the result data structure.
:param datetime.datetime time_low: Lower time interval boundary.
:param datetime.datetime time_high: Upper time interval boundary.
:param str interval: Time interval, one of the interval defined in :py:mod:`pyzenkit.zenscript`.
:param dict result: Result data structure.
:return: Result data structure.
:rtype: dict
"""
if result is None:
result = {}
result['ts_from_s'] = str(time_low)
result['ts_to_s'] = str(time_high)
result['ts_from'] = int(time_low.timestamp())
result['ts_to'] = int(time_high.timestamp())
result['ts_delta'] = result['ts_to'] - result['ts_from']
result['interval'] = interval
result['_id'] = '{}_{}'.format(result['ts_from'], result['ts_to'])
return result
[docs] def fetch_messages(self, time_low, time_high):
"""
Fetch messages from database collection withing time interval defined by
given upper and lower time boundary.
:param datetime.datetime time_low: Lower time interval boundary.
:param datetime.datetime time_high: Upper time interval boundary.
:return: List of messages fetched from database.
:rtype: list
"""
self.logger.info("Fetching messages from time interval '%s' -> '%s' (%i -> %i)", str(time_low), str(time_high), time_low.timestamp(), time_high.timestamp())
count, result = self.eventservice.search_events(
{
'st_from': time_low,
'st_to': time_high
}
)
self.logger.info("Fetched '%i' messages from time interval '%s' -> '%s' (%i -> %i)", len(result), str(time_low), str(time_high), time_low.timestamp(), time_high.timestamp())
return result
[docs] def fetch_all_messages(self):
"""
Fetch all messages from database collection.
:return: List of messages fetched from database.
:rtype: list
"""
self.logger.info("Fetching all messages")
count, result = self.eventservice.search_events()
self.logger.info("Fetched all '%i' messages", len(result))
return result
[docs]class DemoFetcherScript(FetcherScript):
"""
This is an internal implementation of :py:class:`FetcherScript` usable only
for demonstration purposes.
"""
def __init__(self, name = None, description = None):
"""
Initialize demonstration fetcher script. This method overrides the base
implementation in :py:func:`mentat.script.fetcher.FetcherScript.__init__` and
t aims to even more simplify the script object creation.
:param str name: Optional script name.
:param str description: Optional script description.
"""
name = 'demo-fetcherscript.py' if not name else name
description = 'demo-fetcherscript.py - Demonstration fetcher script' if not description else description
super().__init__(
name = name,
description = description,
#
# Configure required script paths.
#
path_bin = 'tmp',
path_cfg = 'tmp',
path_var = 'tmp',
path_log = 'tmp',
path_run = 'tmp',
path_tmp = 'tmp',
#
# Override default configurations.
#
default_debug = True,
default_config_dir = os.path.abspath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), '../../../conf/core'
)
)
)
[docs] def get_default_command(self):
"""
Return the name of a default script command.
:return: Name of the default command.
:rtype: str
"""
return 'demo'
[docs] def cbk_command_demo(self):
"""
Implementation of the 'demo' command (default command).
"""
self.logger.warning("Fetcher demonstration run - begin")
result = {}
interval = '5_minutes'
(time_low, time_high) = self.calculate_interval_thresholds(
time_high = int(time.time()),
interval = interval
)
result = self.initialize_result(time_low, time_high, interval)
messages = self.fetch_messages(time_low, time_high)
self.logger.info("Processed '%i' messages", len(messages))
self.logger.warning("Fetcher demonstration run - end")
return result
#-------------------------------------------------------------------------------
#
# Perform the demonstration.
#
if __name__ == "__main__":
# Prepare demonstration environment.
APP_NAME = 'demo-fetcherscript.py'
for directory in (
DemoFetcherScript.get_resource_path('tmp'),
DemoFetcherScript.get_resource_path('tmp/{}'.format(APP_NAME))
):
try:
os.mkdir(directory)
except FileExistsError:
pass
DemoFetcherScript.json_save(
DemoFetcherScript.get_resource_path('tmp/{}.conf'.format(APP_NAME)),
{
'test_a': 1, 'test_b': 2, 'test_c': 3
}
)
#
# Perform demonstation run.
#
DemoFetcherScript(APP_NAME).run()