From dd723dae5d772425b2525755cbd051d24eeccfab Mon Sep 17 00:00:00 2001 From: Tuna Celik Date: Fri, 10 Feb 2023 22:10:47 +0100 Subject: [PATCH] Resolved conflicts --- radicale/app/delete.py | 31 ++++++++------ radicale/app/proppatch.py | 90 +++++++++++++++------------------------ radicale/app/put.py | 78 +++++++++++++++++++-------------- 3 files changed, 99 insertions(+), 100 deletions(-) diff --git a/radicale/app/delete.py b/radicale/app/delete.py index a6fea029..4980a8ad 100644 --- a/radicale/app/delete.py +++ b/radicale/app/delete.py @@ -1,4 +1,4 @@ -# This file is part of Radicale Server - Calendar Server +# This file is part of Radicale - CalDAV and CardDAV server # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter # Copyright © 2008-2017 Guillaume Ayoub @@ -17,28 +17,31 @@ # You should have received a copy of the GNU General Public License # along with Radicale. If not, see . +import xml.etree.ElementTree as ET from http import client -from xml.etree import ElementTree as ET +from typing import Optional -from radicale import app, httputils, storage, xmlutils +from radicale import httputils, storage, types, xmlutils +from radicale.app.base import Access, ApplicationBase from radicale.hook import HookNotificationItem, HookNotificationItemTypes -def xml_delete(base_prefix, path, collection, href=None): +def xml_delete(base_prefix: str, path: str, collection: storage.BaseCollection, + item_href: Optional[str] = None) -> ET.Element: """Read and answer DELETE requests. Read rfc4918-9.6 for info. """ - collection.delete(href) + collection.delete(item_href) multistatus = ET.Element(xmlutils.make_clark("D:multistatus")) response = ET.Element(xmlutils.make_clark("D:response")) multistatus.append(response) - href = ET.Element(xmlutils.make_clark("D:href")) - href.text = xmlutils.make_href(base_prefix, path) - response.append(href) + href_element = ET.Element(xmlutils.make_clark("D:href")) + href_element.text = xmlutils.make_href(base_prefix, path) + response.append(href_element) status = ET.Element(xmlutils.make_clark("D:status")) status.text = xmlutils.make_response(200) @@ -47,14 +50,16 @@ def xml_delete(base_prefix, path, collection, href=None): return multistatus -class ApplicationDeleteMixin: - def do_DELETE(self, environ, base_prefix, path, user): +class ApplicationPartDelete(ApplicationBase): + + def do_DELETE(self, environ: types.WSGIEnviron, base_prefix: str, + path: str, user: str) -> types.WSGIResponse: """Manage DELETE request.""" - access = app.Access(self._rights, user, path) + access = Access(self._rights, user, path) if not access.check("w"): return httputils.NOT_ALLOWED with self._storage.acquire_lock("w", user): - item = next(self._storage.discover(path), None) + item = next(iter(self._storage.discover(path)), None) if not item: return httputils.NOT_FOUND if not access.check("w", item): @@ -75,6 +80,8 @@ class ApplicationDeleteMixin: ) xml_answer = xml_delete(base_prefix, path, item) else: + assert item.collection is not None + assert item.href is not None hook_notification_item_list.append( HookNotificationItem( HookNotificationItemTypes.DELETE, diff --git a/radicale/app/proppatch.py b/radicale/app/proppatch.py index d2f7a03d..70eff8b6 100644 --- a/radicale/app/proppatch.py +++ b/radicale/app/proppatch.py @@ -1,4 +1,4 @@ -# This file is part of Radicale Server - Calendar Server +# This file is part of Radicale - CalDAV and CardDAV server # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter # Copyright © 2008-2017 Guillaume Ayoub @@ -18,92 +18,71 @@ # along with Radicale. If not, see . import socket +import xml.etree.ElementTree as ET from http import client -from xml.etree import ElementTree as ET +from typing import Dict, Optional, cast -import defusedxml.ElementTree as DefusedET - -from radicale import app, httputils -from radicale import item as radicale_item -from radicale import storage, xmlutils +import radicale.item as radicale_item +from radicale import httputils, storage, types, xmlutils +from radicale.app.base import Access, ApplicationBase from radicale.hook import HookNotificationItem, HookNotificationItemTypes from radicale.log import logger -def xml_add_propstat_to(element, tag, status_number): - """Add a PROPSTAT response structure to an element. - - The PROPSTAT answer structure is defined in rfc4918-9.1. It is added to the - given ``element``, for the following ``tag`` with the given - ``status_number``. - - """ - propstat = ET.Element(xmlutils.make_clark("D:propstat")) - element.append(propstat) - - prop = ET.Element(xmlutils.make_clark("D:prop")) - propstat.append(prop) - - clark_tag = xmlutils.make_clark(tag) - prop_tag = ET.Element(clark_tag) - prop.append(prop_tag) - - status = ET.Element(xmlutils.make_clark("D:status")) - status.text = xmlutils.make_response(status_number) - propstat.append(status) - - -def xml_proppatch(base_prefix, path, xml_request, collection): +def xml_proppatch(base_prefix: str, path: str, + xml_request: Optional[ET.Element], + collection: storage.BaseCollection) -> ET.Element: """Read and answer PROPPATCH requests. Read rfc4918-9.2 for info. """ - props_to_set = xmlutils.props_from_request(xml_request, actions=("set",)) - props_to_remove = xmlutils.props_from_request(xml_request, - actions=("remove",)) - multistatus = ET.Element(xmlutils.make_clark("D:multistatus")) response = ET.Element(xmlutils.make_clark("D:response")) multistatus.append(response) - href = ET.Element(xmlutils.make_clark("D:href")) href.text = xmlutils.make_href(base_prefix, path) response.append(href) + # Create D:propstat element for props with status 200 OK + propstat = ET.Element(xmlutils.make_clark("D:propstat")) + status = ET.Element(xmlutils.make_clark("D:status")) + status.text = xmlutils.make_response(200) + props_ok = ET.Element(xmlutils.make_clark("D:prop")) + propstat.append(props_ok) + propstat.append(status) + response.append(propstat) - new_props = collection.get_meta() - for short_name, value in props_to_set.items(): - new_props[short_name] = value - xml_add_propstat_to(response, short_name, 200) - for short_name in props_to_remove: - try: - del new_props[short_name] - except KeyError: - pass - xml_add_propstat_to(response, short_name, 200) - radicale_item.check_and_sanitize_props(new_props) - collection.set_meta(new_props) + props_with_remove = xmlutils.props_from_request(xml_request) + all_props_with_remove = cast(Dict[str, Optional[str]], + dict(collection.get_meta())) + all_props_with_remove.update(props_with_remove) + all_props = radicale_item.check_and_sanitize_props(all_props_with_remove) + collection.set_meta(all_props) + for short_name in props_with_remove: + props_ok.append(ET.Element(xmlutils.make_clark(short_name))) return multistatus -class ApplicationProppatchMixin: - def do_PROPPATCH(self, environ, base_prefix, path, user): +class ApplicationPartProppatch(ApplicationBase): + + def do_PROPPATCH(self, environ: types.WSGIEnviron, base_prefix: str, + path: str, user: str) -> types.WSGIResponse: """Manage PROPPATCH request.""" - access = app.Access(self._rights, user, path) + access = Access(self._rights, user, path) if not access.check("w"): return httputils.NOT_ALLOWED try: - xml_content = self._read_xml_content(environ) + xml_content = self._read_xml_request_body(environ) except RuntimeError as e: logger.warning( "Bad PROPPATCH request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST except socket.timeout: - logger.debug("client timed out", exc_info=True) + logger.debug("Client timed out", exc_info=True) return httputils.REQUEST_TIMEOUT with self._storage.acquire_lock("w", user): - item = next(self._storage.discover(path), None) + item = next(iter(self._storage.discover(path)), None) if not item: return httputils.NOT_FOUND if not access.check("w", item): @@ -129,5 +108,4 @@ class ApplicationProppatchMixin: logger.warning( "Bad PROPPATCH request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST - return (client.MULTI_STATUS, headers, - self._write_xml_content(xml_answer)) + return client.MULTI_STATUS, headers, self._xml_response(xml_answer) diff --git a/radicale/app/put.py b/radicale/app/put.py index c08387ce..3c394ce5 100644 --- a/radicale/app/put.py +++ b/radicale/app/put.py @@ -1,4 +1,4 @@ -# This file is part of Radicale Server - Calendar Server +# This file is part of Radicale - CalDAV and CardDAV server # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter # Copyright © 2008-2017 Guillaume Ayoub @@ -22,21 +22,31 @@ import posixpath import socket import sys from http import client +from typing import Iterator, List, Mapping, MutableMapping, Optional, Tuple import vobject -from radicale import app, httputils -from radicale import item as radicale_item -from radicale import pathutils, rights, storage, xmlutils +import radicale.item as radicale_item +from radicale import httputils, pathutils, rights, storage, types, xmlutils +from radicale.app.base import Access, ApplicationBase from radicale.hook import HookNotificationItem, HookNotificationItemTypes from radicale.log import logger +from types import TracebackType -MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()} +MIMETYPE_TAGS: Mapping[str, str] = {value: key for key, value in + xmlutils.MIMETYPES.items()} -def prepare(vobject_items, path, content_type, permissions, parent_permissions, - tag=None, write_whole_collection=None): - if (write_whole_collection or permissions and not parent_permissions): +def prepare(vobject_items: List[vobject.base.Component], path: str, + content_type: str, permission: bool, parent_permission: bool, + tag: Optional[str] = None, + write_whole_collection: Optional[bool] = None) -> Tuple[ + Iterator[radicale_item.Item], # items + Optional[str], # tag + Optional[bool], # write_whole_collection + Optional[MutableMapping[str, str]], # props + Optional[Tuple[type, BaseException, Optional[TracebackType]]]]: + if (write_whole_collection or permission and not parent_permission): write_whole_collection = True tag = radicale_item.predict_tag_of_whole_collection( vobject_items, MIMETYPE_TAGS.get(content_type)) @@ -44,20 +54,20 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions, raise ValueError("Can't determine collection tag") collection_path = pathutils.strip_path(path) elif (write_whole_collection is not None and not write_whole_collection or - not permissions and parent_permissions): + not permission and parent_permission): write_whole_collection = False if tag is None: tag = radicale_item.predict_tag_of_parent_collection(vobject_items) collection_path = posixpath.dirname(pathutils.strip_path(path)) - props = None + props: Optional[MutableMapping[str, str]] = None stored_exc_info = None items = [] try: - if tag: + if tag and write_whole_collection is not None: radicale_item.check_and_sanitize_items( vobject_items, is_collection=write_whole_collection, tag=tag) if write_whole_collection and tag == "VCALENDAR": - vobject_components = [] + vobject_components: List[vobject.base.Component] = [] vobject_item, = vobject_items for content in ("vevent", "vtodo", "vjournal"): vobject_components.extend( @@ -99,37 +109,40 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions, caldesc = vobject_items[0].x_wr_caldesc.value if caldesc: props["C:calendar-description"] = caldesc - radicale_item.check_and_sanitize_props(props) + props = radicale_item.check_and_sanitize_props(props) except Exception: - stored_exc_info = sys.exc_info() + exc_info_or_none_tuple = sys.exc_info() + assert exc_info_or_none_tuple[0] is not None + stored_exc_info = exc_info_or_none_tuple - # Use generator for items and delete references to free memory - # early - def items_generator(): + # Use iterator for items and delete references to free memory early + def items_iter() -> Iterator[radicale_item.Item]: while items: yield items.pop(0) - return (items_generator(), tag, write_whole_collection, props, - stored_exc_info) + return items_iter(), tag, write_whole_collection, props, stored_exc_info -class ApplicationPutMixin: - def do_PUT(self, environ, base_prefix, path, user): +class ApplicationPartPut(ApplicationBase): + + def do_PUT(self, environ: types.WSGIEnviron, base_prefix: str, + path: str, user: str) -> types.WSGIResponse: """Manage PUT request.""" - access = app.Access(self._rights, user, path) + access = Access(self._rights, user, path) if not access.check("w"): return httputils.NOT_ALLOWED try: - content = self._read_content(environ) + content = httputils.read_request_body(self.configuration, environ) except RuntimeError as e: logger.warning("Bad PUT request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST except socket.timeout: - logger.debug("client timed out", exc_info=True) + logger.debug("Client timed out", exc_info=True) return httputils.REQUEST_TIMEOUT # Prepare before locking - content_type = environ.get("CONTENT_TYPE", "").split(";")[0] + content_type = environ.get("CONTENT_TYPE", "").split(";", + maxsplit=1)[0] try: - vobject_items = tuple(vobject.readComponents(content or "")) + vobject_items = radicale_item.read_components(content or "") except Exception as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) @@ -141,20 +154,20 @@ class ApplicationPutMixin: bool(rights.intersect(access.parent_permissions, "w"))) with self._storage.acquire_lock("w", user): - item = next(self._storage.discover(path), None) - parent_item = next( - self._storage.discover(access.parent_path), None) - if not parent_item: + item = next(iter(self._storage.discover(path)), None) + parent_item = next(iter( + self._storage.discover(access.parent_path)), None) + if not isinstance(parent_item, storage.BaseCollection): return httputils.CONFLICT write_whole_collection = ( isinstance(item, storage.BaseCollection) or - not parent_item.get_meta("tag")) + not parent_item.tag) if write_whole_collection: tag = prepared_tag else: - tag = parent_item.get_meta("tag") + tag = parent_item.tag if write_whole_collection: if ("w" if tag else "W") not in access.permissions: @@ -206,6 +219,7 @@ class ApplicationPutMixin: "Bad PUT request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST else: + assert not isinstance(item, storage.BaseCollection) prepared_item, = prepared_items if (item and item.uid != prepared_item.uid or not item and parent_item.has_uid(prepared_item.uid)):