swh.core package

Submodules

swh.core.api_async module

swh.core.config module

swh.core.config.exists_accessible(file)[source]

Check whether a file exists, and is accessible.

Returns:True if the file exists and is accessible False if the file does not exist
Raises:PermissionError if the file cannot be read.
swh.core.config.config_basepath(config_path)[source]

Return the base path of a configuration file

swh.core.config.read_raw_config(base_config_path)[source]

Read the raw config corresponding to base_config_path.

Can read yml or ini files.

swh.core.config.config_exists(config_path)[source]

Check whether the given config exists

swh.core.config.read(conf_file=None, default_conf=None)[source]

Read the user’s configuration file.

Fill in the gap using default_conf. default_conf is similar to this:

DEFAULT_CONF = {
    'a': ('str', '/tmp/swh-loader-git/log'),
    'b': ('str', 'dbname=swhloadergit')
    'c': ('bool', true)
    'e': ('bool', None)
    'd': ('int', 10)
}

If conf_file is None, return the default config.

swh.core.config.priority_read(conf_filenames, default_conf=None)[source]

Try reading the configuration files from conf_filenames, in order, and return the configuration from the first one that exists.

default_conf has the same specification as it does in read.

swh.core.config.merge_default_configs(base_config, *other_configs)[source]

Merge several default config dictionaries, from left to right

swh.core.config.merge_configs(base, other)[source]

Merge two config dictionaries

This does merge config dicts recursively, with the rules, for every value of the dicts (with ‘val’ not being a dict):

  • None + type -> type
  • type + None -> None
  • dict + dict -> dict (merged)
  • val + dict -> TypeError
  • dict + val -> TypeError
  • val + val -> val (other)

for instance:

>>> d1 = {
...   'key1': {
...     'skey1': 'value1',
...     'skey2': {'sskey1': 'value2'},
...   },
...   'key2': 'value3',
... }

with

>>> d2 = {
...   'key1': {
...     'skey1': 'value4',
...     'skey2': {'sskey2': 'value5'},
...   },
...   'key3': 'value6',
... }

will give:

>>> d3 = {
...   'key1': {
...     'skey1': 'value4',  # <-- note this
...     'skey2': {
...       'sskey1': 'value2',
...       'sskey2': 'value5',
...     },
...   },
...   'key2': 'value3',
...   'key3': 'value6',
... }
>>> assert merge_configs(d1, d2) == d3

Note that no type checking is done for anything but dicts.

swh.core.config.swh_config_paths(base_filename)[source]

Return the Software Heritage specific configuration paths for the given filename.

swh.core.config.prepare_folders(conf, *keys)[source]

Prepare the folder mentioned in config under keys.

swh.core.config.load_global_config()[source]

Load the global Software Heritage config

swh.core.config.load_named_config(name, default_conf=None, global_conf=True)[source]

Load the config named name from the Software Heritage configuration paths.

If global_conf is True (default), read the global configuration too.

class swh.core.config.SWHConfig[source]

Bases: object

Mixin to add configuration parsing abilities to classes

The class should override the class attributes:
  • DEFAULT_CONFIG (default configuration to be parsed)
  • CONFIG_BASE_FILENAME (the filename of the configuration to be used)

This class defines one classmethod, parse_config_file, which parses a configuration file using the default config as set in the class attribute.

DEFAULT_CONFIG = {}
CONFIG_BASE_FILENAME = ''
classmethod parse_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True)[source]

Parse the configuration file associated to the current class.

By default, parse_config_file will load the configuration cls.CONFIG_BASE_FILENAME from one of the Software Heritage configuration directories, in order, unless it is overridden by base_filename or config_filename (which shortcuts the file lookup completely).

Parameters:
  • base_filename (-) – overrides the default cls.CONFIG_BASE_FILENAME
  • config_filename (-) – sets the file to parse instead of the defaults set from cls.CONFIG_BASE_FILENAME
  • additional_configs (-) – (list of default configuration dicts) allows to override or extend the configuration set in cls.DEFAULT_CONFIG.
  • global_config (-) – Load the global configuration (default: True)
__dict__ = mappingproxy({'__module__': 'swh.core.config', '__doc__': 'Mixin to add configuration parsing abilities to classes\n\n The class should override the class attributes:\n - DEFAULT_CONFIG (default configuration to be parsed)\n - CONFIG_BASE_FILENAME (the filename of the configuration to be used)\n\n This class defines one classmethod, parse_config_file, which\n parses a configuration file using the default config as set in the\n class attribute.\n\n ', 'DEFAULT_CONFIG': {}, 'CONFIG_BASE_FILENAME': '', 'parse_config_file': <classmethod object>, '__dict__': <attribute '__dict__' of 'SWHConfig' objects>, '__weakref__': <attribute '__weakref__' of 'SWHConfig' objects>})
__module__ = 'swh.core.config'
__weakref__

