# -*- coding: utf-8 -*-
""" Filesystem supply and drain cogs for DeadBeat """
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import os.path as pth
import codecs
import logging
import time
import socket
import errno
from io import open
from operator import itemgetter
from .movement import Object, Cog, itemsetter, uuidsetter, dummy_escapement, INotify, str_type
__all__ = ["FileWatcherSupply", "LineFileDrain", "FilerSupply", "FilerDrain"]
class _WatchedFileRecord(Object):
__slots__ = ("path", "line_buffer", "f", "inode", "size")
def __init__(self, path=None, line_buffer="", f=None, inode=-1, size=-1):
self.path = path
self.line_buffer = line_buffer
self.f = f
self.inode = inode
self.size = size
[docs]class FileWatcherSupply(Cog):
""" Supply cog akin to `tail` """
[docs] def __init__(self, escapement, filenames, tail=True, wait=1, factory=dict, data_set=None, id_get=None, id_set=None):
""" Initialize FileWatcherSupply, open files, register inotify and timeouts.
:param escapement: `escapement` singleton.
:param filenames: Paths of files to watch.
:param tail: Start tailing immediatelly. If False, files are read from the beginning.
:param wait: If inotify is not available, files will be polled for new data this often.
:param factory: Callable returning new event data.
:param data_set: Setter for inserting new lines into data.
:param id_get: Getter of ID.
:param id_set: Generator and setter of ID.
"""
self.esc = self.escapement = escapement
self.filenames = filenames
self.watched = [_WatchedFileRecord(n, "", None, -1, -1) for n in filenames]
self.index = {}
for i, fname in enumerate(self.filenames):
self.index[fname] = i
self.wait = wait
self.tail = tail
self.factory = factory
self.data_set = data_set or itemsetter("input")
self.id_get = id_get or itemgetter("ID")
self.id_set = id_set or uuidsetter("ID")
self.watch_to_read = 0
self.check_soon = False
for watched in self.watched:
self.open(watched)
if self.tail and watched.f:
watched.f.seek(0, os.SEEK_END)
if not self.esc.inotify or not self.tail:
self.esc.enqueue(self, None)
[docs] def open(self, watched):
""" Open watched file.
:param watched: `_WatchedFileRecord` file state.
"""
try:
watched.f = open(watched.path, "r")
st = os.fstat(watched.f.fileno())
watched.inode, watched.size = st.st_ino, st.st_size
if self.esc.inotify:
self.esc.inotify_add(watched.path, INotify.MODIFY, self.handle_inotify)
except IOError:
watched.f = None
watched.inode = -1
watched.size = -1
def _check_reopen(self, watched):
""" Check size and inode of file for changes.
:param watched: `_WatchedFileRecord` file state.
"""
try:
st = os.stat(watched.path)
cur_inode, cur_size = st.st_ino, st.st_size
except OSError:
cur_inode = -1
cur_size = -1
if cur_inode != watched.inode or cur_size < watched.size:
self.close(watched)
self.open(watched)
return True
return False
[docs] def handle_inotify(self, path, mask, cookie, name):
""" `inotify` event handler """
self.watch_to_read = self.index[path]
return self.readline(None)
[docs] def read(self, watched):
""" Single file incomplete data reader.
:param watched: `_WatchedFileRecord` file state.
"""
file_changed = False
res = None
new_data = watched.f.readline() if watched.f else ""
if not new_data:
file_changed = self._check_reopen(watched)
if file_changed and watched.line_buffer:
new_data = watched.line_buffer
watched.line_buffer = ""
else:
new_data = watched.f.readline() if watched.f else ""
if new_data:
if new_data.endswith(watched.f.newlines or "\n"):
res = watched.line_buffer + new_data
watched.line_buffer = ""
else:
watched.line_buffer += new_data
self.check_soon = new_data or file_changed or self.check_soon
return res
[docs] def readline(self, data):
""" Main pipeline data handler.
Read data from appropriate file, but return only complete lines.
"""
self.check_soon = False
line = None
for count in range(len(self.watched)):
line = self.read(self.watched[self.watch_to_read])
self.watch_to_read = (self.watch_to_read + 1) % len(self.watched)
if line:
break
if self.check_soon:
self.esc.enqueue(self, None)
elif not self.esc.inotify:
self.esc.after(self.wait, self, None)
event = self.factory()
event = self.id_set(event)
event = self.data_set(event, line)
return (event,) if line is not None else None
__call__ = readline
[docs] def close(self, watched):
""" Close single file.
:param watched: `_WatchedFileRecord` file state.
"""
try:
if self.esc.inotify:
self.esc.inotify_rm(watched.path, self.handle_inotify)
if watched.f:
watched.f.close()
except IOError:
pass
watched.inode = -1
watched.size = -1
[docs] def finish(self):
""" `finish` event handler """
self.esc.unschedule(self)
self.esc.unschedule(self.handle_inotify)
for watched in self.watched:
self.close(watched)
self.esc = dummy_escapement
[docs]class LineFileDrain(Cog):
""" Drain cog for appending lines of data to file. """
[docs] def __init__(self, train, path, timestamp=True, flush=False,
charset="utf-8", id_get=None, data_get=None):
""" Initialize LineFileDrain, open file.
:param train: Train singleton.
:param path: File path to save to.
:param timestamp: Prepend timestamp.
:param flush: Flush after each line.
:param charset: Output file encoding.
:param id_get: ID getter.
:param data_get: Final data getter.
"""
self.train = train
self.path = path
self.timestamp = timestamp
self.flush = flush
self.charset = charset
self.id_get = id_get or itemgetter("ID")
self.data_get = data_get or itemgetter("output")
self.f = open(self.path, "a", encoding=self.charset)
def _time_str(self):
epoch = time.time()
us = int(epoch % 1 * 1000000 + 0.5)
usstr = "." + str(us).rstrip("0") if us else ""
tstr = "%04d-%02d-%02dT%02d:%02d:%02d%sZ " % (
time.gmtime(epoch)[:6] + (usstr,))
return tstr
def __call__(self, data):
""" Main pipeline event handler """
if self.timestamp:
self.f.write(self._time_str())
self.f.write(str_type(self.data_get(data)))
self.f.write("\n")
if self.flush:
self.f.flush()
event_id = self.id_get(data)
if event_id is not None:
self.train.notify("event_done", event_id)
[docs] def done(self):
""" `done` event handler. """
self.f.close()
[docs]class FilerSupply(Cog):
""" Supply cog reading new files from directory. """
[docs] def __init__(self, escapement, directory, wait=1, factory=dict, data_set=None, id_get=None, id_set=None):
""" Initialize FilerSupply, register timers and inotify.
:param escapement: `escapement` singleton.
:param directory: Path to watch.
:param wait: If inotify is not available, check directory this often.
:param factory: Callable returning new event data.
:param data_set: Setter for inserting new lines into data.
:param id_get: Getter of ID.
:param id_set: Generator and setter of ID.
"""
self.esc = self.escapement = escapement
self.basedir = self._ensure_path(directory)
self.tmp = self._ensure_path(pth.join(self.basedir, "tmp"))
self.incoming = self._ensure_path(pth.join(self.basedir, "incoming"))
self.errors = self._ensure_path(pth.join(self.basedir, "errors"))
self.wait = wait
self.factory = factory
self.data_set = data_set or itemsetter("input")
self.id_get = id_get or itemgetter("ID")
self.id_set = id_set or uuidsetter("ID")
self.dirlist = []
self.on_fly = {}
if self.esc.inotify:
self.esc.inotify_add(self.incoming, INotify.CREATE|INotify.MOVED_TO, self.handle_inotify)
self.esc.enqueue(self, None)
[docs] def handle_inotify(self, path, mask, cookie, name):
""" `inotify` event handler. """
if self.dirlist:
self.dirlist.append(name)
return self(None)
def _ensure_path(self, p):
try:
os.mkdir(p)
except OSError:
if not pth.isdir(p):
raise
return p
def __call__(self, data):
""" Main pipeline event handler. """
if not self.dirlist:
self.dirlist = os.listdir(self.incoming)
if not self.dirlist:
if not self.esc.inotify:
self.esc.after(self.wait, self, None)
return
data = None
while data is None and self.dirlist:
short_name = self.dirlist.pop()
long_name = pth.join(self.tmp, short_name)
try:
os.rename(pth.join(self.incoming, short_name), long_name)
with codecs.open(long_name, "rb", encoding="utf-8") as f:
data = f.read()
except (OSError, IOError) as e:
if e.errno != errno.ENOENT:
logging.exception("Error reading %s", pth.join(self.incoming, short_name))
# Otherwise we've been faster than inotify or someone else have been faster than us, no big deal
self.esc.enqueue(self, None)
if data is not None:
event = self.factory()
self.id_set(event)
self.on_fly.setdefault(self.id_get(event), set()).add(short_name)
event = self.data_set(event, data)
return (event,)
[docs] def event_rename(self, old_id, new_id):
""" `rename` event handler.
Watch for event data changing id, merging, etc. to be able to remove finished file.
"""
old_data = self.on_fly.pop(old_id, ())
self.on_fly.setdefault(new_id, set()).update(old_data)
logging.debug("%s renamed, now tracked as %s", old_id, new_id)
[docs] def event_done(self, event_id):
""" `event_done` event handler
Remove file, associated with original event data.
"""
names = self.on_fly.pop(event_id, ())
for name in names:
long_name = pth.join(self.tmp, name)
try:
os.unlink(long_name)
except Exception:
logging.exception("%s: attempt to remove nonexistent file %s", event_id, long_name)
else:
logging.debug("%s: removed %s", event_id, long_name)
[docs] def event_error(self, event_id):
""" `event_error` event handler. """
names = self.on_fly.pop(event_id, ())
for name in names:
try:
os.rename(pth.join(self.tmp, name), pth.join(self.errors, name))
logging.debug("%s: Moved unfinished file %s into %s", event_id, name, self.errors)
except Exception:
logging.debug("%s: Unable to move unfinished file %s into %s", event_id, name, self.errors)
[docs] def finish(self):
""" `finish` event handler. """
self.esc.unschedule(self)
if self.esc.inotify:
self.esc.inotify_rm(self.incoming, self.handle_inotify)
self.esc = dummy_escapement
[docs] def done(self):
""" `done` event handler. """
for eid, names in self.on_fly.items():
for name in names:
try:
os.rename(pth.join(self.tmp, name), pth.join(self.incoming, name))
logging.debug("%s: Moved unfinished file %s into %s", eid, name, self.incoming)
except Exception:
logging.debug("%s: Unable to move unfinished file %s into %s", eid, name, self.incoming)
[docs]class FilerDrain(Cog):
""" Drain cog for saving files into `rename` guarded directory """
[docs] def __init__(self, train, directory, id_get=None, data_get=None):
""" Initialize FilerDrain, set up paths.
:param train: Train singleton.
:param directory: Path to save to.
:param id_get: ID getter.
:param data_get: Final data getter.
"""
self.train = train
self.directory = self._ensure_path(directory)
self.id_get = id_get or itemgetter("ID")
self.data_get = data_get or itemgetter("output")
self.tmp = self._ensure_path(pth.join(self.directory, "tmp"))
self.incoming = self._ensure_path(pth.join(self.directory, "incoming"))
self.hostname = socket.gethostname()
self.pid = os.getpid()
def _ensure_path(self, p):
try:
os.mkdir(p)
except OSError:
if not pth.isdir(p):
raise
return p
def _get_new_name(self, fd=None):
(inode, device) = os.fstat(fd)[1:3] if fd else (0, 0)
return "%s.%d.%f.%d.%d" % (
self.hostname, self.pid, time.time(), device, inode)
[docs] def create_unique_file(self):
""" Create file with unique name.
:returns: tuple of file object and its name (without path).
"""
# First find and open name unique within tmp
tmpname = None
while not tmpname:
tmpname = self._get_new_name()
try:
fd = os.open(pth.join(self.tmp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL)
except OSError as e:
if e.errno != errno.EEXIST:
raise # other errors than duplicates should get noticed
tmpname = None
# Now we know the device/inode, rename to make unique within system
newname = self._get_new_name(fd)
os.rename(pth.join(self.tmp, tmpname), pth.join(self.tmp, newname))
nf = os.fdopen(fd, "w")
return nf, newname
def __call__(self, data):
""" Main pipeline event handler. """
f, newname = self.create_unique_file()
f.write(self.data_get(data))
os.rename(pth.join(self.tmp, newname), pth.join(self.incoming, newname))
f.close()
event_id = self.id_get(data)
if event_id is not None:
self.train.notify("event_done", event_id)