1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-10 18:40:53 +00:00

Merge pull request #1619 from pieterhijma/expand-overridden

Expand overridden
This commit is contained in:
Peter Bieringer 2024-11-08 07:36:54 +00:00 committed by GitHub
commit bf77844d34
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 351 additions and 210 deletions

View file

@ -313,25 +313,30 @@ def _expand(
recurrences = rruleset.between(start, end, inc=True) recurrences = rruleset.between(start, end, inc=True)
expanded: vobject.base.Component = copy.copy(expanded_item.vobject_item) expanded: vobject.base.Component = copy.copy(expanded_item.vobject_item)
vevent_recurrence, vevents_overridden = _split_overridden_vevents(expanded, dt_format)
is_expanded_filled: bool = False is_expanded_filled: bool = False
i_overridden = 0
for recurrence_dt in recurrences: for recurrence_dt in recurrences:
recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc) recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc)
i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format)
vevent = copy.deepcopy(expanded.vevent) if not vevent:
vevent.recurrence_id = ContentLine( vevent = copy.deepcopy(vevent_recurrence)
name='RECURRENCE-ID', vevent.recurrence_id = ContentLine(
value=recurrence_utc.strftime(dt_format), params={} name='RECURRENCE-ID',
) value=recurrence_utc.strftime(dt_format), params={}
vevent.dtstart = ContentLine(
name='DTSTART',
value=recurrence_utc.strftime(dt_format), params={}
)
if duration:
vevent.dtend = ContentLine(
name='DTEND',
value=(recurrence_utc + duration).strftime(dt_format), params={}
) )
vevent.dtstart = ContentLine(
name='DTSTART',
value=recurrence_utc.strftime(dt_format), params={}
)
if duration:
vevent.dtend = ContentLine(
name='DTEND',
value=(recurrence_utc + duration).strftime(dt_format), params={}
)
if is_expanded_filled is False: if is_expanded_filled is False:
expanded.vevent = vevent expanded.vevent = vevent
@ -346,6 +351,29 @@ def _expand(
return element return element
def _convert_timezone(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
name_content_line: str):
prop = getattr(vevent, name_prop, None)
if prop:
if type(prop.value) is datetime.date:
date_time = datetime.datetime.fromordinal(
prop.value.toordinal()
).replace(tzinfo=datetime.timezone.utc)
else:
date_time = prop.value.astimezone(datetime.timezone.utc)
setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[]))
def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
dt_format: str):
prop = getattr(vevent, name_prop, None)
if prop:
setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[]))
def _make_vobject_expanded_item( def _make_vobject_expanded_item(
item: radicale_item.Item, item: radicale_item.Item,
dt_format: str, dt_format: str,
@ -381,33 +409,89 @@ def _make_vobject_expanded_item(
vevent.dtend = ContentLine(name='DTEND', value=end_utc, params={}) vevent.dtend = ContentLine(name='DTEND', value=end_utc, params={})
rruleset = None rruleset = None
if hasattr(item.vobject_item.vevent, 'rrule'): for i, vevent in enumerate(item.vobject_item.vevent_list):
rruleset = vevent.getrruleset() _convert_timezone(vevent, 'dtstart', 'DTSTART')
_convert_timezone(vevent, 'dtend', 'DTEND')
_convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID')
# There is something strange behaviour during serialization native datetime, so converting manually if hasattr(vevent, 'rrule'):
vevent.dtstart.value = vevent.dtstart.value.strftime(dt_format) rruleset = vevent.getrruleset()
if dt_end is not None:
vevent.dtend.value = vevent.dtend.value.strftime(dt_format)
timezones_to_remove = [] # There is something strange behaviour during serialization native datetime, so converting manually
for component in item.vobject_item.components(): _convert_to_utc(vevent, 'dtstart', dt_format)
if component.name == 'VTIMEZONE': _convert_to_utc(vevent, 'dtend', dt_format)
timezones_to_remove.append(component) _convert_to_utc(vevent, 'recurrence_id', dt_format)
for timezone in timezones_to_remove: timezones_to_remove = []
item.vobject_item.remove(timezone) for component in item.vobject_item.components():
if component.name == 'VTIMEZONE':
timezones_to_remove.append(component)
try: for timezone in timezones_to_remove:
delattr(item.vobject_item.vevent, 'rrule') item.vobject_item.remove(timezone)
delattr(item.vobject_item.vevent, 'exdate')
delattr(item.vobject_item.vevent, 'exrule') try:
delattr(item.vobject_item.vevent, 'rdate') delattr(item.vobject_item.vevent_list[i], 'rrule')
except AttributeError: delattr(item.vobject_item.vevent_list[i], 'exdate')
pass delattr(item.vobject_item.vevent_list[i], 'exrule')
delattr(item.vobject_item.vevent_list[i], 'rdate')
except AttributeError:
pass
return item, rruleset return item, rruleset
def _split_overridden_vevents(
component: vobject.base.Component,
dt_format: str
) -> Tuple[
vobject.icalendar.RecurringComponent,
List[vobject.icalendar.RecurringComponent]
]:
vevent_recurrence = None
vevents_overridden = []
for vevent in component.vevent_list:
if hasattr(vevent, 'recurrence_id'):
vevents_overridden += [vevent]
elif vevent_recurrence:
raise ValueError(
f"component with UID {vevent.uid} "
f"has more than one vevent without a recurrence_id"
)
else:
vevent_recurrence = vevent
if vevent_recurrence:
return (
vevent_recurrence, sorted(
vevents_overridden,
key=lambda vevent: datetime.datetime.strptime(vevent.recurrence_id.value, dt_format)
)
)
else:
raise ValueError(
f"component with UID {vevent.uid} "
f"does not have a vevent without a recurrence_id"
)
def _find_overridden(
start: int,
vevents: List[vobject.icalendar.RecurringComponent],
dt: datetime.datetime,
dt_format: str
) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]:
for i in range(start, len(vevents)):
dt_event = datetime.datetime.strptime(
vevents[i].recurrence_id.value,
dt_format
).replace(tzinfo=datetime.timezone.utc)
if dt_event == dt:
return (i + 1, vevents[i])
return (start, None)
def xml_item_response(base_prefix: str, href: str, def xml_item_response(base_prefix: str, href: str,
found_props: Sequence[ET.Element] = (), found_props: Sequence[ET.Element] = (),
not_found_props: Sequence[ET.Element] = (), not_found_props: Sequence[ET.Element] = (),

View file

@ -0,0 +1,35 @@
BEGIN:VCALENDAR
VERSION:2.0
BEGIN:VTIMEZONE
LAST-MODIFIED:20040110T032845Z
TZID:US/Eastern
BEGIN:DAYLIGHT
DTSTART:20000404T020000
RRULE:FREQ=YEARLY;BYDAY=1SU;BYMONTH=4
TZNAME:EDT
TZOFFSETFROM:-0500
TZOFFSETTO:-0400
END:DAYLIGHT
BEGIN:STANDARD
DTSTART:20001026T020000
RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=10
TZNAME:EST
TZOFFSETFROM:-0400
TZOFFSETTO:-0500
END:STANDARD
END:VTIMEZONE
BEGIN:VEVENT
DTSTART;TZID=US/Eastern:20060102T120000
DURATION:PT1H
RRULE:FREQ=DAILY;COUNT=5
SUMMARY:Event #2
UID:event_daily_rrule_overridden
END:VEVENT
BEGIN:VEVENT
DTSTART;TZID=US/Eastern:20060104T140000
DURATION:PT1H
RECURRENCE-ID;TZID=US/Eastern:20060104T120000
SUMMARY:Event #2 bis
UID:event_daily_rrule_overridden
END:VEVENT
END:VCALENDAR

View file

@ -1632,184 +1632,6 @@ permissions: RrWw""")
calendar_path, "http://radicale.org/ns/sync/INVALID") calendar_path, "http://radicale.org/ns/sync/INVALID")
assert not sync_token assert not sync_token
def test_report_with_expand_property(self) -> None:
"""Test report with expand property"""
self.put("/calendar.ics/", get_file_content("event_daily_rrule.ics"))
req_body_without_expand = \
"""<?xml version="1.0" encoding="utf-8" ?>
<C:calendar-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<C:calendar-data>
</C:calendar-data>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="20060103T000000Z" end="20060105T000000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:filter>
</C:calendar-query>
"""
_, responses = self.report("/calendar.ics/", req_body_without_expand)
assert len(responses) == 1
response_without_expand = responses['/calendar.ics/event_daily_rrule.ics']
assert not isinstance(response_without_expand, int)
status, element = response_without_expand["C:calendar-data"]
assert status == 200 and element.text
assert "RRULE" in element.text
assert "BEGIN:VTIMEZONE" in element.text
assert "RECURRENCE-ID" not in element.text
uids: List[str] = []
for line in element.text.split("\n"):
if line.startswith("UID:"):
uid = line[len("UID:"):]
assert uid == "event_daily_rrule"
uids.append(uid)
assert len(uids) == 1
req_body_with_expand = \
"""<?xml version="1.0" encoding="utf-8" ?>
<C:calendar-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<C:calendar-data>
<C:expand start="20060103T000000Z" end="20060105T000000Z"/>
</C:calendar-data>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="20060103T000000Z" end="20060105T000000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:filter>
</C:calendar-query>
"""
_, responses = self.report("/calendar.ics/", req_body_with_expand)
assert len(responses) == 1
response_with_expand = responses['/calendar.ics/event_daily_rrule.ics']
assert not isinstance(response_with_expand, int)
status, element = response_with_expand["C:calendar-data"]
assert status == 200 and element.text
assert "RRULE" not in element.text
assert "BEGIN:VTIMEZONE" not in element.text
uids = []
recurrence_ids = []
for line in element.text.split("\n"):
if line.startswith("UID:"):
assert line == "UID:event_daily_rrule"
uids.append(line)
if line.startswith("RECURRENCE-ID:"):
assert line in ["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"]
recurrence_ids.append(line)
if line.startswith("DTSTART:"):
assert line in ["DTSTART:20060103T170000Z", "DTSTART:20060104T170000Z"]
assert len(uids) == 2
assert len(set(recurrence_ids)) == 2
def test_report_with_expand_property_all_day_event(self) -> None:
"""Test report with expand property"""
self.put("/calendar.ics/", get_file_content("event_full_day_rrule.ics"))
req_body_without_expand = \
"""<?xml version="1.0" encoding="utf-8" ?>
<C:calendar-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<C:calendar-data>
</C:calendar-data>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="20060103T000000Z" end="20060105T000000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:filter>
</C:calendar-query>
"""
_, responses = self.report("/calendar.ics/", req_body_without_expand)
assert len(responses) == 1
response_without_expand = responses['/calendar.ics/event_full_day_rrule.ics']
assert not isinstance(response_without_expand, int)
status, element = response_without_expand["C:calendar-data"]
assert status == 200 and element.text
assert "RRULE" in element.text
assert "RECURRENCE-ID" not in element.text
uids: List[str] = []
for line in element.text.split("\n"):
if line.startswith("UID:"):
uid = line[len("UID:"):]
assert uid == "event_full_day_rrule"
uids.append(uid)
assert len(uids) == 1
req_body_with_expand = \
"""<?xml version="1.0" encoding="utf-8" ?>
<C:calendar-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<C:calendar-data>
<C:expand start="20060103T000000Z" end="20060105T000000Z"/>
</C:calendar-data>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="20060103T000000Z" end="20060105T000000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:filter>
</C:calendar-query>
"""
_, responses = self.report("/calendar.ics/", req_body_with_expand)
assert len(responses) == 1
response_with_expand = responses['/calendar.ics/event_full_day_rrule.ics']
assert not isinstance(response_with_expand, int)
status, element = response_with_expand["C:calendar-data"]
assert status == 200 and element.text
assert "RRULE" not in element.text
assert "BEGIN:VTIMEZONE" not in element.text
uids = []
recurrence_ids = []
for line in element.text.split("\n"):
if line.startswith("UID:"):
assert line == "UID:event_full_day_rrule"
uids.append(line)
if line.startswith("RECURRENCE-ID:"):
assert line in ["RECURRENCE-ID:20060103", "RECURRENCE-ID:20060104", "RECURRENCE-ID:20060105"]
recurrence_ids.append(line)
if line.startswith("DTSTART:"):
assert line in ["DTSTART:20060103", "DTSTART:20060104", "DTSTART:20060105"]
if line.startswith("DTEND:"):
assert line in ["DTEND:20060104", "DTEND:20060105", "DTEND:20060106"]
assert len(uids) == 3
assert len(set(recurrence_ids)) == 3
def test_propfind_sync_token(self) -> None: def test_propfind_sync_token(self) -> None:
"""Retrieve the sync-token with a propfind request""" """Retrieve the sync-token with a propfind request"""
calendar_path = "/calendar.ics/" calendar_path = "/calendar.ics/"

View file

@ -0,0 +1,200 @@
# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
"""
Radicale tests with expand requests.
"""
import os
from typing import ClassVar, List
from radicale.tests import BaseTest
from radicale.tests.helpers import get_file_content
ONLY_DATES = True
CONTAINS_TIMES = False
class TestExpandRequests(BaseTest):
"""Tests with expand requests."""
# Allow skipping sync-token tests, when not fully supported by the backend
full_sync_token_support: ClassVar[bool] = True
def setup_method(self) -> None:
BaseTest.setup_method(self)
rights_file_path = os.path.join(self.colpath, "rights")
with open(rights_file_path, "w") as f:
f.write("""\
[permit delete collection]
user: .*
collection: test-permit-delete
permissions: RrWwD
[forbid delete collection]
user: .*
collection: test-forbid-delete
permissions: RrWwd
[permit overwrite collection]
user: .*
collection: test-permit-overwrite
permissions: RrWwO
[forbid overwrite collection]
user: .*
collection: test-forbid-overwrite
permissions: RrWwo
[allow all]
user: .*
collection: .*
permissions: RrWw""")
self.configure({"rights": {"file": rights_file_path,
"type": "from_file"}})
def _test_expand(self,
expected_uid: str,
expected_recurrence_ids: List[str],
expected_start_times: List[str],
expected_end_times: List[str],
only_dates: bool,
nr_uids: int) -> None:
self.put("/calendar.ics/", get_file_content(f"{expected_uid}.ics"))
req_body_without_expand = \
"""<?xml version="1.0" encoding="utf-8" ?>
<C:calendar-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<C:calendar-data>
</C:calendar-data>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="20060103T000000Z" end="20060105T000000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:filter>
</C:calendar-query>
"""
_, responses = self.report("/calendar.ics/", req_body_without_expand)
assert len(responses) == 1
response_without_expand = responses[f'/calendar.ics/{expected_uid}.ics']
assert not isinstance(response_without_expand, int)
status, element = response_without_expand["C:calendar-data"]
assert status == 200 and element.text
assert "RRULE" in element.text
if not only_dates:
assert "BEGIN:VTIMEZONE" in element.text
if nr_uids == 1:
assert "RECURRENCE-ID" not in element.text
uids: List[str] = []
for line in element.text.split("\n"):
if line.startswith("UID:"):
uid = line[len("UID:"):]
assert uid == expected_uid
uids.append(uid)
assert len(uids) == nr_uids
req_body_with_expand = \
"""<?xml version="1.0" encoding="utf-8" ?>
<C:calendar-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<C:calendar-data>
<C:expand start="20060103T000000Z" end="20060105T000000Z"/>
</C:calendar-data>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="20060103T000000Z" end="20060105T000000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:filter>
</C:calendar-query>
"""
_, responses = self.report("/calendar.ics/", req_body_with_expand)
assert len(responses) == 1
response_with_expand = responses[f'/calendar.ics/{expected_uid}.ics']
assert not isinstance(response_with_expand, int)
status, element = response_with_expand["C:calendar-data"]
assert status == 200 and element.text
assert "RRULE" not in element.text
assert "BEGIN:VTIMEZONE" not in element.text
uids = []
recurrence_ids = []
for line in element.text.split("\n"):
if line.startswith("UID:"):
assert line == f"UID:{expected_uid}"
uids.append(line)
if line.startswith("RECURRENCE-ID:"):
assert line in expected_recurrence_ids
recurrence_ids.append(line)
if line.startswith("DTSTART:"):
assert line in expected_start_times
if line.startswith("DTEND:"):
assert line in expected_end_times
assert len(uids) == len(expected_recurrence_ids)
assert len(set(recurrence_ids)) == len(expected_recurrence_ids)
def test_report_with_expand_property(self) -> None:
"""Test report with expand property"""
self._test_expand(
"event_daily_rrule",
["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"],
["DTSTART:20060103T170000Z", "DTSTART:20060104T170000Z"],
[],
CONTAINS_TIMES,
1
)
def test_report_with_expand_property_all_day_event(self) -> None:
"""Test report with expand property for all day events"""
self._test_expand(
"event_full_day_rrule",
["RECURRENCE-ID:20060103", "RECURRENCE-ID:20060104", "RECURRENCE-ID:20060105"],
["DTSTART:20060103", "DTSTART:20060104", "DTSTART:20060105"],
["DTEND:20060104", "DTEND:20060105", "DTEND:20060106"],
ONLY_DATES,
1
)
def test_report_with_expand_property_overridden(self) -> None:
"""Test report with expand property with overridden events"""
self._test_expand(
"event_daily_rrule_overridden",
["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"],
["DTSTART:20060103T170000Z", "DTSTART:20060104T190000Z"],
[],
CONTAINS_TIMES,
2
)