# Copyright (C) 2022-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import dataclasses
import datetime
import functools
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple
import psycopg2
from swh.core.db import BaseDb
from swh.model.swhids import CoreSWHID, ObjectType
[docs]
def now():
return datetime.datetime.now(tz=datetime.timezone.utc)
[docs]
@dataclasses.dataclass(frozen=True)
class Datastore:
"""Represents a datastore being scrubbed; eg. swh-storage or swh-journal."""
package: str
"""'storage', 'journal', or 'objstorage'."""
cls: str
"""'postgresql'/'cassandra' for storage, 'kafka' for journal,
'pathslicer'/'winery'/... for objstorage."""
instance: str
"""Human readable string."""
[docs]
@dataclasses.dataclass(frozen=True)
class ConfigEntry:
"""Represents a datastore being scrubbed; eg. swh-storage or swh-journal."""
name: str
datastore: Datastore
object_type: ObjectType
nb_partitions: int
check_hashes: bool
check_references: bool
[docs]
@dataclasses.dataclass(frozen=True)
class CorruptObject:
id: CoreSWHID
config: ConfigEntry
first_occurrence: datetime.datetime
object_: bytes
[docs]
@dataclasses.dataclass(frozen=True)
class MissingObject:
id: CoreSWHID
config: ConfigEntry
first_occurrence: datetime.datetime
[docs]
@dataclasses.dataclass(frozen=True)
class MissingObjectReference:
missing_id: CoreSWHID
reference_id: CoreSWHID
config: ConfigEntry
first_occurrence: datetime.datetime
[docs]
@dataclasses.dataclass(frozen=True)
class FixedObject:
id: CoreSWHID
object_: bytes
method: str
recovery_date: Optional[datetime.datetime] = None
[docs]
class ScrubberDb(BaseDb):
current_version = 7
def __init__(self, db, **kwargs):
if isinstance(db, str):
conn = psycopg2.connect(db, **kwargs)
else:
conn = db
super().__init__(conn=conn)
####################################
# Shared tables
####################################
[docs]
@functools.lru_cache(1000)
def datastore_get_or_add(self, datastore: Datastore) -> int:
"""Creates a datastore if it does not exist, and returns its id."""
with self.transaction() as cur:
cur.execute(
"""
WITH inserted AS (
INSERT INTO datastore (package, class, instance)
VALUES (%(package)s, %(cls)s, %(instance)s)
ON CONFLICT DO NOTHING
RETURNING id
)
SELECT id
FROM inserted
UNION (
-- If the datastore already exists, we need to fetch its id
SELECT id
FROM datastore
WHERE
package=%(package)s
AND class=%(cls)s
AND instance=%(instance)s
)
LIMIT 1
""",
(dataclasses.asdict(datastore)),
)
res = cur.fetchone()
assert res is not None
(id_,) = res
return id_
[docs]
@functools.lru_cache(1000)
def datastore_get(self, datastore_id: int) -> Datastore:
"""Returns a datastore's id. Raises :exc:`ValueError` if it does not exist."""
with self.transaction() as cur:
cur.execute(
"""
SELECT package, class, instance
FROM datastore
WHERE id=%s
""",
(datastore_id,),
)
res = cur.fetchone()
if res is None:
raise ValueError(f"No datastore with id {datastore_id}")
(package, cls, instance) = res
return Datastore(package=package, cls=cls, instance=instance)
[docs]
def config_add(
self,
name: Optional[str],
datastore: Datastore,
object_type: ObjectType,
nb_partitions: int,
check_hashes: bool = True,
check_references: bool = True,
) -> int:
"""Creates a configuration entry (and potentially a datastore);
Will fail if a config with same (datastore. object_type, nb_paritions)
already exists.
"""
if not (check_hashes or check_references):
raise ValueError(
"At least one of the 2 check_hashes and check_references flags must be set"
)
datastore_id = self.datastore_get_or_add(datastore)
if not name:
name = (
f"check_{object_type.name.lower()}_{nb_partitions}_"
f"{datastore.package}_{datastore.cls}"
)
args = {
"name": name,
"datastore_id": datastore_id,
"object_type": object_type.name.lower(),
"nb_partitions": nb_partitions,
"check_hashes": check_hashes,
"check_references": check_references,
}
with self.transaction() as cur:
cur.execute(
"""
WITH inserted AS (
INSERT INTO check_config
(name, datastore, object_type, nb_partitions,
check_hashes, check_references)
VALUES
(%(name)s, %(datastore_id)s, %(object_type)s, %(nb_partitions)s,
%(check_hashes)s, %(check_references)s)
RETURNING id
)
SELECT id
FROM inserted;
""",
args,
)
res = cur.fetchone()
if res is None:
raise ValueError(f"No config matching {args}")
(id_,) = res
return id_
[docs]
@functools.lru_cache(1000)
def config_get(self, config_id: int) -> ConfigEntry:
with self.transaction() as cur:
cur.execute(
"""
SELECT
cc.name, cc.object_type, cc.nb_partitions,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM check_config AS cc
INNER JOIN datastore As ds ON (cc.datastore=ds.id)
WHERE cc.id=%(config_id)s
""",
{
"config_id": config_id,
},
)
res = cur.fetchone()
if res is None:
raise ValueError(f"No config with id {config_id}")
(
name,
object_type,
nb_partitions,
chk_hashes,
chk_refs,
ds_package,
ds_class,
ds_instance,
) = res
return ConfigEntry(
name=name,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
object_type=getattr(ObjectType, object_type.upper()),
nb_partitions=nb_partitions,
check_hashes=chk_hashes,
check_references=chk_refs,
)
[docs]
def config_get_by_name(
self,
name: str,
datastore: Optional[int] = None,
) -> Optional[int]:
"""Get the configuration entry for given name, if any"""
query_parts = ["SELECT id FROM check_config WHERE "]
where_parts = [" name = %s "]
query_params = [name]
if datastore:
where_parts.append(" datastore = %s ")
query_params.append(str(datastore))
query_parts.append(" AND ".join(where_parts))
query = "\n".join(query_parts)
with self.transaction() as cur:
cur.execute(query, query_params)
if cur.rowcount:
res = cur.fetchone()
if res:
(id_,) = res
return id_
return None
[docs]
def config_iter(self) -> Iterator[Tuple[int, ConfigEntry]]:
with self.transaction() as cur:
cur.execute(
"""
SELECT
cc.id, cc.name, cc.object_type, cc.nb_partitions,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM check_config AS cc
INNER JOIN datastore AS ds ON (cc.datastore=ds.id)
""",
)
for row in cur:
assert row is not None
(
id_,
name,
object_type,
nb_partitions,
chk_hashes,
chk_refs,
ds_package,
ds_class,
ds_instance,
) = row
yield (
id_,
ConfigEntry(
name=name,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
object_type=object_type,
nb_partitions=nb_partitions,
check_hashes=chk_hashes,
check_references=chk_refs,
),
)
[docs]
def config_get_stats(
self,
config_id: int,
) -> Dict[str, Any]:
"""Return statistics for the check configuration <check_id>."""
config = self.config_get(config_id)
stats = {"config": config}
with self.transaction() as cur:
cur.execute(
"""
SELECT
min(end_date - start_date),
avg(end_date - start_date),
max(end_date - start_date)
FROM checked_partition
WHERE config_id=%s AND end_date is not NULL
""",
(config_id,),
)
row = cur.fetchone()
assert row
minv, avgv, maxv = row
stats["min_duration"] = minv.total_seconds() if minv is not None else 0.0
stats["max_duration"] = maxv.total_seconds() if maxv is not None else 0.0
stats["avg_duration"] = avgv.total_seconds() if avgv is not None else 0.0
cur.execute(
"""
SELECT count(*)
FROM checked_partition
WHERE config_id=%s AND end_date is not NULL
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["checked_partition"] = row[0]
cur.execute(
"""
SELECT count(*)
FROM checked_partition
WHERE config_id=%s AND end_date is NULL
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["running_partition"] = row[0]
cur.execute(
"""
SELECT count(*)
FROM missing_object
WHERE config_id=%s
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["missing_object"] = row[0]
cur.execute(
"""
SELECT count(distinct reference_id)
FROM missing_object_reference
WHERE config_id=%s
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["missing_object_reference"] = row[0]
cur.execute(
"""
SELECT count(*)
FROM corrupt_object
WHERE config_id=%s
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["corrupt_object"] = row[0]
return stats
####################################
# Checkpointing/progress tracking
####################################
[docs]
def checked_partition_iter_next(
self,
config_id: int,
) -> Iterator[int]:
"""Generates partitions to be checked for the given configuration
At each iteration, look for the next "free" partition in the
checked_partition, for the given config_id, reserve it and return its
id.
Reserving the partition means make sure there is a row in the table for
this partition id with the start_date column set.
To chose a "free" partition is to select either the smaller partition
is for which the start_date is NULL, or the first partition id not yet
in the table.
Stops the iteration when the number of partitions for the config id is
reached.
"""
while True:
start_time = now()
with self.transaction() as cur:
cur.execute(
"""
WITH next AS (
SELECT min(partition_id) as pid
FROM checked_partition
WHERE config_id=%(config_id)s and start_date is NULL
UNION
SELECT coalesce(max(partition_id) + 1, 0) as pid
FROM checked_partition
WHERE config_id=%(config_id)s
)
INSERT INTO checked_partition(config_id, partition_id, start_date)
select %(config_id)s, min(pid), %(start_date)s from next
where pid is not NULL
ON CONFLICT (config_id, partition_id)
DO UPDATE
SET start_date = GREATEST(
checked_partition.start_date, EXCLUDED.start_date
)
RETURNING partition_id;
""",
{"config_id": config_id, "start_date": start_time},
)
res = cur.fetchone()
assert res is not None
(partition_id,) = res
if partition_id >= self.config_get(config_id).nb_partitions:
self.conn.rollback()
return
yield partition_id
[docs]
def checked_partition_reset(self, config_id: int, partition_id: int) -> bool:
"""
Reset the partition, aka clear start_date and end_date
"""
with self.transaction() as cur:
cur.execute(
"""
UPDATE checked_partition
SET start_date=NULL, end_date=NULL
WHERE config_id=%(config_id)s AND partition_id=%(partition_id)s
""",
{"config_id": config_id, "partition_id": partition_id},
)
return bool(cur.rowcount)
[docs]
def checked_partition_upsert(
self,
config_id: int,
partition_id: int,
date: Optional[datetime.datetime] = None,
) -> None:
"""
Records in the database the given partition was last checked at the given date.
"""
if date is None:
date = now()
with self.transaction() as cur:
cur.execute(
"""
UPDATE checked_partition
SET end_date = GREATEST(%(date)s, end_date)
WHERE config_id=%(config_id)s AND partition_id=%(partition_id)s
""",
{
"config_id": config_id,
"partition_id": partition_id,
"date": date,
},
)
[docs]
def checked_partition_get_last_date(
self,
config_id: int,
partition_id: int,
) -> Optional[datetime.datetime]:
"""
Returns the last date the given partition was checked in the given datastore,
or :const:`None` if it was never checked.
Currently, this matches partition id and number exactly, with no regard for
partitions that contain or are contained by it.
"""
with self.transaction() as cur:
cur.execute(
"""
SELECT end_date
FROM checked_partition
WHERE config_id=%s AND partition_id=%s
""",
(config_id, partition_id),
)
res = cur.fetchone()
if res is None:
return None
else:
(date,) = res
return date
[docs]
def checked_partition_get_running(
self,
config_id: int,
) -> Iterator[Tuple[int, datetime.datetime]]:
"""Yields the partitions which are currently being checked; i.e. which have a
start_date but no end_date.
"""
with self.transaction() as cur:
cur.execute(
"""
SELECT partition_id, start_date
FROM checked_partition
WHERE config_id=%s AND start_date is not NULL AND end_date is NULL
""",
(config_id,),
)
for partition_id, start_date in cur:
yield (partition_id, start_date)
[docs]
def checked_partition_get_stuck(
self,
config_id: int,
since: Optional[datetime.timedelta] = None,
) -> Iterator[Tuple[int, datetime.datetime]]:
"""Yields the partitions which are currently running for more than `since`; if
not set, automatically guess a reasonable delay from completed partitions.
If no such a delay can be extracted, fall back to 1 hour.
The heuristic for the automatic delay is 2x max(end_date-start_date)
for the last 10 partitions checked.
"""
with self.transaction() as cur:
if since is None:
cur.execute(
"""
WITH delays as
(
SELECT end_date - start_date as delay
FROM checked_partition
WHERE config_id=%s AND end_date is not NULL
ORDER BY start_date DESC
LIMIT 10
)
SELECT 2*max(delay) from delays
""",
(config_id,),
)
res = cur.fetchone()
assert res is not None
(since,) = res
if since is None:
since = datetime.timedelta(hours=1)
cur.execute(
"""
SELECT partition_id, start_date
FROM checked_partition
WHERE config_id=%s AND end_date is NULL AND start_date < %s
""",
(config_id, now() - since),
)
for partition_id, start_date in cur:
yield (partition_id, start_date)
[docs]
def checked_partition_iter(
self, config_id: int
) -> Iterator[Tuple[int, int, datetime.datetime, Optional[datetime.datetime]]]:
"""Yields tuples of ``(partition_id, nb_partitions, start_date, end_date)``"""
with self.transaction() as cur:
cur.execute(
"""
SELECT CP.partition_id, CC.nb_partitions, CP.start_date, CP.end_date
FROM checked_partition as CP
INNER JOIN check_config AS CC on (CC.id=CP.config_id)
WHERE CC.id=%s
""",
(config_id,),
)
for row in cur:
yield tuple(row)
####################################
# Inventory of objects with issues
####################################
[docs]
def corrupt_object_add(
self,
id: CoreSWHID,
config: ConfigEntry,
serialized_object: bytes,
) -> None:
config_id = self.config_get_by_name(config.name)
assert config_id is not None
with self.transaction() as cur:
cur.execute(
"""
INSERT INTO corrupt_object (id, config_id, object)
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
""",
(str(id), config_id, serialized_object),
)
def _corrupt_object_list_from_cursor(
self, cur: psycopg2.extensions.cursor
) -> Iterator[CorruptObject]:
for row in cur:
(
id,
first_occurrence,
object_,
cc_object_type,
cc_nb_partitions,
cc_name,
cc_chk_hashes,
cc_chk_refs,
ds_package,
ds_class,
ds_instance,
) = row
yield CorruptObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
object_=object_,
config=ConfigEntry(
name=cc_name,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
object_type=cc_object_type,
nb_partitions=cc_nb_partitions,
check_hashes=cc_chk_hashes,
check_references=cc_chk_refs,
),
)
[docs]
def corrupt_object_iter(self) -> Iterator[CorruptObject]:
"""Yields all records in the 'corrupt_object' table."""
with self.transaction() as cur:
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
cc.object_type, cc.nb_partitions, cc.name,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN check_config AS cc ON (cc.id=co.config_id)
INNER JOIN datastore AS ds ON (ds.id=cc.datastore)
"""
)
yield from self._corrupt_object_list_from_cursor(cur)
[docs]
def corrupt_object_get(
self,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> Iterator[CorruptObject]:
"""Yields a page of records in the 'corrupt_object' table, ordered by id.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
in_origin: An origin URL. If provided, only returns objects that may be
found in the given origin
"""
with self.transaction() as cur:
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
cc.object_type, cc.nb_partitions, cc.name,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN check_config AS cc ON (cc.id=co.config_id)
INNER JOIN datastore AS ds ON (ds.id=cc.datastore)
WHERE
co.id >= %s
AND co.id <= %s
ORDER BY co.id
LIMIT %s
""",
(str(start_id), str(end_id), limit),
)
yield from self._corrupt_object_list_from_cursor(cur)
[docs]
def corrupt_object_grab_by_id(
self,
cur: psycopg2.extensions.cursor,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> Iterator[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
cc.object_type, cc.nb_partitions, cc.name,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN check_config AS cc ON (cc.id=co.config_id)
INNER JOIN datastore AS ds ON (ds.id=cc.datastore)
WHERE
co.id >= %(start_id)s
AND co.id <= %(end_id)s
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=str(start_id),
end_id=str(end_id),
limit=limit,
),
)
yield from self._corrupt_object_list_from_cursor(cur)
[docs]
def corrupt_object_grab_by_origin(
self,
cur: psycopg2.extensions.cursor,
origin_url: str,
start_id: Optional[CoreSWHID] = None,
end_id: Optional[CoreSWHID] = None,
limit: int = 100,
) -> Iterator[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
origin_url: only returns objects that may be found in the given origin
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
cc.object_type, cc.nb_partitions, cc.name,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN check_config AS cc ON (cc.id=co.config_id)
INNER JOIN datastore AS ds ON (ds.id=cc.datastore)
INNER JOIN object_origin AS oo ON (oo.object_id=co.id)
WHERE
(co.id >= %(start_id)s OR %(start_id)s IS NULL)
AND (co.id <= %(end_id)s OR %(end_id)s IS NULL)
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
AND oo.origin_url=%(origin_url)s
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=None if start_id is None else str(start_id),
end_id=None if end_id is None else str(end_id),
origin_url=origin_url,
limit=limit,
),
)
yield from self._corrupt_object_list_from_cursor(cur)
[docs]
def missing_object_add(
self,
id: CoreSWHID,
reference_ids: Iterable[CoreSWHID],
config: ConfigEntry,
) -> None:
"""
Adds a "hole" to the inventory, ie. an object missing from a datastore
that is referenced by an other object of the same datastore.
If the missing object is already known to be missing by the scrubber database,
this only records the reference (which can be useful to locate an origin
to recover the object from).
If that reference is already known too, this is a noop.
Args:
id: SWHID of the missing object (the hole)
reference_id: SWHID of the object referencing the missing object
datastore: representation of the swh-storage/swh-journal/... instance
containing this hole
"""
config_id = self.config_get_by_name(config.name)
with self.transaction() as cur:
cur.execute(
"""
INSERT INTO missing_object (id, config_id)
VALUES (%s, %s)
ON CONFLICT DO NOTHING
""",
(str(id), config_id),
)
if reference_ids:
psycopg2.extras.execute_batch(
cur,
"""
INSERT INTO missing_object_reference (missing_id, reference_id, config_id)
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
""",
[
(str(id), str(reference_id), config_id)
for reference_id in reference_ids
],
)
[docs]
def missing_object_iter(self) -> Iterator[MissingObject]:
"""Yields all records in the 'missing_object' table."""
with self.transaction() as cur:
cur.execute(
"""
SELECT
mo.id, mo.first_occurrence,
cc.name, cc.object_type, cc.nb_partitions,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM missing_object AS mo
INNER JOIN check_config AS cc ON (cc.id=mo.config_id)
INNER JOIN datastore AS ds ON (ds.id=cc.datastore)
"""
)
for row in cur:
(
id,
first_occurrence,
cc_name,
cc_object_type,
cc_nb_partitions,
cc_chk_hashes,
cc_chk_refs,
ds_package,
ds_class,
ds_instance,
) = row
yield MissingObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
config=ConfigEntry(
name=cc_name,
object_type=cc_object_type,
nb_partitions=cc_nb_partitions,
check_hashes=cc_chk_hashes,
check_references=cc_chk_refs,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
),
)
[docs]
def missing_object_reference_iter(
self, missing_id: CoreSWHID
) -> Iterator[MissingObjectReference]:
"""Yields all records in the 'missing_object_reference' table."""
with self.transaction() as cur:
cur.execute(
"""
SELECT
mor.reference_id, mor.first_occurrence,
cc.name, cc.object_type, cc.nb_partitions,
cc.check_hashes, cc.check_references,
ds.package, ds.class, ds.instance
FROM missing_object_reference AS mor
INNER JOIN check_config AS cc ON (cc.id=mor.config_id)
INNER JOIN datastore AS ds ON (ds.id=cc.datastore)
WHERE mor.missing_id=%s
""",
(str(missing_id),),
)
for row in cur:
(
reference_id,
first_occurrence,
cc_name,
cc_object_type,
cc_nb_partitions,
cc_chk_hashes,
cc_chk_refs,
ds_package,
ds_class,
ds_instance,
) = row
yield MissingObjectReference(
missing_id=missing_id,
reference_id=CoreSWHID.from_string(reference_id),
first_occurrence=first_occurrence,
config=ConfigEntry(
name=cc_name,
object_type=cc_object_type,
nb_partitions=cc_nb_partitions,
check_hashes=cc_chk_hashes,
check_references=cc_chk_refs,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
),
)
####################################
# Issue resolution
####################################
[docs]
def object_origin_add(
self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO object_origin (object_id, origin_url)
VALUES %s
ON CONFLICT DO NOTHING
""",
[(str(swhid), origin_url) for origin_url in origins],
)
[docs]
def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]:
"""Returns origins with non-fixed corrupt objects, ordered by URL.
Arguments:
after: if given, only returns origins with an URL after this value
"""
with self.transaction() as cur:
cur.execute(
"""
SELECT DISTINCT origin_url
FROM object_origin
WHERE
origin_url > %(after)s
AND object_id IN (
(SELECT id FROM corrupt_object)
EXCEPT (SELECT id FROM fixed_object)
)
ORDER BY origin_url
LIMIT %(limit)s
""",
dict(after=after, limit=limit),
)
return [origin_url for (origin_url,) in cur]
[docs]
def fixed_object_add(
self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO fixed_object (id, object, method)
VALUES %s
ON CONFLICT DO NOTHING
""",
[
(str(fixed_object.id), fixed_object.object_, fixed_object.method)
for fixed_object in fixed_objects
],
)
[docs]
def fixed_object_iter(self) -> Iterator[FixedObject]:
with self.transaction() as cur:
cur.execute("SELECT id, object, method, recovery_date FROM fixed_object")
for id, object_, method, recovery_date in cur:
yield FixedObject(
id=CoreSWHID.from_string(id),
object_=object_,
method=method,
recovery_date=recovery_date,
)