Source code for deadbeat.fs

# -*- coding: utf-8 -*-

""" Filesystem supply and drain cogs for DeadBeat """

from __future__ import absolute_import, division, print_function, unicode_literals

import os
import os.path as pth
import codecs
import logging
import time
import socket
import errno
from io import open
from operator import itemgetter
from .movement import Object, Cog, itemsetter, uuidsetter, dummy_escapement, INotify, str_type

__all__ = ["FileWatcherSupply", "LineFileDrain", "FilerSupply", "FilerDrain"]

class _WatchedFileRecord(Object):

    __slots__ = ("path", "line_buffer", "f", "inode", "size")

    def __init__(self, path=None, line_buffer="", f=None, inode=-1, size=-1):
        self.path = path
        self.line_buffer = line_buffer
        self.f = f
        self.inode = inode
        self.size = size

[docs]class FileWatcherSupply(Cog): """ Supply cog akin to `tail` """
[docs] def __init__(self, escapement, filenames, tail=True, wait=1, factory=dict, data_set=None, id_get=None, id_set=None): """ Initialize FileWatcherSupply, open files, register inotify and timeouts. :param escapement: `escapement` singleton. :param filenames: Paths of files to watch. :param tail: Start tailing immediatelly. If False, files are read from the beginning. :param wait: If inotify is not available, files will be polled for new data this often. :param factory: Callable returning new event data. :param data_set: Setter for inserting new lines into data. :param id_get: Getter of ID. :param id_set: Generator and setter of ID. """ self.esc = self.escapement = escapement self.filenames = filenames self.watched = [_WatchedFileRecord(n, "", None, -1, -1) for n in filenames] self.index = {} for i, fname in enumerate(self.filenames): self.index[fname] = i self.wait = wait self.tail = tail self.factory = factory self.data_set = data_set or itemsetter("input") self.id_get = id_get or itemgetter("ID") self.id_set = id_set or uuidsetter("ID") self.watch_to_read = 0 self.check_soon = False for watched in self.watched: self.open(watched) if self.tail and watched.f: watched.f.seek(0, os.SEEK_END) if not self.esc.inotify or not self.tail: self.esc.enqueue(self, None)
[docs] def open(self, watched): """ Open watched file. :param watched: `_WatchedFileRecord` file state. """ try: watched.f = open(watched.path, "r") st = os.fstat(watched.f.fileno()) watched.inode, watched.size = st.st_ino, st.st_size if self.esc.inotify: self.esc.inotify_add(watched.path, INotify.MODIFY, self.handle_inotify) except IOError: watched.f = None watched.inode = -1 watched.size = -1
def _check_reopen(self, watched): """ Check size and inode of file for changes. :param watched: `_WatchedFileRecord` file state. """ try: st = os.stat(watched.path) cur_inode, cur_size = st.st_ino, st.st_size except OSError: cur_inode = -1 cur_size = -1 if cur_inode != watched.inode or cur_size < watched.size: self.close(watched) self.open(watched) return True return False
[docs] def handle_inotify(self, path, mask, cookie, name): """ `inotify` event handler """ self.watch_to_read = self.index[path] return self.readline(None)
[docs] def read(self, watched): """ Single file incomplete data reader. :param watched: `_WatchedFileRecord` file state. """ file_changed = False res = None new_data = watched.f.readline() if watched.f else "" if not new_data: file_changed = self._check_reopen(watched) if file_changed and watched.line_buffer: new_data = watched.line_buffer watched.line_buffer = "" else: new_data = watched.f.readline() if watched.f else "" if new_data: if new_data.endswith(watched.f.newlines or "\n"): res = watched.line_buffer + new_data watched.line_buffer = "" else: watched.line_buffer += new_data self.check_soon = new_data or file_changed or self.check_soon return res
[docs] def readline(self, data): """ Main pipeline data handler. Read data from appropriate file, but return only complete lines. """ self.check_soon = False line = None for count in range(len(self.watched)): line = self.read(self.watched[self.watch_to_read]) self.watch_to_read = (self.watch_to_read + 1) % len(self.watched) if line: break if self.check_soon: self.esc.enqueue(self, None) elif not self.esc.inotify: self.esc.after(self.wait, self, None) event = self.factory() event = self.id_set(event) event = self.data_set(event, line) return (event,) if line is not None else None
__call__ = readline
[docs] def close(self, watched): """ Close single file. :param watched: `_WatchedFileRecord` file state. """ try: if self.esc.inotify: self.esc.inotify_rm(watched.path, self.handle_inotify) if watched.f: watched.f.close() except IOError: pass watched.inode = -1 watched.size = -1
[docs] def finish(self): """ `finish` event handler """ self.esc.unschedule(self) self.esc.unschedule(self.handle_inotify) for watched in self.watched: self.close(watched) self.esc = dummy_escapement
[docs]class LineFileDrain(Cog): """ Drain cog for appending lines of data to file. """
[docs] def __init__(self, train, path, timestamp=True, flush=False, charset="utf-8", id_get=None, data_get=None): """ Initialize LineFileDrain, open file. :param train: Train singleton. :param path: File path to save to. :param timestamp: Prepend timestamp. :param flush: Flush after each line. :param charset: Output file encoding. :param id_get: ID getter. :param data_get: Final data getter. """ self.train = train self.path = path self.timestamp = timestamp self.flush = flush self.charset = charset self.id_get = id_get or itemgetter("ID") self.data_get = data_get or itemgetter("output") self.f = open(self.path, "a", encoding=self.charset)
def _time_str(self): epoch = time.time() us = int(epoch % 1 * 1000000 + 0.5) usstr = "." + str(us).rstrip("0") if us else "" tstr = "%04d-%02d-%02dT%02d:%02d:%02d%sZ " % ( time.gmtime(epoch)[:6] + (usstr,)) return tstr def __call__(self, data): """ Main pipeline event handler """ if self.timestamp: self.f.write(self._time_str()) self.f.write(str_type(self.data_get(data))) self.f.write("\n") if self.flush: self.f.flush() event_id = self.id_get(data) if event_id is not None: self.train.notify("event_done", event_id)
[docs] def done(self): """ `done` event handler. """ self.f.close()
[docs]class FilerSupply(Cog): """ Supply cog reading new files from directory. """
[docs] def __init__(self, escapement, directory, wait=1, factory=dict, data_set=None, id_get=None, id_set=None): """ Initialize FilerSupply, register timers and inotify. :param escapement: `escapement` singleton. :param directory: Path to watch. :param wait: If inotify is not available, check directory this often. :param factory: Callable returning new event data. :param data_set: Setter for inserting new lines into data. :param id_get: Getter of ID. :param id_set: Generator and setter of ID. """ self.esc = self.escapement = escapement self.basedir = self._ensure_path(directory) self.tmp = self._ensure_path(pth.join(self.basedir, "tmp")) self.incoming = self._ensure_path(pth.join(self.basedir, "incoming")) self.errors = self._ensure_path(pth.join(self.basedir, "errors")) self.wait = wait self.factory = factory self.data_set = data_set or itemsetter("input") self.id_get = id_get or itemgetter("ID") self.id_set = id_set or uuidsetter("ID") self.dirlist = [] self.on_fly = {} if self.esc.inotify: self.esc.inotify_add(self.incoming, INotify.CREATE|INotify.MOVED_TO, self.handle_inotify) self.esc.enqueue(self, None)
[docs] def handle_inotify(self, path, mask, cookie, name): """ `inotify` event handler. """ if self.dirlist: self.dirlist.append(name) return self(None)
def _ensure_path(self, p): try: os.mkdir(p) except OSError: if not pth.isdir(p): raise return p def __call__(self, data): """ Main pipeline event handler. """ if not self.dirlist: self.dirlist = os.listdir(self.incoming) if not self.dirlist: if not self.esc.inotify: self.esc.after(self.wait, self, None) return data = None while data is None and self.dirlist: short_name = self.dirlist.pop() long_name = pth.join(self.tmp, short_name) try: os.rename(pth.join(self.incoming, short_name), long_name) with codecs.open(long_name, "rb", encoding="utf-8") as f: data = f.read() except (OSError, IOError) as e: if e.errno != errno.ENOENT: logging.exception("Error reading %s", pth.join(self.incoming, short_name)) # Otherwise we've been faster than inotify or someone else have been faster than us, no big deal self.esc.enqueue(self, None) if data is not None: event = self.factory() self.id_set(event) self.on_fly.setdefault(self.id_get(event), set()).add(short_name) event = self.data_set(event, data) return (event,)
[docs] def event_rename(self, old_id, new_id): """ `rename` event handler. Watch for event data changing id, merging, etc. to be able to remove finished file. """ old_data = self.on_fly.pop(old_id, ()) self.on_fly.setdefault(new_id, set()).update(old_data) logging.debug("%s renamed, now tracked as %s", old_id, new_id)
[docs] def event_done(self, event_id): """ `event_done` event handler Remove file, associated with original event data. """ names = self.on_fly.pop(event_id, ()) for name in names: long_name = pth.join(self.tmp, name) try: os.unlink(long_name) except Exception: logging.exception("%s: attempt to remove nonexistent file %s", event_id, long_name) else: logging.debug("%s: removed %s", event_id, long_name)
[docs] def event_error(self, event_id): """ `event_error` event handler. """ names = self.on_fly.pop(event_id, ()) for name in names: try: os.rename(pth.join(self.tmp, name), pth.join(self.errors, name)) logging.debug("%s: Moved unfinished file %s into %s", event_id, name, self.errors) except Exception: logging.debug("%s: Unable to move unfinished file %s into %s", event_id, name, self.errors)
[docs] def finish(self): """ `finish` event handler. """ self.esc.unschedule(self) if self.esc.inotify: self.esc.inotify_rm(self.incoming, self.handle_inotify) self.esc = dummy_escapement
[docs] def done(self): """ `done` event handler. """ for eid, names in self.on_fly.items(): for name in names: try: os.rename(pth.join(self.tmp, name), pth.join(self.incoming, name)) logging.debug("%s: Moved unfinished file %s into %s", eid, name, self.incoming) except Exception: logging.debug("%s: Unable to move unfinished file %s into %s", eid, name, self.incoming)
[docs]class FilerDrain(Cog): """ Drain cog for saving files into `rename` guarded directory """
[docs] def __init__(self, train, directory, id_get=None, data_get=None): """ Initialize FilerDrain, set up paths. :param train: Train singleton. :param directory: Path to save to. :param id_get: ID getter. :param data_get: Final data getter. """ self.train = train self.directory = self._ensure_path(directory) self.id_get = id_get or itemgetter("ID") self.data_get = data_get or itemgetter("output") self.tmp = self._ensure_path(pth.join(self.directory, "tmp")) self.incoming = self._ensure_path(pth.join(self.directory, "incoming")) self.hostname = socket.gethostname() self.pid = os.getpid()
def _ensure_path(self, p): try: os.mkdir(p) except OSError: if not pth.isdir(p): raise return p def _get_new_name(self, fd=None): (inode, device) = os.fstat(fd)[1:3] if fd else (0, 0) return "%s.%d.%f.%d.%d" % ( self.hostname, self.pid, time.time(), device, inode)
[docs] def create_unique_file(self): """ Create file with unique name. :returns: tuple of file object and its name (without path). """ # First find and open name unique within tmp tmpname = None while not tmpname: tmpname = self._get_new_name() try: fd = os.open(pth.join(self.tmp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL) except OSError as e: if e.errno != errno.EEXIST: raise # other errors than duplicates should get noticed tmpname = None # Now we know the device/inode, rename to make unique within system newname = self._get_new_name(fd) os.rename(pth.join(self.tmp, tmpname), pth.join(self.tmp, newname)) nf = os.fdopen(fd, "w") return nf, newname
def __call__(self, data): """ Main pipeline event handler. """ f, newname = self.create_unique_file() f.write(self.data_get(data)) os.rename(pth.join(self.tmp, newname), pth.join(self.incoming, newname)) f.close() event_id = self.id_get(data) if event_id is not None: self.train.notify("event_done", event_id)