Source code for deadbeat.email

# -*- coding: utf-8 -*-

""" Email manipulations for deadbeat """

from __future__ import absolute_import, division, print_function, unicode_literals

import os
import os.path as pth
import errno
import logging
try:
    from email.errors import HeaderParseError
    from email import message_from_bytes as parse_message
    import email.header as eheader
except ImportError:
    from email.Errors import HeaderParseError
    from email import message_from_string as parse_message
    import email.Header as eheader
from .movement import Cog, INotify, dummy_escapement, itemgetter, itemsetter, uuidsetter, nullsetter, bytes_type

__all__ = ["MaildirSupply", "EmailParse"]

[docs]class MaildirSupply(Cog): """ Supply cog reading new mail files from Maildir. """
[docs] def __init__(self, escapement, directory, wait=1, factory=dict, create=True, data_set=None, id_get=None, id_set=None): """ Initialize FilerSupply, register timers and inotify. :param escapement: `escapement` singleton. :param directory: Maildir path to watch. :param wait: If inotify is not available, check directory this often. :param factory: Callable returning new event data. :param create: Create nonexistent Maildir directories if True, balk otherwise. :param data_set: Setter for inserting new lines into data. :param id_get: Getter of ID. :param id_set: Generator and setter of ID. """ self.esc = self.escapement = escapement self.create = create self.basedir = self._ensure_path(directory) self.tmp = self._ensure_path(pth.join(self.basedir, "tmp")) self.new = self._ensure_path(pth.join(self.basedir, "new")) self.cur = self._ensure_path(pth.join(self.basedir, "cur")) self.wait = wait self.factory = factory self.data_set = data_set or itemsetter("input") self.id_get = id_get or itemgetter("ID") self.id_set = id_set or uuidsetter("ID") self.new_dir_list = [] self.cur_dir_list = [] self.on_fly = {} if self.esc.inotify: self.esc.inotify_add(self.new, INotify.CREATE|INotify.MOVED_TO, self.handle_inotify_new) self.esc.inotify_add(self.cur, INotify.CREATE|INotify.MOVED_TO, self.handle_inotify_cur) self.esc.enqueue(self, None)
[docs] def handle_inotify_new(self, path, mask, cookie, name): """ new directory `inotify` event handler. """ if self.new_dir_list: self.new_dir_list.append(name) return self(None)
[docs] def handle_inotify_cur(self, path, mask, cookie, name): """ `inotify` event handler. """ if self.cur_dir_list: self.cur_dir_list.append(name) return self(None)
def _ensure_path(self, p): if self.create: try: os.mkdir(p) except OSError: if not pth.isdir(p): raise return p def _split_maildir_info(self, name): split_ = name.split(":") if len(split_) == 1: split_.append("") name, info = split_ info = info[2:] if info.startswith("2,") else "" return name, info def _add_maildir_info(self, name, info): name, old_info = self._split_maildir_info(name) return name + ":2," + "".join(sorted(set(old_info + info))) def __call__(self, data): """ Main pipeline event handler. """ if not self.new_dir_list: self.new_dir_list = os.listdir(self.new) if not self.cur_dir_list: self.cur_dir_list = os.listdir(self.cur) if not self.new_dir_list and not self.cur_dir_list: if not self.esc.inotify: self.esc.after(self.wait, self, None) return while self.new_dir_list: short_name = self.new_dir_list.pop() long_name = pth.join(self.new, short_name) new_short_name = self._add_maildir_info(short_name, "") new_name = pth.join(self.cur, new_short_name) try: os.rename(long_name, new_name) except IOError as e: if e.errno != errno.ENOENT: logging.exception("Error reading %s", long_name) # Otherwise we've been faster than inotify or someone else have been faster than us, no big deal self.cur_dir_list.append(new_short_name) logging.debug("%s renamed to %s", long_name, new_name) data = None while data is None and self.cur_dir_list: short_name = self.cur_dir_list.pop() maildir_info = self._split_maildir_info(short_name)[1] if "S" in maildir_info or "F" in maildir_info: break long_name = pth.join(self.cur, short_name) new_name = pth.join(self.tmp, short_name) try: os.rename(long_name, new_name) with open(new_name, "rb") as f: data = f.read() except OSError as e: if e.errno != errno.ENOENT: logging.exception("Error reading %s", new_name) # Otherwise we've been faster than inotify or someone else have been faster than us, no big deal if data is not None: self.esc.enqueue(self, None) event = self.factory() self.id_set(event) self.on_fly.setdefault(self.id_get(event), set()).add(short_name) event = self.data_set(event, data) return (event,) elif not self.esc.inotify: self.esc.after(self.wait, self, None) return
[docs] def event_rename(self, old_id, new_id): """ `rename` event handler. Watch for event data changing id, merging, etc. to be able to remove finished file. """ old_data = self.on_fly.pop(old_id, ()) self.on_fly.setdefault(new_id, set()).update(old_data) logging.debug("%s renamed, now tracked as %s", old_id, new_id)
[docs] def event_done(self, event_id, _flag="S"): """ `done` event handler Mark mail with flag and return to "cur". """ names = self.on_fly.pop(event_id, ()) for name in names: base_name, maildir_info = self._split_maildir_info(name) long_name = pth.join(self.tmp, name) logging.debug("cur %s, base %s, info %s", self.cur, base_name, maildir_info) new_name = pth.join(self.cur, self._add_maildir_info(name, _flag)) try: os.rename(long_name, new_name) except Exception: logging.exception("%s: attempt to rename file %s failed", event_id, long_name) else: logging.debug("%s: rename %s back to %s", event_id, long_name, new_name)
[docs] def event_error(self, event_id): """ `error` event handler Mark mail with "flagged" flag and return to "cur". """ self.event_done(event_id, _flag="F")
[docs] def finish(self): """ `finish` event handler. """ self.esc.unschedule(self) if self.esc.inotify: self.esc.inotify_rm(self.new, self.handle_inotify_new) self.esc.inotify_rm(self.cur, self.handle_inotify_cur) self.esc = dummy_escapement
[docs] def done(self): """ `done` event handler. """ for eid, names in self.on_fly.items(): for name in names: try: os.rename(pth.join(self.tmp, name), pth.join(self.cur, name)) logging.debug("%s: Moved unfinished file %s into %s", eid, name, self.cur) except Exception: logging.debug("%s: Unable to move unfinished file %s into %s", eid, name, self.cur)
[docs]class EmailParse(Cog): """ Email data parsing cog """
[docs] def __init__(self, msg_get=None, header_set=None, text_set=None, flattext_set=None, attach_set=None, default_charsets=["utf-8", "ascii", "iso-8859-1"]): """ Initialize EmailParser. :param msg_get: Getter for email data string. :param header_set: Setter for dict of headers. Keys are header names, values are lists of multiple separate entries as appeared in message. :param text_set: Setter for list of all text/* parts. :param flattext_set: Setter for all text/* parts joined together by newline. :param attach_set: Setter for all nontext attachments in the form of dicts, containing "Name", "Content-Type" and "Data". :param default_charset: Charset to try when unspecified in the message. """ self.msg_get = None or itemgetter("input") self.header_set = header_set or nullsetter self.flattext_set = flattext_set self.text_set = text_set self.attach_set = attach_set self.default_charsets = default_charsets
def _decode(self, s, charset=None): for ch in [charset] + self.default_charsets: if not ch: continue try: res = s.decode(ch) break except (UnicodeDecodeError, LookupError): pass return res def _decode_header(self, header): try: pieces = eheader.decode_header(header) except HeaderParseError: return self._decode(header_text) else: res = [] for piece, charset in pieces: if isinstance(piece, bytes_type) or charset: piece = self._decode(piece, charset) res.append(piece) return "".join(res) def __call__(self, data): mime = self.msg_get(data) msg = parse_message(mime) headers = dict() for header, value in msg.items(): value = self._decode_header(value) headers.setdefault(header, list()).append(value) data = self.header_set(data, headers) text = [] attach = [] if self.flattext_set or self.text_set or self.attach_set: for part in msg.walk(): maintype = part.get_content_maintype() if maintype == "multipart": continue # Just another container if maintype == "text" and (self.text_set or self.flattext_set): body = self._decode(part.get_payload(decode=True), str(part.get_charset())) text.append(body) elif self.attach_set: attach.append({ "Name": part.get_filename(), "Content-Type": part.get_content_type(), "Data": part.get_payload(decode=True) }) if text: if self.text_set: data = self.text_set(data, text) if self.flattext_set: flattext = "\n".join(text) data = self.flattext_set(data, flattext) if attach: data = self.attach_set(data, attach) del data["input"] return (data,)