1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00

Add basic free-busy report

This commit is contained in:
Ray 2023-10-06 13:15:45 -06:00
parent 6b34323c1e
commit 7b0d88ff0d
5 changed files with 156 additions and 43 deletions

View file

@ -23,6 +23,7 @@ import datetime
import posixpath import posixpath
import socket import socket
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import vobject
from http import client from http import client
from typing import (Any, Callable, Iterable, Iterator, List, Optional, from typing import (Any, Callable, Iterable, Iterator, List, Optional,
Sequence, Tuple, Union) Sequence, Tuple, Union)
@ -37,12 +38,38 @@ from radicale.app.base import Access, ApplicationBase
from radicale.item import filter as radicale_filter from radicale.item import filter as radicale_filter
from radicale.log import logger from radicale.log import logger
def free_busy_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None]
) -> Tuple[int, str]:
multistatus = ET.Element(xmlutils.make_clark("D:multistatus"))
if xml_request is None:
return client.MULTI_STATUS, multistatus
root = xml_request
if (root.tag == xmlutils.make_clark("C:free-busy-query") and
collection.tag != "VCALENDAR"):
logger.warning("Invalid REPORT method %r on %r requested",
xmlutils.make_human_tag(root.tag), path)
return client.FORBIDDEN, xmlutils.webdav_error("D:supported-report")
time_range_element = root.find(xmlutils.make_clark("C:time-range"))
start,end = radicale_filter.time_range_timestamps(time_range_element)
items = list(collection.get_by_time(start, end))
cal = vobject.iCalendar()
for item in items:
occurrences = radicale_filter.time_range_fill(item.vobject_item, time_range_element, "VEVENT")
for occurrence in occurrences:
vfb = cal.add('vfreebusy')
vfb.add('dtstamp').value = item.vobject_item.vevent.dtstamp.value
vfb.add('dtstart').value, vfb.add('dtend').value = occurrence
return (client.OK, cal.serialize())
def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element], def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
collection: storage.BaseCollection, encoding: str, collection: storage.BaseCollection, encoding: str,
unlock_storage_fn: Callable[[], None] unlock_storage_fn: Callable[[], None]
) -> Tuple[int, ET.Element]: ) -> Tuple[int, ET.Element]:
"""Read and answer REPORT requests. """Read and answer REPORT requests that return XML.
Read rfc3253-3.6 for info. Read rfc3253-3.6 for info.
@ -426,13 +453,27 @@ class ApplicationPartReport(ApplicationBase):
else: else:
assert item.collection is not None assert item.collection is not None
collection = item.collection collection = item.collection
try:
status, xml_answer = xml_report( if xml_content is not None and \
base_prefix, path, xml_content, collection, self._encoding, xml_content.tag == xmlutils.make_clark("C:free-busy-query"):
lock_stack.close) try:
except ValueError as e: status, body = free_busy_report(
logger.warning( base_prefix, path, xml_content, collection, self._encoding,
"Bad REPORT request on %r: %s", path, e, exc_info=True) lock_stack.close)
return httputils.BAD_REQUEST except ValueError as e:
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding} logger.warning(
return status, headers, self._xml_response(xml_answer) "Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/calendar; charset=%s" % self._encoding}
return status, headers, body
else:
try:
status, xml_answer = xml_report(
base_prefix, path, xml_content, collection, self._encoding,
lock_stack.close)
except ValueError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return status, headers, self._xml_response(xml_answer)

View file

