Source code for swh.web.save_code_now.admin_views

# Copyright (C) 2018-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

import io
import json

from django.conf import settings
from django.contrib.admin.views.decorators import staff_member_required
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import call_command
from django.core.paginator import Paginator
from django.http import FileResponse, HttpResponse, JsonResponse
from django.shortcuts import render
from django.views.decorators.http import require_POST

from swh.web.save_code_now.models import (
    SaveAuthorizedOrigin,
    SaveOriginRequest,
    SaveUnauthorizedOrigin,
)
from swh.web.save_code_now.origin_save import (
    SAVE_REQUEST_PENDING,
    SAVE_REQUEST_REJECTED,
    create_save_origin_request,
)


[docs] @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_requests(request): return render(request, "admin/origin-save-requests.html")
[docs] @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_filters(request): return render(request, "admin/origin-save-filters.html")
def _datatables_origin_urls_response(request, urls_query_set): search_value = request.GET.get("search[value]") if search_value: urls_query_set = urls_query_set.filter(url__icontains=search_value) column_order = request.GET.get("order[0][column]") field_order = request.GET.get(f"columns[{column_order}][name]", "id") order_dir = request.GET.get("order[0][dir]", "desc") if order_dir == "desc": field_order = "-" + field_order urls_query_set = urls_query_set.order_by(field_order) table_data = {} table_data["draw"] = int(request.GET["draw"]) table_data["recordsTotal"] = urls_query_set.count() table_data["recordsFiltered"] = urls_query_set.count() length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(urls_query_set, length) urls_query_set = paginator.page(page).object_list table_data["data"] = [{"url": u.url} for u in urls_query_set] return JsonResponse(table_data)
[docs] @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_authorized_urls_list(request): authorized_urls = SaveAuthorizedOrigin.objects.all() return _datatables_origin_urls_response(request, authorized_urls)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_add_authorized_url(request, origin_url): try: SaveAuthorizedOrigin.objects.get(url=origin_url) except ObjectDoesNotExist: # add the new authorized url SaveAuthorizedOrigin.objects.create(url=origin_url) # check if pending save requests with that url prefix exist pending_save_requests = SaveOriginRequest.objects.filter( origin_url__startswith=origin_url, status=SAVE_REQUEST_PENDING ) # create origin save tasks for previously pending requests for psr in pending_save_requests: create_save_origin_request(psr.visit_type, psr.origin_url) status_code = 200 else: status_code = 400 return HttpResponse(status=status_code)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_remove_authorized_url(request, origin_url): try: entry = SaveAuthorizedOrigin.objects.get(url=origin_url) except ObjectDoesNotExist: status_code = 404 else: entry.delete() status_code = 200 return HttpResponse(status=status_code)
[docs] @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_unauthorized_urls_list(request): unauthorized_urls = SaveUnauthorizedOrigin.objects.all() return _datatables_origin_urls_response(request, unauthorized_urls)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_add_unauthorized_url(request, origin_url): try: SaveUnauthorizedOrigin.objects.get(url=origin_url) except ObjectDoesNotExist: SaveUnauthorizedOrigin.objects.create(url=origin_url) # check if pending save requests with that url prefix exist pending_save_requests = SaveOriginRequest.objects.filter( origin_url__startswith=origin_url, status=SAVE_REQUEST_PENDING ) # mark pending requests as rejected for psr in pending_save_requests: psr.status = SAVE_REQUEST_REJECTED psr.save() status_code = 200 else: status_code = 400 return HttpResponse(status=status_code)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_remove_unauthorized_url(request, origin_url): try: entry = SaveUnauthorizedOrigin.objects.get(url=origin_url) except ObjectDoesNotExist: status_code = 404 else: entry.delete() status_code = 200 return HttpResponse(status=status_code)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_request_accept(request, visit_type, origin_url): try: SaveAuthorizedOrigin.objects.get(url=origin_url) except ObjectDoesNotExist: SaveAuthorizedOrigin.objects.create(url=origin_url) create_save_origin_request(visit_type, origin_url) return HttpResponse(status=200)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_request_reject(request, visit_type, origin_url): try: sor = SaveOriginRequest.objects.get( visit_type=visit_type, origin_url=origin_url, status=SAVE_REQUEST_PENDING ) except ObjectDoesNotExist: status_code = 404 else: status_code = 200 sor.status = SAVE_REQUEST_REJECTED sor.note = json.loads(request.body).get("note") sor.save() return HttpResponse(status=status_code)
[docs] @require_POST @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_request_remove(request, sor_id): try: entry = SaveOriginRequest.objects.get(id=sor_id) except ObjectDoesNotExist: status_code = 404 else: entry.delete() status_code = 200 return HttpResponse(status=status_code)
[docs] @staff_member_required(view_func=None, login_url=settings.LOGIN_URL) def admin_origin_save_requests_csv_dump(request): output = io.StringIO() call_command("dump_savecodenow_data", stdout=output) return FileResponse( io.BytesIO(output.getvalue().encode()), filename="save_code_now_requests.csv", content_type="application/csv", as_attachment=True, )