# -*- coding: utf-8 -*-
""" Main time, poll and inotify event management and pipeline machinery for DeadBeat """
from __future__ import absolute_import, division, print_function, unicode_literals
import time
import operator
import heapq
import select
import uuid
import sys
import signal
import errno
import os
import os.path as pth
import ctypes
import struct
import fcntl
import itertools
import logging
import operator
from collections import namedtuple, deque, Mapping, Sequence, defaultdict
__all__ = [
"Object", "Cog", "DummyEscapement", "Escapement", "Train", "INotify",
"train_line", "TrainChooser", "RoundRobinChooser", "CaseChooser",
"itemsetter", "nullsetter", "uuidsetter", "dictupdater",
"itemdeleter", "constgetter", "selfgetter",
"basestring", "string_types",
"catch", "setn", "appendn"
]
# Compatibility helpers
try:
reduce
except NameError:
from functools import reduce
try:
#: :py:data:`basestring` for Py3 compatibility
basestring = basestring
#: List of string types for Py3 compatibility
string_types = (str, unicode)
str_type = unicode
bytes_type = str
except NameError:
#: :py:data:`basestring` for Py3 compatibility
basestring = str
#: List of string types for Py3 compatibility
string_types = (bytes, str)
str_type = str
bytes_type = bytes
# Getters/setters
[docs]def itemsetter(attr):
""" Return setter for specific key. """
tmp = attr
def setter(obj, data):
obj[tmp] = data
return obj
return setter
def itemappender(x):
def appender(data, value):
data.setdefault(x, list()).append(value)
return data
return appender
[docs]def nullsetter(obj, attr):
""" Return setter which drops the data. """
return obj
[docs]def constgetter(const):
""" Return itemgetter of constant immutable objects. """
return itertools.repeat(const).next
[docs]def uuidsetter(attr):
""" Return setter, which generates and sets new UUID. """
tmp = attr
def setter(obj):
obj[tmp] = str(uuid.uuid4())
return obj
return setter
[docs]def itemdeleter(attr):
""" Return deleter (function, which deletes particular key). """
tmp = attr
def deleter(obj):
obj.pop(attr, None)
return obj
return deleter
[docs]def dictupdater(d1, d2):
d1.update(d2)
return d1
[docs]def selfgetter(obj):
return obj
_undef = type(str("_undef"), (object,), {})()
def itemgetter(key, default=_undef):
if default is _undef:
return operator.itemgetter(key)
else:
def getter(obj):
try:
return obj[key]
except (KeyError, IndexError):
return default
return getter
[docs]class Object(object):
""" Base object with nicer __str__. """
__slots__ = ()
def __str__(self):
""" Expects instance variables with the same names as in __init__ constructor, tries to
fetch them and construct python looking object description.
"""
try:
attrs = self.__init__.func_code.co_varnames[1:self.__init__.func_code.co_argcount]
eq_str = ["%s=%r" % (attr, getattr(self, attr, None)) for attr in attrs]
except AttributeError:
eq_str = ()
return "%s(%s)" % (type(self).__name__, ", ".join(eq_str))
[docs]class Cog(Object):
""" Base class to distinct cogs from other objects. """
__slots__ = ()
[docs]class catch(Object):
""" Context manager for silencing and logging exceptions.
Useful for data transformation with missing or possibly malformed data.
Usage::
with catch():
1=0 # This will only be logged and execution continues on
"""
__slots__ = ("msg",)
[docs] def __init__(self, msg="Silenced exception during conversion"):
""" Initialize catch.
:param msg: Optional msg to log along with the details.
"""
self.msg = msg
def __enter__(self):
pass
def __exit__(self, exctype, excinst, exctb):
if exctype is not None and issubclass(exctype, Exception):
logging.debug(self.msg, exc_info=sys.exc_info())
return True
return False
[docs]class setn(Object):
""" Context manager for conditional setting of dict key if value
is not empty (more precisely: does not evaluate to False in
`bool` context.
Useful for data transformations which may or may not yield a values.
Usage::
with setn(src, "IP4", []) as ip_list:
ip_list[:] = try_to_extract_ip_from_wherever():
# src["IP4"] exists only if resulting ip_list is not empty
"""
__slots__ = ("dictionary", "attribute", "mutable")
[docs] def __init__(self, dictionary, attribute, mutable=None):
""" Initialize setn.
:param dictionary: Dict-like object to modify at context closing.
:param attribute: Dict attribute to set.
:param mutable: Mutable object, which will be assigned (or not).
"""
self.dictionary = dictionary
self.attribute = attribute
self.mutable = [] if mutable is None else mutable
def __enter__(self):
return self.mutable
def __exit__(self, exctype, excinst, exctb):
if exctype is None and self.mutable:
self.dictionary[self.attribute] = self.mutable
[docs]class appendn(Object):
""" Context manager for conditional appending of value if value
is not empty (more precisely: does not evaluate to False in
`bool` context.
Useful for data transformations which may or may not yield a values.
Usage::
with appendn(src_list, {}) as src:
src.update(from_somewhere)
# src is appended to src_list only if src is not empty
"""
__slots__ = ("lst", "mutable")
[docs] def __init__(self, lst, mutable=None):
""" Initialize appendn.
:param lst: List-like object to modify at context closing.
:param mutable: Mutable object, which will be assigned (or not).
"""
self.lst = lst
self.mutable = {} if mutable is None else mutable
def __enter__(self):
return self.mutable
def __exit__(self, exctype, excinst, exctb):
if exctype is None and self.mutable:
self.lst.append(self.mutable)
# Inotify constants
_INotifyTuple = namedtuple("_INotifyTuple", ("mask", "path", "what", "handle", "args", "kwargs"))
_INOTIFY_HEADER = "iIII"
_INOTIFY_HEADER_LEN = struct.calcsize(_INOTIFY_HEADER)
[docs]class INotify(object):
""" Holder for inotify flag constants. """
ACCESS = 0x00000001 # File was accessed
MODIFY = 0x00000002 # File was modified
ATTRIB = 0x00000004 # Metadata changed
CLOSE_WRITE = 0x00000008 # Writable file was closed
CLOSE_NOWRITE = 0x00000010 # Unwritable file closed
OPEN = 0x00000020 # File was opened
MOVED_FROM = 0x00000040 # File was moved from X
MOVED_TO = 0x00000080 # File was moved to Y
CREATE = 0x00000100 # Subfile was created
DELETE = 0x00000200 # Subfile was deleted
DELETE_SELF = 0x00000400 # Watched itself was deleted
MOVE_SELF = 0x00000800 # Watched itself was moved
UNMOUNT = 0x00002000 # Backing fs was unmounted
Q_OVERFLOW = 0x00004000 # Event queue overflowed
IGNORED = 0x00008000 # File was ignored
ONLYDIR = 0x01000000 # Only watch the path if it is a directory
DONT_FOLLOW = 0x02000000 # Don't follow a symlink
EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects
MASK_ADD = 0x20000000 # Add to the mask of an already existing watch
ISDIR = 0x40000000 # Event occurred against dir
ONESHOT = 0x80000000 # Only send event once
_PollTuple = namedtuple("_PollTuple", ("mask", "what", "handle", "args", "kwargs"))
_TickTuple = namedtuple("_TickTuple", ("time", "what", "handle", "args", "kwargs"))
# Event management machinery
[docs]class DummyEscapement(Object):
""" Escapement, which does nothing, mostly silently.
Useful during shutdown phase, when cogs don't want to schedule new events,
however also don't want to riddle code with conditionals. Just clean up,
set escapement to dummy_escapement singleton once for all and shrug.
"""
[docs] def dummy(self, *args, **kwargs):
logging.debug("Dummy Escapement call")
at = after = unschedule = enqueue = dequeue = dummy
fd_register = fd_unregister = dummy
signal_register = signal_unregister = dummy
inotify_add = inotify_rm = dummy
inotify = False
#: Dummy escapement singleton. See :py:class:`deadbeat.movement.DummyEscapement`.
dummy_escapement = DummyEscapement()
class EINTRException(Exception):
""" Exception to make epoll interruption PEP 475 compatible. """
[docs]class Escapement(Object):
""" Class, taking care of all event input and housekeeping. Every application
must have one and only one this class, and use only non-blocking
functions/calls/descriptors to be able to be really event driven and
not stomp on its own feet.
Other application code can use methods of this singleton to plan new
events on specific times, set handlers on signals, put epolls on
sockets and set inotify watches on files.
It is also expected that this class will be wrapped within code, which
actually acts upon returned events and calls corresponding callables.
"""
[docs] def __init__(self):
""" Initialize Escapement, set up queues, epoll and inotify. """
self.prique = []
self.deque = deque()
self.epoll = select.epoll()
self.fds = {}
self.signals = {}
self.pri_times = {}
self.deq_counts = defaultdict(int)
self._inotify_init()
[docs] def at(self, timestamp, what, *args, **kwargs):
""" Plan event at specific time.
:param timestamp: Unix timestamp.
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the event to manage or remove in the future.
:param args: Callable's positional arguments.
:param kwargs: Callable's keyword arguments.
"""
handle = kwargs.pop("handle", None)
t = _TickTuple(timestamp, what, handle, args, kwargs)
heapq.heappush(self.prique, t)
self.pri_times[(what, handle)] = timestamp
[docs] def after(self, delay, what, *args, **kwargs):
""" Plan event after time interval from now.
:param delay: Interval in seconds.
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the event to manage or remove in the future.
:param args: Callable's positional arguments.
:param kwargs: Callable's keyword arguments.
"""
self.at(time.time()+delay, what, *args, **kwargs)
[docs] def unschedule(self, what, handle=None):
""" Remove event scheduled by `at` or `after`. `what` and optional `handle`
denote specific timed event.
:param what: Callable.
:param handle: Optional specific handle.
"""
self.pri_times.pop((what, handle), None)
[docs] def enqueue(self, what, *args, **kwargs):
""" Enqueue event immediately.
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the event to manage or remove in the future.
:param timestamp: Optional keyword argument, timestamp will be returned along
with the event, however does not mean anything for Escapement itself.
:param prio: If True, enqueue to the beginning of the queue. Default False.
Meant for internal usage of the scheduler, don't use yourself.
:param args: Callable's positional arguments
:param kwargs: Callable's keyword arguments
"""
handle = kwargs.pop("handle", None)
t = _TickTuple(kwargs.pop("timestamp", None), what, handle, args, kwargs)
if kwargs.pop("prio", False):
self.deque.appendleft(t)
else:
self.deque.append(t)
self.deq_counts[(what, handle)] += 1
[docs] def dequeue(self, what, handle=None):
""" Remove enqueued event. `what` and optional `handle` denote specific event.
:param what: Callable
:param handle: Optional specific handle
"""
self.deq_counts.pop((what, handle), None)
[docs] def fd_register(self, fd, poll_mask, what, *args, **kwargs):
""" Register file descriptor for epoll events.
:param fd: File descriptor (or file-like object).
:param poll_mask: Polling event mask.
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the handler to manage or remove in the future.
:param args: Callable's positional arguments.
:param kwargs: Callable's keyword arguments.
For `fd` and `poll_mask` see `fd` and `eventmask` at :py:obj:`select.epoll`.
"""
handle = kwargs.pop("handle", None)
try:
fd = fd.fileno()
except AttributeError:
pass
try:
watchers = self.fds[fd]
watcher = watchers.pop((what, handle), None)
masks = [watcher.mask for key, watcher in watchers.items()]
rest_mask = old_mask = reduce(operator.or_, masks, 0)
if watcher:
old_mask |= watcher.mask
except KeyError:
watchers = {}
rest_mask = old_mask = 0
if poll_mask:
watchers[(what, handle)] = _PollTuple(poll_mask, what, handle, args, kwargs)
if watchers:
self.fds[fd] = watchers
else:
del self.fds[fd]
full_mask = rest_mask | poll_mask
if old_mask != full_mask:
try:
if old_mask and full_mask:
self.epoll.modify(fd, full_mask)
else:
if old_mask:
self.epoll.unregister(fd)
else:
self.epoll.register(fd, full_mask)
except OSError as e:
if e.errno == errno.EBADF:
# Someone out of our control probably closed fd, no big deal.
# It may be programming error, so better log it.
logging.debug("File descriptor probably already closed.")
else:
raise
[docs] def fd_unregister(self, fd, what, handle=None):
""" Unregister file descriptor from epoll events.
:param fd: File descriptor (or file-like object).
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the handler to manage or remove in the future.
"""
try:
fd = fd.fileno()
except AttributeError:
pass
self.fd_register(fd, 0, what, handle=handle)
[docs] def signal_register(self, sig_num, what, *args, **kwargs):
""" Register unix signal for events.
:param sig_num: Signal number. See also :py:obj:`signal.signal`.
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the handler to manage or remove in the future.
:param args: Callable's positional arguments.
:param kwargs: Callable's keyword arguments.
"""
try:
task_list = self.signals[sig_num]
except KeyError:
handler = self._signal_handler
signal.signal(sig_num, handler)
self.signals[sig_num] = task_list = list()
handle = kwargs.pop("handle", None)
t = _TickTuple(None, what, handle, args, kwargs)
task_list.append(t)
[docs] def signal_unregister(self, sig_num, what, handle=None):
""" Unregister unix signal from events. `what` and optional `handle` denote
specific handler.
:param sig_num: Signal number. See also :py:obj:`signal.signal`.
:param what: Callable.
:param handle: Optional specific handle.
"""
task_list = self.signals[sig_num]
for i in reversed(range(0, len(task_list))):
if task_list[i].what == what and task_list[i].handle == handle:
del task_list[i]
return len(task_list) # Caller may decide to set SIG_DFL himself if len==0
def _signal_handler(self, sig_num, _frame):
""" Real signal handler, converting signals to events according to handler lists. """
task_list = self.signals[sig_num]
timestamp = time.time()
for handler in task_list:
self.enqueue(handler.what, handle=handler.handle, timestamp=timestamp, *handler.args, **handler.kwargs)
if task_list:
raise EINTRException() # PEP 475 compatibility
def _inotify_init(self):
""" Inotify machinery initialization. Sets self.inotify bool to signal successful
initialization/inotify availability.
"""
for name in "c", "inotify":
try:
lib_name = ctypes.util.find_library(name)
self.inotify_lib = ctypes.CDLL(lib_name, use_errno=True)
self.inotify_lib.inotify_init.argtypes = []
self.inotify_lib.inotify_init.restype = ctypes.c_int
self.inotify_lib.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32]
self.inotify_lib.inotify_add_watch.restype = ctypes.c_int
self.inotify_lib.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
self.inotify_lib.inotify_rm_watch.restype = ctypes.c_int
except (OSError, IOError, AttributeError):
pass
try:
self.inotify_fd = self.inotify_lib.inotify_init()
flag = fcntl.fcntl(self.inotify_fd, fcntl.F_GETFL)
fcntl.fcntl(self.inotify_fd, fcntl.F_SETFL, flag|os.O_NONBLOCK)
self.fd_register(self.inotify_fd, select.EPOLLIN, self._inotify_handle)
self.inotify = True
except AttributeError:
self.inotify = False
self.filesystem_encoding = sys.getfilesystemencoding()
self.inotify_paths = {}
self.inotify_wds = {}
self.inotify_buf = b""
def _inotify_handle(self, poll_event, fd):
""" Real inotify handler - parses data from inotify socket and converts it to events. """
assert fd == self.inotify_fd
while True:
try:
new_data = os.read(fd, 1024)
except OSError as e:
if e.errno == errno.EAGAIN:
break
self.inotify_buf += new_data
pos = 0
while pos <= len(self.inotify_buf)-_INOTIFY_HEADER_LEN:
wd, emask, cookie, len_ = struct.unpack(_INOTIFY_HEADER, self.inotify_buf[pos:pos+_INOTIFY_HEADER_LEN])
fname = self.inotify_buf[pos+_INOTIFY_HEADER_LEN:pos+_INOTIFY_HEADER_LEN+len_]
if len(fname) < len_:
break
fname = fname.rstrip(b"\0").decode(self.filesystem_encoding)
pos += _INOTIFY_HEADER_LEN + len_
try:
abspath, watchers = self.inotify_wds[wd]
except KeyError:
break
timestamp = None
for mask, path, what, handle, args, kwargs in watchers.values():
if mask & emask:
timestamp = timestamp or time.time()
kwargs["path"] = path
kwargs["mask"] = emask
kwargs["cookie"] = cookie
kwargs["name"] = fname
self.enqueue(what, timestamp=timestamp, handle=handle, *args, **kwargs)
self.inotify_buf = self.inotify_buf[pos:]
[docs] def inotify_add(self, path, mask, what, *args, **kwargs):
""" Register path for inotify events.
:param path: Path to watch.
:param mask: Inotify mask (see constants in :py:obj:`movement.INotify`).
:param what: Callable.
:param handle: Optional keyword argument, by which caller can denote
different instances of the event to manage or remove in the future.
:param args: Callable's positional arguments.
:param kwargs: Callable's keyword arguments.
"""
handle = kwargs.pop("handle", None)
abspath = pth.abspath(path)
strbuf = ctypes.create_string_buffer(abspath.encode(self.filesystem_encoding))
wd = self.inotify_paths.get(abspath)
if wd:
_, watchers = self.inotify_wds[wd]
masks = [watcher.mask for key, watcher in watchers.items()]
full_mask = reduce(operator.or_, masks) | mask
else:
full_mask = mask
newwd = self.inotify_lib.inotify_add_watch(self.inotify_fd, strbuf, full_mask)
if wd:
assert wd == newwd
else:
self.inotify_paths[abspath] = newwd
watchers = {}
self.inotify_wds[newwd] = abspath, watchers
watchers[(what, handle)] = _INotifyTuple(mask, path, what, handle, args, kwargs)
[docs] def inotify_rm(self, path, what, handle=None):
""" Unregister path from inotify events.
:param path: Path to watch.
:param what: Callable.
:param handle: Optional specific handle.
"""
abspath = pth.abspath(path)
try:
wd = self.inotify_paths[abspath]
except KeyError:
return
_, watchers = self.inotify_wds[wd]
watchers.pop((what, handle), None)
if not watchers:
self.inotify_lib.inotify_rm_watch(self.inotify_fd, wd)
del self.inotify_wds[wd]
del self.inotify_paths[abspath]
[docs] def wait(self, blocking=True):
""" Main event processing call.
Application should call `wait` periodically to wait for new events and to get
them from the queue.
:param blocking: Don't wait, just check inputs and return events already in the queue.
May be used to check whether the queue is empty or not.
Use sparsely, otherwise the purpose of event management gets defeated.
"""
while self.deque:
tick = self.deque[0]
if (tick.what, tick.handle) not in self.deq_counts:
self.deque.popleft()
else:
break
tick = None
now = time.time()
while self.prique:
tick = self.prique[0]
if self.pri_times.get((tick.what, tick.handle)) == tick.time:
if tick.time > now:
break
self.pri_times.pop((tick.what, tick.handle), None)
timestamp, what, handle, args, kwargs = heapq.heappop(self.prique)
self.enqueue(what, timestamp=now, handle=handle, *args, **kwargs)
else:
heapq.heappop(self.prique)
tick = None
if blocking and (not self.deque or tick and tick.time > now):
delay = tick.time - now if tick else -1
try:
poll_events = self.epoll.poll(delay)
except EINTRException: # PEP 475 compatibility
poll_events = ()
now = time.time()
else:
poll_events = self.epoll.poll(0)
for fd, poll_event in poll_events:
watchers = self.fds[fd]
for mask, what, handle, args, kwargs in watchers.values():
if mask & poll_event:
kwargs["poll_event"] = poll_event & mask
kwargs["fd"] = fd
self.enqueue(what, timestamp=now, handle=handle, *args, **kwargs)
while self.prique:
tick = self.prique[0]
if self.pri_times.get((tick.what, tick.handle)) == tick.time:
if tick.time > now:
break
self.pri_times.pop((tick.what, tick.handle), None)
timestamp, what, handle, args, kwargs = heapq.heappop(self.prique)
self.enqueue(what, timestamp=now, handle=handle, *args, **kwargs)
else:
heapq.heappop(self.prique)
while self.deque:
tick = self.deque.popleft()
if (tick.what, tick.handle) not in self.deq_counts:
continue
self.deq_counts[(tick.what, tick.handle)] -= 1
if not self.deq_counts[(tick.what, tick.handle)]:
del self.deq_counts[(tick.what, tick.handle)]
return tick
_first = type(str("_First"), (Object,), {})()
_last = type(str("_Last"), (Object,), {})()
[docs]class TrainChooser(Cog):
""" Base class for deriving data flow managers. """
[docs] def __init__(self, train, subtrains=()):
self.train = train
self.subtrains = subtrains
[docs] def getSubtrains(self):
return self.subtrains
[docs] def getAllNexts(self):
return [subtrain[_first] for subtrain in self.getSubtrains()]
[docs]class RoundRobinChooser(TrainChooser):
""" Send each incoming data into different fork. """
[docs] def __init__(self, train, subtrains=()):
TrainChooser.__init__(self, train, subtrains)
self.last_chosen = 0
def __call__(self, data):
nxt = self.subtrains[self.last_chosen][_first]
self.last_chosen += 1
return {nxt: (data,)}
[docs]class CaseChooser(TrainChooser):
""" Send incoming data into different fork based on value of attribute. """
[docs] def __init__(self, train, val_getter=None, id_getter=None,
options=(), subtrains=(), default_subtrain=None):
TrainChooser.__init__(self, train, subtrains)
self.val_getter = val_getter or itemgetter("data")
self.id_getter = id_getter or itemgetter("ID")
self.options = options
self.default_subtrain = default_subtrain
self.default_first = self.default_subtrain[_first] if self.default_subtrain else None
self.switch = {}
for option, subtrain in zip(self.options, self.subtrains):
if isinstance(option, Sequence) and not isinstance(option, string_types):
for subopt in option:
self.switch[subopt] = subtrain[_first]
else:
self.switch[option] = subtrain[_first]
[docs] def getSubtrains(self):
return itertools.chain(self.subtrains, (self.default_subtrain,) or ())
def __call__(self, data):
val = self.val_getter(data)
eid = self.id_getter(data)
nxt = self.switch.get(val, self.default_first)
if not nxt:
logging.info("%s: No next cog to continue based on %s.", eid, val)
return None
else:
logging.info("%s: Branched based on %s.", eid, val)
return {nxt: (data,)}
[docs]def train_line(*seq):
""" Create line of cogs. """
result = {}
for cog, nxt in zip((_first,) + seq, seq + (_last,)):
if isinstance(cog, Mapping):
for key, value in cog.items():
if value is _last:
result[key] = nxt
elif key is not _first:
result[key] = value
elif isinstance(cog, TrainChooser):
for subtrain in cog.getSubtrains():
for key, value in subtrain.items():
if value is _last:
result[key] = nxt
elif key is not _first:
result[key] = value
elif isinstance(nxt, Mapping):
result[cog] = nxt[_first]
else:
result[cog] = nxt
return result
[docs]class Train(Object):
""" Class, taking care of event management, pipeline and running tasks.
Every application must have one and only one this class, and use only
non-blocking functions/calls/descriptors to be able to be really event
driven and not stomp on its own feet.
Other application code can use methods of this singleton to broadcast
events, and to access Escapement singleton.
"""
[docs] def __init__(self, esc=None, train=None, id_get=None):
""" Initialize Train, register daemon signals.
:param esc: Escapement singleton. If not specified (recommended), new
default instance is created and used.
:param train: Train definition dictionary (use train_* helpers to construct).
:param id_get: ID getter for prepending log messages.
"""
self.esc = esc or Escapement()
self.id_get = id_get or itemgetter("ID")
self.update(train)
self.notify_cache = {}
self.state = "running"
self.esc.signal_register(signal.SIGTERM, self._shutdown_commence)
self.esc.signal_register(signal.SIGINT, self._shutdown_commence)
[docs] def update(self, train):
""" Update train.
Sometimes cogs need access to Train during instantiation, and train definition
is not yet complete, so creating instance without train definition and then
updating to real thing may be necessary.
"""
self.train = train or {}
self.cogs = self._topo_sort(self.train)
def __call__(self):
""" Main application.
Takes care of all event management (by means of Escapement) and correct shutting down
of the application.
"""
res = None
while self.state!="done":
kick = self.esc.wait()
while kick:
_, cog, _, args, kwargs = kick
try:
eid = self.id_get(args[0])
except Exception:
eid = None
logging.debug("%s Calling %s with args: %s kwargs: %s", eid or "", cog, args, kwargs)
try:
res = cog(*args, **kwargs)
except Exception as e:
logging.exception("%s exception on %s", eid or "", cog)
res = None
if eid:
self.notify("event_error", eid)
if res:
logging.debug("%s Output of %s: %s", eid or "", type(cog).__name__, res)
if isinstance(cog, TrainChooser):
for next_cog, data in res.items():
for datum in reversed(data):
self.esc.enqueue(next_cog, datum, prio=True)
else:
next_cog = self.train.get(cog)
if not next_cog and hasattr(cog, "__self__"):
logging.debug("%s Getting next cog from noncanonical method %s", eid or "", cog)
next_cog = self.train.get(cog.__self__)
if not next_cog:
logging.debug("%s Did not find next cog %s, ending event", eid or "", cog)
else:
for datum in reversed(res):
self.esc.enqueue(next_cog, datum, prio=True)
if self.state == "finishing":
self._shutdown_next()
kick = self.esc.wait(blocking=False)
while not kick:
if self.state == "finishing_next":
self._shutdown_next()
elif self.state == "shutting":
self.state = "done"
else:
break
kick = self.esc.wait(blocking=False)
[docs] def notify(self, attr, *args, **kwargs):
""" Send notification to all cogs with `attr` method.
:param attr: Name of the method to be called.
:param args: Method's positional arguments.
:param kwargs: Method's keyword arguments.
"""
event_id = kwargs.pop("event_id", None)
try:
cog_list = self.notify_cache[attr]
except KeyError:
cog_list = self.notify_cache[attr] = []
for cog in self.cogs:
if attr is not None:
try:
method = getattr(cog, attr)
except AttributeError:
continue
else:
method = cog
if method not in cog_list:
cog_list.append(method)
for method in cog_list:
logging.debug("%s Enqueue %s, %s, %s", event_id or "", method, args, kwargs)
self.esc.enqueue(method, *args, **kwargs)
def _topo_sort(self, train):
""" Helper to have handy cog list topologically sorted.
This also simplifies shutdown code.
"""
prev_cogs = {}
firsts = []
for cog, nextcog in train.items():
prev_cogs.setdefault(nextcog, set()).add(cog)
firsts = list(set(train).difference(prev_cogs))
topo = []
while firsts:
first = firsts.pop()
if first not in (_first, _last):
topo.append(first)
nxt = train.get(first)
if nxt:
ancestors = prev_cogs[nxt]
ancestors.discard(first)
if not ancestors:
firsts.append(nxt)
return topo
def _shutdown_commence(self):
""" Start shutdown protocol. """
if self.state == "running":
self.state = "finishing"
self.shutting_cogs = self.cogs[::-1]
self.esc.after(10, self._shutdown_soft)
def _shutdown_soft(self):
""" Finish shutdown protocol. """
self.notify("done")
self.state = "shutting"
self.esc.after(5, self._shutdown_hard)
def _shutdown_hard(self):
""" Finish unconditionally if shutdown timed out. """
self.state = "done"
def _shutdown_next(self):
""" Shutdown one cog in topological order.
This allows to tell cog to finish its function and flush yet unprocessed data
down the pipeline, Train then waits until the event queue is empty again (all
descendant cogs had its time to process events from finishing cog and
_shutdown_next is called again.
"""
what = None
self.state = "finishing_next"
while not what and self.shutting_cogs:
cog = self.shutting_cogs.pop()
what = getattr(cog, "finish", None)
if what:
self.esc.enqueue(what)
logging.debug("Finishing %s", str(what))
else:
self.notify("done")
self.state = "shutting"
[docs] def to_dot(self):
""" Generate "dot" representation of cog train graph. """
out = list()
out.append("digraph GearTrain {")
out.append("# Nodes")
for cog in self.cogs:
shape = "hexagon" if isinstance(cog, TrainChooser) else "ellipse"
out.append('"%x" [label="%s (%x)" shape="%s"]' % (id(cog), type(cog).__name__, id(cog), shape))
out.append("# Connections")
for cog, nextcog in self.train.items():
if isinstance(nextcog, TrainChooser):
out.append('"%x" -> "%x"' % (id(cog), id(nextcog)))
cog = nextcog
nextcogs = nextcog.getAllNexts()
else:
nextcogs = [nextcog]
for nextcog in nextcogs:
if cog not in (_first, _last) and nextcog not in (_first, _last):
out.append('"%x" -> "%x"' % (id(cog), id(nextcog)))
out.append('{ rank=sink; Legend [shape=none, margin=0, label=<')
out.append('<table cellspacing="0">')
out.append("# Legend")
for cog in self.cogs:
label = str(cog).replace(">", "").replace("<", "")
out.append('<tr><td align="left">%s (%x)</td><td align="left">%s</td></tr>' % (
type(cog).__name__, id(cog), label))
out.append("</table>>];}")
out.append("}")
return "\n".join(out)