@ -52,6 +52,27 @@ def date_to_datetime(d: date) -> datetime:
return d return d
def parse_time_range(time_filter: ET.Element) -> Tuple[datetime, datetime]:
start_text = time_filter.get("start")
end_text = time_filter.get("end")
if start_text:
start = datetime.strptime(
start_text, "%Y%m%dT%H%M%SZ").replace(
tzinfo=timezone.utc)
else:
start = DATETIME_MIN
if end_text:
end = datetime.strptime(
end_text, "%Y%m%dT%H%M%SZ").replace(
tzinfo=timezone.utc)
else:
end = DATETIME_MAX
return start, end
def time_range_timestamps(time_filter: ET.Element) -> Tuple[int, int]:
start, end = parse_time_range(time_filter)
return (math.floor(start.timestamp()), math.ceil(end.timestamp()))
def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool: def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
"""Check whether the ``item`` matches the comp ``filter_``. """Check whether the ``item`` matches the comp ``filter_``.
@ -147,21 +168,10 @@ def time_range_match(vobject_item: vobject.base.Component,
"""Check whether the component/property ``child_name`` of """Check whether the component/property ``child_name`` of
``vobject_item`` matches the time-range ``filter_``.""" ``vobject_item`` matches the time-range ``filter_``."""
start_text = filter_.get("start") if not filter_.get("start") and not filter_.get("end"):
end_text = filter_.get("end")
if not start_text and not end_text:
return False return False
if start_text:
start = datetime.strptime(start_text, "%Y%m%dT%H%M%SZ")
else:
start = datetime.min
if end_text:
end = datetime.strptime(end_text, "%Y%m%dT%H%M%SZ")
else:
end = datetime.max
start = start.replace(tzinfo=timezone.utc)
end = end.replace(tzinfo=timezone.utc)
start, end = parse_time_range(filter_)
matched = False matched = False
def range_fn(range_start: datetime, range_end: datetime, def range_fn(range_start: datetime, range_end: datetime,
@ -181,6 +191,34 @@ def time_range_match(vobject_item: vobject.base.Component,
return matched return matched
def time_range_fill(vobject_item: vobject.base.Component,
filter_: ET.Element, child_name: str, n: int = 1
) -> List[Tuple[datetime, datetime]]:
"""Create a list of ``n`` occurances from the component/property ``child_name``
of ``vobject_item``."""
if not filter_.get("start") and not filter_.get("end"):
return []
start, end = parse_time_range(filter_)
ranges: List[Tuple[datetime, datetime]] = []
def range_fn(range_start: datetime, range_end: datetime,
is_recurrence: bool) -> bool:
nonlocal ranges
if start < range_end and range_start < end:
ranges.append((range_start, range_end))
if n > 0 and len(ranges) >= n:
return True
if end < range_start and not is_recurrence:
return True
return False
def infinity_fn(range_start: datetime) -> bool:
return False
visit_time_ranges(vobject_item, child_name, range_fn, infinity_fn)
return ranges
def visit_time_ranges(vobject_item: vobject.base.Component, child_name: str, def visit_time_ranges(vobject_item: vobject.base.Component, child_name: str,
range_fn: Callable[[datetime, datetime, bool], bool], range_fn: Callable[[datetime, datetime, bool], bool],
infinity_fn: Callable[[datetime], bool]) -> None: infinity_fn: Callable[[datetime], bool]) -> None:
@ -543,20 +581,7 @@ def simplify_prefilters(filters: Iterable[ET.Element], collection_tag: str
if time_filter.tag != xmlutils.make_clark("C:time-range"): if time_filter.tag != xmlutils.make_clark("C:time-range"):
simple = False simple = False
continue continue
start_text = time_filter.get("start") start, end = time_range_timestamps(time_filter)
end_text = time_filter.get("end")
if start_text:
start = math.floor(datetime.strptime(
start_text, "%Y%m%dT%H%M%SZ").replace(
tzinfo=timezone.utc).timestamp())
else:
start = TIMESTAMP_MIN
if end_text:
end = math.ceil(datetime.strptime(
end_text, "%Y%m%dT%H%M%SZ").replace(
tzinfo=timezone.utc).timestamp())
else:
end = TIMESTAMP_MAX
return tag, start, end, simple return tag, start, end, simple
return tag, TIMESTAMP_MIN, TIMESTAMP_MAX, simple return tag, TIMESTAMP_MIN, TIMESTAMP_MAX, simple
return None, TIMESTAMP_MIN, TIMESTAMP_MAX, simple return None, TIMESTAMP_MIN, TIMESTAMP_MAX, simple

View file

@ -158,6 +158,24 @@ class BaseCollection:
continue continue
yield item, simple and (start <= istart or iend <= end) yield item, simple and (start <= istart or iend <= end)
def get_by_time(self, start: int , end: int
) -> Iterable["radicale_item.Item"]:
"""Fetch all items within a start and end time range.
Returns a iterable of ``item``s.
"""
if not self.tag:
return
for item in self.get_all():
# TODO: Any other component_name here?
if item.component_name not in ("VEVENT",):
continue
istart, iend = item.time_range
if istart >= end or iend <= start:
continue
yield item
def has_uid(self, uid: str) -> bool: def has_uid(self, uid: str) -> bool:
"""Check if a UID exists in the collection.""" """Check if a UID exists in the collection."""
for item in self.get_all(): for item in self.get_all():

View file

@ -27,6 +27,7 @@ import sys
import tempfile import tempfile
import wsgiref.util import wsgiref.util
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import vobject
from io import BytesIO from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
@ -35,7 +36,7 @@ import defusedxml.ElementTree as DefusedET
import radicale import radicale
from radicale import app, config, types, xmlutils from radicale import app, config, types, xmlutils
RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]] RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]], vobject.base.Component]]
# Enable debug output # Enable debug output
radicale.log.logger.setLevel(logging.DEBUG) radicale.log.logger.setLevel(logging.DEBUG)
@ -107,8 +108,7 @@ class BaseTest:
def parse_responses(text: str) -> RESPONSES: def parse_responses(text: str) -> RESPONSES:
xml = DefusedET.fromstring(text) xml = DefusedET.fromstring(text)
assert xml.tag == xmlutils.make_clark("D:multistatus") assert xml.tag == xmlutils.make_clark("D:multistatus")
path_responses: Dict[str, Union[ path_responses: RESPONSES = {}
int, Dict[str, Tuple[int, ET.Element]]]] = {}
for response in xml.findall(xmlutils.make_clark("D:response")): for response in xml.findall(xmlutils.make_clark("D:response")):
href = response.find(xmlutils.make_clark("D:href")) href = response.find(xmlutils.make_clark("D:href"))
assert href.text not in path_responses assert href.text not in path_responses
@ -133,6 +133,12 @@ class BaseTest:
path_responses[href.text] = prop_respones path_responses[href.text] = prop_respones
return path_responses return path_responses
@staticmethod
def parse_free_busy(text: str) -> RESPONSES:
path_responses: RESPONSES = {}
path_responses[""] = vobject.readOne(text)
return path_responses
def get(self, path: str, check: Optional[int] = 200, **kwargs def get(self, path: str, check: Optional[int] = 200, **kwargs
) -> Tuple[int, str]: ) -> Tuple[int, str]:
assert "data" not in kwargs assert "data" not in kwargs
@ -177,13 +183,18 @@ class BaseTest:
return status, responses return status, responses
def report(self, path: str, data: str, check: Optional[int] = 207, def report(self, path: str, data: str, check: Optional[int] = 207,
is_xml: Optional[bool] = True,
**kwargs) -> Tuple[int, RESPONSES]: **kwargs) -> Tuple[int, RESPONSES]:
status, _, answer = self.request("REPORT", path, data, check=check, status, _, answer = self.request("REPORT", path, data, check=check,
**kwargs) **kwargs)
if status < 200 or 300 <= status: if status < 200 or 300 <= status:
return status, {} return status, {}
assert answer is not None assert answer is not None
return status, self.parse_responses(answer) if is_xml:
parsed = self.parse_responses(answer)
else:
parsed = self.parse_free_busy(answer)
return status, parsed
def delete(self, path: str, check: Optional[int] = 200, **kwargs def delete(self, path: str, check: Optional[int] = 200, **kwargs
) -> Tuple[int, RESPONSES]: ) -> Tuple[int, RESPONSES]:

