1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00
Radicale/radicale/app/report.py

643 lines
27 KiB
Python
Raw Normal View History

2021-12-08 21:45:42 +01:00
# This file is part of Radicale - CalDAV and CardDAV server
2018-08-28 16:19:36 +02:00
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
2025-03-29 08:37:57 +01:00
# Copyright © 2017-2021 Unrud <unrud@outlook.com>
# Copyright © 2024-2024 Pieter Hijma <pieterhijma@users.noreply.github.com>
# Copyright © 2024-2024 Ray <ray@react0r.com>
# Copyright © 2024-2024 Georgiy <metallerok@gmail.com>
# Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
2018-08-28 16:19:36 +02:00
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import contextlib
2024-04-06 07:30:07 +02:00
import copy
2023-03-30 19:30:59 +03:00
import datetime
2019-06-15 09:01:55 +02:00
import posixpath
2018-08-28 16:19:36 +02:00
import socket
2020-10-04 10:14:57 +02:00
import xml.etree.ElementTree as ET
2018-08-28 16:19:36 +02:00
from http import client
2024-11-07 11:03:24 +01:00
from typing import (Callable, Iterable, Iterator, List, Optional, Sequence,
Tuple, Union)
2018-08-28 16:19:36 +02:00
from urllib.parse import unquote, urlparse
2024-08-14 11:15:30 -06:00
import vobject
2024-04-06 11:02:40 +03:00
import vobject.base
2024-04-06 07:30:07 +02:00
from vobject.base import ContentLine
2021-07-26 20:56:46 +02:00
import radicale.item as radicale_item
2024-08-14 11:15:30 -06:00
from radicale import httputils, pathutils, storage, types, xmlutils
2021-07-26 20:56:46 +02:00
from radicale.app.base import Access, ApplicationBase
2018-08-28 16:19:36 +02:00
from radicale.item import filter as radicale_filter
from radicale.log import logger
2024-08-14 11:15:30 -06:00
2023-10-06 13:15:45 -06:00
def free_busy_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
2023-10-11 12:09:11 -06:00
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None],
max_occurrence: int
2024-08-14 11:15:30 -06:00
) -> Tuple[int, Union[ET.Element, str]]:
# NOTE: this function returns both an Element and a string because
# free-busy reports are an edge-case on the return type according
# to the spec.
2023-10-06 13:15:45 -06:00
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
if xml_request is None:
return client.MULTI_STATUS, multistatus
root = xml_request
if (root.tag == xmlutils.make_clark("C:free-busy-query") and
collection.tag != "VCALENDAR"):
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
time_range_element = root.find(xmlutils.make_clark("C:time-range"))
2024-08-14 11:15:30 -06:00
assert isinstance(time_range_element, ET.Element)
2023-10-11 12:09:11 -06:00
# Build a single filter from the free busy query for retrieval
# TODO: filter for VFREEBUSY in additional to VEVENT but
# test_filter doesn't support that yet.
vevent_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
2024-08-14 11:15:30 -06:00
attrib={'name': 'VEVENT'})
2023-10-11 12:09:11 -06:00
vevent_cf_element.append(time_range_element)
vcalendar_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
2024-08-14 11:15:30 -06:00
attrib={'name': 'VCALENDAR'})
2023-10-11 12:09:11 -06:00
vcalendar_cf_element.append(vevent_cf_element)
filter_element = ET.Element(xmlutils.make_clark("C:filter"))
filter_element.append(vcalendar_cf_element)
filters = (filter_element,)
# First pull from storage
retrieved_items = list(collection.get_filtered(filters))
# !!! Don't access storage after this !!!
unlock_storage_fn()
2023-10-06 13:15:45 -06:00
cal = vobject.iCalendar()
2023-10-11 12:09:11 -06:00
collection_tag = collection.tag
while retrieved_items:
# Second filtering before evaluating occurrences.
# ``item.vobject_item`` might be accessed during filtering.
# Don't keep reference to ``item``, because VObject requires a lot of
# memory.
item, filter_matched = retrieved_items.pop(0)
if not filter_matched:
try:
if not test_filter(collection_tag, item, filter_element):
continue
except ValueError as e:
raise ValueError("Failed to free-busy filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
except Exception as e:
raise RuntimeError("Failed to free-busy filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
fbtype = None
if item.component_name == 'VEVENT':
2023-12-09 18:22:03 -07:00
transp = getattr(item.vobject_item.vevent, 'transp', None)
2023-10-11 12:09:11 -06:00
if transp and transp.value != 'OPAQUE':
continue
2023-12-09 18:22:03 -07:00
status = getattr(item.vobject_item.vevent, 'status', None)
2023-10-11 12:09:11 -06:00
if not status or status.value == 'CONFIRMED':
fbtype = 'BUSY'
elif status.value == 'CANCELLED':
fbtype = 'FREE'
elif status.value == 'TENTATIVE':
fbtype = 'BUSY-TENTATIVE'
else:
# Could do fbtype = status.value for x-name, I prefer this
fbtype = 'BUSY'
# TODO: coalesce overlapping periods
if max_occurrence > 0:
n_occurrences = max_occurrence+1
else:
n_occurrences = 0
2023-10-11 12:09:11 -06:00
occurrences = radicale_filter.time_range_fill(item.vobject_item,
time_range_element,
"VEVENT",
n=n_occurrences)
if len(occurrences) >= max_occurrence:
raise ValueError("FREEBUSY occurrences limit of {} hit"
.format(max_occurrence))
2023-10-06 13:15:45 -06:00
for occurrence in occurrences:
vfb = cal.add('vfreebusy')
vfb.add('dtstamp').value = item.vobject_item.vevent.dtstamp.value
vfb.add('dtstart').value, vfb.add('dtend').value = occurrence
2023-10-11 12:09:11 -06:00
if fbtype:
vfb.add('fbtype').value = fbtype
2023-10-06 13:15:45 -06:00
return (client.OK, cal.serialize())
2018-08-28 16:19:36 +02:00
2021-07-26 20:56:46 +02:00
def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None]
) -> Tuple[int, ET.Element]:
2023-10-06 13:15:45 -06:00
"""Read and answer REPORT requests that return XML.
2018-08-28 16:19:36 +02:00
Read rfc3253-3.6 for info.
"""
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
2018-08-28 16:19:36 +02:00
if xml_request is None:
return client.MULTI_STATUS, multistatus
root = xml_request
2021-07-26 20:56:46 +02:00
if root.tag in (xmlutils.make_clark("D:principal-search-property-set"),
xmlutils.make_clark("D:principal-property-search"),
xmlutils.make_clark("D:expand-property")):
2018-08-28 16:19:36 +02:00
# We don't support searching for principals or indirect retrieving of
# properties, just return an empty result.
# InfCloud asks for expand-property reports (even if we don't announce
# support for them) and stops working if an error code is returned.
logger.warning("Unsupported REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
2018-08-28 16:19:36 +02:00
return client.MULTI_STATUS, multistatus
if (root.tag == xmlutils.make_clark("C:calendar-multiget") and
2021-07-26 20:56:46 +02:00
collection.tag != "VCALENDAR" or
root.tag == xmlutils.make_clark("CR:addressbook-multiget") and
2021-07-26 20:56:46 +02:00
collection.tag != "VADDRESSBOOK" or
root.tag == xmlutils.make_clark("D:sync-collection") and
2021-07-26 20:56:46 +02:00
collection.tag not in ("VADDRESSBOOK", "VCALENDAR")):
2018-08-28 16:19:36 +02:00
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
2021-07-26 20:56:46 +02:00
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
2023-03-30 19:30:59 +03:00
props: Union[ET.Element, List]
if root.find(xmlutils.make_clark("D:prop")) is not None:
props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
else:
props = []
2018-08-28 16:19:36 +02:00
2021-07-26 20:56:46 +02:00
hreferences: Iterable[str]
2018-08-28 16:19:36 +02:00
if root.tag in (
xmlutils.make_clark("C:calendar-multiget"),
xmlutils.make_clark("CR:addressbook-multiget")):
2018-08-28 16:19:36 +02:00
# Read rfc4791-7.9 for info
hreferences = set()
for href_element in root.findall(xmlutils.make_clark("D:href")):
2021-07-26 20:56:46 +02:00
temp_url_path = urlparse(href_element.text).path
assert isinstance(temp_url_path, str)
href_path = pathutils.sanitize_path(unquote(temp_url_path))
2018-08-28 16:19:36 +02:00
if (href_path + "/").startswith(base_prefix + "/"):
hreferences.add(href_path[len(base_prefix):])
else:
logger.warning("Skipping invalid path %r in REPORT request on "
"%r", href_path, path)
elif root.tag == xmlutils.make_clark("D:sync-collection"):
2018-08-28 16:19:36 +02:00
old_sync_token_element = root.find(
xmlutils.make_clark("D:sync-token"))
2018-08-28 16:19:36 +02:00
old_sync_token = ""
if old_sync_token_element is not None and old_sync_token_element.text:
old_sync_token = old_sync_token_element.text.strip()
logger.debug("Client provided sync token: %r", old_sync_token)
try:
sync_token, names = collection.sync(old_sync_token)
except ValueError as e:
# Invalid sync token
logger.warning("Client provided invalid sync token %r: %s",
old_sync_token, e, exc_info=True)
# client.CONFLICT doesn't work with some clients (e.g. InfCloud)
return (client.FORBIDDEN,
xmlutils.webdav_error("D:valid-sync-token"))
2018-08-28 16:19:50 +02:00
hreferences = (pathutils.unstrip_path(
posixpath.join(collection.path, n)) for n in names)
2018-08-28 16:19:36 +02:00
# Append current sync token to response
sync_token_element = ET.Element(xmlutils.make_clark("D:sync-token"))
2018-08-28 16:19:36 +02:00
sync_token_element.text = sync_token
multistatus.append(sync_token_element)
else:
hreferences = (path,)
filters = (
2020-09-26 22:08:21 +02:00
root.findall(xmlutils.make_clark("C:filter")) +
root.findall(xmlutils.make_clark("CR:filter")))
2018-08-28 16:19:36 +02:00
# Retrieve everything required for finishing the request.
2021-07-26 20:56:46 +02:00
retrieved_items = list(retrieve_items(
base_prefix, path, collection, hreferences, filters, multistatus))
collection_tag = collection.tag
# !!! Don't access storage after this !!!
2018-08-28 16:19:36 +02:00
unlock_storage_fn()
while retrieved_items:
# ``item.vobject_item`` might be accessed during filtering.
# Don't keep reference to ``item``, because VObject requires a lot of
# memory.
item, filters_matched = retrieved_items.pop(0)
if filters and not filters_matched:
try:
2021-07-26 20:56:46 +02:00
if not all(test_filter(collection_tag, item, filter_)
for filter_ in filters):
2018-08-28 16:19:36 +02:00
continue
except ValueError as e:
raise ValueError("Failed to filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
except Exception as e:
raise RuntimeError("Failed to filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
found_props = []
not_found_props = []
2023-03-30 19:30:59 +03:00
for prop in props:
element = ET.Element(prop.tag)
if prop.tag == xmlutils.make_clark("D:getetag"):
2018-08-28 16:19:36 +02:00
element.text = item.etag
found_props.append(element)
2023-03-30 19:30:59 +03:00
elif prop.tag == xmlutils.make_clark("D:getcontenttype"):
element.text = xmlutils.get_content_type(item, encoding)
2018-08-28 16:19:36 +02:00
found_props.append(element)
2023-03-30 19:30:59 +03:00
elif prop.tag in (
xmlutils.make_clark("C:calendar-data"),
xmlutils.make_clark("CR:address-data")):
2018-08-28 16:19:36 +02:00
element.text = item.serialize()
2023-03-30 19:30:59 +03:00
expand = prop.find(xmlutils.make_clark("C:expand"))
if expand is not None and item.component_name == 'VEVENT':
2023-03-30 19:30:59 +03:00
start = expand.get('start')
end = expand.get('end')
if (start is None) or (end is None):
return client.FORBIDDEN, \
xmlutils.webdav_error("C:expand")
start = datetime.datetime.strptime(
start, '%Y%m%dT%H%M%SZ'
).replace(tzinfo=datetime.timezone.utc)
end = datetime.datetime.strptime(
end, '%Y%m%dT%H%M%SZ'
).replace(tzinfo=datetime.timezone.utc)
expanded_element = _expand(
2023-03-30 19:30:59 +03:00
element, copy.copy(item), start, end)
found_props.append(expanded_element)
2023-03-30 19:30:59 +03:00
else:
found_props.append(element)
2018-08-28 16:19:36 +02:00
else:
not_found_props.append(element)
2021-07-26 20:56:46 +02:00
assert item.href
2018-08-28 16:19:50 +02:00
uri = pathutils.unstrip_path(
posixpath.join(collection.path, item.href))
2018-08-28 16:19:36 +02:00
multistatus.append(xml_item_response(
base_prefix, uri, found_props=found_props,
not_found_props=not_found_props, found_item=True))
return client.MULTI_STATUS, multistatus
2023-03-30 19:30:59 +03:00
def _expand(
element: ET.Element,
item: radicale_item.Item,
start: datetime.datetime,
end: datetime.datetime,
) -> ET.Element:
2024-11-07 11:03:24 +01:00
vevent_component: vobject.base.Component = copy.copy(item.vobject_item)
# Split the vevents included in the component into one that contains the
# recurrence information and others that contain a recurrence id to
# override instances.
vevent_recurrence, vevents_overridden = _split_overridden_vevents(vevent_component)
dt_format = '%Y%m%dT%H%M%SZ'
2024-11-07 11:03:24 +01:00
all_day_event = False
2024-11-07 11:03:24 +01:00
if type(vevent_recurrence.dtstart.value) is datetime.date:
# If an event comes to us with a dtstart specified as a date
# then in the response we return the date, not datetime
dt_format = '%Y%m%d'
2024-11-07 11:03:24 +01:00
all_day_event = True
# In case of dates, we need to remove timezone information since
# rruleset.between computes with datetimes without timezone information
start = start.replace(tzinfo=None)
end = end.replace(tzinfo=None)
for vevent in vevents_overridden:
_strip_single_event(vevent, dt_format)
2024-11-02 21:09:01 +01:00
duration = None
2024-11-07 11:03:24 +01:00
if hasattr(vevent_recurrence, "dtend"):
duration = vevent_recurrence.dtend.value - vevent_recurrence.dtstart.value
2024-11-02 21:09:01 +01:00
2024-11-07 11:03:24 +01:00
rruleset = None
if hasattr(vevent_recurrence, 'rrule'):
rruleset = vevent_recurrence.getrruleset()
2023-03-30 19:30:59 +03:00
if rruleset:
2024-11-07 11:03:24 +01:00
# This function uses datetimes internally without timezone info for dates
recurrences = rruleset.between(start, end, inc=True)
2023-03-30 23:13:00 +03:00
2024-11-07 11:03:24 +01:00
_strip_component(vevent_component)
_strip_single_event(vevent_recurrence, dt_format)
2024-11-05 11:33:27 +01:00
2024-11-07 11:03:24 +01:00
is_component_filled: bool = False
2024-11-05 11:33:27 +01:00
i_overridden = 0
2023-03-30 19:30:59 +03:00
2024-04-06 11:02:40 +03:00
for recurrence_dt in recurrences:
2023-03-30 23:13:00 +03:00
recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc)
2024-11-05 11:33:27 +01:00
i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format)
2023-03-30 19:30:59 +03:00
2024-11-05 11:33:27 +01:00
if not vevent:
2024-11-07 11:03:24 +01:00
# We did not find an overridden instance, so create a new one
2024-11-05 11:33:27 +01:00
vevent = copy.deepcopy(vevent_recurrence)
2024-11-07 11:03:24 +01:00
# For all day events, the system timezone may influence the
# results, so use recurrence_dt
recurrence_id = recurrence_dt if all_day_event else recurrence_utc
2024-11-05 11:33:27 +01:00
vevent.recurrence_id = ContentLine(
name='RECURRENCE-ID',
2024-11-07 11:03:24 +01:00
value=recurrence_id, params={}
2024-11-02 21:09:01 +01:00
)
2024-11-07 11:03:24 +01:00
_convert_to_utc(vevent, 'recurrence_id', dt_format)
2024-11-05 11:33:27 +01:00
vevent.dtstart = ContentLine(
name='DTSTART',
2024-11-07 11:03:24 +01:00
value=recurrence_id.strftime(dt_format), params={}
2024-11-05 11:33:27 +01:00
)
if duration:
vevent.dtend = ContentLine(
name='DTEND',
2024-11-07 11:03:24 +01:00
value=(recurrence_id + duration).strftime(dt_format), params={}
2024-11-05 11:33:27 +01:00
)
2023-03-30 19:30:59 +03:00
2024-11-07 11:03:24 +01:00
if not is_component_filled:
vevent_component.vevent = vevent
is_component_filled = True
else:
2024-11-07 11:03:24 +01:00
vevent_component.add(vevent)
2023-03-30 19:30:59 +03:00
2024-11-07 11:03:24 +01:00
element.text = vevent_component.serialize()
return element
2023-03-30 19:30:59 +03:00
2024-11-05 11:33:27 +01:00
def _convert_timezone(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
name_content_line: str):
prop = getattr(vevent, name_prop, None)
if prop:
if type(prop.value) is datetime.date:
date_time = datetime.datetime.fromordinal(
prop.value.toordinal()
).replace(tzinfo=datetime.timezone.utc)
else:
date_time = prop.value.astimezone(datetime.timezone.utc)
setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[]))
def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
dt_format: str):
prop = getattr(vevent, name_prop, None)
if prop:
setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[]))
2024-11-07 11:03:24 +01:00
def _strip_single_event(vevent: vobject.icalendar.RecurringComponent, dt_format: str) -> None:
_convert_timezone(vevent, 'dtstart', 'DTSTART')
_convert_timezone(vevent, 'dtend', 'DTEND')
_convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID')
2024-11-07 11:03:24 +01:00
# There is something strange behaviour during serialization native datetime, so converting manually
_convert_to_utc(vevent, 'dtstart', dt_format)
_convert_to_utc(vevent, 'dtend', dt_format)
_convert_to_utc(vevent, 'recurrence_id', dt_format)
2024-11-07 11:03:24 +01:00
try:
delattr(vevent, 'rrule')
delattr(vevent, 'exdate')
delattr(vevent, 'exrule')
delattr(vevent, 'rdate')
except AttributeError:
pass
2024-11-07 11:03:24 +01:00
def _strip_component(vevent: vobject.base.Component) -> None:
timezones_to_remove = []
for component in vevent.components():
if component.name == 'VTIMEZONE':
timezones_to_remove.append(component)
2024-11-05 11:33:27 +01:00
2024-11-07 11:03:24 +01:00
for timezone in timezones_to_remove:
vevent.remove(timezone)
2023-03-30 23:13:00 +03:00
2024-11-05 11:33:27 +01:00
def _split_overridden_vevents(
component: vobject.base.Component,
) -> Tuple[
vobject.icalendar.RecurringComponent,
List[vobject.icalendar.RecurringComponent]
]:
vevent_recurrence = None
vevents_overridden = []
for vevent in component.vevent_list:
if hasattr(vevent, 'recurrence_id'):
vevents_overridden += [vevent]
elif vevent_recurrence:
raise ValueError(
f"component with UID {vevent.uid} "
2024-11-07 11:03:24 +01:00
f"has more than one vevent with recurrence information"
2024-11-05 11:33:27 +01:00
)
else:
vevent_recurrence = vevent
if vevent_recurrence:
return (
vevent_recurrence, sorted(
vevents_overridden,
2024-11-07 11:03:24 +01:00
key=lambda vevent: vevent.recurrence_id.value
2024-11-05 11:33:27 +01:00
)
)
else:
raise ValueError(
f"component with UID {vevent.uid} "
f"does not have a vevent without a recurrence_id"
)
def _find_overridden(
start: int,
vevents: List[vobject.icalendar.RecurringComponent],
dt: datetime.datetime,
dt_format: str
) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]:
for i in range(start, len(vevents)):
dt_event = datetime.datetime.strptime(
vevents[i].recurrence_id.value,
dt_format
).replace(tzinfo=datetime.timezone.utc)
if dt_event == dt:
return (i + 1, vevents[i])
return (start, None)
2021-07-26 20:56:46 +02:00
def xml_item_response(base_prefix: str, href: str,
found_props: Sequence[ET.Element] = (),
not_found_props: Sequence[ET.Element] = (),
found_item: bool = True) -> ET.Element:
response = ET.Element(xmlutils.make_clark("D:response"))
2018-08-28 16:19:36 +02:00
2020-09-26 22:08:21 +02:00
href_element = ET.Element(xmlutils.make_clark("D:href"))
href_element.text = xmlutils.make_href(base_prefix, href)
response.append(href_element)
2018-08-28 16:19:36 +02:00
if found_item:
for code, props in ((200, found_props), (404, not_found_props)):
if props:
propstat = ET.Element(xmlutils.make_clark("D:propstat"))
status = ET.Element(xmlutils.make_clark("D:status"))
2018-08-28 16:19:36 +02:00
status.text = xmlutils.make_response(code)
2020-09-26 22:08:21 +02:00
prop_element = ET.Element(xmlutils.make_clark("D:prop"))
2018-08-28 16:19:36 +02:00
for prop in props:
2020-09-26 22:08:21 +02:00
prop_element.append(prop)
propstat.append(prop_element)
2018-08-28 16:19:36 +02:00
propstat.append(status)
response.append(propstat)
else:
status = ET.Element(xmlutils.make_clark("D:status"))
2018-08-28 16:19:36 +02:00
status.text = xmlutils.make_response(404)
response.append(status)
return response
2021-07-26 20:56:46 +02:00
def retrieve_items(
base_prefix: str, path: str, collection: storage.BaseCollection,
hreferences: Iterable[str], filters: Sequence[ET.Element],
multistatus: ET.Element) -> Iterator[Tuple[radicale_item.Item, bool]]:
"""Retrieves all items that are referenced in ``hreferences`` from
``collection`` and adds 404 responses for missing and invalid items
to ``multistatus``."""
collection_requested = False
def get_names() -> Iterator[str]:
"""Extracts all names from references in ``hreferences`` and adds
404 responses for invalid references to ``multistatus``.
If the whole collections is referenced ``collection_requested``
gets set to ``True``."""
nonlocal collection_requested
for hreference in hreferences:
try:
name = pathutils.name_from_path(hreference, collection)
except ValueError as e:
logger.warning("Skipping invalid path %r in REPORT request on "
"%r: %s", hreference, path, e)
response = xml_item_response(base_prefix, hreference,
found_item=False)
multistatus.append(response)
continue
if name:
# Reference is an item
yield name
else:
# Reference is a collection
collection_requested = True
for name, item in collection.get_multi(get_names()):
if not item:
uri = pathutils.unstrip_path(posixpath.join(collection.path, name))
response = xml_item_response(base_prefix, uri, found_item=False)
multistatus.append(response)
else:
yield item, False
if collection_requested:
yield from collection.get_filtered(filters)
def test_filter(collection_tag: str, item: radicale_item.Item,
filter_: ET.Element) -> bool:
"""Match an item against a filter."""
if (collection_tag == "VCALENDAR" and
filter_.tag != xmlutils.make_clark("C:%s" % filter_)):
if len(filter_) == 0:
return True
if len(filter_) > 1:
raise ValueError("Filter with %d children" % len(filter_))
if filter_[0].tag != xmlutils.make_clark("C:comp-filter"):
raise ValueError("Unexpected %r in filter" % filter_[0].tag)
return radicale_filter.comp_match(item, filter_[0])
if (collection_tag == "VADDRESSBOOK" and
filter_.tag != xmlutils.make_clark("CR:%s" % filter_)):
for child in filter_:
if child.tag != xmlutils.make_clark("CR:prop-filter"):
raise ValueError("Unexpected %r in filter" % child.tag)
test = filter_.get("test", "anyof")
if test == "anyof":
return any(radicale_filter.prop_match(item.vobject_item, f, "CR")
for f in filter_)
if test == "allof":
return all(radicale_filter.prop_match(item.vobject_item, f, "CR")
for f in filter_)
raise ValueError("Unsupported filter test: %r" % test)
raise ValueError("Unsupported filter %r for %r" %
(filter_.tag, collection_tag))
class ApplicationPartReport(ApplicationBase):
def do_REPORT(self, environ: types.WSGIEnviron, base_prefix: str,
path: str, user: str) -> types.WSGIResponse:
2018-08-28 16:19:36 +02:00
"""Manage REPORT request."""
2021-07-26 20:56:46 +02:00
access = Access(self._rights, user, path)
2020-04-22 19:20:07 +02:00
if not access.check("r"):
2018-08-28 16:19:36 +02:00
return httputils.NOT_ALLOWED
try:
2020-09-14 21:19:48 +02:00
xml_content = self._read_xml_request_body(environ)
2018-08-28 16:19:36 +02:00
except RuntimeError as e:
2021-07-26 20:56:46 +02:00
logger.warning("Bad REPORT request on %r: %s", path, e,
exc_info=True)
2018-08-28 16:19:36 +02:00
return httputils.BAD_REQUEST
2018-11-04 18:54:11 +00:00
except socket.timeout:
logger.debug("Client timed out", exc_info=True)
2018-08-28 16:19:36 +02:00
return httputils.REQUEST_TIMEOUT
with contextlib.ExitStack() as lock_stack:
lock_stack.enter_context(self._storage.acquire_lock("r", user))
2021-07-26 20:56:46 +02:00
item = next(iter(self._storage.discover(path)), None)
2018-08-28 16:19:36 +02:00
if not item:
return httputils.NOT_FOUND
2020-04-22 19:20:07 +02:00
if not access.check("r", item):
2018-08-28 16:19:36 +02:00
return httputils.NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
collection = item
else:
2021-07-26 20:56:46 +02:00
assert item.collection is not None
2018-08-28 16:19:36 +02:00
collection = item.collection
2023-10-06 13:15:45 -06:00
if xml_content is not None and \
xml_content.tag == xmlutils.make_clark("C:free-busy-query"):
2023-10-11 12:09:11 -06:00
max_occurrence = self.configuration.get("reporting", "max_freebusy_occurrence")
2023-10-06 13:15:45 -06:00
try:
status, body = free_busy_report(
base_prefix, path, xml_content, collection, self._encoding,
2023-10-11 12:09:11 -06:00
lock_stack.close, max_occurrence)
2023-10-06 13:15:45 -06:00
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/calendar; charset=%s" % self._encoding}
2024-08-14 11:15:30 -06:00
return status, headers, str(body)
2023-10-06 13:15:45 -06:00
else:
try:
status, xml_answer = xml_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return status, headers, self._xml_response(xml_answer)