Django Mail-Bot with ChatGPT
This code is an email retrieval and processing script, written in Python. The script uses the imaplib
library to connect to an email account and retrieve unread emails. It then decodes the emails, extracts the relevant information (sender, subject, content, attachments, etc.), saves the information to a database using a Django model, and processes the emails. The script is intended to be used as a backend for an email processing system, allowing users to automate certain email-related tasks.
The script makes use of the OpenAI API to process the emails and generate answers.
Before running the script, install the following packages:
pip install retrying
pip install ratelimiter
Django models:
Email:
from django.db import models
from django.utils.timezone import now
class Email(models.Model):
"""
Model representing an email.
Fields:
subject (CharField): The subject of the email, with a maximum length of 255 characters.
content (TextField): The content of the email.
received_time (DateTimeField): The time at which the email was received, with a default value of the current time.
account (TextField): The account from which the email was received, with a default value of an empty string.
email_hash (CharField): A unique identifier for the email, with a maximum length of 64 characters and a default value of an empty string.
answered (BooleanField): A boolean indicating whether the email has been answered, with a default value of False.
sender (TextField): The sender of the email, with a default value of an empty string.
"""
subject = models.CharField(max_length=255)
content = models.TextField()
received_time = models.DateTimeField(default=now, blank=True)
account = models.TextField(default="")
email_hash = models.CharField(max_length=64, unique=True, default="")
answered = models.BooleanField(default=False)
sender = models.TextField(default="")
def __str__(self):
"""
Return a string representation of the email instance, which is the email's subject.
Returns:
str: The subject of the email.
"""
return self.subject
Mail attachment:
class EmailAttachment(models.Model):
"""
Model representing an attachment to an email.
Fields:
email (ForeignKey): A foreign key to the Email model, indicating which email the attachment is associated with. The attachment will be deleted if the associated email is deleted.
attachment (BinaryField): The binary data of the attachment.
"""
email = models.ForeignKey(Email, on_delete=models.CASCADE)
attachment = models.BinaryField()
def get_attachment_content(self):
"""
Return the content of the attachment as a string.
Returns:
str: The content of the attachment.
"""
content = self.attachment.decode()
return content
Run:
manage.py makemigrations
manage.py migrate
In the settings.py add two cronjobs:
('* * * * *','[pathtoscript].mail.check_gpt_account',' >> /[pathtoproject]/mail.log 2>&1'),
('* * * * *','[pathtoscript]scripts.functions.mail.check_mails_to_process',' >> /[pathtoproject]/mail.log 2>&1'),
Like this the mail account will be checked every minute for new mails.
Don’t forget to add the cronjobs:
manage.py crontab add
Now the basic preparations are done. Lets start with the script.
At first we have a settings block:
# Import the Email and EmailAttachment models from the "[model].models" module
from [yourmodel].models import Email, EmailAttachment
# A list of allowed email addresses that the code should process
ALLOWED_SENDER = ["test@gmail.com", "admin@aol.com"]
# Variables for the IMAP and SMTP email accounts
MAIL_USERNAME = '[USERNAME]'
MAIL_IN_HOST = '[IMAP HOST]'
MAIL_PASSWORD = '[PASSWORD]'
MAIL_OUT_HOST = '[SMTP HOST]'
# Variables for the OpenAI API Key and engine
OPENAI_API_KEY = '[API KEY]'
OPENAI_ENGINE = "text-davinci-003"
# The subject line for the response email sent by the code
ANSWER_SUBJECT = 'chatgpt answered you'
Then we need to check for new mails. This function will be called by the cronjob:
# Function to check the GPT email account for new emails
def check_gpt_account():
logging.warning('### looking for new mails ###')
# Call the "retrieve_new_emails" function to retrieve and process new emails
retrieve_new_emails(MAIL_IN_HOST, MAIL_USERNAME, MAIL_PASSWORD, MAIL_USERNAME)
return True
Now we will retrieve new emails from the account and save them into our database:
import email
import imaplib
def retrieve_new_emails(host, username, password, account):
"""
Retrieve new unseen emails from the inbox using IMAP protocol.
Parameters:
host (str): IMAP host address
username (str): IMAP username
password (str): IMAP password
account (obj): Django model representing an email account
Returns:
None
"""
# Connect to IMAP server
mail = imaplib.IMAP4_SSL(host)
mail.login(username, password)
mail.select("inbox")
# Search for unseen emails
result, data = mail.search(None, "UNSEEN")
email_ids = data[0].split()
# Loop through each email
for email_id in email_ids:
# Fetch the raw email data
result, data = mail.fetch(email_id, "(RFC822)")
raw_email = data[0][1].decode("utf-8")
# Convert the raw email data into a message object
email_message = email.message_from_string(raw_email)
# Get the email subject
subject = get_email_subject(email_message)
# Get the email content
content = get_email_content(email_message)
# Get the email attachments
attachments = get_email_attachments(email_message)
# Get the received time of the email
received_time = email.utils.parsedate_to_datetime(email_message["Date"])
# Get the sender of the email
sender = email_message["From"]
# Get the email hash
email_hash = get_email_hash(content, email_id)
# Save the email information to the database
if not Email.objects.filter(email_hash=email_hash).exists():
logging.warning('### new mail found ###')
save_email_to_db(subject, content, received_time, attachments, email_hash, account, sender)
For this we will need multiple smaller functions which are following here:
Get attachments:
def get_email_attachments(email_message):
"""
Given an email message, this function returns a list of attachments present in the email.
Args:
email_message: email.message.Message object
The email message to extract attachments from
Returns:
attachments: list of binary data
List of binary data representing the attachments in the email
"""
attachments = []
for part in email_message.walk():
if part.get("Content-Disposition") is None:
continue
filename = part.get_filename()
if bool(filename):
attachments.append(part.get_payload(decode=True))
return attachments
Get the mail content:
def get_email_content(email_message):
"""
Extract the content of an email.
Parameters:
email_message (obj): email message object
Returns:
content (str): email content
"""
if email_message.is_multipart():
for i, part in enumerate(email_message.get_payload()):
if part.get_content_type() == "text/plain":
partcontent = part.get_payload(decode=True).decode("utf-8")
if i == 0:
content = partcontent
else:
content = email_message.get_payload(decode=True).decode("utf-8")
return content
Get the mail subject:
from email.header import decode_header
def get_email_subject(email_message):
"""
Extract the subject of an email.
Parameters:
email_message (obj): email message object
Returns:
subject (str): email subject
"""
subject = ""
if email_message["Subject"]:
subject = decode_header(email_message["Subject"])[0][0]
if isinstance(subject, bytes):
subject = subject.decode()
return subject
Calculate a mail hash (needed to identify mails):
import hashlib
def get_email_hash(content, email_id):
"""
Generates a unique hash for an email based on its content and email id.
Args:
content: str
The content of the email
email_id: str
The id of the email
Returns:
email_hash: str
The generated hash for the email
"""
sha256 = hashlib.sha256()
sha256.update((str(content) + str(email_id)).encode('utf-8'))
email_hash = sha256.hexdigest()
return email_hash
And save the mail finally to the database:
def save_email_to_db(subject, content, received_time, attachments, email_hash, account, sender):
"""
Saves an email and its attachments to the database.
Args:
subject: str
The subject of the email
content: str
The content of the email
received_time: datetime
The time the email was received
attachments: list of binary data
List of binary data representing the attachments in the email
email_hash: str
The hash of the email
account: str
The email account the email was received on
sender: str
The sender of the email
Returns:
None
"""
email = Email(subject=subject, content=content, received_time=received_time, email_hash=email_hash, account=account, sender=sender)
email.save()
# loop through attachments and save them
for attachment in attachments:
email_attachment = EmailAttachment(email=email, attachment=attachment)
email_attachment.save()
Now we need to process the mails (this function will also be called by a cronjob):
def check_mails_to_process():
"""
This function checks for unanswered emails and processes them by getting the request text, sending a response through the chatgpt_request function, building an answer email and sending it to the sender.
The processed mails are then marked as answered.
"""
# Get a list of unanswered emails
mails = get_unanswered_emails()
# If there are no unanswered emails, return False
if not mails:
return False
# Iterate through the mails
for mail in mails:
# Check if the sender is allowed
if not sender_is_allowed(mail):
# If not, mark the mail as answered and continue to the next iteration
mark_as_answered(mail)
continue
# Get the request text from the mail
request = get_request_text(mail)
# Get the answer to the request through the chatgpt_request function
answer = chatgpt_request(request)
# Log the request and answer for debugging purposes
logging.warning(request)
logging.warning(answer)
# Build the answer email
answermail = build_answer_mail(mail, answer)
# Send the answer email to the sender
send_email(MAIL_OUT_HOST, MAIL_USERNAME, MAIL_PASSWORD, mail.sender, ANSWER_SUBJECT, answermail)
# Mark the mail as answered
mark_as_answered(mail)
In this function are some other functions included that are now following:
Check for unanswered mails:
def get_unanswered_emails():
# Return all unanswetred emails
return Email.objects.filter(answered=False)
Check if the sender is allowed:
def sender_is_allowed(mail):
"""
This function is used to check if the sender of an email is in a list of allowed senders.
Args:
- mail (object): An object representing an email, which should contain a 'sender' attribute.
Returns:
- bool: A boolean indicating whether the email sender is allowed or not.
Global Variables:
- ALLOWED_SENDER (list): A list of allowed email addresses, which the 'mail' sender will be checked against.
"""
check_list = ALLOWED_SENDER
for email_to_check in check_list:
if email_to_check in mail.sender:
return True
return False
Build the request text for ChatGPT:
def get_request_text(mail):
"""
This function retrieves the request text from a given email.
Args:
mail (Email): An instance of the Email model, representing the email.
Returns:
str: The request text, including the contents of any attachments if they exist.
"""
# Retrieve the content of the email
request = mail.content
# Check if there are any email attachments associated with the email
if EmailAttachment.objects.filter(email_id=mail).exists():
# Retrieve the attachments if they exist
attachments = EmailAttachment.objects.filter(email_id=mail)
# Create a list to store the contents of each attachment
attachmentcontents = []
# Loop through each attachment
for attachment in attachments:
# Retrieve the content of the attachment and add it to the list
attachmentcontents.append(attachment.get_attachment_content())
# Join the contents of all attachments into a single string
attachmentcontent = "\r\n\r\n".join(attachmentcontents)
# Add the attachments to the request text
request += "\r\n\r\n Hereafter please find the data:\r\n\r\n" + attachmentcontent
# Return the request text
return request
Mark mails as answered:
def mark_as_answered(mail):
"""
This function marks a given email as answered.
Args:
mail (Email): An instance of the Email model, representing the email.
Returns:
None
"""
# Set the "answered" attribute of the email to True
mail.answered = True
# Save the updated email instance
mail.save()
Build the answer mail:
def build_answer_mail(mail, answer):
"""
This function builds an answer email based on a given email and answer text.
Args:
mail (Email): An instance of the Email model, representing the email to be answered.
answer (str): The text of the answer to the email.
Returns:
str: The complete text of the answer email, including both the answer text and the contents of the original email.
"""
# Build the answer email text
answer_mail = []
answer_mail.append("ChatGPT:")
answer_mail.append(answer)
answer_mail.append("Here I provide our previous conversation:")
answer_mail.append(mail.content)
answer_mail = "\r\n\r\n".join(answer_mail)
# Return the answer email text
return answer_mail
Make a ChatGPT API request:
# pip install retrying
from retrying import retry
# pip install ratelimiter
from ratelimiter import RateLimiter
@retry(stop_max_attempt_number=10)
@RateLimiter(max_calls=20, period=60)
def chatgpt_request(request, max_tokens=100):
import openai
import requests
# Define the API endpoint and header
openai.api_key = OPENAI_API_KEY
# Define the payload with the input text
completion = openai.Completion.create(
engine=OPENAI_ENGINE,
prompt=request,
max_tokens=1024,
n=1,
stop=None,
temperature=0.5,
)
response = completion.choices[0].text
response = response.replace("Answer: ", "")
logging.warning(response)
return response
Send an Email:
import smtplib
from email.mime.text import MIMEText
def send_email(host, username, password, recipient, subject, text):
"""
This function sends an email using the Simple Mail Transfer Protocol (SMTP).
Args:
host (str): The hostname or IP address of the SMTP server.
username (str): The username to use when logging into the SMTP server.
password (str): The password to use when logging into the SMTP server.
recipient (str): The email address of the recipient.
subject (str): The subject of the email.
text (str): The body of the email.
Returns:
None
"""
# Create a MIME text message
msg = MIMEText(text)
msg['Subject'] = subject
msg['To'] = recipient
# Connect to the SMTP server and login
server = smtplib.SMTP(host, 587)
server.ehlo()
server.starttls()
server.ehlo()
server.login(username, password)
# Send the email
server.sendmail(username, recipient, msg.as_string())
# Close the connection to the SMTP server
server.quit()