Django Mail-Bot with ChatGPT

This code is an email retrieval and processing script, written in Python. The script uses the imaplib library to connect to an email account and retrieve unread emails. It then decodes the emails, extracts the relevant information (sender, subject, content, attachments, etc.), saves the information to a database using a Django model, and processes the emails. The script is intended to be used as a backend for an email processing system, allowing users to automate certain email-related tasks.

The script makes use of the OpenAI API to process the emails and generate answers.

Before running the script, install the following packages:

pip install retrying
pip install ratelimiter

Django models:

Email:

from django.db import models

from django.utils.timezone import now
class Email(models.Model):
    """
    Model representing an email.
    
    Fields:
        subject (CharField): The subject of the email, with a maximum length of 255 characters.
        content (TextField): The content of the email.
        received_time (DateTimeField): The time at which the email was received, with a default value of the current time.
        account (TextField): The account from which the email was received, with a default value of an empty string.
        email_hash (CharField): A unique identifier for the email, with a maximum length of 64 characters and a default value of an empty string.
        answered (BooleanField): A boolean indicating whether the email has been answered, with a default value of False.
        sender (TextField): The sender of the email, with a default value of an empty string.
        
    """
    subject = models.CharField(max_length=255)
    content = models.TextField()
    received_time = models.DateTimeField(default=now, blank=True)
    account = models.TextField(default="")
    email_hash = models.CharField(max_length=64, unique=True, default="")
    answered = models.BooleanField(default=False)
    sender = models.TextField(default="")
    
    def __str__(self):
        """
        Return a string representation of the email instance, which is the email's subject.
        
        Returns:
            str: The subject of the email.
            
        """
        return self.subject

Mail attachment:

class EmailAttachment(models.Model):
    """
    Model representing an attachment to an email.
    
    Fields:
        email (ForeignKey): A foreign key to the Email model, indicating which email the attachment is associated with. The attachment will be deleted if the associated email is deleted.
        attachment (BinaryField): The binary data of the attachment.
        
    """
    email = models.ForeignKey(Email, on_delete=models.CASCADE)
    attachment = models.BinaryField()

    def get_attachment_content(self):
        """
        Return the content of the attachment as a string.
        
        Returns:
            str: The content of the attachment.
            
        """
        content = self.attachment.decode()
        return content

Run:

manage.py makemigrations
manage.py migrate

In the settings.py add two cronjobs:

('* * * * *','[pathtoscript].mail.check_gpt_account',' >> /[pathtoproject]/mail.log 2>&1'),
('* * * * *','[pathtoscript]scripts.functions.mail.check_mails_to_process',' >> /[pathtoproject]/mail.log 2>&1'),

Like this the mail account will be checked every minute for new mails.

Don’t forget to add the cronjobs:

manage.py crontab add

Now the basic preparations are done. Lets start with the script.

At first we have a settings block:

# Import the Email and EmailAttachment models from the "[model].models" module
from [yourmodel].models import Email, EmailAttachment

# A list of allowed email addresses that the code should process
ALLOWED_SENDER  = ["test@gmail.com", "admin@aol.com"]

# Variables for the IMAP and SMTP email accounts
MAIL_USERNAME   = '[USERNAME]'
MAIL_IN_HOST    = '[IMAP HOST]'
MAIL_PASSWORD   = '[PASSWORD]'
MAIL_OUT_HOST   = '[SMTP HOST]'

# Variables for the OpenAI API Key and engine
OPENAI_API_KEY  = '[API KEY]'
OPENAI_ENGINE   = "text-davinci-003"

# The subject line for the response email sent by the code
ANSWER_SUBJECT  = 'chatgpt answered you'

Then we need to check for new mails. This function will be called by the cronjob:

# Function to check the GPT email account for new emails
def check_gpt_account():
    logging.warning('### looking for new mails ###')
    # Call the "retrieve_new_emails" function to retrieve and process new emails
    retrieve_new_emails(MAIL_IN_HOST, MAIL_USERNAME, MAIL_PASSWORD, MAIL_USERNAME)
    return True

Now we will retrieve new emails from the account and save them into our database:

