1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-10-21 22:12:00 +00:00

Type hints for multifilesystem

This commit is contained in:
Unrud 2021-07-26 20:56:47 +02:00
parent c93d7b8715
commit 0de8628952
14 changed files with 428 additions and 246 deletions

View file

@ -23,56 +23,65 @@ import shlex
import signal
import subprocess
import sys
from typing import Iterator
from radicale import pathutils
from radicale import config, pathutils, types
from radicale.log import logger
from radicale.storage.multifilesystem.base import CollectionBase, StorageBase
class CollectionLockMixin:
def _acquire_cache_lock(self, ns=""):
class CollectionPartLock(CollectionBase):
@types.contextmanager
def _acquire_cache_lock(self, ns: str = "") -> Iterator[None]:
if self._storage._lock.locked == "w":
return contextlib.ExitStack()
yield
return
cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache")
self._storage._makedirs_synced(cache_folder)
lock_path = os.path.join(cache_folder,
".Radicale.lock" + (".%s" % ns if ns else ""))
lock = pathutils.RwLock(lock_path)
return lock.acquire("w")
with lock.acquire("w"):
yield
class StorageLockMixin:
class StoragePartLock(StorageBase):
def __init__(self, configuration):
_lock: pathutils.RwLock
_hook: str
def __init__(self, configuration: config.Configuration) -> None:
super().__init__(configuration)
folder = self.configuration.get("storage", "filesystem_folder")
lock_path = os.path.join(folder, ".Radicale.lock")
lock_path = os.path.join(self._filesystem_folder, ".Radicale.lock")
self._lock = pathutils.RwLock(lock_path)
self._hook = configuration.get("storage", "hook")
@contextlib.contextmanager
def acquire_lock(self, mode, user=""):
@types.contextmanager
def acquire_lock(self, mode: str, user: str = "") -> Iterator[None]:
with self._lock.acquire(mode):
yield
# execute hook
hook = self.configuration.get("storage", "hook")
if mode == "w" and hook:
folder = self.configuration.get("storage", "filesystem_folder")
if mode == "w" and self._hook:
debug = logger.isEnabledFor(logging.DEBUG)
popen_kwargs = dict(
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE if debug else subprocess.DEVNULL,
stderr=subprocess.PIPE if debug else subprocess.DEVNULL,
shell=True, universal_newlines=True, cwd=folder)
# Use new process group for child to prevent terminals
# from sending SIGINT etc.
preexec_fn = None
creationflags = 0
if os.name == "posix":
# Process group is also used to identify child processes
popen_kwargs["preexec_fn"] = os.setpgrp
preexec_fn = os.setpgrp
elif sys.platform == "win32":
popen_kwargs["creationflags"] = (
subprocess.CREATE_NEW_PROCESS_GROUP)
command = hook % {"user": shlex.quote(user or "Anonymous")}
creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
command = self._hook % {
"user": shlex.quote(user or "Anonymous")}
logger.debug("Running storage hook")
p = subprocess.Popen(command, **popen_kwargs)
p = subprocess.Popen(
command, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE if debug else subprocess.DEVNULL,
stderr=subprocess.PIPE if debug else subprocess.DEVNULL,
shell=True, universal_newlines=True, preexec_fn=preexec_fn,
cwd=self._filesystem_folder, creationflags=creationflags)
try:
stdout_data, stderr_data = p.communicate()
except BaseException: # e.g. KeyboardInterrupt or SystemExit