Source code for zcollection.sync

# Copyright (c) 2023 CNES
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Synchronization of concurrent accesses
======================================
"""
from __future__ import annotations

from typing import Callable
import abc
import threading

import fasteners


[docs] class Sync(abc.ABC): # pragma: no cover """Interface of the classes handling the synchronization of concurrent accesses."""
[docs] @abc.abstractmethod def __enter__(self) -> bool: ...
[docs] @abc.abstractmethod def __exit__(self, exc_type, exc_value, traceback) -> None: ...
[docs] @abc.abstractmethod def is_locked(self) -> bool: """Returns True if the lock is acquired, False otherwise."""
[docs] class NoSync(Sync): """This class is used when the user does not want to synchronize accesses to the collection, in other words, when there is no concurrency."""
[docs] def __enter__(self) -> bool: return True
[docs] def __exit__(self, exc_type, exc_value, traceback) -> None: """As this class does not perform any synchronization, this method has nothing to do."""
[docs] def is_locked(self) -> bool: """As this class does not perform any synchronization, this method always returns False.""" return False
[docs] class ProcessSync(Sync): """This class is used when the user wants to synchronize accesses to the collection, in other words, when there is concurrency.""" def __init__(self, path: str) -> None: self.lock = fasteners.InterProcessLock(path)
[docs] def __enter__(self) -> bool: return self.lock.acquire()
[docs] def __exit__(self, exc_type, exc_value, traceback) -> None: try: self.lock.release() except threading.ThreadError: pass
[docs] def __reduce__(self) -> tuple[Callable, tuple[str]]: return (ProcessSync, (str(self.lock.path), ))
[docs] def is_locked(self) -> bool: """Returns True if the lock is acquired, False otherwise.""" return self.lock.exists()