import email
import imaplib
def retrieve_new_emails(host, username, password, account):
    """
    Retrieve new unseen emails from the inbox using IMAP protocol.

    Parameters:
    host (str): IMAP host address
    username (str): IMAP username
    password (str): IMAP password
    account (obj): Django model representing an email account

    Returns:
    None
    """
    
    # Connect to IMAP server
    mail = imaplib.IMAP4_SSL(host)
    mail.login(username, password)
    mail.select("inbox")
    
    # Search for unseen emails
    result, data = mail.search(None, "UNSEEN")
    email_ids = data[0].split()
    
    # Loop through each email
    for email_id in email_ids:
        
        # Fetch the raw email data
        result, data = mail.fetch(email_id, "(RFC822)")
        raw_email = data[0][1].decode("utf-8")
        
        # Convert the raw email data into a message object
        email_message = email.message_from_string(raw_email)
        
        # Get the email subject
        subject = get_email_subject(email_message)
        
        # Get the email content
        content = get_email_content(email_message)
        
        # Get the email attachments
        attachments = get_email_attachments(email_message)
        
        # Get the received time of the email
        received_time = email.utils.parsedate_to_datetime(email_message["Date"])
        
        # Get the sender of the email
        sender = email_message["From"]
        
        # Get the email hash
        email_hash = get_email_hash(content, email_id)
        
        # Save the email information to the database
        if not Email.objects.filter(email_hash=email_hash).exists():
            logging.warning('### new mail found ###')
            save_email_to_db(subject, content, received_time, attachments, email_hash, account, sender)

For this we will need multiple smaller functions which are following here:

Get attachments:

def get_email_attachments(email_message):
    """
    Given an email message, this function returns a list of attachments present in the email.
    
    Args:
    email_message: email.message.Message object
        The email message to extract attachments from
    
    Returns:
    attachments: list of binary data
        List of binary data representing the attachments in the email
    """
    attachments = []
    for part in email_message.walk():
        if part.get("Content-Disposition") is None:
            continue
        filename = part.get_filename()
        if bool(filename):
            attachments.append(part.get_payload(decode=True))
    return attachments

Get the mail content:

def get_email_content(email_message):
    """
    Extract the content of an email.

    Parameters:
    email_message (obj): email message object

    Returns:
    content (str): email content
    """
    if email_message.is_multipart():
            for i, part in enumerate(email_message.get_payload()):
                if part.get_content_type() == "text/plain":
                    partcontent = part.get_payload(decode=True).decode("utf-8")
                    if i == 0:
                        content = partcontent
    else:
        content = email_message.get_payload(decode=True).decode("utf-8")
    return content

Get the mail subject:

from email.header import decode_header
def get_email_subject(email_message):
    """
    Extract the subject of an email.

    Parameters:
    email_message (obj): email message object

    Returns:
    subject (str): email subject
    """
    subject = ""
    if email_message["Subject"]:
        subject = decode_header(email_message["Subject"])[0][0]
        if isinstance(subject, bytes):
            subject = subject.decode()
    return subject

Calculate a mail hash (needed to identify mails):

import hashlib
def get_email_hash(content, email_id):
    """
    Generates a unique hash for an email based on its content and email id.
    
    Args:
    content: str
        The content of the email
    email_id: str
        The id of the email
    
    Returns:
    email_hash: str
        The generated hash for the email
    """
    sha256 = hashlib.sha256()
    sha256.update((str(content) + str(email_id)).encode('utf-8'))
    email_hash = sha256.hexdigest()
    return email_hash

And save the mail finally to the database:

def save_email_to_db(subject, content, received_time, attachments, email_hash, account, sender):
    """
    Saves an email and its attachments to the database.
    
    Args:
    subject: str
        The subject of the email
    content: str
        The content of the email
    received_time: datetime
        The time the email was received
    attachments: list of binary data
        List of binary data representing the attachments in the email
    email_hash: str
        The hash of the email
    account: str
        The email account the email was received on
    sender: str
        The sender of the email
    
    Returns:
    None
    """
    email = Email(subject=subject, content=content, received_time=received_time, email_hash=email_hash, account=account, sender=sender)
    email.save()

    # loop through attachments and save them
    for attachment in attachments:
        email_attachment = EmailAttachment(email=email, attachment=attachment)
        email_attachment.save()

Now we need to process the mails (this function will also be called by a cronjob):

def check_mails_to_process():
    """
    This function checks for unanswered emails and processes them by getting the request text, sending a response through the chatgpt_request function, building an answer email and sending it to the sender. 
    The processed mails are then marked as answered.
    """
    # Get a list of unanswered emails
    mails = get_unanswered_emails()
    
    # If there are no unanswered emails, return False
    if not mails:
        return False
    
    # Iterate through the mails
    for mail in mails:
        
        # Check if the sender is allowed
        if not sender_is_allowed(mail):
            # If not, mark the mail as answered and continue to the next iteration
            mark_as_answered(mail)
            continue
        
        # Get the request text from the mail
        request = get_request_text(mail)
        
        # Get the answer to the request through the chatgpt_request function
        answer = chatgpt_request(request)
        
        # Log the request and answer for debugging purposes
        logging.warning(request)
        logging.warning(answer)
        
        # Build the answer email
        answermail = build_answer_mail(mail, answer)
        
        # Send the answer email to the sender
        send_email(MAIL_OUT_HOST, MAIL_USERNAME, MAIL_PASSWORD, mail.sender, ANSWER_SUBJECT, answermail)
        
        # Mark the mail as answered
        mark_as_answered(mail)