list of weak references to the object (if defined)

swh.core.logger module

swh.core.logger.db_level_of_py_level(lvl)[source]

convert a log level of the logging module to a log level suitable for the logging Postgres DB

swh.core.logger.get_extra_data(record, task_args=True)[source]

Get the extra data to insert to the database from the logging record

swh.core.logger.flatten(data, separator='_')[source]

Flatten the data dictionary into a flat structure

swh.core.logger.stringify(value)[source]

Convert value to string

class swh.core.logger.JournalHandler(level=0, sender_function=<function send>, **kwargs)[source]

Bases: systemd.journal.JournalHandler

emit(record)[source]

Write record as a journal event.

MESSAGE is taken from the message provided by the user, and PRIORITY, LOGGER, THREAD_NAME, CODE_{FILE,LINE,FUNC} fields are appended automatically. In addition, record.MESSAGE_ID will be used if present.

__module__ = 'swh.core.logger'

swh.core.pytest_plugin module

swh.core.pytest_plugin.get_response_cb(request, context, datadir, ignore_urls: List[str] = [], visits: Optional[Dict[KT, VT]] = None)[source]

Mount point callback to fetch on disk the request’s content.

This is meant to be used as ‘body’ argument of the requests_mock.get() method.

It will look for files on the local filesystem based on the requested URL, using the following rules:

  • files are searched in the datadir/<hostname> directory
  • the local file name is the path part of the URL with path hierarchy markers (aka ‘/’) replaced by ‘_’

Eg. if you use the requests_mock fixture in your test file as:

