1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-09-12 20:30:57 +00:00

Type hints for multifilesystem

This commit is contained in:
Unrud 2021-07-26 20:56:47 +02:00 committed by Unrud
parent cecb17df03
commit 698ae875ce
14 changed files with 428 additions and 246 deletions

View file

@ -18,18 +18,33 @@
import json
import os
from typing import Mapping, Optional, TextIO, Union, cast, overload
import radicale.item as radicale_item
from radicale.storage import multifilesystem
from radicale.storage.multifilesystem.base import CollectionBase
class CollectionMetaMixin:
def __init__(self):
super().__init__()
class CollectionPartMeta(CollectionBase):
_meta_cache: Optional[Mapping[str, str]]
_props_path: str
def __init__(self, storage_: "multifilesystem.Storage", path: str,
filesystem_path: Optional[str] = None) -> None:
super().__init__(storage_, path, filesystem_path)
self._meta_cache = None
self._props_path = os.path.join(
self._filesystem_path, ".Radicale.props")
def get_meta(self, key=None):
@overload
def get_meta(self, key: None = None) -> Mapping[str, str]: ...
@overload
def get_meta(self, key: str) -> Optional[str]: ...
def get_meta(self, key: Optional[str] = None) -> Union[Mapping[str, str],
Optional[str]]:
# reuse cached value if the storage is read-only
if self._storage._lock.locked == "w" or self._meta_cache is None:
try:
@ -45,6 +60,7 @@ class CollectionMetaMixin:
"%r: %s" % (self.path, e)) from e
return self._meta_cache if key is None else self._meta_cache.get(key)
def set_meta(self, props):
with self._atomic_write(self._props_path, "w") as f:
def set_meta(self, props: Mapping[str, str]) -> None:
with self._atomic_write(self._props_path, "w") as fo:
f = cast(TextIO, fo)
json.dump(props, f, sort_keys=True)