diff --git a/radicale/app/delete.py b/radicale/app/delete.py
index a6fea029..4980a8ad 100644
--- a/radicale/app/delete.py
+++ b/radicale/app/delete.py
@@ -1,4 +1,4 @@
-# This file is part of Radicale Server - Calendar Server
+# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
@@ -17,28 +17,31 @@
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see .
+import xml.etree.ElementTree as ET
from http import client
-from xml.etree import ElementTree as ET
+from typing import Optional
-from radicale import app, httputils, storage, xmlutils
+from radicale import httputils, storage, types, xmlutils
+from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
-def xml_delete(base_prefix, path, collection, href=None):
+def xml_delete(base_prefix: str, path: str, collection: storage.BaseCollection,
+ item_href: Optional[str] = None) -> ET.Element:
"""Read and answer DELETE requests.
Read rfc4918-9.6 for info.
"""
- collection.delete(href)
+ collection.delete(item_href)
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
response = ET.Element(xmlutils.make_clark("D:response"))
multistatus.append(response)
- href = ET.Element(xmlutils.make_clark("D:href"))
- href.text = xmlutils.make_href(base_prefix, path)
- response.append(href)
+ href_element = ET.Element(xmlutils.make_clark("D:href"))
+ href_element.text = xmlutils.make_href(base_prefix, path)
+ response.append(href_element)
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(200)
@@ -47,14 +50,16 @@ def xml_delete(base_prefix, path, collection, href=None):
return multistatus
-class ApplicationDeleteMixin:
- def do_DELETE(self, environ, base_prefix, path, user):
+class ApplicationPartDelete(ApplicationBase):
+
+ def do_DELETE(self, environ: types.WSGIEnviron, base_prefix: str,
+ path: str, user: str) -> types.WSGIResponse:
"""Manage DELETE request."""
- access = app.Access(self._rights, user, path)
+ access = Access(self._rights, user, path)
if not access.check("w"):
return httputils.NOT_ALLOWED
with self._storage.acquire_lock("w", user):
- item = next(self._storage.discover(path), None)
+ item = next(iter(self._storage.discover(path)), None)
if not item:
return httputils.NOT_FOUND
if not access.check("w", item):
@@ -75,6 +80,8 @@ class ApplicationDeleteMixin:
)
xml_answer = xml_delete(base_prefix, path, item)
else:
+ assert item.collection is not None
+ assert item.href is not None
hook_notification_item_list.append(
HookNotificationItem(
HookNotificationItemTypes.DELETE,
diff --git a/radicale/app/proppatch.py b/radicale/app/proppatch.py
index d2f7a03d..70eff8b6 100644
--- a/radicale/app/proppatch.py
+++ b/radicale/app/proppatch.py
@@ -1,4 +1,4 @@
-# This file is part of Radicale Server - Calendar Server
+# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
@@ -18,92 +18,71 @@
# along with Radicale. If not, see .
import socket
+import xml.etree.ElementTree as ET
from http import client
-from xml.etree import ElementTree as ET
+from typing import Dict, Optional, cast
-import defusedxml.ElementTree as DefusedET
-
-from radicale import app, httputils
-from radicale import item as radicale_item
-from radicale import storage, xmlutils
+import radicale.item as radicale_item
+from radicale import httputils, storage, types, xmlutils
+from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
-def xml_add_propstat_to(element, tag, status_number):
- """Add a PROPSTAT response structure to an element.
-
- The PROPSTAT answer structure is defined in rfc4918-9.1. It is added to the
- given ``element``, for the following ``tag`` with the given
- ``status_number``.
-
- """
- propstat = ET.Element(xmlutils.make_clark("D:propstat"))
- element.append(propstat)
-
- prop = ET.Element(xmlutils.make_clark("D:prop"))
- propstat.append(prop)
-
- clark_tag = xmlutils.make_clark(tag)
- prop_tag = ET.Element(clark_tag)
- prop.append(prop_tag)
-
- status = ET.Element(xmlutils.make_clark("D:status"))
- status.text = xmlutils.make_response(status_number)
- propstat.append(status)
-
-
-def xml_proppatch(base_prefix, path, xml_request, collection):
+def xml_proppatch(base_prefix: str, path: str,
+ xml_request: Optional[ET.Element],
+ collection: storage.BaseCollection) -> ET.Element:
"""Read and answer PROPPATCH requests.
Read rfc4918-9.2 for info.
"""
- props_to_set = xmlutils.props_from_request(xml_request, actions=("set",))
- props_to_remove = xmlutils.props_from_request(xml_request,
- actions=("remove",))
-
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
response = ET.Element(xmlutils.make_clark("D:response"))
multistatus.append(response)
-
href = ET.Element(xmlutils.make_clark("D:href"))
href.text = xmlutils.make_href(base_prefix, path)
response.append(href)
+ # Create D:propstat element for props with status 200 OK
+ propstat = ET.Element(xmlutils.make_clark("D:propstat"))
+ status = ET.Element(xmlutils.make_clark("D:status"))
+ status.text = xmlutils.make_response(200)
+ props_ok = ET.Element(xmlutils.make_clark("D:prop"))
+ propstat.append(props_ok)
+ propstat.append(status)
+ response.append(propstat)
- new_props = collection.get_meta()
- for short_name, value in props_to_set.items():
- new_props[short_name] = value
- xml_add_propstat_to(response, short_name, 200)
- for short_name in props_to_remove:
- try:
- del new_props[short_name]
- except KeyError:
- pass
- xml_add_propstat_to(response, short_name, 200)
- radicale_item.check_and_sanitize_props(new_props)
- collection.set_meta(new_props)
+ props_with_remove = xmlutils.props_from_request(xml_request)
+ all_props_with_remove = cast(Dict[str, Optional[str]],
+ dict(collection.get_meta()))
+ all_props_with_remove.update(props_with_remove)
+ all_props = radicale_item.check_and_sanitize_props(all_props_with_remove)
+ collection.set_meta(all_props)
+ for short_name in props_with_remove:
+ props_ok.append(ET.Element(xmlutils.make_clark(short_name)))
return multistatus
-class ApplicationProppatchMixin:
- def do_PROPPATCH(self, environ, base_prefix, path, user):
+class ApplicationPartProppatch(ApplicationBase):
+
+ def do_PROPPATCH(self, environ: types.WSGIEnviron, base_prefix: str,
+ path: str, user: str) -> types.WSGIResponse:
"""Manage PROPPATCH request."""
- access = app.Access(self._rights, user, path)
+ access = Access(self._rights, user, path)
if not access.check("w"):
return httputils.NOT_ALLOWED
try:
- xml_content = self._read_xml_content(environ)
+ xml_content = self._read_xml_request_body(environ)
except RuntimeError as e:
logger.warning(
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
except socket.timeout:
- logger.debug("client timed out", exc_info=True)
+ logger.debug("Client timed out", exc_info=True)
return httputils.REQUEST_TIMEOUT
with self._storage.acquire_lock("w", user):
- item = next(self._storage.discover(path), None)
+ item = next(iter(self._storage.discover(path)), None)
if not item:
return httputils.NOT_FOUND
if not access.check("w", item):
@@ -129,5 +108,4 @@ class ApplicationProppatchMixin:
logger.warning(
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
- return (client.MULTI_STATUS, headers,
- self._write_xml_content(xml_answer))
+ return client.MULTI_STATUS, headers, self._xml_response(xml_answer)
diff --git a/radicale/app/put.py b/radicale/app/put.py
index c08387ce..3c394ce5 100644
--- a/radicale/app/put.py
+++ b/radicale/app/put.py
@@ -1,4 +1,4 @@
-# This file is part of Radicale Server - Calendar Server
+# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
@@ -22,21 +22,31 @@ import posixpath
import socket
import sys
from http import client
+from typing import Iterator, List, Mapping, MutableMapping, Optional, Tuple
import vobject
-from radicale import app, httputils
-from radicale import item as radicale_item
-from radicale import pathutils, rights, storage, xmlutils
+import radicale.item as radicale_item
+from radicale import httputils, pathutils, rights, storage, types, xmlutils
+from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
+from types import TracebackType
-MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()}
+MIMETYPE_TAGS: Mapping[str, str] = {value: key for key, value in
+ xmlutils.MIMETYPES.items()}
-def prepare(vobject_items, path, content_type, permissions, parent_permissions,
- tag=None, write_whole_collection=None):
- if (write_whole_collection or permissions and not parent_permissions):
+def prepare(vobject_items: List[vobject.base.Component], path: str,
+ content_type: str, permission: bool, parent_permission: bool,
+ tag: Optional[str] = None,
+ write_whole_collection: Optional[bool] = None) -> Tuple[
+ Iterator[radicale_item.Item], # items
+ Optional[str], # tag
+ Optional[bool], # write_whole_collection
+ Optional[MutableMapping[str, str]], # props
+ Optional[Tuple[type, BaseException, Optional[TracebackType]]]]:
+ if (write_whole_collection or permission and not parent_permission):
write_whole_collection = True
tag = radicale_item.predict_tag_of_whole_collection(
vobject_items, MIMETYPE_TAGS.get(content_type))
@@ -44,20 +54,20 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
raise ValueError("Can't determine collection tag")
collection_path = pathutils.strip_path(path)
elif (write_whole_collection is not None and not write_whole_collection or
- not permissions and parent_permissions):
+ not permission and parent_permission):
write_whole_collection = False
if tag is None:
tag = radicale_item.predict_tag_of_parent_collection(vobject_items)
collection_path = posixpath.dirname(pathutils.strip_path(path))
- props = None
+ props: Optional[MutableMapping[str, str]] = None
stored_exc_info = None
items = []
try:
- if tag:
+ if tag and write_whole_collection is not None:
radicale_item.check_and_sanitize_items(
vobject_items, is_collection=write_whole_collection, tag=tag)
if write_whole_collection and tag == "VCALENDAR":
- vobject_components = []
+ vobject_components: List[vobject.base.Component] = []
vobject_item, = vobject_items
for content in ("vevent", "vtodo", "vjournal"):
vobject_components.extend(
@@ -99,37 +109,40 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
caldesc = vobject_items[0].x_wr_caldesc.value
if caldesc:
props["C:calendar-description"] = caldesc
- radicale_item.check_and_sanitize_props(props)
+ props = radicale_item.check_and_sanitize_props(props)
except Exception:
- stored_exc_info = sys.exc_info()
+ exc_info_or_none_tuple = sys.exc_info()
+ assert exc_info_or_none_tuple[0] is not None
+ stored_exc_info = exc_info_or_none_tuple
- # Use generator for items and delete references to free memory
- # early
- def items_generator():
+ # Use iterator for items and delete references to free memory early
+ def items_iter() -> Iterator[radicale_item.Item]:
while items:
yield items.pop(0)
- return (items_generator(), tag, write_whole_collection, props,
- stored_exc_info)
+ return items_iter(), tag, write_whole_collection, props, stored_exc_info
-class ApplicationPutMixin:
- def do_PUT(self, environ, base_prefix, path, user):
+class ApplicationPartPut(ApplicationBase):
+
+ def do_PUT(self, environ: types.WSGIEnviron, base_prefix: str,
+ path: str, user: str) -> types.WSGIResponse:
"""Manage PUT request."""
- access = app.Access(self._rights, user, path)
+ access = Access(self._rights, user, path)
if not access.check("w"):
return httputils.NOT_ALLOWED
try:
- content = self._read_content(environ)
+ content = httputils.read_request_body(self.configuration, environ)
except RuntimeError as e:
logger.warning("Bad PUT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
except socket.timeout:
- logger.debug("client timed out", exc_info=True)
+ logger.debug("Client timed out", exc_info=True)
return httputils.REQUEST_TIMEOUT
# Prepare before locking
- content_type = environ.get("CONTENT_TYPE", "").split(";")[0]
+ content_type = environ.get("CONTENT_TYPE", "").split(";",
+ maxsplit=1)[0]
try:
- vobject_items = tuple(vobject.readComponents(content or ""))
+ vobject_items = radicale_item.read_components(content or "")
except Exception as e:
logger.warning(
"Bad PUT request on %r: %s", path, e, exc_info=True)
@@ -141,20 +154,20 @@ class ApplicationPutMixin:
bool(rights.intersect(access.parent_permissions, "w")))
with self._storage.acquire_lock("w", user):
- item = next(self._storage.discover(path), None)
- parent_item = next(
- self._storage.discover(access.parent_path), None)
- if not parent_item:
+ item = next(iter(self._storage.discover(path)), None)
+ parent_item = next(iter(
+ self._storage.discover(access.parent_path)), None)
+ if not isinstance(parent_item, storage.BaseCollection):
return httputils.CONFLICT
write_whole_collection = (
isinstance(item, storage.BaseCollection) or
- not parent_item.get_meta("tag"))
+ not parent_item.tag)
if write_whole_collection:
tag = prepared_tag
else:
- tag = parent_item.get_meta("tag")
+ tag = parent_item.tag
if write_whole_collection:
if ("w" if tag else "W") not in access.permissions:
@@ -206,6 +219,7 @@ class ApplicationPutMixin:
"Bad PUT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
else:
+ assert not isinstance(item, storage.BaseCollection)
prepared_item, = prepared_items
if (item and item.uid != prepared_item.uid or
not item and parent_item.has_uid(prepared_item.uid)):