mirror of
https://github.com/Kozea/Radicale.git
synced 2025-09-15 20:36:55 +00:00
More type hints
This commit is contained in:
parent
11f7559ef3
commit
c93d7b8715
51 changed files with 1374 additions and 957 deletions
|
@ -27,27 +27,35 @@ import binascii
|
|||
import math
|
||||
import os
|
||||
import sys
|
||||
from datetime import timedelta
|
||||
from datetime import datetime, timedelta
|
||||
from hashlib import sha256
|
||||
from typing import (Any, Callable, List, MutableMapping, Optional, Sequence,
|
||||
Tuple)
|
||||
|
||||
import vobject
|
||||
|
||||
from radicale import storage # noqa:F401
|
||||
from radicale import pathutils
|
||||
from radicale.item import filter as radicale_filter
|
||||
from radicale.log import logger
|
||||
|
||||
|
||||
def predict_tag_of_parent_collection(vobject_items):
|
||||
def predict_tag_of_parent_collection(
|
||||
vobject_items: Sequence[vobject.base.Component]) -> Optional[str]:
|
||||
"""Returns the predicted tag or `None`"""
|
||||
if len(vobject_items) != 1:
|
||||
return ""
|
||||
return None
|
||||
if vobject_items[0].name == "VCALENDAR":
|
||||
return "VCALENDAR"
|
||||
if vobject_items[0].name in ("VCARD", "VLIST"):
|
||||
return "VADDRESSBOOK"
|
||||
return ""
|
||||
return None
|
||||
|
||||
|
||||
def predict_tag_of_whole_collection(vobject_items, fallback_tag=None):
|
||||
def predict_tag_of_whole_collection(
|
||||
vobject_items: Sequence[vobject.base.Component],
|
||||
fallback_tag: Optional[str] = None) -> Optional[str]:
|
||||
"""Returns the predicted tag or `fallback_tag`"""
|
||||
if vobject_items and vobject_items[0].name == "VCALENDAR":
|
||||
return "VCALENDAR"
|
||||
if vobject_items and vobject_items[0].name in ("VCARD", "VLIST"):
|
||||
|
@ -58,9 +66,13 @@ def predict_tag_of_whole_collection(vobject_items, fallback_tag=None):
|
|||
return fallback_tag
|
||||
|
||||
|
||||
def check_and_sanitize_items(vobject_items, is_collection=False, tag=None):
|
||||
def check_and_sanitize_items(
|
||||
vobject_items: List[vobject.base.Component],
|
||||
is_collection: bool = False, tag: str = "") -> None:
|
||||
"""Check vobject items for common errors and add missing UIDs.
|
||||
|
||||
Modifies the list `vobject_items`.
|
||||
|
||||
``is_collection`` indicates that vobject_item contains unrelated
|
||||
components.
|
||||
|
||||
|
@ -169,9 +181,14 @@ def check_and_sanitize_items(vobject_items, is_collection=False, tag=None):
|
|||
(i.name, repr(tag) if tag else "generic"))
|
||||
|
||||
|
||||
def check_and_sanitize_props(props):
|
||||
"""Check collection properties for common errors."""
|
||||
for k, v in props.copy().items(): # Make copy to be able to delete items
|
||||
def check_and_sanitize_props(props: MutableMapping[Any, Any]
|
||||
) -> MutableMapping[str, str]:
|
||||
"""Check collection properties for common errors.
|
||||
|
||||
Modifies the dict `props`.
|
||||
|
||||
"""
|
||||
for k, v in list(props.items()): # Make copy to be able to delete items
|
||||
if not isinstance(k, str):
|
||||
raise ValueError("Key must be %r not %r: %r" % (
|
||||
str.__name__, type(k).__name__, k))
|
||||
|
@ -182,14 +199,13 @@ def check_and_sanitize_props(props):
|
|||
raise ValueError("Value of %r must be %r not %r: %r" % (
|
||||
k, str.__name__, type(v).__name__, v))
|
||||
if k == "tag":
|
||||
if not v:
|
||||
del props[k]
|
||||
continue
|
||||
if v not in ("VCALENDAR", "VADDRESSBOOK"):
|
||||
if v not in ("", "VCALENDAR", "VADDRESSBOOK"):
|
||||
raise ValueError("Unsupported collection tag: %r" % v)
|
||||
return props
|
||||
|
||||
|
||||
def find_available_uid(exists_fn, suffix=""):
|
||||
def find_available_uid(exists_fn: Callable[[str], bool], suffix: str = ""
|
||||
) -> str:
|
||||
"""Generate a pseudo-random UID"""
|
||||
# Prevent infinite loop
|
||||
for _ in range(1000):
|
||||
|
@ -202,7 +218,7 @@ def find_available_uid(exists_fn, suffix=""):
|
|||
raise RuntimeError("No unique random sequence found")
|
||||
|
||||
|
||||
def get_etag(text):
|
||||
def get_etag(text: str) -> str:
|
||||
"""Etag from collection or item.
|
||||
|
||||
Encoded as quoted-string (see RFC 2616).
|
||||
|
@ -213,13 +229,13 @@ def get_etag(text):
|
|||
return '"%s"' % etag.hexdigest()
|
||||
|
||||
|
||||
def get_uid(vobject_component):
|
||||
def get_uid(vobject_component: vobject.base.Component) -> str:
|
||||
"""UID value of an item if defined."""
|
||||
return (vobject_component.uid.value
|
||||
if hasattr(vobject_component, "uid") else None)
|
||||
return (vobject_component.uid.value or ""
|
||||
if hasattr(vobject_component, "uid") else "")
|
||||
|
||||
|
||||
def get_uid_from_object(vobject_item):
|
||||
def get_uid_from_object(vobject_item: vobject.base.Component) -> str:
|
||||
"""UID value of an calendar/addressbook object."""
|
||||
if vobject_item.name == "VCALENDAR":
|
||||
if hasattr(vobject_item, "vevent"):
|
||||
|
@ -230,10 +246,10 @@ def get_uid_from_object(vobject_item):
|
|||
return get_uid(vobject_item.vtodo)
|
||||
elif vobject_item.name == "VCARD":
|
||||
return get_uid(vobject_item)
|
||||
return None
|
||||
return ""
|
||||
|
||||
|
||||
def find_tag(vobject_item):
|
||||
def find_tag(vobject_item: vobject.base.Component) -> str:
|
||||
"""Find component name from ``vobject_item``."""
|
||||
if vobject_item.name == "VCALENDAR":
|
||||
for component in vobject_item.components():
|
||||
|
@ -242,22 +258,24 @@ def find_tag(vobject_item):
|
|||
return ""
|
||||
|
||||
|
||||
def find_tag_and_time_range(vobject_item):
|
||||
"""Find component name and enclosing time range from ``vobject item``.
|
||||
def find_time_range(vobject_item: vobject.base.Component, tag: str
|
||||
) -> Tuple[int, int]:
|
||||
"""Find enclosing time range from ``vobject item``.
|
||||
|
||||
Returns a tuple (``tag``, ``start``, ``end``) where ``tag`` is a string
|
||||
and ``start`` and ``end`` are POSIX timestamps (as int).
|
||||
``tag`` must be set to the return value of ``find_tag``.
|
||||
|
||||
Returns a tuple (``start``, ``end``) where ``start`` and ``end`` are
|
||||
POSIX timestamps.
|
||||
|
||||
This is intened to be used for matching against simplified prefilters.
|
||||
|
||||
"""
|
||||
tag = find_tag(vobject_item)
|
||||
if not tag:
|
||||
return (
|
||||
tag, radicale_filter.TIMESTAMP_MIN, radicale_filter.TIMESTAMP_MAX)
|
||||
return radicale_filter.TIMESTAMP_MIN, radicale_filter.TIMESTAMP_MAX
|
||||
start = end = None
|
||||
|
||||
def range_fn(range_start, range_end, is_recurrence):
|
||||
def range_fn(range_start: datetime, range_end: datetime,
|
||||
is_recurrence: bool) -> bool:
|
||||
nonlocal start, end
|
||||
if start is None or range_start < start:
|
||||
start = range_start
|
||||
|
@ -265,7 +283,7 @@ def find_tag_and_time_range(vobject_item):
|
|||
end = range_end
|
||||
return False
|
||||
|
||||
def infinity_fn(range_start):
|
||||
def infinity_fn(range_start: datetime) -> bool:
|
||||
nonlocal start, end
|
||||
if start is None or range_start < start:
|
||||
start = range_start
|
||||
|
@ -278,7 +296,7 @@ def find_tag_and_time_range(vobject_item):
|
|||
if end is None:
|
||||
end = radicale_filter.DATETIME_MAX
|
||||
try:
|
||||
return tag, math.floor(start.timestamp()), math.ceil(end.timestamp())
|
||||
return math.floor(start.timestamp()), math.ceil(end.timestamp())
|
||||
except ValueError as e:
|
||||
if str(e) == ("offset must be a timedelta representing a whole "
|
||||
"number of minutes") and sys.version_info < (3, 6):
|
||||
|
@ -289,10 +307,31 @@ def find_tag_and_time_range(vobject_item):
|
|||
class Item:
|
||||
"""Class for address book and calendar entries."""
|
||||
|
||||
def __init__(self, collection_path=None, collection=None,
|
||||
vobject_item=None, href=None, last_modified=None, text=None,
|
||||
etag=None, uid=None, name=None, component_name=None,
|
||||
time_range=None):
|
||||
collection: Optional["storage.BaseCollection"]
|
||||
href: Optional[str]
|
||||
last_modified: Optional[str]
|
||||
|
||||
_collection_path: str
|
||||
_text: Optional[str]
|
||||
_vobject_item: Optional[vobject.base.Component]
|
||||
_etag: Optional[str]
|
||||
_uid: Optional[str]
|
||||
_name: Optional[str]
|
||||
_component_name: Optional[str]
|
||||
_time_range: Optional[Tuple[int, int]]
|
||||
|
||||
def __init__(self,
|
||||
collection_path: Optional[str] = None,
|
||||
collection: Optional["storage.BaseCollection"] = None,
|
||||
vobject_item: Optional[vobject.base.Component] = None,
|
||||
href: Optional[str] = None,
|
||||
last_modified: Optional[str] = None,
|
||||
text: Optional[str] = None,
|
||||
etag: Optional[str] = None,
|
||||
uid: Optional[str] = None,
|
||||
name: Optional[str] = None,
|
||||
component_name: Optional[str] = None,
|
||||
time_range: Optional[Tuple[int, int]] = None):
|
||||
"""Initialize an item.
|
||||
|
||||
``collection_path`` the path of the parent collection (optional if
|
||||
|
@ -318,8 +357,7 @@ class Item:
|
|||
``component_name`` the name of the primary component (optional).
|
||||
See ``find_tag``.
|
||||
|
||||
``time_range`` the enclosing time range.
|
||||
See ``find_tag_and_time_range``.
|
||||
``time_range`` the enclosing time range. See ``find_time_range``.
|
||||
|
||||
"""
|
||||
if text is None and vobject_item is None:
|
||||
|
@ -344,7 +382,7 @@ class Item:
|
|||
self._component_name = component_name
|
||||
self._time_range = time_range
|
||||
|
||||
def serialize(self):
|
||||
def serialize(self) -> str:
|
||||
if self._text is None:
|
||||
try:
|
||||
self._text = self.vobject_item.serialize()
|
||||
|
@ -366,38 +404,38 @@ class Item:
|
|||
return self._vobject_item
|
||||
|
||||
@property
|
||||
def etag(self):
|
||||
def etag(self) -> str:
|
||||
"""Encoded as quoted-string (see RFC 2616)."""
|
||||
if self._etag is None:
|
||||
self._etag = get_etag(self.serialize())
|
||||
return self._etag
|
||||
|
||||
@property
|
||||
def uid(self):
|
||||
def uid(self) -> str:
|
||||
if self._uid is None:
|
||||
self._uid = get_uid_from_object(self.vobject_item)
|
||||
return self._uid
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
if self._name is None:
|
||||
self._name = self.vobject_item.name or ""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def component_name(self):
|
||||
if self._component_name is not None:
|
||||
return self._component_name
|
||||
return find_tag(self.vobject_item)
|
||||
def component_name(self) -> str:
|
||||
if self._component_name is None:
|
||||
self._component_name = find_tag(self.vobject_item)
|
||||
return self._component_name
|
||||
|
||||
@property
|
||||
def time_range(self):
|
||||
def time_range(self) -> Tuple[int, int]:
|
||||
if self._time_range is None:
|
||||
self._component_name, *self._time_range = (
|
||||
find_tag_and_time_range(self.vobject_item))
|
||||
self._time_range = find_time_range(
|
||||
self.vobject_item, self.component_name)
|
||||
return self._time_range
|
||||
|
||||
def prepare(self):
|
||||
def prepare(self) -> None:
|
||||
"""Fill cache with values."""
|
||||
orig_vobject_item = self._vobject_item
|
||||
self.serialize()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue