Source code for swh.web.browse.utils

# Copyright (C) 2017-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

import base64
import magic
import stat
import textwrap

from threading import Lock

from django.core.cache import cache
from django.utils.safestring import mark_safe
from django.utils.html import escape
import sentry_sdk

from swh.web.common import highlightjs, service
from swh.web.common.exc import http_status_code_message
from swh.web.common.utils import (
from swh.web.config import get_config

[docs]def get_directory_entries(sha1_git): """Function that retrieves the content of a directory from the archive. The directories entries are first sorted in lexicographical order. Sub-directories and regular files are then extracted. Args: sha1_git: sha1_git identifier of the directory Returns: A tuple whose first member corresponds to the sub-directories list and second member the regular files list Raises: NotFoundExc if the directory is not found """ cache_entry_id = "directory_entries_%s" % sha1_git cache_entry = cache.get(cache_entry_id) if cache_entry: return cache_entry entries = list(service.lookup_directory(sha1_git)) for e in entries: e["perms"] = stat.filemode(e["perms"]) if e["type"] == "rev": # modify dir entry name to explicitly show it points # to a revision e["name"] = "%s @ %s" % (e["name"], e["target"][:7]) dirs = [e for e in entries if e["type"] in ("dir", "rev")] files = [e for e in entries if e["type"] == "file"] dirs = sorted(dirs, key=lambda d: d["name"]) files = sorted(files, key=lambda f: f["name"]) cache.set(cache_entry_id, (dirs, files)) return dirs, files
_lock = Lock()
[docs]def get_mimetype_and_encoding_for_content(content): """Function that returns the mime type and the encoding associated to a content buffer using the magic module under the hood. Args: content (bytes): a content buffer Returns: A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'), associated to the provided content. """ # # packaged as python3-magic in debian buster if hasattr(magic, "from_buffer"): m = magic.Magic(mime=True, mime_encoding=True) mime_encoding = m.from_buffer(content) mime_type, encoding = mime_encoding.split(";") encoding = encoding.replace(" charset=", "") # # packaged as python3-magic in debian stretch else: # TODO: Remove that code when production environment is upgraded # to debian buster # calls to the file-magic API are not thread-safe so they must # be protected with a Lock to guarantee they will succeed _lock.acquire() magic_result = magic.detect_from_content(content) _lock.release() mime_type = magic_result.mime_type encoding = magic_result.encoding return mime_type, encoding
# maximum authorized content size in bytes for HTML display # with code highlighting content_display_max_size = get_config()["content_display_max_size"] def _re_encode_content(mimetype, encoding, content_data): # encode textual content to utf-8 if needed if mimetype.startswith("text/"): # probably a malformed UTF-8 content, re-encode it # by replacing invalid chars with a substitution one if encoding == "unknown-8bit": content_data = content_data.decode("utf-8", "replace").encode("utf-8") elif encoding not in ["utf-8", "binary"]: content_data = content_data.decode(encoding, "replace").encode("utf-8") elif mimetype.startswith("application/octet-stream"): # file may detect a text content as binary # so try to decode it for display encodings = ["us-ascii", "utf-8"] encodings += ["iso-8859-%s" % i for i in range(1, 17)] for enc in encodings: try: content_data = content_data.decode(enc).encode("utf-8") except Exception as exc: sentry_sdk.capture_exception(exc) else: # ensure display in content view encoding = enc mimetype = "text/plain" break return mimetype, encoding, content_data
[docs]def request_content( query_string, max_size=content_display_max_size, raise_if_unavailable=True, re_encode=True, ): """Function that retrieves a content from the archive. Raw bytes content is first retrieved, then the content mime type. If the mime type is not stored in the archive, it will be computed using Python magic module. Args: query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value max_size: the maximum size for a content to retrieve (default to 1MB, no size limit if None) Returns: A tuple whose first member corresponds to the content raw bytes and second member the content mime type Raises: NotFoundExc if the content is not found """ content_data = service.lookup_content(query_string) filetype = None language = None license = None # requests to the indexer db may fail so properly handle # those cases in order to avoid content display errors try: filetype = service.lookup_content_filetype(query_string) language = service.lookup_content_language(query_string) license = service.lookup_content_license(query_string) except Exception as exc: sentry_sdk.capture_exception(exc) mimetype = "unknown" encoding = "unknown" if filetype: mimetype = filetype["mimetype"] encoding = filetype["encoding"] # workaround when encountering corrupted data due to implicit # conversion from bytea to text in the indexer db (see T818) # TODO: Remove that code when all data have been correctly converted if mimetype.startswith("\\"): filetype = None content_data["error_code"] = 200 content_data["error_message"] = "" content_data["error_description"] = "" if not max_size or content_data["length"] < max_size: try: content_raw = service.lookup_content_raw(query_string) except Exception as exc: if raise_if_unavailable: raise exc else: sentry_sdk.capture_exception(exc) content_data["raw_data"] = None content_data["error_code"] = 404 content_data["error_description"] = ( "The bytes of the content are currently not available " "in the archive." ) content_data["error_message"] = http_status_code_message[ content_data["error_code"] ] else: content_data["raw_data"] = content_raw["data"] if not filetype: mimetype, encoding = get_mimetype_and_encoding_for_content( content_data["raw_data"] ) if re_encode: mimetype, encoding, raw_data = _re_encode_content( mimetype, encoding, content_data["raw_data"] ) content_data["raw_data"] = raw_data else: content_data["raw_data"] = None content_data["mimetype"] = mimetype content_data["encoding"] = encoding if language: content_data["language"] = language["lang"] else: content_data["language"] = "not detected" if license: content_data["licenses"] = ", ".join(license["facts"][0]["licenses"]) else: content_data["licenses"] = "not detected" return content_data
[docs]def prepare_content_for_display(content_data, mime_type, path): """Function that prepares a content for HTML display. The function tries to associate a programming language to a content in order to perform syntax highlighting client-side using highlightjs. The language is determined using either the content filename or its mime type. If the mime type corresponds to an image format supported by web browsers, the content will be encoded in base64 for displaying the image. Args: content_data (bytes): raw bytes of the content mime_type (string): mime type of the content path (string): path of the content including filename Returns: A dict containing the content bytes (possibly different from the one provided as parameter if it is an image) under the key 'content_data and the corresponding highlightjs language class under the key 'language'. """ language = highlightjs.get_hljs_language_from_filename(path) if not language: language = highlightjs.get_hljs_language_from_mime_type(mime_type) if not language: language = "nohighlight" elif mime_type.startswith("application/"): mime_type = mime_type.replace("application/", "text/") if mime_type.startswith("image/"): if mime_type in browsers_supported_image_mimes: content_data = base64.b64encode(content_data).decode("ascii") if mime_type.startswith("image/svg"): mime_type = "image/svg+xml" if mime_type.startswith("text/"): content_data = content_data.decode("utf-8", errors="replace") return {"content_data": content_data, "language": language, "mimetype": mime_type}
def _snapshot_context_query_params(snapshot_context): query_params = {} if not snapshot_context: return query_params if snapshot_context and snapshot_context["origin_info"]: origin_info = snapshot_context["origin_info"] snp_query_params = snapshot_context["query_params"] query_params = {"origin_url": origin_info["url"]} if "timestamp" in snp_query_params: query_params["timestamp"] = snp_query_params["timestamp"] if "visit_id" in snp_query_params: query_params["visit_id"] = snp_query_params["visit_id"] if "snapshot" in snp_query_params and "visit_id" not in query_params: query_params["snapshot"] = snp_query_params["snapshot"] elif snapshot_context: query_params = {"snapshot": snapshot_context["snapshot_id"]} if snapshot_context["release"]: query_params["release"] = snapshot_context["release"] elif snapshot_context["branch"] and snapshot_context["branch"] not in ( "HEAD", snapshot_context["revision_id"], ): query_params["branch"] = snapshot_context["branch"] elif snapshot_context["revision_id"]: query_params["revision"] = snapshot_context["revision_id"] return query_params
[docs]def gen_revision_url(revision_id, snapshot_context=None): """ Utility function for generating an url to a revision. Args: revision_id (str): a revision id snapshot_context (dict): if provided, generate snapshot-dependent browsing url Returns: str: The url to browse the revision """ query_params = _snapshot_context_query_params(snapshot_context) query_params.pop("revision", None) return reverse( "browse-revision", url_args={"sha1_git": revision_id}, query_params=query_params )
[docs]def get_revision_log_url(revision_id, snapshot_context=None): """ Utility function for getting the URL for a revision log HTML view (possibly in the context of an origin). Args: revision_id (str): revision identifier the history heads to snapshot_context (dict): if provided, generate snapshot-dependent browsing link Returns: The revision log view URL """ query_params = {} if snapshot_context: query_params = _snapshot_context_query_params(snapshot_context) query_params["revision"] = revision_id if snapshot_context and snapshot_context["origin_info"]: revision_log_url = reverse("browse-origin-log", query_params=query_params) elif snapshot_context: url_args = {"snapshot_id": snapshot_context["snapshot_id"]} del query_params["snapshot"] revision_log_url = reverse( "browse-snapshot-log", url_args=url_args, query_params=query_params ) else: revision_log_url = reverse( "browse-revision-log", url_args={"sha1_git": revision_id} ) return revision_log_url
[docs]def format_log_entries(revision_log, per_page, snapshot_context=None): """ Utility functions that process raw revision log data for HTML display. Its purpose is to: * add links to relevant browse views * format date in human readable format * truncate the message log Args: revision_log (list): raw revision log as returned by the swh-web api per_page (int): number of log entries per page snapshot_context (dict): if provided, generate snapshot-dependent browsing link """ revision_log_data = [] for i, rev in enumerate(revision_log): if i == per_page: break author_name = "None" author_fullname = "None" committer_fullname = "None" if rev["author"]: author_name = gen_person_mail_link(rev["author"]) author_fullname = rev["author"]["fullname"] if rev["committer"]: committer_fullname = rev["committer"]["fullname"] author_date = format_utc_iso_date(rev["date"]) committer_date = format_utc_iso_date(rev["committer_date"]) tooltip = "revision %s\n" % rev["id"] tooltip += "author: %s\n" % author_fullname tooltip += "author date: %s\n" % author_date tooltip += "committer: %s\n" % committer_fullname tooltip += "committer date: %s\n\n" % committer_date if rev["message"]: tooltip += textwrap.indent(rev["message"], " " * 4) revision_log_data.append( { "author": author_name, "id": rev["id"][:7], "message": rev["message"], "date": author_date, "commit_date": committer_date, "url": gen_revision_url(rev["id"], snapshot_context), "tooltip": tooltip, } ) return revision_log_data
# list of common readme names ordered by preference # (lower indices have higher priority) _common_readme_names = [ "readme.markdown", "", "readme.rst", "readme.txt", "readme", ]
[docs]def get_readme_to_display(readmes): """ Process a list of readme files found in a directory in order to find the adequate one to display. Args: readmes: a list of dict where keys are readme file names and values are readme sha1s Returns: A tuple (readme_name, readme_sha1) """ readme_name = None readme_url = None readme_sha1 = None readme_html = None lc_readmes = {k.lower(): {"orig_name": k, "sha1": v} for k, v in readmes.items()} # look for readme names according to the preference order # defined by the _common_readme_names list for common_readme_name in _common_readme_names: if common_readme_name in lc_readmes: readme_name = lc_readmes[common_readme_name]["orig_name"] readme_sha1 = lc_readmes[common_readme_name]["sha1"] readme_url = reverse( "browse-content-raw", url_args={"query_string": readme_sha1}, query_params={"re_encode": "true"}, ) break # otherwise pick the first readme like file if any if not readme_name and len(readmes.items()) > 0: readme_name = next(iter(readmes)) readme_sha1 = readmes[readme_name] readme_url = reverse( "browse-content-raw", url_args={"query_string": readme_sha1}, query_params={"re_encode": "true"}, ) # convert rst README to html server side as there is # no viable solution to perform that task client side if readme_name and readme_name.endswith(".rst"): cache_entry_id = "readme_%s" % readme_sha1 cache_entry = cache.get(cache_entry_id) if cache_entry: readme_html = cache_entry else: try: rst_doc = request_content(readme_sha1) readme_html = rst_to_html(rst_doc["raw_data"]) cache.set(cache_entry_id, readme_html) except Exception as exc: sentry_sdk.capture_exception(exc) readme_html = "Readme bytes are not available" return readme_name, readme_url, readme_html