1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-01 18:18:31 +00:00

More type hints

This commit is contained in:
Unrud 2021-07-26 20:56:46 +02:00 committed by Unrud
parent 12fe5ce637
commit cecb17df03
51 changed files with 1374 additions and 957 deletions

View file

@ -23,13 +23,17 @@ import posixpath
import socket
import xml.etree.ElementTree as ET
from http import client
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple
from radicale import app, httputils, pathutils, rights, storage, xmlutils
from radicale import httputils, pathutils, rights, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.log import logger
def xml_propfind(base_prefix, path, xml_request, allowed_items, user,
encoding):
def xml_propfind(base_prefix: str, path: str,
xml_request: Optional[ET.Element],
allowed_items: Iterable[Tuple[types.CollectionOrItem, str]],
user: str, encoding: str) -> Optional[ET.Element]:
"""Read and answer PROPFIND requests.
Read rfc4918-9.1 for info.
@ -43,7 +47,7 @@ def xml_propfind(base_prefix, path, xml_request, allowed_items, user,
top_element = (xml_request[0] if xml_request is not None else
ET.Element(xmlutils.make_clark("D:allprop")))
props = ()
props: List[str] = []
allprop = False
propname = False
if top_element.tag == xmlutils.make_clark("D:allprop"):
@ -51,13 +55,13 @@ def xml_propfind(base_prefix, path, xml_request, allowed_items, user,
elif top_element.tag == xmlutils.make_clark("D:propname"):
propname = True
elif top_element.tag == xmlutils.make_clark("D:prop"):
props = [prop.tag for prop in top_element]
props.extend(prop.tag for prop in top_element)
if xmlutils.make_clark("D:current-user-principal") in props and not user:
# Ask for authentication
# Returning the DAV:unauthenticated pseudo-principal as specified in
# RFC 5397 doesn't seem to work with DAVx5.
return client.FORBIDDEN, None
return None
# Writing answer
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
@ -68,29 +72,32 @@ def xml_propfind(base_prefix, path, xml_request, allowed_items, user,
base_prefix, path, item, props, user, encoding, write=write,
allprop=allprop, propname=propname))
return client.MULTI_STATUS, multistatus
return multistatus
def xml_propfind_response(base_prefix, path, item, props, user, encoding,
write=False, propname=False, allprop=False):
def xml_propfind_response(
base_prefix: str, path: str, item: types.CollectionOrItem,
props: Sequence[str], user: str, encoding: str, write: bool = False,
propname: bool = False, allprop: bool = False) -> ET.Element:
"""Build and return a PROPFIND response."""
if propname and allprop or (props and (propname or allprop)):
raise ValueError("Only use one of props, propname and allprops")
is_collection = isinstance(item, storage.BaseCollection)
if is_collection:
is_leaf = item.get_meta("tag") in ("VADDRESSBOOK", "VCALENDAR")
collection = item
else:
collection = item.collection
response = ET.Element(xmlutils.make_clark("D:response"))
href = ET.Element(xmlutils.make_clark("D:href"))
if is_collection:
# Some clients expect collections to end with /
if isinstance(item, storage.BaseCollection):
is_collection = True
is_leaf = item.tag in ("VADDRESSBOOK", "VCALENDAR")
collection = item
# Some clients expect collections to end with `/`
uri = pathutils.unstrip_path(item.path, True)
else:
uri = pathutils.unstrip_path(
posixpath.join(collection.path, item.href))
is_collection = is_leaf = False
assert item.collection is not None
assert item.href
collection = item.collection
uri = pathutils.unstrip_path(posixpath.join(
collection.path, item.href))
response = ET.Element(xmlutils.make_clark("D:response"))
href = ET.Element(xmlutils.make_clark("D:href"))
href.text = xmlutils.make_href(base_prefix, uri)
response.append(href)
@ -120,12 +127,12 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
if is_leaf:
props.append(xmlutils.make_clark("D:displayname"))
props.append(xmlutils.make_clark("D:sync-token"))
if collection.get_meta("tag") == "VCALENDAR":
if collection.tag == "VCALENDAR":
props.append(xmlutils.make_clark("CS:getctag"))
props.append(
xmlutils.make_clark("C:supported-calendar-component-set"))
meta = item.get_meta()
meta = collection.get_meta()
for tag in meta:
if tag == "tag":
continue
@ -133,11 +140,11 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
if clark_tag not in props:
props.append(clark_tag)
responses = collections.defaultdict(list)
responses: Dict[int, List[ET.Element]] = collections.defaultdict(list)
if propname:
for tag in props:
responses[200].append(ET.Element(tag))
props = ()
props = []
for tag in props:
element = ET.Element(tag)
is404 = False
@ -159,18 +166,18 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
xmlutils.make_clark("D:principal-URL"),
xmlutils.make_clark("CR:addressbook-home-set"),
xmlutils.make_clark("C:calendar-home-set")) and
collection.is_principal and is_collection):
is_collection and collection.is_principal):
child_element = ET.Element(xmlutils.make_clark("D:href"))
child_element.text = xmlutils.make_href(base_prefix, path)
element.append(child_element)
elif tag == xmlutils.make_clark("C:supported-calendar-component-set"):
human_tag = xmlutils.make_human_tag(tag)
if is_collection and is_leaf:
meta = item.get_meta(human_tag)
if meta:
components = meta.split(",")
components_text = collection.get_meta(human_tag)
if components_text:
components = components_text.split(",")
else:
components = ("VTODO", "VEVENT", "VJOURNAL")
components = ["VTODO", "VEVENT", "VJOURNAL"]
for component in components:
comp = ET.Element(xmlutils.make_clark("C:comp"))
comp.set("name", component)
@ -205,10 +212,10 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
"D:principal-property-search"]
if is_collection and is_leaf:
reports.append("D:sync-collection")
if item.get_meta("tag") == "VADDRESSBOOK":
if collection.tag == "VADDRESSBOOK":
reports.append("CR:addressbook-multiget")
reports.append("CR:addressbook-query")
elif item.get_meta("tag") == "VCALENDAR":
elif collection.tag == "VCALENDAR":
reports.append("C:calendar-multiget")
reports.append("C:calendar-query")
for human_tag in reports:
@ -234,20 +241,21 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
elif is_collection:
if tag == xmlutils.make_clark("D:getcontenttype"):
if is_leaf:
element.text = xmlutils.MIMETYPES[item.get_meta("tag")]
element.text = xmlutils.MIMETYPES[
collection.tag]
else:
is404 = True
elif tag == xmlutils.make_clark("D:resourcetype"):
if item.is_principal:
if collection.is_principal:
child_element = ET.Element(
xmlutils.make_clark("D:principal"))
element.append(child_element)
if is_leaf:
if item.get_meta("tag") == "VADDRESSBOOK":
if collection.tag == "VADDRESSBOOK":
child_element = ET.Element(
xmlutils.make_clark("CR:addressbook"))
element.append(child_element)
elif item.get_meta("tag") == "VCALENDAR":
elif collection.tag == "VCALENDAR":
child_element = ET.Element(
xmlutils.make_clark("C:calendar"))
element.append(child_element)
@ -255,38 +263,39 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
element.append(child_element)
elif tag == xmlutils.make_clark("RADICALE:displayname"):
# Only for internal use by the web interface
displayname = item.get_meta("D:displayname")
displayname = collection.get_meta("D:displayname")
if displayname is not None:
element.text = displayname
else:
is404 = True
elif tag == xmlutils.make_clark("D:displayname"):
displayname = item.get_meta("D:displayname")
displayname = collection.get_meta("D:displayname")
if not displayname and is_leaf:
displayname = item.path
displayname = collection.path
if displayname is not None:
element.text = displayname
else:
is404 = True
elif tag == xmlutils.make_clark("CS:getctag"):
if is_leaf:
element.text = item.etag
element.text = collection.etag
else:
is404 = True
elif tag == xmlutils.make_clark("D:sync-token"):
if is_leaf:
element.text, _ = item.sync()
element.text, _ = collection.sync()
else:
is404 = True
else:
human_tag = xmlutils.make_human_tag(tag)
meta = item.get_meta(human_tag)
if meta is not None:
element.text = meta
tag_text = collection.get_meta(human_tag)
if tag_text is not None:
element.text = tag_text
else:
is404 = True
# Not for collections
elif tag == xmlutils.make_clark("D:getcontenttype"):
assert not isinstance(item, storage.BaseCollection)
element.text = xmlutils.get_content_type(item, encoding)
elif tag == xmlutils.make_clark("D:resourcetype"):
# resourcetype must be returned empty for non-collection elements
@ -311,13 +320,16 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
return response
class ApplicationPropfindMixin:
def _collect_allowed_items(self, items, user):
class ApplicationPartPropfind(ApplicationBase):
def _collect_allowed_items(
self, items: Iterable[types.CollectionOrItem], user: str
) -> Iterator[Tuple[types.CollectionOrItem, str]]:
"""Get items from request that user is allowed to access."""
for item in items:
if isinstance(item, storage.BaseCollection):
path = pathutils.unstrip_path(item.path, True)
if item.get_meta("tag"):
if item.tag:
permissions = rights.intersect(
self._rights.authorization(user, path), "rw")
target = "collection with tag %r" % item.path
@ -326,6 +338,7 @@ class ApplicationPropfindMixin:
self._rights.authorization(user, path), "RW")
target = "collection %r" % item.path
else:
assert item.collection is not None
path = pathutils.unstrip_path(item.collection.path, True)
permissions = rights.intersect(
self._rights.authorization(user, path), "rw")
@ -345,9 +358,10 @@ class ApplicationPropfindMixin:
if permission:
yield item, permission
def do_PROPFIND(self, environ, base_prefix, path, user):
def do_PROPFIND(self, environ: types.WSGIEnviron, base_prefix: str,
path: str, user: str) -> types.WSGIResponse:
"""Manage PROPFIND request."""
access = app.Access(self._rights, user, path)
access = Access(self._rights, user, path)
if not access.check("r"):
return httputils.NOT_ALLOWED
try:
@ -360,22 +374,21 @@ class ApplicationPropfindMixin:
logger.debug("Client timed out", exc_info=True)
return httputils.REQUEST_TIMEOUT
with self._storage.acquire_lock("r", user):
items = self._storage.discover(
path, environ.get("HTTP_DEPTH", "0"))
items_iter = iter(self._storage.discover(
path, environ.get("HTTP_DEPTH", "0")))
# take root item for rights checking
item = next(items, None)
item = next(items_iter, None)
if not item:
return httputils.NOT_FOUND
if not access.check("r", item):
return httputils.NOT_ALLOWED
# put item back
items = itertools.chain([item], items)
allowed_items = self._collect_allowed_items(items, user)
items_iter = itertools.chain([item], items_iter)
allowed_items = self._collect_allowed_items(items_iter, user)
headers = {"DAV": httputils.DAV_HEADERS,
"Content-Type": "text/xml; charset=%s" % self._encoding}
status, xml_answer = xml_propfind(
base_prefix, path, xml_content, allowed_items, user,
self._encoding)
if status == client.FORBIDDEN and xml_answer is None:
xml_answer = xml_propfind(base_prefix, path, xml_content,
allowed_items, user, self._encoding)
if xml_answer is None:
return httputils.NOT_ALLOWED
return status, headers, self._xml_response(xml_answer)
return client.MULTI_STATUS, headers, self._xml_response(xml_answer)