mentat.daemon.piper module

This module provides base implementation of generic pipe-like message processing daemon represented by the pyzenkit.zendaemon.ZenDaemon class. It builds on top of pyzenkit.zendaemon module and adds couple of other usefull features:

  • Automated inclusion and bootstrapping of mentat.daemon.component.filer daemon component.

  • Additional configurations and command line arguments related to filer protocol.

Filer daemon component

This module automatically takes care of inclusion and bootstrapping of mentat.daemon.component.filer daemon component. This component implements the filesystem message exchange queue, aka. filer protocol. This is a fairly simple protocol inspired by Postfix MTA that uses ordinary filesystem directories for exchanging messages between multiple modules/processess. Each process has a directory asigned to him which represents his message queue. Passing a message for processing means simply puting a file into this directory, or more specifically into one of its subdirectories.

The designated filesystem message queue directory must contain and use following subdirectories:

  • incoming: input queue containing only complete messages

  • pending: daemon working directory and pending queue

  • tmp: work directory for other processes

  • errors: messages causing problems during processing

Enqueue message

Equeuing a new message can be done by anyone including the owner of the queue and it means creating a new file in incoming subdirectory. However it is required that all files in this directory are complete and final, so ussually it is necessary to create the temporary file in different location (ussually tmp) and then move it to the incoming queue.

Key requirement here is, that the move operation must be atomic, so that multiple instances of the same daemon may run on the same queue and thus enable cheap paralelization. The atomic move should be achived simply by keeping the whole queue directory structure on the same disk partition.

Once message is in the incoming subdirectory, it must be considered a property of the queue owner and no other process (except other instances of the same daemon) must not modify it in any way.

Dequeue message

Dequeueing a message is done only by the owner of the queue and it means atomically moving the related message file into pending subdirectory. In that instant process that moved the message owns it and may work with it. When the work is there are following options how to handle the message file:

  • Move the message file to errors subdirectory in case there was any error during processing.

  • Move the message to incoming subdirectory of another daemon to pass the message to next step in processing chain.

  • Delete the message permanently from filesystem in case it is not needed anymore.

Custom configuration

Custom command line options

--queue-in-dir

Name of the input queue directory.

Type: string

--queue-out-dir

Name of the output queue directory.

Type: string, default: None

--queue-out-limit

Limit on the number of the files for the output queue directory.

Type: integer, default: 10000

--queue-out-wait

Waiting time when the output queue limit is reached in seconds.

Type: integer, default: 10

class mentat.daemon.piper.DemoPiperDaemon(name=None, description=None)[source]

Bases: PiperDaemon

Minimalistic class for demonstration purposes.

class mentat.daemon.piper.DemoPrintComponent(**kwargs)[source]

Bases: ZenDaemonComponent

Demonstration implementation of a daemon component capable of printing the message being processed to log.

cbk_event_message_process(daemon, args)[source]

Print the message contents into the log.

get_events()[source]

Get the list of event names and their appropriate callback handlers.

class mentat.daemon.piper.PiperDaemon(**kwargs)[source]

Bases: MentatBaseDaemon

This module provides base implementation of generic pipe-like message processing daemon.

CONFIG_QUEUE_IN_DIR = 'queue_in_dir'
CONFIG_QUEUE_IN_PERMS = 'queue_in_perms'
CONFIG_QUEUE_IN_WAIT = 'queue_in_wait'
CONFIG_QUEUE_OUT_DIR = 'queue_out_dir'
CONFIG_QUEUE_OUT_LIMIT = 'queue_out_limit'
CONFIG_QUEUE_OUT_WAIT = 'queue_out_wait'
CORE_FILEQUEUE = 'filequeue'