From 8add0b941b017d68d85395455093e19d516591fa Mon Sep 17 00:00:00 2001 From: Boris Ponsioen <b.g.t.ponsioen@uva.nl> Date: Wed, 23 May 2018 09:27:09 +0200 Subject: [PATCH] Removes mongoengine rest framework, implements incremental updates on Citables per journal --- SciPost_v1/settings/base.py | 1 - metacore/admin.py | 17 +++- metacore/services.py | 181 ++++++++++++++++++++++-------------- requirements.txt | 1 - 4 files changed, 125 insertions(+), 75 deletions(-) diff --git a/SciPost_v1/settings/base.py b/SciPost_v1/settings/base.py index dfbbc6f38..db35cf629 100644 --- a/SciPost_v1/settings/base.py +++ b/SciPost_v1/settings/base.py @@ -83,7 +83,6 @@ INSTALLED_APPS = ( 'guardian', 'haystack', 'rest_framework', - 'rest_framework_mongoengine', 'sphinxdoc', 'affiliations', 'colleges', diff --git a/metacore/admin.py b/metacore/admin.py index 2cefe7421..a35b5a53d 100644 --- a/metacore/admin.py +++ b/metacore/admin.py @@ -1,13 +1,13 @@ from django.contrib import admin from django.contrib import messages from .models import Citable, CitableWithDOI, Journal -from .services import get_crossref_test, import_journal_full, get_crossref_work_count, add_journal_to_existing +from .services import import_journal_full, import_journal_incremental, get_crossref_work_count, add_journal_to_existing # Register your models here. class JournalAdmin(admin.ModelAdmin): fields = ('name', 'ISSN_digital', 'last_full_sync') list_display = ('name', 'ISSN_digital', 'last_full_sync', 'count_metacore', 'count_crossref', 'last_update') - actions = ['import_full', 'update_counts', 'add_journal_to_items', 'delete_all_citables'] + actions = ['import_full', 'import_incremental', 'update_counts', 'add_journal_to_items', 'delete_all_citables'] def import_full(self, request, queryset): """ Starts background task to import all works by this journal """ @@ -17,6 +17,19 @@ class JournalAdmin(admin.ModelAdmin): messages.add_message(request, messages.INFO, 'Import task for journal {} added. Go to Background Tasks -> Tasks in admin to view'.format(journal.name)) messages.add_message(request, messages.WARNING, 'Make sure that "./manage.py process_tasks" is running (otherwise start it).') + + def import_incremental(self, request, queryset): + """ Starts background task to import all works by this journal """ + + for journal in queryset: + if journal.last_update: + t = import_journal_incremental(journal.ISSN_digital, journal.last_update.strftime('%Y-%m-%d')) + messages.add_message(request, messages.INFO, 'Import task for journal {} added. Go to Background Tasks -> Tasks in admin to view'.format(journal.name)) + else: + messages.add_message(request, messages.INFO, 'Incremental import task for journal {} could not be started, since date of last full sync is not set.'.format(journal.name)) + + messages.add_message(request, messages.WARNING, 'Make sure that "./manage.py process_tasks" is running (otherwise start it).') + def update_counts(self, request, queryset): for journal in queryset: journal.count_metacore = Citable.objects(metadata__ISSN=journal.ISSN_digital).count() diff --git a/metacore/services.py b/metacore/services.py index 2e02932c6..701d9ffe7 100644 --- a/metacore/services.py +++ b/metacore/services.py @@ -2,7 +2,7 @@ import requests from .models import Citable, CitableWithDOI, Journal from background_task import background from rest_framework import serializers -from rest_framework_mongoengine.serializers import DocumentSerializer +from mongoengine.python_support import pymongo from django.utils import timezone import logging @@ -14,7 +14,17 @@ def import_journal_full(issn, cursor='*'): Task to query CrossRef for all works of a journal with given ISSN and store them in the Metacore mongo database """ + import_journal(issn=issn, cursor=cursor, from_index_date=None) +@background() +def import_journal_incremental(issn, from_index_date, cursor='*'): + """ + Task to query CrossRef for all works of a journal with given ISSN + from a given date onward and store them in the Metacore mongo database + """ + import_journal(issn=issn, cursor=cursor, from_index_date=from_index_date) + +def import_journal(issn, cursor='*', from_index_date=None): # Get journal to track progress # Formulate the CR query @@ -36,7 +46,11 @@ def import_journal_full(issn, cursor='*'): logger.info("Last cursor: ", last_cursor) logger.info("Current cursor: ", cursor) - params = {'cursor': cursor, 'rows': rows, 'mailto': 'b.g.t.ponsioen@uva.nl'} + if from_index_date: + params = {'cursor': cursor, 'rows': rows, 'mailto': 'b.g.t.ponsioen@uva.nl', 'filter': 'from-index-date:{}'.format(from_index_date)} + else: + params = {'cursor': cursor, 'rows': rows, 'mailto': 'b.g.t.ponsioen@uva.nl'} + last_cursor = cursor r = requests.get(url, params=params) r_json = r.json() @@ -45,7 +59,20 @@ def import_journal_full(issn, cursor='*'): cursor = r_json['message']['next-cursor'] number_of_results = len(r_json['message']['items']) - citables = [parse_crossref_citable(it) for it in citables_json] + # citables = [parse_crossref_citable(it) for it in citables_json] + citables = [] + serialized_objects = [] + validation_errors = [] + for cit in citables_json: + serialized_object = CitableCrossrefSerializer(data=cit) + if serialized_object.is_valid(): + citables.append(CitableWithDOI(**serialized_object.validated_data)) + serialized_objects.append(serialized_object) + else: + # TODO: insert the actual validation errors instead + citables.append(False) + validation_errors.append(serialized_object.errors) + # Parser returns False if there's an error errors = any([not i for i in citables if i == False]) orig_citables = citables @@ -54,9 +81,12 @@ def import_journal_full(issn, cursor='*'): # Mass insert in database (will fail on encountering existing documents # with same DOI if citables: - Citable.objects.insert(citables) - - citable = [] + if from_index_date: + operations = [obj.to_UpdateOne() for obj in serialized_objects] + col = Citable._get_collection() + col.bulk_write(operations, ordered=False) + else: + Citable.objects.insert(citables, {'ordered': False}) # Save current count so progress can be tracked in the admin page # TODO: make this work (currently only executed after whole import @@ -105,65 +135,6 @@ def get_crossref_work_count(issn): if 'total-results' in result: return result['total-results'] -def get_crossref_test(cursor='*'): - """ - For testing purposes - retrieves a "small" dataset from CrossRef and saves it - in de database, after parsing - """ - - # Member 16 is APS - # url = 'https://api.crossref.org/members/16/works' - # Last cursor I used (after 100.000 records from APS) for this - # cursor = 'AoJ79tDrpd8CPwtodHRwOi8vZHguZG9pLm9yZy8xMC4xMTAzL3BoeXNyZXZiLjQyLjgxMjU=' - - # This is PRL - url = 'https://api.crossref.org/journals/0031-9007/works' - # cursor = 'AoJ2/dSFrt8CPxFodHRwOi8vZHguZG9pLm9yZy8xMC4xMTAzL3BoeXNyZXZsZXR0LjExMy4yMzY2MDM=' - - # If the loop is allowed to complete, it fetches (rows * batches) records - rows = 500 - batches = 2000 - last_cursor = cursor - - for i in range(0,batches): - # print("-------------------------------") - # print("Batch %s" % (i, )) - # print("Last cursor: ", last_cursor) - # print("Current cursor: ", cursor) - logger.info("-------------------------------") - logger.info("Batch %s" % (i, )) - logger.info("Last cursor: ", last_cursor) - logger.info("Current cursor: ", cursor) - - params = {'cursor': cursor, 'rows': rows, 'mailto': 'b.g.t.ponsioen@uva.nl'} - last_cursor = cursor - r = requests.get(url, params=params) - r_json = r.json() - - citables_json = r_json['message']['items'] - cursor = r_json['message']['next-cursor'] - number_of_results = len(r_json['message']['items']) - - citables = [parse_crossref_citable(it) for it in citables_json] - # Parser returns None if there's an error - errors = any([not i for i in citables if i == False]) - orig_citables = citables - citables = [citable for citable in citables if citable] - - # Mass insert in database (will fail on encountering existing documents - # with same DOI - if citables: - Citable.objects.insert(citables) - - citable = [] - - if number_of_results < rows: - # print(number_of_results) - # print('End reached.') - logger.info(number_of_results) - logger.info('End reached.') - break - def convert_doi_to_lower_case(): # If you accidentally import 100.000+ records that have random uppercase characters # in their reference DOI list @@ -259,11 +230,79 @@ def parse_crossref_citable(citable_item): logger.error(citable_item.keys()) return False -class CitableCrossrefSerializer(DocumentSerializer): - # metadata = serializers.JSONField(source='te') - test = serializers.CharField(source='te') - class Meta: - model = CitableWithDOI - fields = '__all__' +class CitableCrossrefSerializer(serializers.BaseSerializer): + """ + Class for deserializing a JSON object into the correct form to create a CitableWithDOI out of. + Specifically for Crossref REST API format + + Usage: + json_data = { ... } + serialized_object = CitableCrossrefSerializer(data=json_data) + serialized_object.is_valid() + # Validated/parsed data: serialized_object.validated_data + CitableWithDOI.create(**serialized_object.validated_data) + """ + + def to_internal_value(self, data): + authors_raw = data.get('author') + references_raw = data.get('reference') + + doi = data.get('DOI') + publisher = data.get('publisher') + # {'issued': {'date-parts': ['...']}} + publication_date_raw = data.get('issued', {}).get('date-parts', [''])[0] + # {'title': ['...']} + title = data.get('title', [''])[0] + # {'container-title': ['...']} + journal = data.get('container-title', [''])[0] + # {'license': [{'url': '...'}]} + license = data.get('license', [{}])[0].get('URL') + metadata = data + + # Validation errors + if not doi: + raise serializers.ValidationError({'DOI': 'DOI not given.'}) + if not authors_raw: + raise serializers.ValidationError({'authors': 'Author list is empty.'}) + if not title: + raise serializers.ValidationError({'title': 'Title is not present.'}) + if not publication_date_raw: + raise serializers.ValidationError({'publication_date': 'Publication date is missing.'}) + + # More complex parsing logic + publication_date = '-'.join([str(date_part) for date_part in publication_date_raw]) + + authors = [] + for author_names in authors_raw: + author = [] + if 'given' in author_names: + author.append(author_names['given']) + if 'family' in author_names: + author.append(author_names['family']) + authors.append(' '.join(author)) + + if references_raw: + references_with_doi = [ref for ref in references_raw if 'DOI' in ref] + references = [ref['DOI'].lower() for ref in references_with_doi] + else: + references = [] + + return { + 'authors': authors, + 'doi': doi.lower(), + 'references': references, + 'publisher': publisher, + 'publication-date': publication_date, + 'title': title, + 'journal': journal, + 'license': license, + 'metadata': metadata + } + + def to_UpdateOne(self): + filters = {'doi': self.validated_data.pop('doi')} + mods = {'$set': self.validated_data} + + return pymongo.UpdateOne(filters, mods, upsert=True) diff --git a/requirements.txt b/requirements.txt index 822386963..d60aa612c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -52,7 +52,6 @@ html2text # Mongo (Metacore) mongoengine==0.15.0 django-background-tasks==1.1.13 -django-rest-framework-mongoengine==3.3.0 # Possibly dead (most probably not used anymore and possibly not up-to-date packages) -- JdW (August 15th, 2017) imagesize==0.7.1 -- GitLab