# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
import traceback
from typing import Any, Dict, Optional
from django.http import HttpResponse
from django.shortcuts import render
from django.urls import get_resolver
from django.utils.cache import add_never_cache_headers
from django.utils.html import escape
from rest_framework.exceptions import APIException
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.utils.encoders import JSONEncoder
from swh.storage.exc import StorageAPIError, StorageDBError
from swh.web.api import utils
from swh.web.config import get_config
from swh.web.utils import gen_path_info, shorten_path
from swh.web.utils.exc import (
BadInputExc,
ForbiddenExc,
LargePayloadExc,
MaskedObjectException,
NotFoundExc,
UnauthorizedExc,
masked_to_common_types,
sentry_capture_exception,
)
logger = logging.getLogger("django")
[docs]
def filter_by_fields(request: Request, data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract a request parameter 'fields' if it exists to permit the filtering on
the data dict's keys.
If such field is not provided, returns the data as is.
"""
fields = request.query_params.get("fields")
if fields:
data = utils.filter_field_keys(data, set(fields.split(",")))
return data
[docs]
def make_api_response(
request: Request,
data: Dict[str, Any],
doc_data: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
) -> HttpResponse:
"""Generates an API response based on the requested mimetype.
Args:
request: a DRF Request object
data: raw data to return in the API response
doc_data: documentation data for HTML response
options: optional data that can be used to generate the response
Returns:
a DRF Response a object
"""
options = options or {}
if data:
options["headers"] = compute_link_header(data, options)
data = transform(data)
data = filter_by_fields(request, data)
doc_data = doc_data or {}
headers = {}
if "headers" in options:
doc_data["headers_data"] = options["headers"]
headers = options["headers"]
# get request status code
doc_data["status_code"] = options.get("status", 200)
accepted_media_type = getattr(request, "accepted_media_type", "application/json")
# when requesting HTML, typically when browsing the API through its
# documented views, we need to enrich the input data with documentation
# and render the apidoc HTML template
if accepted_media_type == "text/html":
doc_data["response_data"] = data
if data is not None:
doc_data["response_data"] = json.dumps(
data, cls=JSONEncoder, sort_keys=True, indent=4, separators=(",", ": ")
)
doc_data["heading"] = shorten_path(str(request.path))
# generate breadcrumbs data
if "route" in doc_data:
all_view_names = set(get_resolver().reverse_dict.keys())
doc_data["endpoint_path"] = gen_path_info(doc_data["route"])
for i in range(len(doc_data["endpoint_path"]) - 1):
view_name = "api-1-" + "-".join(
[doc_data["endpoint_path"][i]["name"] for i in range(i + 1)]
)
if view_name in all_view_names:
doc_data["endpoint_path"][i]["path"] += "/doc/"
else:
doc_data["endpoint_path"][i]["path"] = ""
if not doc_data["noargs"]:
doc_data["endpoint_path"][-1]["path"] += "/doc/"
response = render(
request, "apidoc.html", doc_data, status=doc_data["status_code"]
)
# otherwise simply return the raw data and let DRF picks
# the correct renderer (JSON or YAML)
else:
response = Response(
data,
headers=headers,
content_type=accepted_media_type,
status=doc_data["status_code"],
)
if getattr(request, "never_cache", False):
add_never_cache_headers(response)
return response
[docs]
def error_response(
request: Request, exception: Exception, doc_data: Dict[str, Any]
) -> HttpResponse:
"""Private function to create a custom error response.
Args:
request: a DRF Request object
error: the exception that caused the error
doc_data: documentation data for HTML response
"""
error_data = {
"exception": exception.__class__.__name__,
"reason": str(exception),
}
if hasattr(exception, "extra_error_data") and isinstance(
exception.extra_error_data, dict
):
error_data.update(exception.extra_error_data)
error_code = 500
if isinstance(exception, BadInputExc):
error_code = 400
elif isinstance(exception, UnauthorizedExc):
error_code = 401
elif isinstance(exception, NotFoundExc):
error_code = 404
elif isinstance(exception, ForbiddenExc):
error_code = 403
elif isinstance(exception, MaskedObjectException):
error_code = 403
error_data["masked"] = masked_to_common_types(exception)
elif isinstance(exception, LargePayloadExc):
error_code = 413
elif isinstance(exception, StorageDBError):
error_code = 503
elif isinstance(exception, StorageAPIError):
error_code = 503
elif isinstance(exception, APIException):
error_code = exception.status_code
error_opts = {"status": error_code}
if getattr(request, "accepted_media_type", None) == "text/html":
error_data["reason"] = escape(error_data["reason"])
if get_config()["debug"]:
error_data["traceback"] = traceback.format_exc()
logger.error(error_data["traceback"])
return make_api_response(request, error_data, doc_data, options=error_opts)
[docs]
def error_response_handler(
exc: Exception, context: Dict[str, Any]
) -> Optional[HttpResponse]:
"""Custom DRF exception handler used to generate API error responses."""
sentry_capture_exception(exc)
doc_data = getattr(exc, "doc_data", {})
return error_response(context["request"], exc, doc_data)