1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00

Resolved conflicts

This commit is contained in:
Tuna Celik 2023-02-10 22:10:47 +01:00
parent cf81d1f9a7
commit dd723dae5d
3 changed files with 99 additions and 100 deletions

View file

@ -1,4 +1,4 @@
# This file is part of Radicale Server - Calendar Server
# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
@ -17,28 +17,31 @@
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import xml.etree.ElementTree as ET
from http import client
from xml.etree import ElementTree as ET
from typing import Optional
from radicale import app, httputils, storage, xmlutils
from radicale import httputils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
def xml_delete(base_prefix, path, collection, href=None):
def xml_delete(base_prefix: str, path: str, collection: storage.BaseCollection,
item_href: Optional[str] = None) -> ET.Element:
"""Read and answer DELETE requests.
Read rfc4918-9.6 for info.
"""
collection.delete(href)
collection.delete(item_href)
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
response = ET.Element(xmlutils.make_clark("D:response"))
multistatus.append(response)
href = ET.Element(xmlutils.make_clark("D:href"))
href.text = xmlutils.make_href(base_prefix, path)
response.append(href)
href_element = ET.Element(xmlutils.make_clark("D:href"))
href_element.text = xmlutils.make_href(base_prefix, path)
response.append(href_element)
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(200)
@ -47,14 +50,16 @@ def xml_delete(base_prefix, path, collection, href=None):
return multistatus
class ApplicationDeleteMixin:
def do_DELETE(self, environ, base_prefix, path, user):
class ApplicationPartDelete(ApplicationBase):
def do_DELETE(self, environ: types.WSGIEnviron, base_prefix: str,
path: str, user: str) -> types.WSGIResponse:
"""Manage DELETE request."""
access = app.Access(self._rights, user, path)
access = Access(self._rights, user, path)
if not access.check("w"):
return httputils.NOT_ALLOWED
with self._storage.acquire_lock("w", user):
item = next(self._storage.discover(path), None)
item = next(iter(self._storage.discover(path)), None)
if not item:
return httputils.NOT_FOUND
if not access.check("w", item):
@ -75,6 +80,8 @@ class ApplicationDeleteMixin:
)
xml_answer = xml_delete(base_prefix, path, item)
else:
assert item.collection is not None
assert item.href is not None
hook_notification_item_list.append(
HookNotificationItem(
HookNotificationItemTypes.DELETE,

View file

@ -1,4 +1,4 @@
# This file is part of Radicale Server - Calendar Server
# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
@ -18,92 +18,71 @@
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import socket
import xml.etree.ElementTree as ET
from http import client
from xml.etree import ElementTree as ET
from typing import Dict, Optional, cast
import defusedxml.ElementTree as DefusedET
from radicale import app, httputils
from radicale import item as radicale_item
from radicale import storage, xmlutils
import radicale.item as radicale_item
from radicale import httputils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
def xml_add_propstat_to(element, tag, status_number):
"""Add a PROPSTAT response structure to an element.
The PROPSTAT answer structure is defined in rfc4918-9.1. It is added to the
given ``element``, for the following ``tag`` with the given
``status_number``.
"""
propstat = ET.Element(xmlutils.make_clark("D:propstat"))
element.append(propstat)
prop = ET.Element(xmlutils.make_clark("D:prop"))
propstat.append(prop)
clark_tag = xmlutils.make_clark(tag)
prop_tag = ET.Element(clark_tag)
prop.append(prop_tag)
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(status_number)
propstat.append(status)
def xml_proppatch(base_prefix, path, xml_request, collection):
def xml_proppatch(base_prefix: str, path: str,
xml_request: Optional[ET.Element],
collection: storage.BaseCollection) -> ET.Element:
"""Read and answer PROPPATCH requests.
Read rfc4918-9.2 for info.
"""
props_to_set = xmlutils.props_from_request(xml_request, actions=("set",))
props_to_remove = xmlutils.props_from_request(xml_request,
actions=("remove",))
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
response = ET.Element(xmlutils.make_clark("D:response"))
multistatus.append(response)
href = ET.Element(xmlutils.make_clark("D:href"))
href.text = xmlutils.make_href(base_prefix, path)
response.append(href)
# Create D:propstat element for props with status 200 OK
propstat = ET.Element(xmlutils.make_clark("D:propstat"))
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(200)
props_ok = ET.Element(xmlutils.make_clark("D:prop"))
propstat.append(props_ok)
propstat.append(status)
response.append(propstat)
new_props = collection.get_meta()
for short_name, value in props_to_set.items():
new_props[short_name] = value
xml_add_propstat_to(response, short_name, 200)
for short_name in props_to_remove:
try:
del new_props[short_name]
except KeyError:
pass
xml_add_propstat_to(response, short_name, 200)
radicale_item.check_and_sanitize_props(new_props)
collection.set_meta(new_props)
props_with_remove = xmlutils.props_from_request(xml_request)
all_props_with_remove = cast(Dict[str, Optional[str]],
dict(collection.get_meta()))
all_props_with_remove.update(props_with_remove)
all_props = radicale_item.check_and_sanitize_props(all_props_with_remove)
collection.set_meta(all_props)
for short_name in props_with_remove:
props_ok.append(ET.Element(xmlutils.make_clark(short_name)))
return multistatus
class ApplicationProppatchMixin:
def do_PROPPATCH(self, environ, base_prefix, path, user):
class ApplicationPartProppatch(ApplicationBase):
def do_PROPPATCH(self, environ: types.WSGIEnviron, base_prefix: str,
path: str, user: str) -> types.WSGIResponse:
"""Manage PROPPATCH request."""
access = app.Access(self._rights, user, path)
access = Access(self._rights, user, path)
if not access.check("w"):
return httputils.NOT_ALLOWED
try:
xml_content = self._read_xml_content(environ)
xml_content = self._read_xml_request_body(environ)
except RuntimeError as e:
logger.warning(
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
except socket.timeout:
logger.debug("client timed out", exc_info=True)
logger.debug("Client timed out", exc_info=True)
return httputils.REQUEST_TIMEOUT
with self._storage.acquire_lock("w", user):
item = next(self._storage.discover(path), None)
item = next(iter(self._storage.discover(path)), None)
if not item:
return httputils.NOT_FOUND
if not access.check("w", item):
@ -129,5 +108,4 @@ class ApplicationProppatchMixin:
logger.warning(
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
return (client.MULTI_STATUS, headers,
self._write_xml_content(xml_answer))
return client.MULTI_STATUS, headers, self._xml_response(xml_answer)

View file

@ -1,4 +1,4 @@
# This file is part of Radicale Server - Calendar Server
# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
@ -22,21 +22,31 @@ import posixpath
import socket
import sys
from http import client
from typing import Iterator, List, Mapping, MutableMapping, Optional, Tuple
import vobject
from radicale import app, httputils
from radicale import item as radicale_item
from radicale import pathutils, rights, storage, xmlutils
import radicale.item as radicale_item
from radicale import httputils, pathutils, rights, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
from types import TracebackType
MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()}
MIMETYPE_TAGS: Mapping[str, str] = {value: key for key, value in
xmlutils.MIMETYPES.items()}
def prepare(vobject_items, path, content_type, permissions, parent_permissions,
tag=None, write_whole_collection=None):
if (write_whole_collection or permissions and not parent_permissions):
def prepare(vobject_items: List[vobject.base.Component], path: str,
content_type: str, permission: bool, parent_permission: bool,
tag: Optional[str] = None,
write_whole_collection: Optional[bool] = None) -> Tuple[
Iterator[radicale_item.Item], # items
Optional[str], # tag
Optional[bool], # write_whole_collection
Optional[MutableMapping[str, str]], # props
Optional[Tuple[type, BaseException, Optional[TracebackType]]]]:
if (write_whole_collection or permission and not parent_permission):
write_whole_collection = True
tag = radicale_item.predict_tag_of_whole_collection(
vobject_items, MIMETYPE_TAGS.get(content_type))
@ -44,20 +54,20 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
raise ValueError("Can't determine collection tag")
collection_path = pathutils.strip_path(path)
elif (write_whole_collection is not None and not write_whole_collection or
not permissions and parent_permissions):
not permission and parent_permission):
write_whole_collection = False
if tag is None:
tag = radicale_item.predict_tag_of_parent_collection(vobject_items)
collection_path = posixpath.dirname(pathutils.strip_path(path))
props = None
props: Optional[MutableMapping[str, str]] = None
stored_exc_info = None
items = []
try:
if tag:
if tag and write_whole_collection is not None:
radicale_item.check_and_sanitize_items(
vobject_items, is_collection=write_whole_collection, tag=tag)
if write_whole_collection and tag == "VCALENDAR":
vobject_components = []
vobject_components: List[vobject.base.Component] = []
vobject_item, = vobject_items
for content in ("vevent", "vtodo", "vjournal"):
vobject_components.extend(
@ -99,37 +109,40 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
caldesc = vobject_items[0].x_wr_caldesc.value
if caldesc:
props["C:calendar-description"] = caldesc
radicale_item.check_and_sanitize_props(props)
props = radicale_item.check_and_sanitize_props(props)
except Exception:
stored_exc_info = sys.exc_info()
exc_info_or_none_tuple = sys.exc_info()
assert exc_info_or_none_tuple[0] is not None
stored_exc_info = exc_info_or_none_tuple
# Use generator for items and delete references to free memory
# early
def items_generator():
# Use iterator for items and delete references to free memory early
def items_iter() -> Iterator[radicale_item.Item]:
while items:
yield items.pop(0)
return (items_generator(), tag, write_whole_collection, props,
stored_exc_info)
return items_iter(), tag, write_whole_collection, props, stored_exc_info
class ApplicationPutMixin:
def do_PUT(self, environ, base_prefix, path, user):
class ApplicationPartPut(ApplicationBase):
def do_PUT(self, environ: types.WSGIEnviron, base_prefix: str,
path: str, user: str) -> types.WSGIResponse:
"""Manage PUT request."""
access = app.Access(self._rights, user, path)
access = Access(self._rights, user, path)
if not access.check("w"):
return httputils.NOT_ALLOWED
try:
content = self._read_content(environ)
content = httputils.read_request_body(self.configuration, environ)
except RuntimeError as e:
logger.warning("Bad PUT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
except socket.timeout:
logger.debug("client timed out", exc_info=True)
logger.debug("Client timed out", exc_info=True)
return httputils.REQUEST_TIMEOUT
# Prepare before locking
content_type = environ.get("CONTENT_TYPE", "").split(";")[0]
content_type = environ.get("CONTENT_TYPE", "").split(";",
maxsplit=1)[0]
try:
vobject_items = tuple(vobject.readComponents(content or ""))
vobject_items = radicale_item.read_components(content or "")
except Exception as e:
logger.warning(
"Bad PUT request on %r: %s", path, e, exc_info=True)
@ -141,20 +154,20 @@ class ApplicationPutMixin:
bool(rights.intersect(access.parent_permissions, "w")))
with self._storage.acquire_lock("w", user):
item = next(self._storage.discover(path), None)
parent_item = next(
self._storage.discover(access.parent_path), None)
if not parent_item:
item = next(iter(self._storage.discover(path)), None)
parent_item = next(iter(
self._storage.discover(access.parent_path)), None)
if not isinstance(parent_item, storage.BaseCollection):
return httputils.CONFLICT
write_whole_collection = (
isinstance(item, storage.BaseCollection) or
not parent_item.get_meta("tag"))
not parent_item.tag)
if write_whole_collection:
tag = prepared_tag
else:
tag = parent_item.get_meta("tag")
tag = parent_item.tag
if write_whole_collection:
if ("w" if tag else "W") not in access.permissions:
@ -206,6 +219,7 @@ class ApplicationPutMixin:
"Bad PUT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
else:
assert not isinstance(item, storage.BaseCollection)
prepared_item, = prepared_items
if (item and item.uid != prepared_item.uid or
not item and parent_item.has_uid(prepared_item.uid)):