requests_mock.get(‘https?://nowhere.com’, body=get_response_cb) # or even requests_mock.get(re.compile(‘https?://’), body=get_response_cb)

then a call requests.get like:

will look the content of the response in:

datadir/https_nowhere.com/path_to_resource,a=b,c=d

or a call requests.get like:

will look the content of the response in:

datadir/http_nowhere.com/path_to_resource,a=b,c=d
Parameters:
  • request (requests.Request) – Object requests
  • context (requests.Context) – Object holding response metadata information (status_code, headers, etc…)
  • ignore_urls – urls whose status response should be 404 even if the local file exists
  • visits – Dict of url, number of visits. If None, disable multi visit support (default)
Returns:

Optional[FileDescriptor] on disk file to read from the test context

swh.core.pytest_plugin.datadir(request)[source]

By default, returns the test directory’s data directory.

This can be overridden on a per arborescence basis. Add an override definition in the local conftest, for example:

import pytest

from os import path

@pytest.fixture def datadir():

return path.join(path.abspath(path.dirname(__file__)), ‘resources’)
swh.core.pytest_plugin.requests_mock_datadir_factory(ignore_urls: List[str] = [], has_multi_visit: bool = False)[source]

This factory generates fixture which allow to look for files on the local filesystem based on the requested URL, using the following rules:

  • files are searched in the datadir/<hostname> directory
  • the local file name is the path part of the URL with path hierarchy markers (aka ‘/’) replaced by ‘_’

Multiple implementations are possible, for example:

  • requests_mock_datadir_factory([]):
    This computes the file name from the query and always returns the same result.
  • requests_mock_datadir_factory(has_multi_visit=True):
    This computes the file name from the query and returns the content of the filename the first time, the next call returning the content of files suffixed with _visit1 and so on and so forth. If the file is not found, returns a 404.
  • requests_mock_datadir_factory(ignore_urls=[‘url1’, ‘url2’]):
    This will ignore any files corresponding to url1 and url2, always returning 404.
Parameters:
  • ignore_urls – List of urls to always returns 404 (whether file exists or not)
  • has_multi_visit – Activate or not the multiple visits behavior
swh.core.pytest_plugin.requests_mock_datadir(requests_mock, datadir)
swh.core.pytest_plugin.requests_mock_datadir_visits(requests_mock, datadir)
swh.core.pytest_plugin.swh_rpc_client(swh_rpc_client_class, swh_rpc_adapter)[source]

This fixture generates an RPCClient instance that uses the class generated by the rpc_client_class fixture as backend.

Since it uses the swh_rpc_adapter, HTTP queries will be intercepted and routed directly to the current Flask app (as provided by the app fixture).

So this stack of fixtures allows to test the RPCClient -> RPCServerApp communication path using a real RPCClient instance and a real Flask (RPCServerApp) app instance.

To use this fixture:

  • ensure an app fixture exists and generate a Flask application,
  • implement an swh_rpc_client_class fixtures that returns the RPCClient-based class to use as client side for the tests,
  • implement your tests using this swh_rpc_client fixture.

See swh/core/api/tests/test_rpc_client_server.py for an example of usage.

swh.core.pytest_plugin.swh_rpc_adapter(app)[source]

Fixture that generates a requests.Adapter instance that can be used to test client/servers code based on swh.core.api classes.

See swh/core/api/tests/test_rpc_client_server.py for an example of usage.

class swh.core.pytest_plugin.RPCTestAdapter(client)[source]

Bases: requests.adapters.BaseAdapter

__init__(client)[source]

Initialize self. See help(type(self)) for accurate signature.

build_response(req, resp)[source]
send(request, **kw)[source]

Sends PreparedRequest object. Returns Response object.

Parameters:
  • request – The PreparedRequest being sent.
  • stream – (optional) Whether to stream the request content.
  • timeout (float or tuple) – (optional) How long to wait for the server to send data before giving up, as a float, or a (connect timeout, read timeout) tuple.
  • verify – (optional) Either a boolean, in which case it controls whether we verify the server’s TLS certificate, or a string, in which case it must be a path to a CA bundle to use
  • cert – (optional) Any user-provided SSL certificate to be trusted.
  • proxies – (optional) The proxies dictionary to apply to the request.
__module__ = 'swh.core.pytest_plugin'
swh.core.pytest_plugin.flask_app_client(app)[source]
swh.core.pytest_plugin._push_request_context(request)[source]

During tests execution request context has been pushed, e.g. url_for, session, etc. can be used in tests as is:

def test_app(app, client):
    assert client.get(url_for('myview')).status_code == 200

swh.core.statsd module

class swh.core.statsd.TimedContextManagerDecorator(statsd, metric=None, error_metric=None, tags=None, sample_rate=1)[source]

Bases: object

A context manager and a decorator which will report the elapsed time in the context OR in a function call.

elapsed

the elapsed time at the point of completion

Type:float
__init__(statsd, metric=None, error_metric=None, tags=None, sample_rate=1)[source]

Initialize self. See help(type(self)) for accurate signature.

__call__(func)[source]

Decorator which returns the elapsed time of the function call.

Default to the function name if metric was not provided.

__enter__()[source]
__exit__(type, value, traceback)[source]
_send(start)[source]
_send_error()[source]
start()[source]

Start the timer

stop()[source]

Stop the timer, send the metric value

__dict__ = mappingproxy({'__module__': 'swh.core.statsd', '__doc__': '\n A context manager and a decorator which will report the elapsed time in\n the context OR in a function call.\n\n Attributes:\n elapsed (float): the elapsed time at the point of completion\n ', '__init__': <function TimedContextManagerDecorator.__init__>, '__call__': <function TimedContextManagerDecorator.__call__>, '__enter__': <function TimedContextManagerDecorator.__enter__>, '__exit__': <function TimedContextManagerDecorator.__exit__>, '_send': <function TimedContextManagerDecorator._send>, '_send_error': <function TimedContextManagerDecorator._send_error>, 'start': <function TimedContextManagerDecorator.start>, 'stop': <function TimedContextManagerDecorator.stop>, '__dict__': <attribute '__dict__' of 'TimedContextManagerDecorator' objects>, '__weakref__': <attribute '__weakref__' of 'TimedContextManagerDecorator' objects>})
__module__ = 'swh.core.statsd'
__weakref__

list of weak references to the object (if defined)

class swh.core.statsd.Statsd(host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None)[source]

Bases: object

Initialize a client to send metrics to a StatsD server.

Parameters:
  • host (str) – the host of the StatsD server. Defaults to localhost.
  • port (int) – the port of the StatsD server. Defaults to 8125.
  • max_buffer_size (int) – Maximum number of metrics to buffer before sending to the server if sending metrics in batch
  • namespace (str) – Namespace to prefix all metric names
  • constant_tags (Dict[str, str]) – Tags to attach to all metrics

Note

This class also supports the following environment variables:

STATSD_HOST
Override the default host of the statsd server
STATSD_PORT
Override the default port of the statsd server
STATSD_TAGS

Tags to attach to every metric reported. Example value:

“label:value,other_label:other_value”

__init__(host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None)[source]

Initialize self. See help(type(self)) for accurate signature.

__enter__()[source]
__exit__(type, value, traceback)[source]
gauge(metric, value, tags=None, sample_rate=1)[source]

Record the value of a gauge, optionally setting a list of tags and a sample rate.

>>> statsd.gauge('users.online', 123)
>>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"})
increment(metric, value=1, tags=None, sample_rate=1)[source]

Increment a counter, optionally setting a value, tags and a sample rate.

>>> statsd.increment('page.views')
>>> statsd.increment('files.transferred', 124)
decrement(metric, value=1, tags=None, sample_rate=1)[source]

Decrement a counter, optionally setting a value, tags and a sample rate.

>>> statsd.decrement('files.remaining')
>>> statsd.decrement('active.connections', 2)
histogram(metric, value, tags=None, sample_rate=1)[source]

Sample a histogram value, optionally setting tags and a sample rate.

>>> statsd.histogram('uploaded.file.size', 1445)
>>> statsd.histogram('file.count', 26, tags={"filetype": "python"})
timing(metric, value, tags=None, sample_rate=1)[source]

Record a timing, optionally setting tags and a sample rate.

>>> statsd.timing("query.response.time", 1234)
timed(metric=None, error_metric=None, tags=None, sample_rate=1)[source]

A decorator or context manager that will measure the distribution of a function’s/context’s run time. Optionally specify a list of tags or a sample rate. If the metric is not defined as a decorator, the module name and function name will be used. The metric is required as a context manager.

@statsd.timed('user.query.time', sample_rate=0.5)
def get_user(user_id):
    # Do what you need to ...
    pass

# Is equivalent to ...
with statsd.timed('user.query.time', sample_rate=0.5):
    # Do what you need to ...
    pass

# Is equivalent to ...
start = time.monotonic()
try:
    get_user(user_id)
finally:
    statsd.timing('user.query.time', time.monotonic() - start)
set(metric, value, tags=None, sample_rate=1)[source]

Sample a set value.

>>> statsd.set('visitors.uniques', 999)
socket

Return a connected socket.

Note: connect the socket before assigning it to the class instance to avoid bad thread race conditions.

open_buffer(max_buffer_size=50)[source]

Open a buffer to send a batch of metrics in one packet.

You can also use this as a context manager.

>>> with Statsd() as batch:
...     batch.gauge('users.online', 123)
...     batch.gauge('active.connections', 1001)
close_buffer()[source]

Flush the buffer and switch back to single metric packets.

close_socket()[source]

Closes connected socket if connected.

_report(metric, metric_type, value, tags, sample_rate)[source]

Create a metric packet and send it.

_send_to_server(packet)[source]
_send_to_buffer(packet)[source]
_flush_buffer()[source]
_add_constant_tags(tags)[source]
__dict__ = mappingproxy({'__module__': 'swh.core.statsd', '__doc__': 'Initialize a client to send metrics to a StatsD server.\n\n Arguments:\n host (str): the host of the StatsD server. Defaults to localhost.\n port (int): the port of the StatsD server. Defaults to 8125.\n\n max_buffer_size (int): Maximum number of metrics to buffer before\n sending to the server if sending metrics in batch\n\n namespace (str): Namespace to prefix all metric names\n\n constant_tags (Dict[str, str]): Tags to attach to all metrics\n\n Note:\n This class also supports the following environment variables:\n\n STATSD_HOST\n Override the default host of the statsd server\n STATSD_PORT\n Override the default port of the statsd server\n STATSD_TAGS\n Tags to attach to every metric reported. Example value:\n\n "label:value,other_label:other_value"\n ', '__init__': <function Statsd.__init__>, '__enter__': <function Statsd.__enter__>, '__exit__': <function Statsd.__exit__>, 'gauge': <function Statsd.gauge>, 'increment': <function Statsd.increment>, 'decrement': <function Statsd.decrement>, 'histogram': <function Statsd.histogram>, 'timing': <function Statsd.timing>, 'timed': <function Statsd.timed>, 'set': <function Statsd.set>, 'socket': <property object>, 'open_buffer': <function Statsd.open_buffer>, 'close_buffer': <function Statsd.close_buffer>, 'close_socket': <function Statsd.close_socket>, '_report': <function Statsd._report>, '_send_to_server': <function Statsd._send_to_server>, '_send_to_buffer': <function Statsd._send_to_buffer>, '_flush_buffer': <function Statsd._flush_buffer>, '_add_constant_tags': <function Statsd._add_constant_tags>, '__dict__': <attribute '__dict__' of 'Statsd' objects>, '__weakref__': <attribute '__weakref__' of 'Statsd' objects>})
__module__ = 'swh.core.statsd'
__weakref__

list of weak references to the object (if defined)

swh.core.statsd.random() → x in the interval [0, 1).

swh.core.tarball module

swh.core.tarball._canonical_abspath(path)[source]

Resolve all paths to an absolute and real one.

Parameters:path – to resolve
Returns:canonical absolute path to path
swh.core.tarball._badpath(path, basepath)[source]

Determine if a path is outside basepath.

Parameters:
  • path – a relative or absolute path of a file or directory
  • basepath – the basepath path must be in
Returns:

True if path is outside basepath, false otherwise.

Determine if the tarinfo member is outside basepath.

Parameters:
  • info – TarInfo member representing a symlink or hardlink of tar archive
  • basepath – the basepath the info member must be in
Returns:

True if info is outside basepath, false otherwise.

swh.core.tarball.is_tarball(filepath)[source]

Given a filepath, determine if it represents an archive.

Parameters:filepath – file to test for tarball property
Returns:Bool, True if it’s a tarball, False otherwise
swh.core.tarball._uncompress_zip(tarpath, dirpath)[source]

Uncompress zip archive safely.

As per zipfile is concerned (cf. note on https://docs.python.org/3.5/library/zipfile.html#zipfile.ZipFile.extract) # noqa

Parameters:
  • tarpath – path to the archive
  • dirpath – directory to uncompress the archive to
swh.core.tarball._safemembers(tarpath, members, basepath)[source]

Given a list of archive members, yield the members (directory, file, hard-link) that stays in bounds with basepath. Note that symbolic link are authorized to point outside the basepath though.

Parameters:
  • tarpath – Name of the tarball
  • members – Archive members for such tarball
  • basepath – the basepath sandbox
Yields:

Safe TarInfo member

Raises:

ValueError when a member would be extracted outside basepath

swh.core.tarball._uncompress_tar(tarpath, dirpath)[source]

Uncompress tarpath if the tarpath is safe. Safe means, no file will be uncompressed outside of dirpath.

Parameters:
  • tarpath – path to the archive
  • dirpath – directory to uncompress the archive to
Raises:

ValueError when a member would be extracted outside dirpath.

swh.core.tarball.uncompress(tarpath, dest)[source]
Uncompress tarpath to dest folder if tarball is supported and safe.

Safe means, no file will be uncompressed outside of dirpath.

Note that this fixes permissions after successfully uncompressing the archive.

Parameters:
  • tarpath – path to tarball to uncompress
  • dest – the destination folder where to uncompress the tarball
Returns:

The nature of the tarball, zip or tar.

Raises:
  • ValueError when
    • an archive member would be extracted outside basepath
    • the archive is not supported
swh.core.tarball._ls(rootdir)[source]

Generator of filepath, filename from rootdir.

swh.core.tarball._compress_zip(tarpath, files)[source]

Compress dirpath’s content as tarpath.

swh.core.tarball._compress_tar(tarpath, files)[source]

Compress dirpath’s content as tarpath.

swh.core.tarball.compress(tarpath, nature, dirpath_or_files)[source]

Create a tarball tarpath with nature nature. The content of the tarball is either dirpath’s content (if representing a directory path) or dirpath’s iterable contents.

Compress the directory dirpath’s content to a tarball. The tarball being dumped at tarpath. The nature of the tarball is determined by the nature argument.

swh.core.utils module

swh.core.utils.cwd(path)[source]

Contextually change the working directory to do thy bidding. Then gets back to the original location.

swh.core.utils.grouper(iterable, n)[source]
Collect data into fixed-length size iterables. The last block might

contain less elements as it will hold only the remaining number of elements.

The invariant here is that the number of elements in the input iterable and the sum of the number of elements of all iterables generated from this function should be equal.

Parameters:
  • iterable (Iterable) – an iterable
  • n (int) – size of block to slice the iterable into
Yields:

fixed-length blocks as iterables. As mentioned, the last iterable might be less populated.

swh.core.utils.backslashescape_errors(exception)[source]
swh.core.utils.encode_with_unescape(value)[source]

Encode an unicode string containing x<hex> backslash escapes

swh.core.utils.decode_with_escape(value)[source]

Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences as x<hex value>. We also escape NUL bytes as they are invalid in JSON strings.

swh.core.utils.commonname(path0, path1, as_str=False)[source]

Compute the commonname between the path0 and path1.

swh.core.utils.numfile_sortkey(fname)[source]

Simple function to sort filenames of the form:

nnxxx.ext

where nn is a number according to the numbers.

Typically used to sort sql/nn-swh-xxx.sql files.

Module contents