Source code for mentat.daemon.component.commiter

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------


"""
Daemon component that is commiting all messages. It is intended to be used at the
end of the message processing chain and it will emit the 'message_commit' event
for any message it receives.

The implementation is based on :py:class:`pyzenkit.zendaemon.ZenDaemonComponent`
"""


__author__ = "Jan Mach <jan.mach@cesnet.cz>"
__credits__ = "Pavel Kácha <pavel.kacha@cesnet.cz>, Andrea Kropáčová <andrea.kropacova@cesnet.cz>"


#
# Custom libraries.
#
import pyzenkit.zendaemon


[docs]class CommiterDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): """ Implementation of ZenDaemonComponent commiting all messages. """ EVENT_MSG_PROCESS = 'message_process' EVENT_LOG_STATISTICS = 'log_statistics' STATS_CNT_COMMITS = 'cnt_commits' def __init__(self, **kwargs): """ Perform component initializations. """ super().__init__(**kwargs) # Unique component identifier self.cid = kwargs.get('cid', 'commiter') # Permit changing of default event mapping self.event_map = kwargs.get('event_map', { self.EVENT_MSG_PROCESS: self.EVENT_MSG_PROCESS, self.EVENT_LOG_STATISTICS: self.EVENT_LOG_STATISTICS })
[docs] def get_events(self): """ Get the list of event names and their appropriate callback handlers. """ return [ { 'event': self.event_map[self.EVENT_MSG_PROCESS], 'callback': self.cbk_event_message_process, 'prepend': False }, { 'event': self.event_map[self.EVENT_LOG_STATISTICS], 'callback': self.cbk_event_log_statistics, 'prepend': False } ]
#---------------------------------------------------------------------------
[docs] def cbk_event_message_process(self, daemon, args): """ Schedule event for commiting given message. """ daemon.logger.debug( "Component '{}': Scheduling message commit for '{}'".format( self.cid, args['id'] ) ) daemon.queue.schedule('message_commit', args) self.inc_statistic(self.STATS_CNT_COMMITS) return (daemon.FLAG_CONTINUE, args)
[docs] def cbk_event_log_statistics(self, daemon, args): """ Periodical processing statistics logging. """ stats = self.get_statistics() stats_str = '' for k in [self.STATS_CNT_COMMITS]: if k in stats: stats_str = self.pattern_stats.format(stats_str, k, stats[k]['cnt'], stats[k]['inc'], stats[k]['spd']) else: stats_str = self.pattern_stats.format(stats_str, k, 0, 0, 0) daemon.logger.info( "Component '{}': *** Processing statistics ***{}".format( self.cid, stats_str ) ) return (daemon.FLAG_CONTINUE, args)