In this function are some other functions included that are now following:

Check for unanswered mails:

def get_unanswered_emails():
    # Return all unanswetred emails
    return Email.objects.filter(answered=False)

Check if the sender is allowed:

def sender_is_allowed(mail):
    """
    This function is used to check if the sender of an email is in a list of allowed senders.

    Args:
    - mail (object): An object representing an email, which should contain a 'sender' attribute.

    Returns:
    - bool: A boolean indicating whether the email sender is allowed or not.

    Global Variables:
    - ALLOWED_SENDER (list): A list of allowed email addresses, which the 'mail' sender will be checked against.
    """
    check_list = ALLOWED_SENDER
    for email_to_check in check_list:
        if email_to_check in mail.sender:
            return True
    return False

Build the request text for ChatGPT:

def get_request_text(mail):
    """
    This function retrieves the request text from a given email. 
    
    Args:
        mail (Email): An instance of the Email model, representing the email.
    
    Returns:
        str: The request text, including the contents of any attachments if they exist.
        
    """
    
    # Retrieve the content of the email
    request = mail.content
    
    # Check if there are any email attachments associated with the email
    if EmailAttachment.objects.filter(email_id=mail).exists():
        # Retrieve the attachments if they exist
        attachments = EmailAttachment.objects.filter(email_id=mail)
        
        # Create a list to store the contents of each attachment
        attachmentcontents = []
        
        # Loop through each attachment
        for attachment in attachments:
            # Retrieve the content of the attachment and add it to the list
            attachmentcontents.append(attachment.get_attachment_content())
        
        # Join the contents of all attachments into a single string
        attachmentcontent = "\r\n\r\n".join(attachmentcontents)
        
        # Add the attachments to the request text
        request += "\r\n\r\n Hereafter please find the data:\r\n\r\n" + attachmentcontent
    
    # Return the request text
    return request

Mark mails as answered:

def mark_as_answered(mail):
    """
    This function marks a given email as answered.
    
    Args:
        mail (Email): An instance of the Email model, representing the email.
        
    Returns:
        None
    
    """
    
    # Set the "answered" attribute of the email to True
    mail.answered = True
    
    # Save the updated email instance
    mail.save()

Build the answer mail:

def build_answer_mail(mail, answer):
    """
    This function builds an answer email based on a given email and answer text.
    
    Args:
        mail (Email): An instance of the Email model, representing the email to be answered.
        answer (str): The text of the answer to the email.
    
    Returns:
        str: The complete text of the answer email, including both the answer text and the contents of the original email.
        
    """
    
    # Build the answer email text
    answer_mail = []
    answer_mail.append("ChatGPT:")
    answer_mail.append(answer)
    answer_mail.append("Here I provide our previous conversation:")
    answer_mail.append(mail.content)
    
    answer_mail = "\r\n\r\n".join(answer_mail)
    
    # Return the answer email text
    return answer_mail

Make a ChatGPT API request:

# pip install retrying
from retrying import retry
# pip install ratelimiter
from ratelimiter import RateLimiter
@retry(stop_max_attempt_number=10)
@RateLimiter(max_calls=20, period=60)
def chatgpt_request(request, max_tokens=100):
    import openai
    import requests

    # Define the API endpoint and header
    openai.api_key = OPENAI_API_KEY
    
    # Define the payload with the input text
    completion = openai.Completion.create(
        engine=OPENAI_ENGINE,
        prompt=request,
        max_tokens=1024,
        n=1,
        stop=None,
        temperature=0.5,
    )

    response = completion.choices[0].text
    response = response.replace("Answer: ", "")
    logging.warning(response)
    return response

Send an Email:

import smtplib
from email.mime.text import MIMEText

def send_email(host, username, password, recipient, subject, text):
    """
    This function sends an email using the Simple Mail Transfer Protocol (SMTP).

    Args:
        host (str): The hostname or IP address of the SMTP server.
        username (str): The username to use when logging into the SMTP server.
        password (str): The password to use when logging into the SMTP server.
        recipient (str): The email address of the recipient.
        subject (str): The subject of the email.
        text (str): The body of the email.

    Returns:
        None
    """
    # Create a MIME text message
    msg = MIMEText(text)
    msg['Subject'] = subject
    msg['To'] = recipient

    # Connect to the SMTP server and login
    server = smtplib.SMTP(host, 587)
    server.ehlo()
    server.starttls()
    server.ehlo()
    server.login(username, password)

    # Send the email
    server.sendmail(username, recipient, msg.as_string())

    # Close the connection to the SMTP server
    server.quit()

Leave a Reply

Your email address will not be published. Required fields are marked *