#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------
"""
Daemon component providing parsing and serializing functions of IDEA messages for
daemons. Is is implemented as an encapsulation of :py:mod:`mentat.idea.internal`
library.
The implementation is based on :py:class:`pyzenkit.zendaemon.ZenDaemonComponent`.
"""
__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"
import json
import traceback
#
# Custom libraries.
#
import pyzenkit.zendaemon
import mentat.idea.internal
[docs]class ParserDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
"""
Implementation of ZenDaemonComponent encapsulating IDEA library.
"""
EVENT_MSG_PROCESS = 'message_process'
EVENT_MSG_UPDATE = 'message_update'
EVENT_LOG_STATISTICS = 'log_statistics'
STATS_CNT_PARSED = 'cnt_parsed'
STATS_CNT_ENCODED = 'cnt_encoded'
STATS_CNT_ERRORS = 'cnt_errors'
def __init__(self, **kwargs):
"""
Perform component initializations.
"""
super().__init__(**kwargs)
# Unique component identifier
self.cid = kwargs.get('cid', 'parser')
# Permit changing of default event mapping
self.event_map = kwargs.get('event_map', {
self.EVENT_MSG_PROCESS: self.EVENT_MSG_PROCESS,
self.EVENT_MSG_UPDATE: self.EVENT_MSG_UPDATE,
self.EVENT_LOG_STATISTICS: self.EVENT_LOG_STATISTICS
})
[docs] def get_events(self):
"""
Get the list of event names and their appropriate callback handlers.
"""
return [
{
'event': self.event_map[self.EVENT_MSG_PROCESS],
'callback': self.cbk_event_message_process,
'prepend': False
},
{
'event': self.event_map[self.EVENT_MSG_UPDATE],
'callback': self.cbk_event_message_update,
'prepend': True
},
{
'event': self.event_map[self.EVENT_LOG_STATISTICS],
'callback': self.cbk_event_log_statistics,
'prepend': False
}
]
#---------------------------------------------------------------------------
[docs] def cbk_event_message_process(self, daemon, args):
"""
Print the message contents into the log.
"""
daemon.logger.debug(
"Component '{}': Parsing IDEA message from: '{}'".format(
self.cid,
args['id']
)
)
try:
msg_raw = json.loads(str(args['data']).strip())
args['idea'] = mentat.idea.internal.Idea(msg_raw)
self.inc_statistic(self.STATS_CNT_PARSED)
args['idea_id'] = args['idea'].get('ID', '__UNKNOWN__')
daemon.logger.debug(
"Component '{}': Parsed IDEA message '{}' from message file '{}'".format(
self.cid,
args['idea_id'],
args['id']
)
)
return (daemon.FLAG_CONTINUE, args)
except: # pylint: disable=locally-disabled,bare-except
daemon.logger.error(
"Component '{}': Unable to parse IDEA message '{}': '{}'".format(
self.cid,
args['id'],
traceback.format_exc()
)
)
daemon.queue.schedule('message_banish', args)
self.inc_statistic(self.STATS_CNT_ERRORS)
return (daemon.FLAG_STOP, args)
[docs] def cbk_event_message_update(self, daemon, args):
"""
Print the message contents into the log.
"""
daemon.logger.debug(
"Component '{}': Generating JSON from message: '{}'".format(
self.cid,
args['id']
)
)
args['data'] = args['idea'].to_json()
self.inc_statistic(self.STATS_CNT_ENCODED)
return (daemon.FLAG_CONTINUE, args)
[docs] def cbk_event_log_statistics(self, daemon, args):
"""
Periodical processing statistics logging.
"""
stats = self.get_statistics()
stats_str = ''
for k in [self.STATS_CNT_PARSED, self.STATS_CNT_ENCODED, self.STATS_CNT_ERRORS]:
if k in stats:
stats_str = self.pattern_stats.format(stats_str, k, stats[k]['cnt'], stats[k]['inc'], stats[k]['spd'])
else:
stats_str = self.pattern_stats.format(stats_str, k, 0, 0, 0)
daemon.logger.info(
"Component '{}': *** Processing statistics ***{}".format(
self.cid,
stats_str
)
)
return (daemon.FLAG_CONTINUE, args)