# -*- coding: utf-8 -*-
""" Email manipulations for deadbeat """
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import os.path as pth
import errno
import logging
try:
from email.errors import HeaderParseError
from email import message_from_bytes as parse_message
import email.header as eheader
except ImportError:
from email.Errors import HeaderParseError
from email import message_from_string as parse_message
import email.Header as eheader
from .movement import Cog, INotify, dummy_escapement, itemgetter, itemsetter, uuidsetter, nullsetter, bytes_type
__all__ = ["MaildirSupply", "EmailParse"]
[docs]class MaildirSupply(Cog):
""" Supply cog reading new mail files from Maildir. """
[docs] def __init__(self, escapement, directory, wait=1, factory=dict, create=True, data_set=None, id_get=None, id_set=None):
""" Initialize FilerSupply, register timers and inotify.
:param escapement: `escapement` singleton.
:param directory: Maildir path to watch.
:param wait: If inotify is not available, check directory this often.
:param factory: Callable returning new event data.
:param create: Create nonexistent Maildir directories if True, balk otherwise.
:param data_set: Setter for inserting new lines into data.
:param id_get: Getter of ID.
:param id_set: Generator and setter of ID.
"""
self.esc = self.escapement = escapement
self.create = create
self.basedir = self._ensure_path(directory)
self.tmp = self._ensure_path(pth.join(self.basedir, "tmp"))
self.new = self._ensure_path(pth.join(self.basedir, "new"))
self.cur = self._ensure_path(pth.join(self.basedir, "cur"))
self.wait = wait
self.factory = factory
self.data_set = data_set or itemsetter("input")
self.id_get = id_get or itemgetter("ID")
self.id_set = id_set or uuidsetter("ID")
self.new_dir_list = []
self.cur_dir_list = []
self.on_fly = {}
if self.esc.inotify:
self.esc.inotify_add(self.new, INotify.CREATE|INotify.MOVED_TO, self.handle_inotify_new)
self.esc.inotify_add(self.cur, INotify.CREATE|INotify.MOVED_TO, self.handle_inotify_cur)
self.esc.enqueue(self, None)
[docs] def handle_inotify_new(self, path, mask, cookie, name):
""" new directory `inotify` event handler. """
if self.new_dir_list:
self.new_dir_list.append(name)
return self(None)
[docs] def handle_inotify_cur(self, path, mask, cookie, name):
""" `inotify` event handler. """
if self.cur_dir_list:
self.cur_dir_list.append(name)
return self(None)
def _ensure_path(self, p):
if self.create:
try:
os.mkdir(p)
except OSError:
if not pth.isdir(p):
raise
return p
def _split_maildir_info(self, name):
split_ = name.split(":")
if len(split_) == 1:
split_.append("")
name, info = split_
info = info[2:] if info.startswith("2,") else ""
return name, info
def _add_maildir_info(self, name, info):
name, old_info = self._split_maildir_info(name)
return name + ":2," + "".join(sorted(set(old_info + info)))
def __call__(self, data):
""" Main pipeline event handler. """
if not self.new_dir_list:
self.new_dir_list = os.listdir(self.new)
if not self.cur_dir_list:
self.cur_dir_list = os.listdir(self.cur)
if not self.new_dir_list and not self.cur_dir_list:
if not self.esc.inotify:
self.esc.after(self.wait, self, None)
return
while self.new_dir_list:
short_name = self.new_dir_list.pop()
long_name = pth.join(self.new, short_name)
new_short_name = self._add_maildir_info(short_name, "")
new_name = pth.join(self.cur, new_short_name)
try:
os.rename(long_name, new_name)
except IOError as e:
if e.errno != errno.ENOENT:
logging.exception("Error reading %s", long_name)
# Otherwise we've been faster than inotify or someone else have been faster than us, no big deal
self.cur_dir_list.append(new_short_name)
logging.debug("%s renamed to %s", long_name, new_name)
data = None
while data is None and self.cur_dir_list:
short_name = self.cur_dir_list.pop()
maildir_info = self._split_maildir_info(short_name)[1]
if "S" in maildir_info or "F" in maildir_info:
break
long_name = pth.join(self.cur, short_name)
new_name = pth.join(self.tmp, short_name)
try:
os.rename(long_name, new_name)
with open(new_name, "rb") as f:
data = f.read()
except OSError as e:
if e.errno != errno.ENOENT:
logging.exception("Error reading %s", new_name)
# Otherwise we've been faster than inotify or someone else have been faster than us, no big deal
if data is not None:
self.esc.enqueue(self, None)
event = self.factory()
self.id_set(event)
self.on_fly.setdefault(self.id_get(event), set()).add(short_name)
event = self.data_set(event, data)
return (event,)
elif not self.esc.inotify:
self.esc.after(self.wait, self, None)
return
[docs] def event_rename(self, old_id, new_id):
""" `rename` event handler.
Watch for event data changing id, merging, etc. to be able to remove finished file.
"""
old_data = self.on_fly.pop(old_id, ())
self.on_fly.setdefault(new_id, set()).update(old_data)
logging.debug("%s renamed, now tracked as %s", old_id, new_id)
[docs] def event_done(self, event_id, _flag="S"):
""" `done` event handler
Mark mail with flag and return to "cur".
"""
names = self.on_fly.pop(event_id, ())
for name in names:
base_name, maildir_info = self._split_maildir_info(name)
long_name = pth.join(self.tmp, name)
logging.debug("cur %s, base %s, info %s", self.cur, base_name, maildir_info)
new_name = pth.join(self.cur, self._add_maildir_info(name, _flag))
try:
os.rename(long_name, new_name)
except Exception:
logging.exception("%s: attempt to rename file %s failed", event_id, long_name)
else:
logging.debug("%s: rename %s back to %s", event_id, long_name, new_name)
[docs] def event_error(self, event_id):
""" `error` event handler
Mark mail with "flagged" flag and return to "cur".
"""
self.event_done(event_id, _flag="F")
[docs] def finish(self):
""" `finish` event handler. """
self.esc.unschedule(self)
if self.esc.inotify:
self.esc.inotify_rm(self.new, self.handle_inotify_new)
self.esc.inotify_rm(self.cur, self.handle_inotify_cur)
self.esc = dummy_escapement
[docs] def done(self):
""" `done` event handler. """
for eid, names in self.on_fly.items():
for name in names:
try:
os.rename(pth.join(self.tmp, name), pth.join(self.cur, name))
logging.debug("%s: Moved unfinished file %s into %s", eid, name, self.cur)
except Exception:
logging.debug("%s: Unable to move unfinished file %s into %s", eid, name, self.cur)
[docs]class EmailParse(Cog):
""" Email data parsing cog """
[docs] def __init__(self, msg_get=None, header_set=None, text_set=None, flattext_set=None,
attach_set=None, default_charsets=["utf-8", "ascii", "iso-8859-1"]):
""" Initialize EmailParser.
:param msg_get: Getter for email data string.
:param header_set: Setter for dict of headers. Keys are header names, values are lists of
multiple separate entries as appeared in message.
:param text_set: Setter for list of all text/* parts.
:param flattext_set: Setter for all text/* parts joined together by newline.
:param attach_set: Setter for all nontext attachments in the form of dicts, containing
"Name", "Content-Type" and "Data".
:param default_charset: Charset to try when unspecified in the message.
"""
self.msg_get = None or itemgetter("input")
self.header_set = header_set or nullsetter
self.flattext_set = flattext_set
self.text_set = text_set
self.attach_set = attach_set
self.default_charsets = default_charsets
def _decode(self, s, charset=None):
for ch in [charset] + self.default_charsets:
if not ch:
continue
try:
res = s.decode(ch)
break
except (UnicodeDecodeError, LookupError):
pass
return res
def _decode_header(self, header):
try:
pieces = eheader.decode_header(header)
except HeaderParseError:
return self._decode(header_text)
else:
res = []
for piece, charset in pieces:
if isinstance(piece, bytes_type) or charset:
piece = self._decode(piece, charset)
res.append(piece)
return "".join(res)
def __call__(self, data):
mime = self.msg_get(data)
msg = parse_message(mime)
headers = dict()
for header, value in msg.items():
value = self._decode_header(value)
headers.setdefault(header, list()).append(value)
data = self.header_set(data, headers)
text = []
attach = []
if self.flattext_set or self.text_set or self.attach_set:
for part in msg.walk():
maintype = part.get_content_maintype()
if maintype == "multipart":
continue # Just another container
if maintype == "text" and (self.text_set or self.flattext_set):
body = self._decode(part.get_payload(decode=True), str(part.get_charset()))
text.append(body)
elif self.attach_set:
attach.append({
"Name": part.get_filename(),
"Content-Type": part.get_content_type(),
"Data": part.get_payload(decode=True)
})
if text:
if self.text_set:
data = self.text_set(data, text)
if self.flattext_set:
flattext = "\n".join(text)
data = self.flattext_set(data, flattext)
if attach:
data = self.attach_set(data, attach)
del data["input"]
return (data,)