Source code for swh.objstorage.backends.seaweedfs.http
# Copyright (C) 2019-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Iterator
from urllib.parse import urljoin, urlparse
import requests
from swh.objstorage.objstorage import DEFAULT_LIMIT
LOGGER = logging.getLogger(__name__)
[docs]
class HttpFiler:
"""Simple class that encapsulates access to a seaweedfs filer service.
TODO: handle errors
"""
def __init__(self, url, pool_maxsize=100):
if not url.endswith("/"):
url = url + "/"
self.url = url
self.baseurl = urljoin(url, "/")
self.basepath = urlparse(url).path
self.session = requests.Session()
self.session.headers["Accept"] = "application/json"
adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_maxsize, pool_maxsize=pool_maxsize
)
self.session.mount(self.url, adapter)
self.batchsize = DEFAULT_LIMIT
[docs]
def build_url(self, path):
assert path == self.basepath or path.startswith(self.basepath)
return urljoin(self.baseurl, path)
[docs]
def get(self, remote_path):
url = self.build_url(remote_path)
LOGGER.debug("Get file %s", url)
resp = self.session.get(url)
resp.raise_for_status()
return resp.content
[docs]
def exists(self, remote_path):
url = self.build_url(remote_path)
LOGGER.debug("Check file %s", url)
return self.session.head(url).status_code == 200
[docs]
def put(self, fp, remote_path):
url = self.build_url(remote_path)
LOGGER.debug("Put file %s", url)
return self.session.post(url, files={"file": fp})
[docs]
def delete(self, remote_path):
url = self.build_url(remote_path)
LOGGER.debug("Delete file %s", url)
return self.session.delete(url)
[docs]
def iterfiles(self, dir: str, last_file_name: str = "") -> Iterator[str]:
"""Recursively yield absolute file names
Args:
dir: retrieve file names starting from this directory; must
be an absolute path.
last_file_name: if given, starts from the file just after; must
be basename.
Yields:
absolute file names
"""
if not dir.endswith("/"):
dir = dir + "/"
# first, generates files going "down" the tree from current position
# (last_file_name)
yield from self._iter_files(dir, last_file_name)
# then, continue iterate going up the tree
while dir != self.basepath:
dir, last = dir[:-1].rsplit("/", 1)
dir += "/"
yield from self._iter_files(dir, last_file_name=last)
def _iter_files(self, dir: str, last_file_name: str = "") -> Iterator[str]:
for entry in self._iter_one_dir(dir, last_file_name):
fullpath = entry["FullPath"]
if entry["Mode"] & 1 << 31: # it's a directory, recurse
# see https://pkg.go.dev/io/fs#FileMode
yield from self._iter_files(fullpath)
else:
yield fullpath
def _iter_one_dir(self, remote_path: str, last_file_name: str = ""):
url = self.build_url(remote_path)
params = {"limit": self.batchsize}
if last_file_name:
params["lastFileName"] = last_file_name
LOGGER.debug("List directory %s", url)
while True:
rsp = self.session.get(url, params=params)
if rsp.ok:
dircontent = rsp.json()
if dircontent["Entries"]:
yield from dircontent["Entries"]
if not dircontent["ShouldDisplayLoadMore"]:
break
params["lastFileName"] = dircontent["LastFileName"]
else:
LOGGER.error('Error listing "%s". [HTTP %d]', url, rsp.status_code)
break