Source code for swh.search.elasticsearch

# Copyright (C) 2019-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import base64
from typing import Any, Dict, Iterable, Iterator, List, Optional

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan
import msgpack

from swh.model import model
from swh.model.identifiers import origin_identifier
from swh.search.interface import PagedResult


def _sanitize_origin(origin):
    origin = origin.copy()
    res = {"url": origin.pop("url")}
    for field_name in ("intrinsic_metadata", "has_visits"):
        if field_name in origin:
            res[field_name] = origin.pop(field_name)
    return res


[docs]def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: """Tokenize as string an index page result from a search """ page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) return page_token.decode()
[docs]def token_decode(page_token: str) -> Dict[bytes, Any]: """Read the page_token """ return msgpack.loads(base64.b64decode(page_token.encode()), raw=True)
[docs]class ElasticSearch: def __init__(self, hosts: List[str]): self._backend = Elasticsearch(hosts=hosts)
[docs] def check(self): return self._backend.ping()
[docs] def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index="*")
[docs] def initialize(self) -> None: """Declare Elasticsearch indices and mappings""" if not self._backend.indices.exists(index="origin"): self._backend.indices.create(index="origin") self._backend.indices.put_mapping( index="origin", body={ "properties": { "sha1": {"type": "keyword", "doc_values": True,}, "url": { "type": "text", # To split URLs into token on any character # that is not alphanumerical "analyzer": "simple", "fields": { "as_you_type": { "type": "search_as_you_type", "analyzer": "simple", } }, }, "has_visits": {"type": "boolean",}, "intrinsic_metadata": { "type": "nested", "properties": { "@context": { # don't bother indexing tokens "type": "keyword", } }, }, } }, )
[docs] def flush(self) -> None: self._backend.indices.refresh(index="_all")
[docs] def origin_update(self, documents: Iterable[Dict]) -> None: documents = map(_sanitize_origin, documents) documents_with_sha1 = ( (origin_identifier(document), document) for document in documents ) actions = [ { "_op_type": "update", "_id": sha1, "_index": "origin", "doc": {**document, "sha1": sha1,}, "doc_as_upsert": True, } for (sha1, document) in documents_with_sha1 ] bulk(self._backend, actions, index="origin")
[docs] def origin_dump(self) -> Iterator[model.Origin]: results = scan(self._backend, index="*") for hit in results: yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"])