1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-04 18:22:26 +00:00
Radicale/radicale/app/report.py
2025-07-21 19:43:38 +02:00

810 lines
35 KiB
Python

# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2021 Unrud <unrud@outlook.com>
# Copyright © 2024-2024 Pieter Hijma <pieterhijma@users.noreply.github.com>
# Copyright © 2024-2024 Ray <ray@react0r.com>
# Copyright © 2024-2025 Georgiy <metallerok@gmail.com>
# Copyright © 2024-2025 Peter Bieringer <pb@bieringer.de>
# Copyright © 2025-2025 David Greaves <david@dgreaves.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import copy
import datetime
import posixpath
import socket
import xml.etree.ElementTree as ET
from http import client
from typing import (Callable, Iterable, Iterator, List, Optional, Sequence,
Tuple, Union)
from urllib.parse import unquote, urlparse
import vobject
import vobject.base
from vobject.base import ContentLine
import radicale.item as radicale_item
from radicale import httputils, pathutils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.item import filter as radicale_filter
from radicale.log import logger
DT_FORMAT_TIMESTAMP: str = '%Y%m%dT%H%M%SZ'
DT_FORMAT_DATE: str = '%Y%m%d'
def free_busy_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None],
max_occurrence: int
) -> Tuple[int, Union[ET.Element, str]]:
# NOTE: this function returns both an Element and a string because
# free-busy reports are an edge-case on the return type according
# to the spec.
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
if xml_request is None:
return client.MULTI_STATUS, multistatus
root = xml_request
if (root.tag == xmlutils.make_clark("C:free-busy-query") and
collection.tag != "VCALENDAR"):
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
time_range_element = root.find(xmlutils.make_clark("C:time-range"))
assert isinstance(time_range_element, ET.Element)
# Build a single filter from the free busy query for retrieval
# TODO: filter for VFREEBUSY in additional to VEVENT but
# test_filter doesn't support that yet.
vevent_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
attrib={'name': 'VEVENT'})
vevent_cf_element.append(time_range_element)
vcalendar_cf_element = ET.Element(xmlutils.make_clark("C:comp-filter"),
attrib={'name': 'VCALENDAR'})
vcalendar_cf_element.append(vevent_cf_element)
filter_element = ET.Element(xmlutils.make_clark("C:filter"))
filter_element.append(vcalendar_cf_element)
filters = (filter_element,)
# First pull from storage
retrieved_items = list(collection.get_filtered(filters))
# !!! Don't access storage after this !!!
unlock_storage_fn()
cal = vobject.iCalendar()
collection_tag = collection.tag
while retrieved_items:
# Second filtering before evaluating occurrences.
# ``item.vobject_item`` might be accessed during filtering.
# Don't keep reference to ``item``, because VObject requires a lot of
# memory.
item, filter_matched = retrieved_items.pop(0)
if not filter_matched:
try:
if not test_filter(collection_tag, item, filter_element):
continue
except ValueError as e:
raise ValueError("Failed to free-busy filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
except Exception as e:
raise RuntimeError("Failed to free-busy filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
fbtype = None
if item.component_name == 'VEVENT':
transp = getattr(item.vobject_item.vevent, 'transp', None)
if transp and transp.value != 'OPAQUE':
continue
status = getattr(item.vobject_item.vevent, 'status', None)
if not status or status.value == 'CONFIRMED':
fbtype = 'BUSY'
elif status.value == 'CANCELLED':
fbtype = 'FREE'
elif status.value == 'TENTATIVE':
fbtype = 'BUSY-TENTATIVE'
else:
# Could do fbtype = status.value for x-name, I prefer this
fbtype = 'BUSY'
# TODO: coalesce overlapping periods
if max_occurrence > 0:
n_occurrences = max_occurrence+1
else:
n_occurrences = 0
occurrences = radicale_filter.time_range_fill(item.vobject_item,
time_range_element,
"VEVENT",
n=n_occurrences)
if len(occurrences) >= max_occurrence:
raise ValueError("FREEBUSY occurrences limit of {} hit"
.format(max_occurrence))
for occurrence in occurrences:
vfb = cal.add('vfreebusy')
vfb.add('dtstamp').value = item.vobject_item.vevent.dtstamp.value
vfb.add('dtstart').value, vfb.add('dtend').value = occurrence
if fbtype:
vfb.add('fbtype').value = fbtype
return (client.OK, cal.serialize())
def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None],
max_occurrence: int = 0,
) -> Tuple[int, ET.Element]:
"""Read and answer REPORT requests that return XML.
Read rfc3253-3.6 for info.
"""
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
if xml_request is None:
return client.MULTI_STATUS, multistatus
root = xml_request
if root.tag in (xmlutils.make_clark("D:principal-search-property-set"),
xmlutils.make_clark("D:principal-property-search"),
xmlutils.make_clark("D:expand-property")):
# We don't support searching for principals or indirect retrieving of
# properties, just return an empty result.
# InfCloud asks for expand-property reports (even if we don't announce
# support for them) and stops working if an error code is returned.
logger.warning("Unsupported REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.MULTI_STATUS, multistatus
if (root.tag == xmlutils.make_clark("C:calendar-multiget") and
collection.tag != "VCALENDAR" or
root.tag == xmlutils.make_clark("CR:addressbook-multiget") and
collection.tag != "VADDRESSBOOK" or
root.tag == xmlutils.make_clark("D:sync-collection") and
collection.tag not in ("VADDRESSBOOK", "VCALENDAR")):
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
props: Union[ET.Element, List]
if root.find(xmlutils.make_clark("D:prop")) is not None:
props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
else:
props = []
hreferences: Iterable[str]
if root.tag in (
xmlutils.make_clark("C:calendar-multiget"),
xmlutils.make_clark("CR:addressbook-multiget")):
# Read rfc4791-7.9 for info
hreferences = set()
for href_element in root.findall(xmlutils.make_clark("D:href")):
temp_url_path = urlparse(href_element.text).path
assert isinstance(temp_url_path, str)
href_path = pathutils.sanitize_path(unquote(temp_url_path))
if (href_path + "/").startswith(base_prefix + "/"):
hreferences.add(href_path[len(base_prefix):])
else:
logger.warning("Skipping invalid path %r in REPORT request on "
"%r", href_path, path)
elif root.tag == xmlutils.make_clark("D:sync-collection"):
old_sync_token_element = root.find(
xmlutils.make_clark("D:sync-token"))
old_sync_token = ""
if old_sync_token_element is not None and old_sync_token_element.text:
old_sync_token = old_sync_token_element.text.strip()
logger.debug("Client provided sync token: %r", old_sync_token)
try:
sync_token, names = collection.sync(old_sync_token)
except ValueError as e:
# Invalid sync token
logger.warning("Client provided invalid sync token %r: %s",
old_sync_token, e, exc_info=True)
# client.CONFLICT doesn't work with some clients (e.g. InfCloud)
return (client.FORBIDDEN,
xmlutils.webdav_error("D:valid-sync-token"))
hreferences = (pathutils.unstrip_path(
posixpath.join(collection.path, n)) for n in names)
# Append current sync token to response
sync_token_element = ET.Element(xmlutils.make_clark("D:sync-token"))
sync_token_element.text = sync_token
multistatus.append(sync_token_element)
else:
hreferences = (path,)
filters = (
root.findall(xmlutils.make_clark("C:filter")) +
root.findall(xmlutils.make_clark("CR:filter")))
expand = root.find(".//" + xmlutils.make_clark("C:expand"))
# if we have expand prop we use "filter (except time range) -> expand -> filter (only time range)" approach
time_range_element = None
main_filters = []
for filter_ in filters:
# extract time-range filter for processing after main filters
# for expand request
filter_copy = copy.deepcopy(filter_)
if expand is not None:
for comp_filter in filter_copy.findall(".//" + xmlutils.make_clark("C:comp-filter")):
if comp_filter.get("name", "").upper() == "VCALENDAR":
continue
time_range_element = comp_filter.find(xmlutils.make_clark("C:time-range"))
if time_range_element is not None:
comp_filter.remove(time_range_element)
main_filters.append(filter_copy)
# Retrieve everything required for finishing the request.
retrieved_items = list(retrieve_items(
base_prefix, path, collection, hreferences, main_filters, multistatus))
collection_tag = collection.tag
# !!! Don't access storage after this !!!
unlock_storage_fn()
n_vevents = 0
while retrieved_items:
# ``item.vobject_item`` might be accessed during filtering.
# Don't keep reference to ``item``, because VObject requires a lot of
# memory.
item, filters_matched = retrieved_items.pop(0)
if filters and not filters_matched:
try:
if not all(test_filter(collection_tag, item, filter_)
for filter_ in main_filters):
continue
except ValueError as e:
raise ValueError("Failed to filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
except Exception as e:
raise RuntimeError("Failed to filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
found_props = []
not_found_props = []
for prop in props:
element = ET.Element(prop.tag)
if prop.tag == xmlutils.make_clark("D:getetag"):
element.text = item.etag
found_props.append(element)
elif prop.tag == xmlutils.make_clark("D:getcontenttype"):
element.text = xmlutils.get_content_type(item, encoding)
found_props.append(element)
elif prop.tag in (
xmlutils.make_clark("C:calendar-data"),
xmlutils.make_clark("CR:address-data")):
element.text = item.serialize()
if (expand is not None) and item.component_name == 'VEVENT':
starts = expand.get('start')
ends = expand.get('end')
if (starts is None) or (ends is None):
return client.FORBIDDEN, \
xmlutils.webdav_error("C:expand")
start = datetime.datetime.strptime(
starts, DT_FORMAT_TIMESTAMP
).replace(tzinfo=datetime.timezone.utc)
end = datetime.datetime.strptime(
ends, DT_FORMAT_TIMESTAMP
).replace(tzinfo=datetime.timezone.utc)
time_range_start = None
time_range_end = None
if time_range_element is not None:
time_range_start, time_range_end = radicale_filter.parse_time_range(time_range_element)
(expanded_element, n_vev) = _expand(
element=element, item=copy.copy(item),
start=start, end=end,
time_range_start=time_range_start, time_range_end=time_range_end,
max_occurrence=max_occurrence,
)
if n_vev == 0:
logger.debug("No VEVENTs found after expansion for %r, skipping", item.href)
continue
n_vevents += n_vev
found_props.append(expanded_element)
else:
found_props.append(element)
if hasattr(item.vobject_item, "vevent_list"):
n_vevents += len(item.vobject_item.vevent_list)
# Avoid DoS with too many events
if max_occurrence and n_vevents > max_occurrence:
raise ValueError("REPORT occurrences limit of {} hit"
.format(max_occurrence))
else:
not_found_props.append(element)
assert item.href
uri = pathutils.unstrip_path(
posixpath.join(collection.path, item.href))
if found_props or not_found_props:
multistatus.append(xml_item_response(
base_prefix, uri, found_props=found_props,
not_found_props=not_found_props, found_item=True))
return client.MULTI_STATUS, multistatus
def _expand(
element: ET.Element,
item: radicale_item.Item,
start: datetime.datetime,
end: datetime.datetime,
time_range_start: Optional[datetime.datetime] = None,
time_range_end: Optional[datetime.datetime] = None,
max_occurrence: int = 0,
) -> Tuple[ET.Element, int]:
vevent_component: vobject.base.Component = copy.copy(item.vobject_item)
logger.info("Expanding event %s", item.href)
logger.debug(f"Expand range: {start} to {end}")
logger.debug(f"Time range: {time_range_start} to {time_range_end}")
# Split the vevents included in the component into one that contains the
# recurrence information and others that contain a recurrence id to
# override instances.
vevent_recurrence, vevents_overridden = _split_overridden_vevents(vevent_component)
dt_format = DT_FORMAT_TIMESTAMP
all_day_event = False
if type(vevent_recurrence.dtstart.value) is datetime.date:
# If an event comes to us with a dtstart specified as a date
# then in the response we return the date, not datetime
dt_format = DT_FORMAT_DATE
all_day_event = True
# In case of dates, we need to remove timezone information since
# rruleset.between computes with datetimes without timezone information
start = start.replace(tzinfo=None)
end = end.replace(tzinfo=None)
if time_range_start is not None and time_range_end is not None:
time_range_start = time_range_start.replace(tzinfo=None)
time_range_end = time_range_end.replace(tzinfo=None)
for vevent in vevents_overridden:
_strip_single_event(vevent, dt_format)
duration = None
if hasattr(vevent_recurrence, "dtend"):
duration = vevent_recurrence.dtend.value - vevent_recurrence.dtstart.value
elif hasattr(vevent_recurrence, "duration"):
try:
duration = vevent_recurrence.duration.value
if duration.total_seconds() <= 0:
logger.warning("Invalid DURATION: %s", duration)
duration = None
except (AttributeError, TypeError) as e:
logger.warning("Failed to parse DURATION: %s", e)
duration = None
# Generate EXDATE to remove from expansion range
exdates_set: set[datetime.datetime] = set()
if hasattr(vevent_recurrence, 'exdate'):
exdates = vevent_recurrence.exdate.value
if not isinstance(exdates, list):
exdates = [exdates]
exdates_set = {
exdate.astimezone(datetime.timezone.utc) if isinstance(exdate, datetime.datetime)
else datetime.datetime.fromordinal(exdate.toordinal()).replace(tzinfo=None)
for exdate in exdates
}
logger.debug("EXDATE values: %s", exdates_set)
rruleset = None
if hasattr(vevent_recurrence, 'rrule'):
rruleset = vevent_recurrence.getrruleset()
filtered_vevents = []
if rruleset:
# This function uses datetimes internally without timezone info for dates
# A vobject rruleset is for the event dtstart.
# Expanded over a given time range this will not include
# events which started before the time range but are still
# ongoing at the start of the range
# To accomodate this, reduce the start time by the duration of
# the event. If this introduces an extra reccurence point then
# that event should be included as it is still ongoing. If no
# extra point is generated then it was a no-op.
rstart = start - duration if duration and duration.total_seconds() > 0 else start
recurrences = rruleset.between(rstart, end, inc=True, count=max_occurrence)
if max_occurrence and len(recurrences) >= max_occurrence:
# this shouldn't be > and if it's == then assume a limit
# was hit and ignore that maybe some would be filtered out
# by EXDATE etc. This is anti-DoS, not precise limits
raise ValueError("REPORT occurrences limit of {} hit"
.format(max_occurrence))
_strip_component(vevent_component)
_strip_single_event(vevent_recurrence, dt_format)
i_overridden = 0
for recurrence_dt in recurrences:
recurrence_utc = recurrence_dt if all_day_event else recurrence_dt.astimezone(datetime.timezone.utc)
logger.debug("Processing recurrence: %s (all_day_event: %s)", recurrence_utc, all_day_event)
# Apply time-range filter
if time_range_start is not None and time_range_end is not None:
dtstart = recurrence_utc
dtend = dtstart + duration if duration else dtstart
# Start includes the time, end does not
if not (dtstart <= time_range_end and dtend > time_range_start):
logger.debug("Recurrence %s filtered out by time-range", recurrence_utc)
continue
# Check exdate
if recurrence_utc in exdates_set:
logger.debug("Recurrence %s excluded by EXDATE", recurrence_utc)
continue
# Check for overridden instances
i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format)
if not vevent:
# Create new instance from recurrence
vevent = copy.deepcopy(vevent_recurrence)
# For all day events, the system timezone may influence the
# results, so use recurrence_dt
recurrence_id = recurrence_dt if all_day_event else recurrence_utc
logger.debug("Creating new VEVENT with RECURRENCE-ID: %s", recurrence_id)
vevent.recurrence_id = ContentLine(
name='RECURRENCE-ID',
value=recurrence_id, params={}
)
_convert_to_utc(vevent, 'recurrence_id', dt_format)
suffix = ''
if (dt_format == DT_FORMAT_DATE):
suffix = ';VALUE=DATE'
else:
suffix = ''
vevent.dtstart = ContentLine(
name='DTSTART' + suffix,
value=recurrence_id.strftime(dt_format), params={}
)
# if there is a DTEND, override it. Duration does not need changing
if hasattr(vevent, "dtend"):
vevent.dtend = ContentLine(
name='DTEND' + suffix,
value=(recurrence_id + duration).strftime(dt_format), params={}
)
filtered_vevents.append(vevent)
# Filter overridden and recurrence base events
if time_range_start is not None and time_range_end is not None:
for vevent in vevents_overridden:
dtstart = vevent.dtstart.value
# Handle string values for DTSTART/DTEND
if isinstance(dtstart, str):
try:
dtstart = datetime.datetime.strptime(dtstart, dt_format)
if all_day_event:
dtstart = dtstart.date()
except ValueError as e:
logger.warning("Invalid DTSTART format: %s, error: %s", dtstart, e)
continue
dtend = dtstart + duration if duration else dtstart
logger.debug(
"Filtering VEVENT with DTSTART: %s (type: %s), DTEND: %s (type: %s)",
dtstart, type(dtstart), dtend, type(dtend))
# Convert to datetime for comparison
if all_day_event and isinstance(dtstart, datetime.date) and not isinstance(dtstart, datetime.datetime):
dtstart = datetime.datetime.fromordinal(dtstart.toordinal()).replace(tzinfo=None)
dtend = datetime.datetime.fromordinal(dtend.toordinal()).replace(tzinfo=None)
elif not all_day_event and isinstance(dtstart, datetime.datetime) \
and isinstance(dtend, datetime.datetime):
dtstart = dtstart.replace(tzinfo=datetime.timezone.utc)
dtend = dtend.replace(tzinfo=datetime.timezone.utc)
else:
logger.warning("Unexpected DTSTART/DTEND type: dtstart=%s, dtend=%s", type(dtstart), type(dtend))
continue
if dtstart < time_range_end and dtend > time_range_start:
if vevent not in filtered_vevents: # Avoid duplicates
logger.debug("VEVENT passed time-range filter: %s", dtstart)
filtered_vevents.append(vevent)
else:
logger.debug("VEVENT filtered out: %s", dtstart)
# Rebuild component
if not filtered_vevents:
element.text = ""
return element, 0
else:
vevent_component.vevent_list = filtered_vevents
logger.debug("lbt: vevent_component %s", vevent_component)
element.text = vevent_component.serialize()
return element, len(filtered_vevents)
def _convert_timezone(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
name_content_line: str):
prop = getattr(vevent, name_prop, None)
if prop:
if type(prop.value) is datetime.date:
date_time = datetime.datetime.fromordinal(
prop.value.toordinal()
).replace(tzinfo=datetime.timezone.utc)
else:
date_time = prop.value.astimezone(datetime.timezone.utc)
setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[]))
def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
dt_format: str):
prop = getattr(vevent, name_prop, None)
if prop:
setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[]))
def _strip_single_event(vevent: vobject.icalendar.RecurringComponent, dt_format: str) -> None:
_convert_timezone(vevent, 'dtstart', 'DTSTART')
_convert_timezone(vevent, 'dtend', 'DTEND')
_convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID')
# There is something strange behaviour during serialization native datetime, so converting manually
_convert_to_utc(vevent, 'dtstart', dt_format)
_convert_to_utc(vevent, 'dtend', dt_format)
_convert_to_utc(vevent, 'recurrence_id', dt_format)
try:
delattr(vevent, 'rrule')
delattr(vevent, 'exdate')
delattr(vevent, 'exrule')
delattr(vevent, 'rdate')
except AttributeError:
pass
def _strip_component(vevent: vobject.base.Component) -> None:
timezones_to_remove = []
for component in vevent.components():
if component.name == 'VTIMEZONE':
timezones_to_remove.append(component)
for timezone in timezones_to_remove:
vevent.remove(timezone)
def _split_overridden_vevents(
component: vobject.base.Component,
) -> Tuple[
vobject.icalendar.RecurringComponent,
List[vobject.icalendar.RecurringComponent]
]:
vevent_recurrence = None
vevents_overridden = []
for vevent in component.vevent_list:
if hasattr(vevent, 'recurrence_id'):
vevents_overridden += [vevent]
elif vevent_recurrence:
raise ValueError(
f"component with UID {vevent.uid} "
f"has more than one vevent with recurrence information"
)
else:
vevent_recurrence = vevent
if vevent_recurrence:
return (
vevent_recurrence, sorted(
vevents_overridden,
key=lambda vevent: vevent.recurrence_id.value
)
)
else:
raise ValueError(
f"component with UID {vevent.uid} "
f"does not have a vevent without a recurrence_id"
)
def _find_overridden(
start: int,
vevents: List[vobject.icalendar.RecurringComponent],
dt: datetime.datetime,
dt_format: str
) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]:
for i in range(start, len(vevents)):
dt_event = datetime.datetime.strptime(
vevents[i].recurrence_id.value,
dt_format
).replace(tzinfo=datetime.timezone.utc)
if dt_event == dt:
return (i + 1, vevents[i])
return (start, None)
def xml_item_response(base_prefix: str, href: str,
found_props: Sequence[ET.Element] = (),
not_found_props: Sequence[ET.Element] = (),
found_item: bool = True) -> ET.Element:
response = ET.Element(xmlutils.make_clark("D:response"))
href_element = ET.Element(xmlutils.make_clark("D:href"))
href_element.text = xmlutils.make_href(base_prefix, href)
response.append(href_element)
if found_item:
for code, props in ((200, found_props), (404, not_found_props)):
if props:
propstat = ET.Element(xmlutils.make_clark("D:propstat"))
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(code)
prop_element = ET.Element(xmlutils.make_clark("D:prop"))
for prop in props:
prop_element.append(prop)
propstat.append(prop_element)
propstat.append(status)
response.append(propstat)
else:
status = ET.Element(xmlutils.make_clark("D:status"))
status.text = xmlutils.make_response(404)
response.append(status)
return response
def retrieve_items(
base_prefix: str, path: str, collection: storage.BaseCollection,
hreferences: Iterable[str], filters: Sequence[ET.Element],
multistatus: ET.Element) -> Iterator[Tuple[radicale_item.Item, bool]]:
"""Retrieves all items that are referenced in ``hreferences`` from
``collection`` and adds 404 responses for missing and invalid items
to ``multistatus``."""
collection_requested = False
def get_names() -> Iterator[str]:
"""Extracts all names from references in ``hreferences`` and adds
404 responses for invalid references to ``multistatus``.
If the whole collections is referenced ``collection_requested``
gets set to ``True``."""
nonlocal collection_requested
for hreference in hreferences:
try:
name = pathutils.name_from_path(hreference, collection)
except ValueError as e:
logger.warning("Skipping invalid path %r in REPORT request on "
"%r: %s", hreference, path, e)
response = xml_item_response(base_prefix, hreference,
found_item=False)
multistatus.append(response)
continue
if name:
# Reference is an item
yield name
else:
# Reference is a collection
collection_requested = True
for name, item in collection.get_multi(get_names()):
if not item:
uri = pathutils.unstrip_path(posixpath.join(collection.path, name))
response = xml_item_response(base_prefix, uri, found_item=False)
multistatus.append(response)
else:
yield item, False
if collection_requested:
yield from collection.get_filtered(filters)
def test_filter(collection_tag: str, item: radicale_item.Item,
filter_: ET.Element) -> bool:
"""Match an item against a filter."""
if (collection_tag == "VCALENDAR" and
filter_.tag != xmlutils.make_clark("C:%s" % filter_)):
if len(filter_) == 0:
return True
if len(filter_) > 1:
raise ValueError("Filter with %d children" % len(filter_))
if filter_[0].tag != xmlutils.make_clark("C:comp-filter"):
raise ValueError("Unexpected %r in filter" % filter_[0].tag)
return radicale_filter.comp_match(item, filter_[0])
if (collection_tag == "VADDRESSBOOK" and
filter_.tag != xmlutils.make_clark("CR:%s" % filter_)):
for child in filter_:
if child.tag != xmlutils.make_clark("CR:prop-filter"):
raise ValueError("Unexpected %r in filter" % child.tag)
test = filter_.get("test", "anyof")
if test == "anyof":
return any(radicale_filter.prop_match(item.vobject_item, f, "CR")
for f in filter_)
if test == "allof":
return all(radicale_filter.prop_match(item.vobject_item, f, "CR")
for f in filter_)
raise ValueError("Unsupported filter test: %r" % test)
raise ValueError("Unsupported filter %r for %r" %
(filter_.tag, collection_tag))
class ApplicationPartReport(ApplicationBase):
def do_REPORT(self, environ: types.WSGIEnviron, base_prefix: str,
path: str, user: str) -> types.WSGIResponse:
"""Manage REPORT request."""
access = Access(self._rights, user, path)
if not access.check("r"):
return httputils.NOT_ALLOWED
try:
xml_content = self._read_xml_request_body(environ)
except RuntimeError as e:
logger.warning("Bad REPORT request on %r: %s", path, e,
exc_info=True)
return httputils.BAD_REQUEST
except socket.timeout:
logger.debug("Client timed out", exc_info=True)
return httputils.REQUEST_TIMEOUT
with contextlib.ExitStack() as lock_stack:
lock_stack.enter_context(self._storage.acquire_lock("r", user))
item = next(iter(self._storage.discover(path)), None)
if not item:
return httputils.NOT_FOUND
if not access.check("r", item):
return httputils.NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
collection = item
else:
assert item.collection is not None
collection = item.collection
max_occurrence = self.configuration.get("reporting", "max_freebusy_occurrence")
if xml_content is not None and \
xml_content.tag == xmlutils.make_clark("C:free-busy-query"):
try:
status, body = free_busy_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close, max_occurrence)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/calendar; charset=%s" % self._encoding}
return status, headers, str(body)
else:
try:
status, xml_answer = xml_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close, max_occurrence)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return status, headers, self._xml_response(xml_answer)