View file

@ -22,6 +22,7 @@ Radicale tests with simple requests.
import os import os
import posixpath import posixpath
import vobject
from typing import Any, Callable, ClassVar, Iterable, List, Optional, Tuple from typing import Any, Callable, ClassVar, Iterable, List, Optional, Tuple
import defusedxml.ElementTree as DefusedET import defusedxml.ElementTree as DefusedET
@ -1360,10 +1361,27 @@ permissions: RrWw""")
</C:calendar-query>""") </C:calendar-query>""")
assert len(responses) == 1 assert len(responses) == 1
response = responses[event_path] response = responses[event_path]
assert not isinstance(response, int) assert isinstance(response, dict)
status, prop = response["D:getetag"] status, prop = response["D:getetag"]
assert status == 200 and prop.text assert status == 200 and prop.text
def test_report_free_busy(self) -> None:
"""Test free busy report on a few items"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
for i in (1,2):
filename = "event{}.ics".format(i)
event = get_file_content(filename)
self.put(posixpath.join(calendar_path, filename), event)
code, responses = self.report(calendar_path, """\
<?xml version="1.0" encoding="utf-8" ?>
<C:free-busy-query xmlns:C="urn:ietf:params:xml:ns:caldav">
<C:time-range start="20130901T140000Z" end="20130908T220000Z"/>
</C:free-busy-query>""", 200, is_xml = False)
assert len(responses) == 1
for response in responses.values():
assert isinstance(response, vobject.base.Component)
def _report_sync_token( def _report_sync_token(
self, calendar_path: str, sync_token: Optional[str] = None self, calendar_path: str, sync_token: Optional[str] = None
) -> Tuple[str, RESPONSES]: ) -> Tuple[str, RESPONSES]: