Source code for deadbeat.movement

# -*- coding: utf-8 -*-

""" Main time, poll and inotify event management and pipeline machinery for DeadBeat """

from __future__ import absolute_import, division, print_function, unicode_literals

import time
import operator
import heapq
import select
import uuid
import sys
import signal
import errno
import os
import os.path as pth
import ctypes
import struct
import fcntl
import itertools
import logging
import operator
from collections import namedtuple, deque, Mapping, Sequence, defaultdict

__all__ = [
    "Object", "Cog", "DummyEscapement", "Escapement", "Train", "INotify",
    "train_line", "TrainChooser", "RoundRobinChooser", "CaseChooser",
    "itemsetter", "nullsetter", "uuidsetter", "dictupdater",
    "itemdeleter", "constgetter", "selfgetter",
    "basestring", "string_types",
    "catch", "setn", "appendn"
]

# Compatibility helpers

try:
    reduce
except NameError:
    from functools import reduce

try:
    #: :py:data:`basestring` for Py3 compatibility
    basestring = basestring
    #: List of string types for Py3 compatibility
    string_types = (str, unicode)
    str_type = unicode
    bytes_type = str
except NameError:
    #: :py:data:`basestring` for Py3 compatibility
    basestring = str
    #: List of string types for Py3 compatibility
    string_types = (bytes, str)
    str_type = str
    bytes_type = bytes

# Getters/setters

[docs]def itemsetter(attr): """ Return setter for specific key. """ tmp = attr def setter(obj, data): obj[tmp] = data return obj return setter
def itemappender(x): def appender(data, value): data.setdefault(x, list()).append(value) return data return appender
[docs]def nullsetter(obj, attr): """ Return setter which drops the data. """ return obj
[docs]def constgetter(const): """ Return itemgetter of constant immutable objects. """ return itertools.repeat(const).next
[docs]def uuidsetter(attr): """ Return setter, which generates and sets new UUID. """ tmp = attr def setter(obj): obj[tmp] = str(uuid.uuid4()) return obj return setter
[docs]def itemdeleter(attr): """ Return deleter (function, which deletes particular key). """ tmp = attr def deleter(obj): obj.pop(attr, None) return obj return deleter
[docs]def dictupdater(d1, d2): d1.update(d2) return d1
[docs]def selfgetter(obj): return obj
_undef = type(str("_undef"), (object,), {})() def itemgetter(key, default=_undef): if default is _undef: return operator.itemgetter(key) else: def getter(obj): try: return obj[key] except (KeyError, IndexError): return default return getter
[docs]class Object(object): """ Base object with nicer __str__. """ __slots__ = () def __str__(self): """ Expects instance variables with the same names as in __init__ constructor, tries to fetch them and construct python looking object description. """ try: attrs = self.__init__.func_code.co_varnames[1:self.__init__.func_code.co_argcount] eq_str = ["%s=%r" % (attr, getattr(self, attr, None)) for attr in attrs] except AttributeError: eq_str = () return "%s(%s)" % (type(self).__name__, ", ".join(eq_str))
[docs]class Cog(Object): """ Base class to distinct cogs from other objects. """ __slots__ = ()
[docs]class catch(Object): """ Context manager for silencing and logging exceptions. Useful for data transformation with missing or possibly malformed data. Usage:: with catch(): 1=0 # This will only be logged and execution continues on """ __slots__ = ("msg",)
[docs] def __init__(self, msg="Silenced exception during conversion"): """ Initialize catch. :param msg: Optional msg to log along with the details. """ self.msg = msg
def __enter__(self): pass def __exit__(self, exctype, excinst, exctb): if exctype is not None and issubclass(exctype, Exception): logging.debug(self.msg, exc_info=sys.exc_info()) return True return False
[docs]class setn(Object): """ Context manager for conditional setting of dict key if value is not empty (more precisely: does not evaluate to False in `bool` context. Useful for data transformations which may or may not yield a values. Usage:: with setn(src, "IP4", []) as ip_list: ip_list[:] = try_to_extract_ip_from_wherever(): # src["IP4"] exists only if resulting ip_list is not empty """ __slots__ = ("dictionary", "attribute", "mutable")
[docs] def __init__(self, dictionary, attribute, mutable=None): """ Initialize setn. :param dictionary: Dict-like object to modify at context closing. :param attribute: Dict attribute to set. :param mutable: Mutable object, which will be assigned (or not). """ self.dictionary = dictionary self.attribute = attribute self.mutable = [] if mutable is None else mutable
def __enter__(self): return self.mutable def __exit__(self, exctype, excinst, exctb): if exctype is None and self.mutable: self.dictionary[self.attribute] = self.mutable
[docs]class appendn(Object): """ Context manager for conditional appending of value if value is not empty (more precisely: does not evaluate to False in `bool` context. Useful for data transformations which may or may not yield a values. Usage:: with appendn(src_list, {}) as src: src.update(from_somewhere) # src is appended to src_list only if src is not empty """ __slots__ = ("lst", "mutable")
[docs] def __init__(self, lst, mutable=None): """ Initialize appendn. :param lst: List-like object to modify at context closing. :param mutable: Mutable object, which will be assigned (or not). """ self.lst = lst self.mutable = {} if mutable is None else mutable
def __enter__(self): return self.mutable def __exit__(self, exctype, excinst, exctb): if exctype is None and self.mutable: self.lst.append(self.mutable)
# Inotify constants _INotifyTuple = namedtuple("_INotifyTuple", ("mask", "path", "what", "handle", "args", "kwargs")) _INOTIFY_HEADER = "iIII" _INOTIFY_HEADER_LEN = struct.calcsize(_INOTIFY_HEADER)
[docs]class INotify(object): """ Holder for inotify flag constants. """ ACCESS = 0x00000001 # File was accessed MODIFY = 0x00000002 # File was modified ATTRIB = 0x00000004 # Metadata changed CLOSE_WRITE = 0x00000008 # Writable file was closed CLOSE_NOWRITE = 0x00000010 # Unwritable file closed OPEN = 0x00000020 # File was opened MOVED_FROM = 0x00000040 # File was moved from X MOVED_TO = 0x00000080 # File was moved to Y CREATE = 0x00000100 # Subfile was created DELETE = 0x00000200 # Subfile was deleted DELETE_SELF = 0x00000400 # Watched itself was deleted MOVE_SELF = 0x00000800 # Watched itself was moved UNMOUNT = 0x00002000 # Backing fs was unmounted Q_OVERFLOW = 0x00004000 # Event queue overflowed IGNORED = 0x00008000 # File was ignored ONLYDIR = 0x01000000 # Only watch the path if it is a directory DONT_FOLLOW = 0x02000000 # Don't follow a symlink EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects MASK_ADD = 0x20000000 # Add to the mask of an already existing watch ISDIR = 0x40000000 # Event occurred against dir ONESHOT = 0x80000000 # Only send event once
_PollTuple = namedtuple("_PollTuple", ("mask", "what", "handle", "args", "kwargs")) _TickTuple = namedtuple("_TickTuple", ("time", "what", "handle", "args", "kwargs")) # Event management machinery
[docs]class DummyEscapement(Object): """ Escapement, which does nothing, mostly silently. Useful during shutdown phase, when cogs don't want to schedule new events, however also don't want to riddle code with conditionals. Just clean up, set escapement to dummy_escapement singleton once for all and shrug. """
[docs] def dummy(self, *args, **kwargs): logging.debug("Dummy Escapement call")
at = after = unschedule = enqueue = dequeue = dummy fd_register = fd_unregister = dummy signal_register = signal_unregister = dummy inotify_add = inotify_rm = dummy inotify = False
#: Dummy escapement singleton. See :py:class:`deadbeat.movement.DummyEscapement`. dummy_escapement = DummyEscapement() class EINTRException(Exception): """ Exception to make epoll interruption PEP 475 compatible. """
[docs]class Escapement(Object): """ Class, taking care of all event input and housekeeping. Every application must have one and only one this class, and use only non-blocking functions/calls/descriptors to be able to be really event driven and not stomp on its own feet. Other application code can use methods of this singleton to plan new events on specific times, set handlers on signals, put epolls on sockets and set inotify watches on files. It is also expected that this class will be wrapped within code, which actually acts upon returned events and calls corresponding callables. """
[docs] def __init__(self): """ Initialize Escapement, set up queues, epoll and inotify. """ self.prique = [] self.deque = deque() self.epoll = select.epoll() self.fds = {} self.signals = {} self.pri_times = {} self.deq_counts = defaultdict(int) self._inotify_init()
[docs] def at(self, timestamp, what, *args, **kwargs): """ Plan event at specific time. :param timestamp: Unix timestamp. :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future. :param args: Callable's positional arguments. :param kwargs: Callable's keyword arguments. """ handle = kwargs.pop("handle", None) t = _TickTuple(timestamp, what, handle, args, kwargs) heapq.heappush(self.prique, t) self.pri_times[(what, handle)] = timestamp
[docs] def after(self, delay, what, *args, **kwargs): """ Plan event after time interval from now. :param delay: Interval in seconds. :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future. :param args: Callable's positional arguments. :param kwargs: Callable's keyword arguments. """ self.at(time.time()+delay, what, *args, **kwargs)
[docs] def unschedule(self, what, handle=None): """ Remove event scheduled by `at` or `after`. `what` and optional `handle` denote specific timed event. :param what: Callable. :param handle: Optional specific handle. """ self.pri_times.pop((what, handle), None)
[docs] def enqueue(self, what, *args, **kwargs): """ Enqueue event immediately. :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future. :param timestamp: Optional keyword argument, timestamp will be returned along with the event, however does not mean anything for Escapement itself. :param prio: If True, enqueue to the beginning of the queue. Default False. Meant for internal usage of the scheduler, don't use yourself. :param args: Callable's positional arguments :param kwargs: Callable's keyword arguments """ handle = kwargs.pop("handle", None) t = _TickTuple(kwargs.pop("timestamp", None), what, handle, args, kwargs) if kwargs.pop("prio", False): self.deque.appendleft(t) else: self.deque.append(t) self.deq_counts[(what, handle)] += 1
[docs] def dequeue(self, what, handle=None): """ Remove enqueued event. `what` and optional `handle` denote specific event. :param what: Callable :param handle: Optional specific handle """ self.deq_counts.pop((what, handle), None)
[docs] def fd_register(self, fd, poll_mask, what, *args, **kwargs): """ Register file descriptor for epoll events. :param fd: File descriptor (or file-like object). :param poll_mask: Polling event mask. :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future. :param args: Callable's positional arguments. :param kwargs: Callable's keyword arguments. For `fd` and `poll_mask` see `fd` and `eventmask` at :py:obj:`select.epoll`. """ handle = kwargs.pop("handle", None) try: fd = fd.fileno() except AttributeError: pass try: watchers = self.fds[fd] watcher = watchers.pop((what, handle), None) masks = [watcher.mask for key, watcher in watchers.items()] rest_mask = old_mask = reduce(operator.or_, masks, 0) if watcher: old_mask |= watcher.mask except KeyError: watchers = {} rest_mask = old_mask = 0 if poll_mask: watchers[(what, handle)] = _PollTuple(poll_mask, what, handle, args, kwargs) if watchers: self.fds[fd] = watchers else: del self.fds[fd] full_mask = rest_mask | poll_mask if old_mask != full_mask: try: if old_mask and full_mask: self.epoll.modify(fd, full_mask) else: if old_mask: self.epoll.unregister(fd) else: self.epoll.register(fd, full_mask) except OSError as e: if e.errno == errno.EBADF: # Someone out of our control probably closed fd, no big deal. # It may be programming error, so better log it. logging.debug("File descriptor probably already closed.") else: raise
[docs] def fd_unregister(self, fd, what, handle=None): """ Unregister file descriptor from epoll events. :param fd: File descriptor (or file-like object). :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future. """ try: fd = fd.fileno() except AttributeError: pass self.fd_register(fd, 0, what, handle=handle)
[docs] def signal_register(self, sig_num, what, *args, **kwargs): """ Register unix signal for events. :param sig_num: Signal number. See also :py:obj:`signal.signal`. :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future. :param args: Callable's positional arguments. :param kwargs: Callable's keyword arguments. """ try: task_list = self.signals[sig_num] except KeyError: handler = self._signal_handler signal.signal(sig_num, handler) self.signals[sig_num] = task_list = list() handle = kwargs.pop("handle", None) t = _TickTuple(None, what, handle, args, kwargs) task_list.append(t)
[docs] def signal_unregister(self, sig_num, what, handle=None): """ Unregister unix signal from events. `what` and optional `handle` denote specific handler. :param sig_num: Signal number. See also :py:obj:`signal.signal`. :param what: Callable. :param handle: Optional specific handle. """ task_list = self.signals[sig_num] for i in reversed(range(0, len(task_list))): if task_list[i].what == what and task_list[i].handle == handle: del task_list[i] return len(task_list) # Caller may decide to set SIG_DFL himself if len==0
def _signal_handler(self, sig_num, _frame): """ Real signal handler, converting signals to events according to handler lists. """ task_list = self.signals[sig_num] timestamp = time.time() for handler in task_list: self.enqueue(handler.what, handle=handler.handle, timestamp=timestamp, *handler.args, **handler.kwargs) if task_list: raise EINTRException() # PEP 475 compatibility def _inotify_init(self): """ Inotify machinery initialization. Sets self.inotify bool to signal successful initialization/inotify availability. """ for name in "c", "inotify": try: lib_name = ctypes.util.find_library(name) self.inotify_lib = ctypes.CDLL(lib_name, use_errno=True) self.inotify_lib.inotify_init.argtypes = [] self.inotify_lib.inotify_init.restype = ctypes.c_int self.inotify_lib.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32] self.inotify_lib.inotify_add_watch.restype = ctypes.c_int self.inotify_lib.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] self.inotify_lib.inotify_rm_watch.restype = ctypes.c_int except (OSError, IOError, AttributeError): pass try: self.inotify_fd = self.inotify_lib.inotify_init() flag = fcntl.fcntl(self.inotify_fd, fcntl.F_GETFL) fcntl.fcntl(self.inotify_fd, fcntl.F_SETFL, flag|os.O_NONBLOCK) self.fd_register(self.inotify_fd, select.EPOLLIN, self._inotify_handle) self.inotify = True except AttributeError: self.inotify = False self.filesystem_encoding = sys.getfilesystemencoding() self.inotify_paths = {} self.inotify_wds = {} self.inotify_buf = b"" def _inotify_handle(self, poll_event, fd): """ Real inotify handler - parses data from inotify socket and converts it to events. """ assert fd == self.inotify_fd while True: try: new_data = os.read(fd, 1024) except OSError as e: if e.errno == errno.EAGAIN: break self.inotify_buf += new_data pos = 0 while pos <= len(self.inotify_buf)-_INOTIFY_HEADER_LEN: wd, emask, cookie, len_ = struct.unpack(_INOTIFY_HEADER, self.inotify_buf[pos:pos+_INOTIFY_HEADER_LEN]) fname = self.inotify_buf[pos+_INOTIFY_HEADER_LEN:pos+_INOTIFY_HEADER_LEN+len_] if len(fname) < len_: break fname = fname.rstrip(b"\0").decode(self.filesystem_encoding) pos += _INOTIFY_HEADER_LEN + len_ try: abspath, watchers = self.inotify_wds[wd] except KeyError: break timestamp = None for mask, path, what, handle, args, kwargs in watchers.values(): if mask & emask: timestamp = timestamp or time.time() kwargs["path"] = path kwargs["mask"] = emask kwargs["cookie"] = cookie kwargs["name"] = fname self.enqueue(what, timestamp=timestamp, handle=handle, *args, **kwargs) self.inotify_buf = self.inotify_buf[pos:]
[docs] def inotify_add(self, path, mask, what, *args, **kwargs): """ Register path for inotify events. :param path: Path to watch. :param mask: Inotify mask (see constants in :py:obj:`movement.INotify`). :param what: Callable. :param handle: Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future. :param args: Callable's positional arguments. :param kwargs: Callable's keyword arguments. """ handle = kwargs.pop("handle", None) abspath = pth.abspath(path) strbuf = ctypes.create_string_buffer(abspath.encode(self.filesystem_encoding)) wd = self.inotify_paths.get(abspath) if wd: _, watchers = self.inotify_wds[wd] masks = [watcher.mask for key, watcher in watchers.items()] full_mask = reduce(operator.or_, masks) | mask else: full_mask = mask newwd = self.inotify_lib.inotify_add_watch(self.inotify_fd, strbuf, full_mask) if wd: assert wd == newwd else: self.inotify_paths[abspath] = newwd watchers = {} self.inotify_wds[newwd] = abspath, watchers watchers[(what, handle)] = _INotifyTuple(mask, path, what, handle, args, kwargs)
[docs] def inotify_rm(self, path, what, handle=None): """ Unregister path from inotify events. :param path: Path to watch. :param what: Callable. :param handle: Optional specific handle. """ abspath = pth.abspath(path) try: wd = self.inotify_paths[abspath] except KeyError: return _, watchers = self.inotify_wds[wd] watchers.pop((what, handle), None) if not watchers: self.inotify_lib.inotify_rm_watch(self.inotify_fd, wd) del self.inotify_wds[wd] del self.inotify_paths[abspath]
[docs] def wait(self, blocking=True): """ Main event processing call. Application should call `wait` periodically to wait for new events and to get them from the queue. :param blocking: Don't wait, just check inputs and return events already in the queue. May be used to check whether the queue is empty or not. Use sparsely, otherwise the purpose of event management gets defeated. """ while self.deque: tick = self.deque[0] if (tick.what, tick.handle) not in self.deq_counts: self.deque.popleft() else: break tick = None now = time.time() while self.prique: tick = self.prique[0] if self.pri_times.get((tick.what, tick.handle)) == tick.time: if tick.time > now: break self.pri_times.pop((tick.what, tick.handle), None) timestamp, what, handle, args, kwargs = heapq.heappop(self.prique) self.enqueue(what, timestamp=now, handle=handle, *args, **kwargs) else: heapq.heappop(self.prique) tick = None if blocking and (not self.deque or tick and tick.time > now): delay = tick.time - now if tick else -1 try: poll_events = self.epoll.poll(delay) except EINTRException: # PEP 475 compatibility poll_events = () now = time.time() else: poll_events = self.epoll.poll(0) for fd, poll_event in poll_events: watchers = self.fds[fd] for mask, what, handle, args, kwargs in watchers.values(): if mask & poll_event: kwargs["poll_event"] = poll_event & mask kwargs["fd"] = fd self.enqueue(what, timestamp=now, handle=handle, *args, **kwargs) while self.prique: tick = self.prique[0] if self.pri_times.get((tick.what, tick.handle)) == tick.time: if tick.time > now: break self.pri_times.pop((tick.what, tick.handle), None) timestamp, what, handle, args, kwargs = heapq.heappop(self.prique) self.enqueue(what, timestamp=now, handle=handle, *args, **kwargs) else: heapq.heappop(self.prique) while self.deque: tick = self.deque.popleft() if (tick.what, tick.handle) not in self.deq_counts: continue self.deq_counts[(tick.what, tick.handle)] -= 1 if not self.deq_counts[(tick.what, tick.handle)]: del self.deq_counts[(tick.what, tick.handle)] return tick
_first = type(str("_First"), (Object,), {})() _last = type(str("_Last"), (Object,), {})()
[docs]class TrainChooser(Cog): """ Base class for deriving data flow managers. """
[docs] def __init__(self, train, subtrains=()): self.train = train self.subtrains = subtrains
[docs] def getSubtrains(self): return self.subtrains
[docs] def getAllNexts(self): return [subtrain[_first] for subtrain in self.getSubtrains()]
[docs]class RoundRobinChooser(TrainChooser): """ Send each incoming data into different fork. """
[docs] def __init__(self, train, subtrains=()): TrainChooser.__init__(self, train, subtrains) self.last_chosen = 0
def __call__(self, data): nxt = self.subtrains[self.last_chosen][_first] self.last_chosen += 1 return {nxt: (data,)}
[docs]class CaseChooser(TrainChooser): """ Send incoming data into different fork based on value of attribute. """
[docs] def __init__(self, train, val_getter=None, id_getter=None, options=(), subtrains=(), default_subtrain=None): TrainChooser.__init__(self, train, subtrains) self.val_getter = val_getter or itemgetter("data") self.id_getter = id_getter or itemgetter("ID") self.options = options self.default_subtrain = default_subtrain self.default_first = self.default_subtrain[_first] if self.default_subtrain else None self.switch = {} for option, subtrain in zip(self.options, self.subtrains): if isinstance(option, Sequence) and not isinstance(option, string_types): for subopt in option: self.switch[subopt] = subtrain[_first] else: self.switch[option] = subtrain[_first]
[docs] def getSubtrains(self): return itertools.chain(self.subtrains, (self.default_subtrain,) or ())
def __call__(self, data): val = self.val_getter(data) eid = self.id_getter(data) nxt = self.switch.get(val, self.default_first) if not nxt: logging.info("%s: No next cog to continue based on %s.", eid, val) return None else: logging.info("%s: Branched based on %s.", eid, val) return {nxt: (data,)}
[docs]def train_line(*seq): """ Create line of cogs. """ result = {} for cog, nxt in zip((_first,) + seq, seq + (_last,)): if isinstance(cog, Mapping): for key, value in cog.items(): if value is _last: result[key] = nxt elif key is not _first: result[key] = value elif isinstance(cog, TrainChooser): for subtrain in cog.getSubtrains(): for key, value in subtrain.items(): if value is _last: result[key] = nxt elif key is not _first: result[key] = value elif isinstance(nxt, Mapping): result[cog] = nxt[_first] else: result[cog] = nxt return result
[docs]class Train(Object): """ Class, taking care of event management, pipeline and running tasks. Every application must have one and only one this class, and use only non-blocking functions/calls/descriptors to be able to be really event driven and not stomp on its own feet. Other application code can use methods of this singleton to broadcast events, and to access Escapement singleton. """
[docs] def __init__(self, esc=None, train=None, id_get=None): """ Initialize Train, register daemon signals. :param esc: Escapement singleton. If not specified (recommended), new default instance is created and used. :param train: Train definition dictionary (use train_* helpers to construct). :param id_get: ID getter for prepending log messages. """ self.esc = esc or Escapement() self.id_get = id_get or itemgetter("ID") self.update(train) self.notify_cache = {} self.state = "running" self.esc.signal_register(signal.SIGTERM, self._shutdown_commence) self.esc.signal_register(signal.SIGINT, self._shutdown_commence)
[docs] def update(self, train): """ Update train. Sometimes cogs need access to Train during instantiation, and train definition is not yet complete, so creating instance without train definition and then updating to real thing may be necessary. """ self.train = train or {} self.cogs = self._topo_sort(self.train)
def __call__(self): """ Main application. Takes care of all event management (by means of Escapement) and correct shutting down of the application. """ res = None while self.state!="done": kick = self.esc.wait() while kick: _, cog, _, args, kwargs = kick try: eid = self.id_get(args[0]) except Exception: eid = None logging.debug("%s Calling %s with args: %s kwargs: %s", eid or "", cog, args, kwargs) try: res = cog(*args, **kwargs) except Exception as e: logging.exception("%s exception on %s", eid or "", cog) res = None if eid: self.notify("event_error", eid) if res: logging.debug("%s Output of %s: %s", eid or "", type(cog).__name__, res) if isinstance(cog, TrainChooser): for next_cog, data in res.items(): for datum in reversed(data): self.esc.enqueue(next_cog, datum, prio=True) else: next_cog = self.train.get(cog) if not next_cog and hasattr(cog, "__self__"): logging.debug("%s Getting next cog from noncanonical method %s", eid or "", cog) next_cog = self.train.get(cog.__self__) if not next_cog: logging.debug("%s Did not find next cog %s, ending event", eid or "", cog) else: for datum in reversed(res): self.esc.enqueue(next_cog, datum, prio=True) if self.state == "finishing": self._shutdown_next() kick = self.esc.wait(blocking=False) while not kick: if self.state == "finishing_next": self._shutdown_next() elif self.state == "shutting": self.state = "done" else: break kick = self.esc.wait(blocking=False)
[docs] def notify(self, attr, *args, **kwargs): """ Send notification to all cogs with `attr` method. :param attr: Name of the method to be called. :param args: Method's positional arguments. :param kwargs: Method's keyword arguments. """ event_id = kwargs.pop("event_id", None) try: cog_list = self.notify_cache[attr] except KeyError: cog_list = self.notify_cache[attr] = [] for cog in self.cogs: if attr is not None: try: method = getattr(cog, attr) except AttributeError: continue else: method = cog if method not in cog_list: cog_list.append(method) for method in cog_list: logging.debug("%s Enqueue %s, %s, %s", event_id or "", method, args, kwargs) self.esc.enqueue(method, *args, **kwargs)
def _topo_sort(self, train): """ Helper to have handy cog list topologically sorted. This also simplifies shutdown code. """ prev_cogs = {} firsts = [] for cog, nextcog in train.items(): prev_cogs.setdefault(nextcog, set()).add(cog) firsts = list(set(train).difference(prev_cogs)) topo = [] while firsts: first = firsts.pop() if first not in (_first, _last): topo.append(first) nxt = train.get(first) if nxt: ancestors = prev_cogs[nxt] ancestors.discard(first) if not ancestors: firsts.append(nxt) return topo def _shutdown_commence(self): """ Start shutdown protocol. """ if self.state == "running": self.state = "finishing" self.shutting_cogs = self.cogs[::-1] self.esc.after(10, self._shutdown_soft) def _shutdown_soft(self): """ Finish shutdown protocol. """ self.notify("done") self.state = "shutting" self.esc.after(5, self._shutdown_hard) def _shutdown_hard(self): """ Finish unconditionally if shutdown timed out. """ self.state = "done" def _shutdown_next(self): """ Shutdown one cog in topological order. This allows to tell cog to finish its function and flush yet unprocessed data down the pipeline, Train then waits until the event queue is empty again (all descendant cogs had its time to process events from finishing cog and _shutdown_next is called again. """ what = None self.state = "finishing_next" while not what and self.shutting_cogs: cog = self.shutting_cogs.pop() what = getattr(cog, "finish", None) if what: self.esc.enqueue(what) logging.debug("Finishing %s", str(what)) else: self.notify("done") self.state = "shutting"
[docs] def to_dot(self): """ Generate "dot" representation of cog train graph. """ out = list() out.append("digraph GearTrain {") out.append("# Nodes") for cog in self.cogs: shape = "hexagon" if isinstance(cog, TrainChooser) else "ellipse" out.append('"%x" [label="%s (%x)" shape="%s"]' % (id(cog), type(cog).__name__, id(cog), shape)) out.append("# Connections") for cog, nextcog in self.train.items(): if isinstance(nextcog, TrainChooser): out.append('"%x" -> "%x"' % (id(cog), id(nextcog))) cog = nextcog nextcogs = nextcog.getAllNexts() else: nextcogs = [nextcog] for nextcog in nextcogs: if cog not in (_first, _last) and nextcog not in (_first, _last): out.append('"%x" -> "%x"' % (id(cog), id(nextcog))) out.append('{ rank=sink; Legend [shape=none, margin=0, label=<') out.append('<table cellspacing="0">') out.append("# Legend") for cog in self.cogs: label = str(cog).replace(">", "").replace("<", "") out.append('<tr><td align="left">%s (%x)</td><td align="left">%s</td></tr>' % ( type(cog).__name__, id(cog), label)) out.append("</table>>];}") out.append("}") return "\n".join(out)