Newer
Older
__copyright__ = "Copyright © Stichting SciPost (SciPost Foundation)"
__license__ = "AGPL v3"
from django.urls import reverse_lazy
from django.db.models import Q, Case, DateField, Max, Min, Sum, Value, When, F
from django.utils import timezone
from crispy_forms.layout import Layout, Div, Field, ButtonHolder, Submit
from crispy_bootstrap5.bootstrap5 import FloatingField
from common.forms import HTMXDynSelWidget
from finances.constants import (
SUBSIDY_STATUS,
SUBSIDY_TYPE_SPONSORSHIPAGREEMENT,
SUBSIDY_TYPES,
)
from organizations.models import Organization
from scipost.fields import UserModelChoiceField
from .models import Subsidy, SubsidyPayment, SubsidyAttachment, WorkLog
class SubsidyForm(forms.ModelForm):
url="/organizations/organization-autocomplete",
attrs={
"data-html": True,
"style": "width: 100%",
},
renewal_of = forms.ModelMultipleChoiceField(
queryset=Subsidy.objects.all(),
widget=autocomplete.ModelSelect2Multiple(
url=reverse_lazy("finances:subsidy_autocomplete"),
attrs={
"data-html": True,
"style": "width: 100%",
},
),
help_text=("Start typing, and select from the popup."),
class Meta:
model = Subsidy
fields = [
"organization",
"subsidy_type",
"description",
"amount",
"amount_publicly_shown",
"status",
"paid_on",
"date_from",
"date_until",
"renewable",
"renewal_of",
]
widgets = {
"paid_on": forms.DateInput(attrs={"type": "date"}),
"date_from": forms.DateInput(attrs={"type": "date"}),
"date_until": forms.DateInput(attrs={"type": "date"}),
}
organization_query = forms.CharField(
max_length=128,
required=False,
label="Organization name or acronym",
)
country = forms.CharField(
max_length=32,
required=False,
label="Country name or code",
)
status = forms.MultipleChoiceField(
label="Status",
choices=SUBSIDY_STATUS,
required=False,
)
type = forms.MultipleChoiceField(
choices=SUBSIDY_TYPES,
required=False,
)
active_year = forms.ChoiceField(
label="Active year",
required=False,
)
orderby = forms.ChoiceField(
label="Order by",
choices=(
("amount", "Amount"),
("date_from", "Date from"),
("date_until", "Date until"),
("annot_renewal_action_date", "Renewal date"),
),
required=False,
)
ordering = forms.ChoiceField(
label="Ordering",
choices=(
# FIXME: Emperically, the ordering appers to be reversed for dates?
("-", "Ascending"),
("+", "Descending"),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.helper = FormHelper()
self.helper.layout = Layout(
Div(
Div(
Div(FloatingField("organization_query"), css_class="col"),
Div(
FloatingField("active_year"),
css_class="col-4 col-sm-3 col-md-2 col-lg-4 col-xl-3 col-xxl-2",
),
css_class="row mb-0",
),
Div(
Div(FloatingField("country"), css_class="col-12 col-lg-4"),
Div(FloatingField("orderby"), css_class="col-6 col-lg-4"),
Div(FloatingField("ordering"), css_class="col-6 col-lg-4"),
css_class="row mb-0",
),
css_class="col-12 col-lg",
),
Div(Field("status", size=6), css_class="col-12 col-lg-auto"),
Div(Field("type", size=6), css_class="col-12 col-lg-auto"),
css_class="row mb-0",
min_year, max_year = (
Subsidy.objects.all()
.aggregate(min=Min("date_from"), max=Max("date_until"))
.values()
)
self.fields["active_year"].choices = [("", "---")] + [
(year, year) for year in range(min_year.year, max_year.year + 1)
]
def search_results(self, user):
if user.groups.filter(name="Financial Administrators").exists():
subsidies = Subsidy.objects.all()
else:
subsidies = Subsidy.objects.obtained()
# Include `renewal_action_date` property in queryset
subsidies = subsidies.annotate(
annot_renewal_action_date=Case(
When(
Q(subsidy_type=SUBSIDY_TYPE_SPONSORSHIPAGREEMENT),
then=F("date_until") - datetime.timedelta(days=122),
),
default=Value(None),
output_field=DateField(),
)
)
if organization_query := self.cleaned_data["organization_query"]:
Q(organization__name__unaccent__icontains=organization_query)
| Q(organization__acronym__unaccent__icontains=organization_query)
)
if self.cleaned_data["country"]:
subsidies = subsidies.filter(
organization__country__icontains=self.cleaned_data["country"],
)
if status := self.cleaned_data["status"]:
subsidies = subsidies.filter(status__in=status)
if subsidy_type := self.cleaned_data["type"]:
subsidies = subsidies.filter(subsidy_type__in=subsidy_type)
if active_year := self.cleaned_data["active_year"]:
subsidies = subsidies.filter(
Q(date_from__year__lte=int(active_year))
& Q(date_until__year__gte=int(active_year))
)
# Ordering of subsidies
# Only order if both fields are set
if (orderby_value := self.cleaned_data.get("orderby")) and (
ordering_value := self.cleaned_data.get("ordering")
):
# Remove the + from the ordering value, causes a Django error
ordering_value = ordering_value.replace("+", "")
# Ordering string is built by the ordering (+/-), and the field name
# from the orderby field split by "," and joined together
subsidies = subsidies.order_by(
*[
ordering_value + order_part
for order_part in orderby_value.split(",")
]
)
class SubsidyPaymentForm(forms.ModelForm):
class Meta:
model = SubsidyPayment
fields = (
"subsidy",
"reference",
"amount",
"date_scheduled",
)
widgets = {
"date_scheduled": forms.DateInput(attrs={"type": "date"}),
}
invoice = forms.ChoiceField(required=False)
proof_of_payment = forms.ChoiceField(required=False)
def __init__(self, *args, **kwargs):
subsidy = kwargs.pop("subsidy")
super().__init__(*args, **kwargs)
self.fields["subsidy"].initial = subsidy
self.fields["subsidy"].widget = forms.HiddenInput()
print(subsidy.attachments.invoices())
self.fields["invoice"].choices = [
(att.id, f"{att.attachment.name.split('/')[-1]}")
for att in subsidy.attachments.invoices()
]
self.fields["proof_of_payment"].choices = [
(att.id, f"{att.attachment.name.split('/')[-1]}")
for att in subsidy.attachments.proofs_of_payment()
]
self.helper = FormHelper()
self.helper.layout = Layout(
Field("subsidy"),
Div(
Div(Field("reference"), css_class="col-lg-5"),
Div(Field("amount"), css_class="col-lg-3"),
Div(Field("date_scheduled"), css_class="col-lg-4"),
css_class="row",
),
Div(
Div(Field("invoice"), css_class="col-lg-6"),
Div(Field("proof_of_payment"), css_class="col-lg-6"),
css_class="row",
),
ButtonHolder(Submit("submit", "Submit", css_class="btn-sm")),
def clean_invoice(self):
if invoice := self.cleaned_data["invoice"]:
invoice = SubsidyAttachment.objects.get(id=invoice)
return invoice
def clean_proof_of_payment(self):
if proof_of_payment := self.cleaned_data["proof_of_payment"]:
proof_of_payment = SubsidyAttachment.objects.get(id=proof_of_payment)
return proof_of_payment
def save(self, commit=True):
instance = super().save(commit=False)
if invoice := self.cleaned_data["invoice"]:
instance.invoice = invoice
if proof_of_payment := self.cleaned_data["proof_of_payment"]:
instance.proof_of_payment = proof_of_payment
if commit:
instance.save()
return instance
class SubsidyAttachmentInlineLinkForm(forms.ModelForm):
class Meta:
model = SubsidyAttachment
fields = [
"subsidy",
]
filename = forms.CharField(
label="Filename",
required=True,
)
subsidy = forms.ModelChoiceField(
queryset=Subsidy.objects.all(),
widget=HTMXDynSelWidget(
dynsel_context={
"results_page_url": reverse_lazy(
"finances:_hx_dynsel_subsidy_result_page"
),
"collection_name": "subsidies",
}
),
help_text=("Start typing, and select from the popup."),
required=False,
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.helper = FormHelper()
self.helper.layout = Layout(
Div(
Div(Field("filename"), css_class="col-6 col"),
Div(Field("subsidy"), css_class="col-6 col"),
css_class="row mb-0",
)
)
self.fields["filename"].initial = self.instance.filename
def clean(self):
orphaned = self.cleaned_data.get("subsidy") is None
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
filename = self.cleaned_data["filename"]
# Allow misnamed orphans
if orphaned:
return
filename_regex = (
"^SciPost_"
"[0-9]{4,}(-[0-9]{4,})?_[A-Z]{2,}_[\w]+_"
"(Agreement|Invoice|ProofOfPayment|Other)"
"(-[0-9]{2,})?(_[\w]+)?\.(pdf|docx|png)$"
)
pattern = re.compile(filename_regex)
if not pattern.match(filename):
self.add_error(
"filename",
"The filename does not match the required regex pattern "
f"'{filename_regex}'",
)
def save(self, commit=True):
instance: "SubsidyAttachment" = super().save(commit=False)
filename = self.cleaned_data["filename"]
old_relative_path = instance.attachment.name
new_relative_path = instance.attachment.name.replace(
instance.filename, filename
)
instance.attachment.storage.save(new_relative_path, instance.attachment)
instance.attachment.name = new_relative_path
instance.save()
instance.attachment.storage.delete(old_relative_path)
class SubsidyAttachmentForm(forms.ModelForm):
class Meta:
model = SubsidyAttachment
fields = (
"subsidy",
"attachment",
"kind",
"date",
"description",
"visibility",
)
widgets = {
"date": forms.DateInput(attrs={"type": "date"}),
}
subsidy = forms.ModelChoiceField(
queryset=Subsidy.objects.all(),
widget=autocomplete.ModelSelect2(
url=reverse_lazy("finances:subsidy_autocomplete"),
attrs={
"data-html": True,
"style": "width: 100%",
},
),
help_text=("Start typing, and select from the popup."),
required=False,
)
def clean(self):
orphaned = self.cleaned_data["subsidy"] is None
attachment_filename = self.cleaned_data["attachment"].name.split("/")[-1]
# Allow misnamed orphans
if orphaned:
return
filename_regex = (
"^SciPost_"
"[0-9]{4,}(-[0-9]{4,})?_[A-Z]{2,}_[\w]+_"
"(Agreement|Invoice|ProofOfPayment|Other)"
"(-[0-9]{2,})?(_[\w]+)?\.(pdf|docx|png)$"
pattern = re.compile(filename_regex)
#
if not pattern.match(attachment_filename):
"The filename does not match the required regex pattern "
#############
# Work logs #
#############
self.types = kwargs.pop("log_types", False)
self.fields["log_type"] = forms.ChoiceField(choices=self.types)
"comments",
"log_type",
"duration",
"comments": forms.Textarea(attrs={"rows": 4}),
"duration": forms.TextInput(attrs={"placeholder": "HH:MM:SS"}),
"""
Filter work logs given the requested date range and users.
"""
queryset=get_user_model().objects.filter(work_logs__isnull=False).distinct(),
required=True, widget=forms.DateInput(attrs={"type": "date"})
end = forms.DateField(required=True, widget=forms.DateInput(attrs={"type": "date"}))
hourly_rate = forms.FloatField(min_value=0, initial=WorkLog.HOURLY_RATE)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not any(self.fields[field].initial for field in ["start", "end"]):
current_month = datetime.date.today().replace(day=1)
last_month_end = current_month - datetime.timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
self.fields["start"].initial = last_month_start
self.fields["end"].initial = last_month_end
self.helper = FormHelper()
self.helper.layout = Layout(
Div(
Div(FloatingField("employee"), css_class="col-9 col-md"),
Div(FloatingField("hourly_rate"), css_class="col-3 col-md-2"),
Div(FloatingField("start"), css_class="col-6 col-md-auto col-lg-2"),
Div(FloatingField("end"), css_class="col-6 col-md-auto col-lg-2"),
css_class="row mb-0 mt-2",
),
Submit("submit", "Filter"),
)
self.cleaned_data["months"] = [
dt
for dt in rrule(
dtstart=self.cleaned_data["start"],
until=self.cleaned_data["end"],
)
]
return self.cleaned_data.get("months", [])
def filter(self):
"""Filter work logs and return in user-grouped format."""
output = []
if self.is_valid():
if self.cleaned_data["employee"]:
user_qs = get_user_model().objects.filter(
id=self.cleaned_data["employee"].id
)
else:
user_qs = get_user_model().objects.filter(work_logs__isnull=False)
work_logs__work_date__gte=self.cleaned_data["start"],
work_logs__work_date__lte=self.cleaned_data["end"],
).distinct()
output = []
for user in user_qs:
logs = user.work_logs.filter(
work_date__gte=self.cleaned_data["start"],
work_date__lte=self.cleaned_data["end"],
).distinct()
output.append(
{
"logs": logs,
"duration": logs.aggregate(total=Sum("duration")),
"user": user,
}
)
"""Filter work logs and return in per-month format."""
if self.cleaned_data["employee"]:
user_qs = get_user_model().objects.filter(
id=self.cleaned_data["employee"].id
)
else:
user_qs = get_user_model().objects.filter(work_logs__isnull=False)
work_logs__work_date__gte=self.cleaned_data["start"],
work_logs__work_date__lte=self.cleaned_data["end"],
).distinct()
work_log_qs = WorkLog.objects.filter(
work_date__gte=self.cleaned_data["start"],
work_date__lte=self.cleaned_data["end"],
user__in=user_qs,
)
output = []
for user in user_qs:
total_time_per_month = [
work_log_qs.filter(
work_date__year=dt.year, work_date__month=dt.month, user=user
).aggregate(Sum("duration"))["duration__sum"]
for dt in self.get_months()
]
if self.cleaned_data["hourly_rate"]:
salary_per_month = [
(
duration.total_seconds()
/ 3600 # Convert to hours
* self.cleaned_data["hourly_rate"]
if duration is not None
else 0
)
for duration in total_time_per_month
]
else:
salary_per_month = []
"monthly_data": zip(
self.get_months(),
total_time_per_month,
salary_per_month,
),
"user": user,
}
)