Source code for gmail_client.message

import datetime
import email
import re
import time
import os
from imaplib import ParseFlags

from gmail_client.codecs import decode_email_header


def parse_flags(headers):
    """
    Parses flags from headers using Python's `ParseFlags`.

    It drops all the \ in all the flags if it exists, to
    hide the details of the protocol.

    """
    def _parse_flag(f):
        if f.startswith('\\'):
            return f[1:]
        else:
            return f

    return set(map(_parse_flag, ParseFlags(headers)))

def parse_headers(message):
    """
    We must parse the headers because Python's Message class hides a
    dictionary type object but doesn't implement all of it's methods
    like copy or iteritems.

    """
    d = {}

    for k, v in message.items():
        d[k] = v

    return d

def parse_labels(headers):
    if re.search(r'X-GM-LABELS \(([^\)]+)\)', headers):
        labels = re.search(r'X-GM-LABELS \(([^\)]+)\)', headers).groups(1)[0].split(' ')
        return map(lambda l: l.replace('"', '').decode("string_escape"), labels)
    else:
        return list()

def parse_subject(encoded_header):

    if encoded_header is not None:
        dh = encoded_header.encode('UTF-8')
    else:
        dh = ''

    return dh


[docs]class Attachment(object): """ Attachments are files sent in the email. """ def __init__(self, name, content_type, content): self.name = name self.content_type = content_type self.content = content self.size = 0 if content is None else len(self.content) def save(self, path=None): if path is None: # Save as name of attachment if there is no path specified path = self.name elif os.path.isdir(path): # If the path is a directory, save as name of attachment in that directory path = os.path.join(path, self.name) with open(path, 'wb') as f: f.write(self.content) return path
class ParsedEmail(object): def __init__(self, message): self._body = [] self._html = [] self._attachments = [] self.parse(message) @staticmethod def is_attachment(p): content_disposition = p.get("Content-Disposition", None) filename = p.get_filename() if filename is not None \ or (content_disposition is not None and content_disposition.find('attachment') > -1): return True return False @staticmethod def is_multi_part(p): return 'multipart' == p.get_content_maintype() @staticmethod def is_html(p): return p.get_content_type() == 'text/html' @staticmethod def is_text(p): return p.get_content_type() == 'text/plain' def parse(self, mail): if self.is_multi_part(mail): for part in mail.walk(): self.parse_message_part(part) else: self.parse_message_part(mail) def parse_message_part(self, p): if self.is_attachment(p): return self.parse_attachment(p) else: return self.parse_message(p) def parse_attachment(self, message_part): a = Attachment(decode_email_header(message_part.get_filename()), message_part.get_content_type(), message_part.get_payload(decode=True)) self._attachments.append(a) return self def parse_message(self, p): if self.is_html(p): self._html.append(p) elif self.is_text(p): self._body.append(p) def _get_content(self, l): if len(l) != 0: # pass decode=True so that quoted printable characters # get interpreted into their unicode counterparts return l[-1].get_payload(decode=True) else: return '' @property def html(self): return self._get_content(self._html) @property def txt(self): return self._get_content(self._body) @property def attachments(self): return self._attachments
[docs]class Message(object):
def __init__(self, mailbox, uid): self.uid = uid self.mailbox = mailbox self.gmail = mailbox.gmail if mailbox else None self.message = None self.headers = {} self.subject = None self.body = None self.html = None self.to = None self.fr = None self.cc = None self.delivered_to = None self.sent_at = None self._flags = set([]) self._labels = set([]) self.thread_id = None self.thread = [] self.message_id = None self.attachments = [] def add_flag(self, flag): if flag not in self.flags: self.gmail.imap.uid('STORE', self.uid, '+FLAGS', '\\{0}'.format(flag)) self._flags.add(flag) return self def remove_flag(self, flag): if flag in self.flags: self.gmail.imap.uid('STORE', self.uid, '-FLAGS', '\\{0}'.format(flag)) self._flags.remove(flag) return self @property def flags(self): return self._flags @flags.setter def flags(self, fs): self._flags = set(fs) @property def labels(self): return self._labels @labels.setter def labels(self, fs): self._labels = set(fs) @property def is_read(self): return 'Seen' in self.flags @property def is_starred(self): return 'Flagged' in self.flags @property def is_draft(self): return 'Draft' in self.flags @property def is_deleted(self): return 'Deleted' in self.flags def mark_read(self): return self.add_flag('Seen') def mark_unread(self): return self.remove_flag('Seen') def star(self): return self.add_flag('Flagged') def un_star(self): return self.remove_flag('Flagged') def move_to(self, name): self.gmail.copy(self.uid, name, self.mailbox.name) if name not in ['[Gmail]/Bin', '[Gmail]/Trash']: self.delete() def delete(self): if self.mailbox.name not in ['[Gmail]/Bin', '[Gmail]/Trash']: trash = '[Gmail]/Trash' if '[Gmail]/Trash' in self.gmail.labels() else '[Gmail]/Bin' self.move_to(trash) return self.add_flag('Deleted') def has_label(self, label): return label in self.labels def add_label(self, label): if label not in self.labels: self.gmail.imap.uid('STORE', self.uid, '+X-GM-LABELS', label) self.labels.add(label) return self def remove_label(self, label): if label in self.labels: self.gmail.imap.uid('STORE', self.uid, '-X-GM-LABELS', label) self.labels.remove(label) return self def archive(self): self.move_to('[Gmail]/All Mail') return self def _parse(self, raw_message): raw_headers = raw_message[0] raw_email = raw_message[1] self.message = email.message_from_string(raw_email) self.headers = parse_headers(self.message) self.subject = parse_subject(self.message['subject']) self.sent_at = datetime.datetime.fromtimestamp(time.mktime(email.utils.parsedate_tz(self.message['date'])[:9])) self.to = self.message['to'] self.fr = self.message['from'] self.delivered_to = self.message['delivered_to'] parsed_email = ParsedEmail(self.message) self.body = parsed_email.txt self.html = parsed_email.html self.attachments = parsed_email.attachments self.flags = parse_flags(raw_headers) self.labels = parse_labels(raw_headers) if re.search(r'X-GM-THRID (\d+)', raw_headers): self.thread_id = re.search(r'X-GM-THRID (\d+)', raw_headers).groups(1)[0] if re.search(r'X-GM-MSGID (\d+)', raw_headers): self.message_id = re.search(r'X-GM-MSGID (\d+)', raw_headers).groups(1)[0] def fetch(self): return self.message if self.message else self.forced_fetch() @property def has_attachments(self): return len(self.attachments) > 0 def forced_fetch(self): _, results = self.gmail.imap.uid('FETCH', self.uid, '(BODY.PEEK[] FLAGS X-GM-THRID X-GM-MSGID X-GM-LABELS)') self._parse(results[0]) return self.message # returns a list of fetched messages (both sent and received) in chronological order def fetch_thread(self): self.fetch() original_mailbox = self.mailbox self.gmail.use_mailbox(original_mailbox.name) # fetch and cache messages from inbox or other received mailbox response, results = self.gmail.imap.uid('SEARCH', None, '(X-GM-THRID ' + self.thread_id + ')') received_messages = {} uids = results[0].split(' ') if response == 'OK': for uid in uids: received_messages[uid] = Message(original_mailbox, uid) self.gmail.fetch_multiple_messages(received_messages) self.mailbox.messages.update(received_messages) # fetch and cache messages from 'sent' self.gmail.use_mailbox('[Gmail]/Sent Mail') response, results = self.gmail.imap.uid('SEARCH', None, '(X-GM-THRID ' + self.thread_id + ')') sent_messages = {} uids = results[0].split(' ') if response == 'OK': for uid in uids: sent_messages[uid] = Message(self.gmail.mailboxes['[Gmail]/Sent Mail'], uid) self.gmail.fetch_multiple_messages(sent_messages) self.gmail.mailboxes['[Gmail]/Sent Mail'].messages.update(sent_messages) self.gmail.use_mailbox(original_mailbox.name) # combine and sort sent and received messages return sorted(dict(received_messages.items() + sent_messages.items()).values(), key=lambda m: m.sent_at)