SciPost Code Repository

Skip to content
Snippets Groups Projects
Commit ce125b63 authored by Jean-Sébastien Caux's avatar Jean-Sébastien Caux
Browse files

Add management command to get Events from Mailgun Events API

parent 9dff6410
No related branches found
No related tags found
No related merge requests found
......@@ -433,3 +433,8 @@ CELERY_IMPORTS = ('submissions.tasks',)
# Automation.
ED_ASSIGMENT_DT_DELTA = timedelta(hours=6)
# Mailgun credentials
MAILGUN_DOMAIN_NAME = ''
MAILGUN_API_KEY = ''
......@@ -29,3 +29,7 @@ EMAIL_BACKEND_ORIGINAL = "mails.backends.filebased.EmailBackend"
# CSP
CSP_REPORT_URI = get_secret('CSP_SENTRY')
CSP_REPORT_ONLY = True
# Mailgun credentials
MAILGUN_DOMAIN_NAME = get_secret('MAILGUN_DOMAIN_NAME')
MAILGUN_API_KEY = get_secret('MAILGUN_API_KEY')
__copyright__ = "Copyright © Stichting SciPost (SciPost Foundation)"
__license__ = "AGPL v3"
class APIMailError(Exception):
def __init__(self, name):
self.name = name
def __str__(self):
return 'API Mail error: {}'.format(self.name)
__copyright__ = "Copyright © Stichting SciPost (SciPost Foundation)"
__license__ = "AGPL v3"
import requests
from django.conf import settings
from django.core.management import BaseCommand
from ...exceptions import APIMailError
from ...models import Event
def get_and_save_events(url=None):
"""
Get events from Mailgun Events API.
This method treats a single page and saves new Events to the database.
If no url is given, get the first page.
Returns the paging JSON, if present, so traversing can be performed.
"""
response = requests.get(
url if url else "https://api.eu.mailgun.net/v3/%s/events" % settings.MAILGUN_DOMAIN_NAME,
auth=("api", settings.MAILGUN_API_KEY)
).json()
events = response['items']
for item in events:
if not Event.objects.filter(data__timestamp=item['timestamp'],
data__id=item['id']).exists():
Event.objects.create(data=item)
info = {'nitems': len(events)}
if 'paging' in response:
info['paging'] = response['paging']
return info
class Command(BaseCommand):
"""
Perform a GET request to harvest Events from the Mailgun API, saving them to the DB.
"""
help = 'Gets Events from the Maigun Events API and saves them to the DB.'
def handle(self, *args, **kwargs):
info = get_and_save_events()
ctr = 1 # Safety: ensure no runaway requests
while ctr < 100 and info['nitems'] > 0:
info = get_and_save_events(url=info['paging']['next'])
ctr += 1
if ctr == 100:
raise APIMailError('Hard stop of mailgun_get_events: '
'harvested above 100 pages from Mailgun Events API')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment