1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-09-15 20:36:55 +00:00

Fix merge conflicts.

This commit is contained in:
Dipl. Ing. Péter Varkoly 2024-08-25 14:11:48 +02:00
commit 19e5972b4f
76 changed files with 4135 additions and 1365 deletions

View file

@ -3,6 +3,7 @@
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
# Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -68,6 +69,7 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
_max_content_length: int
_auth_realm: str
_extra_headers: Mapping[str, str]
_permit_delete_collection: bool
def __init__(self, configuration: config.Configuration) -> None:
"""Initialize Application.
@ -79,11 +81,16 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
"""
super().__init__(configuration)
self._mask_passwords = configuration.get("logging", "mask_passwords")
self._bad_put_request_content = configuration.get("logging", "bad_put_request_content")
self._request_header_on_debug = configuration.get("logging", "request_header_on_debug")
self._response_content_on_debug = configuration.get("logging", "response_content_on_debug")
self._auth_delay = configuration.get("auth", "delay")
self._internal_server = configuration.get("server", "_internal_server")
self._max_content_length = configuration.get(
"server", "max_content_length")
self._auth_realm = configuration.get("auth", "realm")
self._permit_delete_collection = configuration.get("rights", "permit_delete_collection")
logger.info("permit delete of collection: %s", self._permit_delete_collection)
self._extra_headers = dict()
for key in self.configuration.options("headers"):
self._extra_headers[key] = configuration.get("headers", key)
@ -136,7 +143,10 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
answers = []
if answer is not None:
if isinstance(answer, str):
logger.debug("Response content:\n%s", answer)
if self._response_content_on_debug:
logger.debug("Response content:\n%s", answer)
else:
logger.debug("Response content: suppressed by config/option [auth] response_content_on_debug")
headers["Content-Type"] += "; charset=%s" % self._encoding
answer = answer.encode(self._encoding)
accept_encoding = [
@ -182,8 +192,11 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
logger.info("%s request for %r%s received from %s%s",
request_method, unsafe_path, depthinfo,
remote_host, remote_useragent)
logger.debug("Request headers:\n%s",
pprint.pformat(self._scrub_headers(environ)))
if self._request_header_on_debug:
logger.debug("Request header:\n%s",
pprint.pformat(self._scrub_headers(environ)))
else:
logger.debug("Request header: suppressed by config/option [auth] request_header_on_debug")
# SCRIPT_NAME is already removed from PATH_INFO, according to the
# WSGI specification.
@ -219,7 +232,7 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
path.rstrip("/").endswith("/.well-known/carddav")):
return response(*httputils.redirect(
base_prefix + "/", client.MOVED_PERMANENTLY))
# Return NOT FOUND for all other paths containing ".well-knwon"
# Return NOT FOUND for all other paths containing ".well-known"
if path.endswith("/.well-known") or "/.well-known/" in path:
return response(*httputils.NOT_FOUND)
@ -270,7 +283,14 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
if "W" in self._rights.authorization(user, principal_path):
with self._storage.acquire_lock("w", user):
try:
self._storage.create_collection(principal_path)
new_coll = self._storage.create_collection(principal_path)
if new_coll:
jsn_coll = self.configuration.get("storage", "predefined_collections")
for (name_coll, props) in jsn_coll.items():
try:
self._storage.create_collection(principal_path + name_coll, props=props)
except ValueError as e:
logger.warning("Failed to create predefined collection %r: %s", name_coll, e)
except ValueError as e:
logger.warning("Failed to create principal "
"collection %r: %s", user, e)

View file

@ -1,5 +1,6 @@
# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2020 Unrud <unrud@outlook.com>
# Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -21,8 +22,8 @@ import sys
import xml.etree.ElementTree as ET
from typing import Optional
from radicale import (auth, config, httputils, pathutils, rights, storage,
types, web, xmlutils)
from radicale import (auth, config, hook, httputils, pathutils, rights,
storage, types, web, xmlutils)
from radicale.log import logger
# HACK: https://github.com/tiran/defusedxml/issues/54
@ -38,6 +39,8 @@ class ApplicationBase:
_rights: rights.BaseRights
_web: web.BaseWeb
_encoding: str
_permit_delete_collection: bool
_hook: hook.BaseHook
def __init__(self, configuration: config.Configuration) -> None:
self.configuration = configuration
@ -46,6 +49,9 @@ class ApplicationBase:
self._rights = rights.load(configuration)
self._web = web.load(configuration)
self._encoding = configuration.get("encoding", "request")
self._log_bad_put_request_content = configuration.get("logging", "bad_put_request_content")
self._response_content_on_debug = configuration.get("logging", "response_content_on_debug")
self._hook = hook.load(configuration)
def _read_xml_request_body(self, environ: types.WSGIEnviron
) -> Optional[ET.Element]:
@ -66,8 +72,11 @@ class ApplicationBase:
def _xml_response(self, xml_content: ET.Element) -> bytes:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Response content:\n%s",
xmlutils.pretty_xml(xml_content))
if self._response_content_on_debug:
logger.debug("Response content:\n%s",
xmlutils.pretty_xml(xml_content))
else:
logger.debug("Response content: suppressed by config/option [auth] response_content_on_debug")
f = io.BytesIO()
ET.ElementTree(xml_content).write(f, encoding=self._encoding,
xml_declaration=True)

View file

@ -23,6 +23,7 @@ from typing import Optional
from radicale import httputils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
def xml_delete(base_prefix: str, path: str, collection: storage.BaseCollection,
@ -67,12 +68,33 @@ class ApplicationPartDelete(ApplicationBase):
if if_match not in ("*", item.etag):
# ETag precondition not verified, do not delete item
return httputils.PRECONDITION_FAILED
hook_notification_item_list = []
if isinstance(item, storage.BaseCollection):
xml_answer = xml_delete(base_prefix, path, item)
if self._permit_delete_collection:
for i in item.get_all():
hook_notification_item_list.append(
HookNotificationItem(
HookNotificationItemTypes.DELETE,
access.path,
i.uid
)
)
xml_answer = xml_delete(base_prefix, path, item)
else:
return httputils.NOT_ALLOWED
else:
assert item.collection is not None
assert item.href is not None
hook_notification_item_list.append(
HookNotificationItem(
HookNotificationItemTypes.DELETE,
access.path,
item.uid
)
)
xml_answer = xml_delete(
base_prefix, path, item.collection, item.href)
for notification_item in hook_notification_item_list:
self._hook.notify(notification_item)
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return client.OK, headers, self._xml_response(xml_answer)

View file

@ -45,8 +45,8 @@ def propose_filename(collection: storage.BaseCollection) -> str:
class ApplicationPartGet(ApplicationBase):
def _content_disposition_attachement(self, filename: str) -> str:
value = "attachement"
def _content_disposition_attachment(self, filename: str) -> str:
value = "attachment"
try:
encoded_filename = quote(filename, encoding=self._encoding)
except UnicodeEncodeError:
@ -91,7 +91,7 @@ class ApplicationPartGet(ApplicationBase):
return (httputils.NOT_ALLOWED if limited_access else
httputils.DIRECTORY_LISTING)
content_type = xmlutils.MIMETYPES[item.tag]
content_disposition = self._content_disposition_attachement(
content_disposition = self._content_disposition_attachment(
propose_filename(item))
elif limited_access:
return httputils.NOT_ALLOWED

View file

@ -52,8 +52,12 @@ class ApplicationPartMkcol(ApplicationBase):
logger.warning(
"Bad MKCOL request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
if (props.get("tag") and "w" not in permissions or
not props.get("tag") and "W" not in permissions):
collection_type = props.get("tag") or "UNKNOWN"
if props.get("tag") and "w" not in permissions:
logger.warning("MKCOL request %r (type:%s): %s", path, collection_type, "rejected because of missing rights 'w'")
return httputils.NOT_ALLOWED
if not props.get("tag") and "W" not in permissions:
logger.warning("MKCOL request %r (type:%s): %s", path, collection_type, "rejected because of missing rights 'W'")
return httputils.NOT_ALLOWED
with self._storage.acquire_lock("w", user):
item = next(iter(self._storage.discover(path)), None)
@ -71,6 +75,7 @@ class ApplicationPartMkcol(ApplicationBase):
self._storage.create_collection(path, props=props)
except ValueError as e:
logger.warning(
"Bad MKCOL request on %r: %s", path, e, exc_info=True)
"Bad MKCOL request on %r (type:%s): %s", path, collection_type, e, exc_info=True)
return httputils.BAD_REQUEST
logger.info("MKCOL request %r (type:%s): %s", path, collection_type, "successful")
return client.CREATED, {}, None

View file

@ -18,6 +18,7 @@
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import posixpath
import re
from http import client
from urllib.parse import urlparse
@ -26,6 +27,22 @@ from radicale.app.base import Access, ApplicationBase
from radicale.log import logger
def get_server_netloc(environ: types.WSGIEnviron, force_port: bool = False):
if environ.get("HTTP_X_FORWARDED_HOST"):
host = environ["HTTP_X_FORWARDED_HOST"]
proto = environ.get("HTTP_X_FORWARDED_PROTO") or "http"
port = "443" if proto == "https" else "80"
port = environ["HTTP_X_FORWARDED_PORT"] or port
else:
host = environ.get("HTTP_HOST") or environ["SERVER_NAME"]
proto = environ["wsgi.url_scheme"]
port = environ["SERVER_PORT"]
if (not force_port and port == ("443" if proto == "https" else "80") or
re.search(r":\d+$", host)):
return host
return host + ":" + port
class ApplicationPartMove(ApplicationBase):
def do_MOVE(self, environ: types.WSGIEnviron, base_prefix: str,
@ -33,7 +50,11 @@ class ApplicationPartMove(ApplicationBase):
"""Manage MOVE request."""
raw_dest = environ.get("HTTP_DESTINATION", "")
to_url = urlparse(raw_dest)
if to_url.netloc != environ["HTTP_HOST"]:
to_netloc_with_port = to_url.netloc
if to_url.port is None:
to_netloc_with_port += (":443" if to_url.scheme == "https"
else ":80")
if to_netloc_with_port != get_server_netloc(environ, force_port=True):
logger.info("Unsupported destination address: %r", raw_dest)
# Remote destination server, not supported
return httputils.REMOTE_DESTINATION

View file

@ -85,7 +85,7 @@ def xml_propfind_response(
if isinstance(item, storage.BaseCollection):
is_collection = True
is_leaf = item.tag in ("VADDRESSBOOK", "VCALENDAR")
is_leaf = item.tag in ("VADDRESSBOOK", "VCALENDAR", "VSUBSCRIBED")
collection = item
# Some clients expect collections to end with `/`
uri = pathutils.unstrip_path(item.path, True)
@ -259,6 +259,10 @@ def xml_propfind_response(
child_element = ET.Element(
xmlutils.make_clark("C:calendar"))
element.append(child_element)
elif collection.tag == "VSUBSCRIBED":
child_element = ET.Element(
xmlutils.make_clark("CS:subscribed"))
element.append(child_element)
child_element = ET.Element(xmlutils.make_clark("D:collection"))
element.append(child_element)
elif tag == xmlutils.make_clark("RADICALE:displayname"):
@ -268,6 +272,12 @@ def xml_propfind_response(
element.text = displayname
else:
is404 = True
elif tag == xmlutils.make_clark("RADICALE:getcontentcount"):
# Only for internal use by the web interface
if isinstance(item, storage.BaseCollection) and not collection.is_principal:
element.text = str(sum(1 for x in item.get_all()))
else:
is404 = True
elif tag == xmlutils.make_clark("D:displayname"):
displayname = collection.get_meta("D:displayname")
if not displayname and is_leaf:
@ -286,6 +296,13 @@ def xml_propfind_response(
element.text, _ = collection.sync()
else:
is404 = True
elif tag == xmlutils.make_clark("CS:source"):
if is_leaf:
child_element = ET.Element(xmlutils.make_clark("D:href"))
child_element.text = collection.get_meta('CS:source')
element.append(child_element)
else:
is404 = True
else:
human_tag = xmlutils.make_human_tag(tag)
tag_text = collection.get_meta(human_tag)
@ -305,13 +322,13 @@ def xml_propfind_response(
responses[404 if is404 else 200].append(element)
for status_code, childs in responses.items():
if not childs:
for status_code, children in responses.items():
if not children:
continue
propstat = ET.Element(xmlutils.make_clark("D:propstat"))
response.append(propstat)
prop = ET.Element(xmlutils.make_clark("D:prop"))
prop.extend(childs)
prop.extend(children)
propstat.append(prop)
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(status_code)

View file

@ -22,9 +22,12 @@ import xml.etree.ElementTree as ET
from http import client
from typing import Dict, Optional, cast
import defusedxml.ElementTree as DefusedET
import radicale.item as radicale_item
from radicale import httputils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
@ -93,6 +96,16 @@ class ApplicationPartProppatch(ApplicationBase):
try:
xml_answer = xml_proppatch(base_prefix, path, xml_content,
item)
if xml_content is not None:
hook_notification_item = HookNotificationItem(
HookNotificationItemTypes.CPATCH,
access.path,
DefusedET.tostring(
xml_content,
encoding=self._encoding
).decode(encoding=self._encoding)
)
self._hook.notify(hook_notification_item)
except ValueError as e:
logger.warning(
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)

View file

@ -3,6 +3,7 @@
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -30,6 +31,7 @@ import vobject
import radicale.item as radicale_item
from radicale import httputils, pathutils, rights, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
MIMETYPE_TAGS: Mapping[str, str] = {value: key for key, value in
@ -132,7 +134,7 @@ class ApplicationPartPut(ApplicationBase):
try:
content = httputils.read_request_body(self.configuration, environ)
except RuntimeError as e:
logger.warning("Bad PUT request on %r: %s", path, e, exc_info=True)
logger.warning("Bad PUT request on %r (read_request_body): %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
except socket.timeout:
logger.debug("Client timed out", exc_info=True)
@ -144,7 +146,11 @@ class ApplicationPartPut(ApplicationBase):
vobject_items = radicale_item.read_components(content or "")
except Exception as e:
logger.warning(
"Bad PUT request on %r: %s", path, e, exc_info=True)
"Bad PUT request on %r (read_components): %s", path, e, exc_info=True)
if self._log_bad_put_request_content:
logger.warning("Bad PUT request content of %r:\n%s", path, content)
else:
logger.debug("Bad PUT request content: suppressed by config/option [auth] bad_put_request_content")
return httputils.BAD_REQUEST
(prepared_items, prepared_tag, prepared_write_whole_collection,
prepared_props, prepared_exc_info) = prepare(
@ -198,7 +204,7 @@ class ApplicationPartPut(ApplicationBase):
props = prepared_props
if prepared_exc_info:
logger.warning(
"Bad PUT request on %r: %s", path, prepared_exc_info[1],
"Bad PUT request on %r (prepare): %s", path, prepared_exc_info[1],
exc_info=prepared_exc_info)
return httputils.BAD_REQUEST
@ -206,9 +212,16 @@ class ApplicationPartPut(ApplicationBase):
try:
etag = self._storage.create_collection(
path, prepared_items, props).etag
for item in prepared_items:
hook_notification_item = HookNotificationItem(
HookNotificationItemTypes.UPSERT,
access.path,
item.serialize()
)
self._hook.notify(hook_notification_item)
except ValueError as e:
logger.warning(
"Bad PUT request on %r: %s", path, e, exc_info=True)
"Bad PUT request on %r (create_collection): %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
else:
assert not isinstance(item, storage.BaseCollection)
@ -222,9 +235,15 @@ class ApplicationPartPut(ApplicationBase):
href = posixpath.basename(pathutils.strip_path(path))
try:
etag = parent_item.upload(href, prepared_item).etag
hook_notification_item = HookNotificationItem(
HookNotificationItemTypes.UPSERT,
access.path,
prepared_item.serialize()
)
self._hook.notify(hook_notification_item)
except ValueError as e:
logger.warning(
"Bad PUT request on %r: %s", path, e, exc_info=True)
"Bad PUT request on %r (upload): %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"ETag": etag}

View file

@ -18,13 +18,20 @@
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import copy
import datetime
import posixpath
import socket
import xml.etree.ElementTree as ET
from http import client
from typing import Callable, Iterable, Iterator, Optional, Sequence, Tuple
from typing import (Any, Callable, Iterable, Iterator, List, Optional,
Sequence, Tuple, Union)
from urllib.parse import unquote, urlparse
import vobject
import vobject.base
from vobject.base import ContentLine
import radicale.item as radicale_item
from radicale import httputils, pathutils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
@ -32,11 +39,110 @@ from radicale.item import filter as radicale_filter
from radicale.log import logger
def free_busy_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None],
max_occurrence: int
) -> Tuple[int, Union[ET.Element, str]]:
# NOTE: this function returns both an Element and a string because
# free-busy reports are an edge-case on the return type according
# to the spec.
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
if xml_request is None:
return client.MULTI_STATUS, multistatus
root = xml_request
if (root.tag == xmlutils.make_clark("C:free-busy-query") and
collection.tag != "VCALENDAR"):
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
time_range_element = root.find(xmlutils.make_clark("C:time-range"))
assert isinstance(time_range_element, ET.Element)
# Build a single filter from the free busy query for retrieval
# TODO: filter for VFREEBUSY in additional to VEVENT but
# test_filter doesn't support that yet.
vevent_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
attrib={'name': 'VEVENT'})
vevent_cf_element.append(time_range_element)
vcalendar_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
attrib={'name': 'VCALENDAR'})
vcalendar_cf_element.append(vevent_cf_element)
filter_element = ET.Element(xmlutils.make_clark("C:filter"))
filter_element.append(vcalendar_cf_element)
filters = (filter_element,)
# First pull from storage
retrieved_items = list(collection.get_filtered(filters))
# !!! Don't access storage after this !!!
unlock_storage_fn()
cal = vobject.iCalendar()
collection_tag = collection.tag
while retrieved_items:
# Second filtering before evaluating occurrences.
# ``item.vobject_item`` might be accessed during filtering.
# Don't keep reference to ``item``, because VObject requires a lot of
# memory.
item, filter_matched = retrieved_items.pop(0)
if not filter_matched:
try:
if not test_filter(collection_tag, item, filter_element):
continue
except ValueError as e:
raise ValueError("Failed to free-busy filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
except Exception as e:
raise RuntimeError("Failed to free-busy filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
fbtype = None
if item.component_name == 'VEVENT':
transp = getattr(item.vobject_item.vevent, 'transp', None)
if transp and transp.value != 'OPAQUE':
continue
status = getattr(item.vobject_item.vevent, 'status', None)
if not status or status.value == 'CONFIRMED':
fbtype = 'BUSY'
elif status.value == 'CANCELLED':
fbtype = 'FREE'
elif status.value == 'TENTATIVE':
fbtype = 'BUSY-TENTATIVE'
else:
# Could do fbtype = status.value for x-name, I prefer this
fbtype = 'BUSY'
# TODO: coalesce overlapping periods
if max_occurrence > 0:
n_occurrences = max_occurrence+1
else:
n_occurrences = 0
occurrences = radicale_filter.time_range_fill(item.vobject_item,
time_range_element,
"VEVENT",
n=n_occurrences)
if len(occurrences) >= max_occurrence:
raise ValueError("FREEBUSY occurrences limit of {} hit"
.format(max_occurrence))
for occurrence in occurrences:
vfb = cal.add('vfreebusy')
vfb.add('dtstamp').value = item.vobject_item.vevent.dtstamp.value
vfb.add('dtstart').value, vfb.add('dtend').value = occurrence
if fbtype:
vfb.add('fbtype').value = fbtype
return (client.OK, cal.serialize())
def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None]
) -> Tuple[int, ET.Element]:
"""Read and answer REPORT requests.
"""Read and answer REPORT requests that return XML.
Read rfc3253-3.6 for info.
@ -64,9 +170,8 @@ def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
prop_element = root.find(xmlutils.make_clark("D:prop"))
props = ([prop.tag for prop in prop_element]
if prop_element is not None else [])
props: Union[ET.Element, List] = root.find(xmlutils.make_clark("D:prop")) or []
hreferences: Iterable[str]
if root.tag in (
@ -138,19 +243,40 @@ def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
found_props = []
not_found_props = []
for tag in props:
element = ET.Element(tag)
if tag == xmlutils.make_clark("D:getetag"):
for prop in props:
element = ET.Element(prop.tag)
if prop.tag == xmlutils.make_clark("D:getetag"):
element.text = item.etag
found_props.append(element)
elif tag == xmlutils.make_clark("D:getcontenttype"):
elif prop.tag == xmlutils.make_clark("D:getcontenttype"):
element.text = xmlutils.get_content_type(item, encoding)
found_props.append(element)
elif tag in (
elif prop.tag in (
xmlutils.make_clark("C:calendar-data"),
xmlutils.make_clark("CR:address-data")):
element.text = item.serialize()
found_props.append(element)
expand = prop.find(xmlutils.make_clark("C:expand"))
if expand is not None:
start = expand.get('start')
end = expand.get('end')
if (start is None) or (end is None):
return client.FORBIDDEN, \
xmlutils.webdav_error("C:expand")
start = datetime.datetime.strptime(
start, '%Y%m%dT%H%M%SZ'
).replace(tzinfo=datetime.timezone.utc)
end = datetime.datetime.strptime(
end, '%Y%m%dT%H%M%SZ'
).replace(tzinfo=datetime.timezone.utc)
expanded_element = _expand(
element, copy.copy(item), start, end)
found_props.append(expanded_element)
else:
found_props.append(element)
else:
not_found_props.append(element)
@ -164,6 +290,111 @@ def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
return client.MULTI_STATUS, multistatus
def _expand(
element: ET.Element,
item: radicale_item.Item,
start: datetime.datetime,
end: datetime.datetime,
) -> ET.Element:
dt_format = '%Y%m%dT%H%M%SZ'
if type(item.vobject_item.vevent.dtstart.value) is datetime.date:
# If an event comes to us with a dt_start specified as a date
# then in the response we return the date, not datetime
dt_format = '%Y%m%d'
expanded_item, rruleset = _make_vobject_expanded_item(item, dt_format)
if rruleset:
recurrences = rruleset.between(start, end, inc=True)
expanded: vobject.base.Component = copy.copy(expanded_item.vobject_item)
is_expanded_filled: bool = False
for recurrence_dt in recurrences:
recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc)
vevent = copy.deepcopy(expanded.vevent)
vevent.recurrence_id = ContentLine(
name='RECURRENCE-ID',
value=recurrence_utc.strftime(dt_format), params={}
)
if is_expanded_filled is False:
expanded.vevent = vevent
is_expanded_filled = True
else:
expanded.add(vevent)
element.text = expanded.serialize()
else:
element.text = expanded_item.vobject_item.serialize()
return element
def _make_vobject_expanded_item(
item: radicale_item.Item,
dt_format: str,
) -> Tuple[radicale_item.Item, Optional[Any]]:
# https://www.rfc-editor.org/rfc/rfc4791#section-9.6.5
# The returned calendar components MUST NOT use recurrence
# properties (i.e., EXDATE, EXRULE, RDATE, and RRULE) and MUST NOT
# have reference to or include VTIMEZONE components. Date and local
# time with reference to time zone information MUST be converted
# into date with UTC time.
item = copy.copy(item)
vevent = item.vobject_item.vevent
if type(vevent.dtstart.value) is datetime.date:
start_utc = datetime.datetime.fromordinal(
vevent.dtstart.value.toordinal()
).replace(tzinfo=datetime.timezone.utc)
else:
start_utc = vevent.dtstart.value.astimezone(datetime.timezone.utc)
vevent.dtstart = ContentLine(name='DTSTART', value=start_utc, params=[])
dt_end = getattr(vevent, 'dtend', None)
if dt_end is not None:
if type(vevent.dtend.value) is datetime.date:
end_utc = datetime.datetime.fromordinal(
dt_end.value.toordinal()
).replace(tzinfo=datetime.timezone.utc)
else:
end_utc = dt_end.value.astimezone(datetime.timezone.utc)
vevent.dtend = ContentLine(name='DTEND', value=end_utc, params={})
rruleset = None
if hasattr(item.vobject_item.vevent, 'rrule'):
rruleset = vevent.getrruleset()
# There is something strange behaviour during serialization native datetime, so converting manually
vevent.dtstart.value = vevent.dtstart.value.strftime(dt_format)
if dt_end is not None:
vevent.dtend.value = vevent.dtend.value.strftime(dt_format)
timezones_to_remove = []
for component in item.vobject_item.components():
if component.name == 'VTIMEZONE':
timezones_to_remove.append(component)
for timezone in timezones_to_remove:
item.vobject_item.remove(timezone)
try:
delattr(item.vobject_item.vevent, 'rrule')
delattr(item.vobject_item.vevent, 'exdate')
delattr(item.vobject_item.vevent, 'exrule')
delattr(item.vobject_item.vevent, 'rdate')
except AttributeError:
pass
return item, rruleset
def xml_item_response(base_prefix: str, href: str,
found_props: Sequence[ET.Element] = (),
not_found_props: Sequence[ET.Element] = (),
@ -295,13 +526,28 @@ class ApplicationPartReport(ApplicationBase):
else:
assert item.collection is not None
collection = item.collection
try:
status, xml_answer = xml_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return status, headers, self._xml_response(xml_answer)
if xml_content is not None and \
xml_content.tag == xmlutils.make_clark("C:free-busy-query"):
max_occurrence = self.configuration.get("reporting", "max_freebusy_occurrence")
try:
status, body = free_busy_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close, max_occurrence)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/calendar; charset=%s" % self._encoding}
return status, headers, str(body)
else:
try:
status, xml_answer = xml_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return status, headers, self._xml_response(xml_answer)