Source code for apps.core.smtpd

import asyncore
import base64
import smtpd
import email
import logging
import threading
from email.header import decode_header
from email.parser import Parser
from time import sleep

from django.conf import settings
from django.core.cache import cache
from django.db import transaction

from apps.mails import models

logger = logging.getLogger(__name__)


[docs]class Lurker(smtpd.SMTPServer): INSTANCE = None THREAD = None CLEANER = None enable_SMTPUTF8 = True
[docs] def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): """ Process and save message :param peer: :param mailfrom: :param rcpttos: :param data: :param kwargs: :return: """ if not data: raise Exception('No content given!') parser = Parser() if isinstance(data, bytes): data = data.decode() msg = parser.parsestr(data) sender = self.parse_address(msg.get('From')) # Start transaction of inserting message and parts. with transaction.atomic(): # Create message message = models.Message.objects.create( peer=peer[0], port=peer[1], headers=dict(msg.items()), sender_name=sender[0], sender_address=sender[1], recipients_to=self.parse_addresses(msg.get_all('To', [])), recipients_cc=self.parse_addresses(msg.get_all('Cc', [])), recipients_bcc=self.parse_addresses(msg.get_all('Bcc', [])), subject=Lurker.clean_international_string(msg.get('Subject')), source=data, size=len(data), type=msg.get_content_type(), ) # Create parts for part in msg.walk(): body = part.get_payload(decode=True) length = len(body) if body else 0 models.MessagePart.objects.create( message=message, is_attachment=part.get_filename() is not None, filename=part.get_filename(), type=part.get_content_type(), charset=part.get_content_charset(), body=body, size=length ) try: self.clear_cache() except Exception as e: logger.debug('Cache clearance failed.') logger.debug(e) logger.info( 'Saved email from \'{}:{}\' with {} parts.'.format(message.peer, message.port, message.parts.count()) ) return
@staticmethod
[docs] def clear_cache(): if settings.DEBUG: return cache.delete_many([ 'mails.context.counts', ])
@staticmethod
[docs] def clean(): num_deleted = models.Message.cleanup() if num_deleted > 0: logger.info('Cleanup deleted \'{}\' old messages.'.format(num_deleted)) # Schedule next cleanup task. Lurker.CLEANER = threading.Timer(3600, Lurker.clean) Lurker.CLEANER.start()
@staticmethod
[docs] def clean_international_string(entry): # Remove the prefix and suffix if exists. prefix = '=?UTF-8?B?' suffix = '?=' if prefix in entry and suffix in entry: middle = entry[len(prefix):len(entry)-len(suffix)] decoded = base64.b64decode(middle) return decoded.decode('utf-8') return entry
@staticmethod
[docs] def parse_addresses(raw): """ Parse Addresses :param raw: :return: """ if type(raw) is not list: raw = [raw] addresses = list() for entry in raw: addresses.append(Lurker.clean_international_string(entry)) return email.utils.getaddresses(addresses)
@staticmethod
[docs] def parse_address(raw): """ Parse Address :param raw: :return: """ return list(email.utils.parseaddr(Lurker.clean_international_string(raw)))
@staticmethod
[docs] def start(test=False, threaded=False, cleaner=False): """ Start SMTP catcher. :param test: :param threaded: :param cleaner: :return: """ if Lurker.INSTANCE: return Lurker.INSTANCE = Lurker((settings.SMTPD_ADDRESS, settings.SMTPD_PORT), None) if cleaner: Lurker.CLEANER = threading.Timer(3600, Lurker.clean) Lurker.CLEANER.start() if threaded: Lurker.THREAD = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1}) Lurker.THREAD.start() # Sleep to make sure our Lurker instance is ready. sleep(2) logger.info('Started listening on {}:{}'.format(settings.SMTPD_ADDRESS, settings.SMTPD_PORT)) if cleaner: logger.info('Scheduled cleaner task in background.')
@staticmethod
[docs] def stop(): """ Stop threaded server. :return: """ Lurker.INSTANCE.close() if Lurker.CLEANER: Lurker.CLEANER.cancel() if Lurker.THREAD: Lurker.THREAD.join()