diff --git a/radicale/app/report.py b/radicale/app/report.py index 11c5bea4..43d89916 100644 --- a/radicale/app/report.py +++ b/radicale/app/report.py @@ -313,25 +313,30 @@ def _expand( recurrences = rruleset.between(start, end, inc=True) expanded: vobject.base.Component = copy.copy(expanded_item.vobject_item) + vevent_recurrence, vevents_overridden = _split_overridden_vevents(expanded, dt_format) + is_expanded_filled: bool = False + i_overridden = 0 for recurrence_dt in recurrences: recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc) + i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format) - vevent = copy.deepcopy(expanded.vevent) - vevent.recurrence_id = ContentLine( - name='RECURRENCE-ID', - value=recurrence_utc.strftime(dt_format), params={} - ) - vevent.dtstart = ContentLine( - name='DTSTART', - value=recurrence_utc.strftime(dt_format), params={} - ) - if duration: - vevent.dtend = ContentLine( - name='DTEND', - value=(recurrence_utc + duration).strftime(dt_format), params={} + if not vevent: + vevent = copy.deepcopy(vevent_recurrence) + vevent.recurrence_id = ContentLine( + name='RECURRENCE-ID', + value=recurrence_utc.strftime(dt_format), params={} ) + vevent.dtstart = ContentLine( + name='DTSTART', + value=recurrence_utc.strftime(dt_format), params={} + ) + if duration: + vevent.dtend = ContentLine( + name='DTEND', + value=(recurrence_utc + duration).strftime(dt_format), params={} + ) if is_expanded_filled is False: expanded.vevent = vevent @@ -346,6 +351,29 @@ def _expand( return element +def _convert_timezone(vevent: vobject.icalendar.RecurringComponent, + name_prop: str, + name_content_line: str): + prop = getattr(vevent, name_prop, None) + if prop: + if type(prop.value) is datetime.date: + date_time = datetime.datetime.fromordinal( + prop.value.toordinal() + ).replace(tzinfo=datetime.timezone.utc) + else: + date_time = prop.value.astimezone(datetime.timezone.utc) + + setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[])) + + +def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent, + name_prop: str, + dt_format: str): + prop = getattr(vevent, name_prop, None) + if prop: + setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[])) + + def _make_vobject_expanded_item( item: radicale_item.Item, dt_format: str, @@ -381,33 +409,89 @@ def _make_vobject_expanded_item( vevent.dtend = ContentLine(name='DTEND', value=end_utc, params={}) rruleset = None - if hasattr(item.vobject_item.vevent, 'rrule'): - rruleset = vevent.getrruleset() + for i, vevent in enumerate(item.vobject_item.vevent_list): + _convert_timezone(vevent, 'dtstart', 'DTSTART') + _convert_timezone(vevent, 'dtend', 'DTEND') + _convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID') - # There is something strange behaviour during serialization native datetime, so converting manually - vevent.dtstart.value = vevent.dtstart.value.strftime(dt_format) - if dt_end is not None: - vevent.dtend.value = vevent.dtend.value.strftime(dt_format) + if hasattr(vevent, 'rrule'): + rruleset = vevent.getrruleset() - timezones_to_remove = [] - for component in item.vobject_item.components(): - if component.name == 'VTIMEZONE': - timezones_to_remove.append(component) + # There is something strange behaviour during serialization native datetime, so converting manually + _convert_to_utc(vevent, 'dtstart', dt_format) + _convert_to_utc(vevent, 'dtend', dt_format) + _convert_to_utc(vevent, 'recurrence_id', dt_format) - for timezone in timezones_to_remove: - item.vobject_item.remove(timezone) + timezones_to_remove = [] + for component in item.vobject_item.components(): + if component.name == 'VTIMEZONE': + timezones_to_remove.append(component) - try: - delattr(item.vobject_item.vevent, 'rrule') - delattr(item.vobject_item.vevent, 'exdate') - delattr(item.vobject_item.vevent, 'exrule') - delattr(item.vobject_item.vevent, 'rdate') - except AttributeError: - pass + for timezone in timezones_to_remove: + item.vobject_item.remove(timezone) + + try: + delattr(item.vobject_item.vevent_list[i], 'rrule') + delattr(item.vobject_item.vevent_list[i], 'exdate') + delattr(item.vobject_item.vevent_list[i], 'exrule') + delattr(item.vobject_item.vevent_list[i], 'rdate') + except AttributeError: + pass return item, rruleset +def _split_overridden_vevents( + component: vobject.base.Component, + dt_format: str +) -> Tuple[ + vobject.icalendar.RecurringComponent, + List[vobject.icalendar.RecurringComponent] +]: + vevent_recurrence = None + vevents_overridden = [] + + for vevent in component.vevent_list: + if hasattr(vevent, 'recurrence_id'): + vevents_overridden += [vevent] + elif vevent_recurrence: + raise ValueError( + f"component with UID {vevent.uid} " + f"has more than one vevent without a recurrence_id" + ) + else: + vevent_recurrence = vevent + + if vevent_recurrence: + return ( + vevent_recurrence, sorted( + vevents_overridden, + key=lambda vevent: datetime.datetime.strptime(vevent.recurrence_id.value, dt_format) + ) + ) + else: + raise ValueError( + f"component with UID {vevent.uid} " + f"does not have a vevent without a recurrence_id" + ) + + +def _find_overridden( + start: int, + vevents: List[vobject.icalendar.RecurringComponent], + dt: datetime.datetime, + dt_format: str +) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]: + for i in range(start, len(vevents)): + dt_event = datetime.datetime.strptime( + vevents[i].recurrence_id.value, + dt_format + ).replace(tzinfo=datetime.timezone.utc) + if dt_event == dt: + return (i + 1, vevents[i]) + return (start, None) + + def xml_item_response(base_prefix: str, href: str, found_props: Sequence[ET.Element] = (), not_found_props: Sequence[ET.Element] = (), diff --git a/radicale/tests/static/event_daily_rrule_overridden.ics b/radicale/tests/static/event_daily_rrule_overridden.ics new file mode 100644 index 00000000..077d6cd4 --- /dev/null +++ b/radicale/tests/static/event_daily_rrule_overridden.ics @@ -0,0 +1,35 @@ +BEGIN:VCALENDAR +VERSION:2.0 +BEGIN:VTIMEZONE +LAST-MODIFIED:20040110T032845Z +TZID:US/Eastern +BEGIN:DAYLIGHT +DTSTART:20000404T020000 +RRULE:FREQ=YEARLY;BYDAY=1SU;BYMONTH=4 +TZNAME:EDT +TZOFFSETFROM:-0500 +TZOFFSETTO:-0400 +END:DAYLIGHT +BEGIN:STANDARD +DTSTART:20001026T020000 +RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=10 +TZNAME:EST +TZOFFSETFROM:-0400 +TZOFFSETTO:-0500 +END:STANDARD +END:VTIMEZONE +BEGIN:VEVENT +DTSTART;TZID=US/Eastern:20060102T120000 +DURATION:PT1H +RRULE:FREQ=DAILY;COUNT=5 +SUMMARY:Event #2 +UID:event_daily_rrule_overridden +END:VEVENT +BEGIN:VEVENT +DTSTART;TZID=US/Eastern:20060104T140000 +DURATION:PT1H +RECURRENCE-ID;TZID=US/Eastern:20060104T120000 +SUMMARY:Event #2 bis +UID:event_daily_rrule_overridden +END:VEVENT +END:VCALENDAR diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py index fd6e8c30..6440542f 100644 --- a/radicale/tests/test_base.py +++ b/radicale/tests/test_base.py @@ -1632,184 +1632,6 @@ permissions: RrWw""") calendar_path, "http://radicale.org/ns/sync/INVALID") assert not sync_token - def test_report_with_expand_property(self) -> None: - """Test report with expand property""" - self.put("/calendar.ics/", get_file_content("event_daily_rrule.ics")) - req_body_without_expand = \ - """ - - - - - - - - - - - - - - """ - _, responses = self.report("/calendar.ics/", req_body_without_expand) - assert len(responses) == 1 - - response_without_expand = responses['/calendar.ics/event_daily_rrule.ics'] - assert not isinstance(response_without_expand, int) - status, element = response_without_expand["C:calendar-data"] - - assert status == 200 and element.text - - assert "RRULE" in element.text - assert "BEGIN:VTIMEZONE" in element.text - assert "RECURRENCE-ID" not in element.text - - uids: List[str] = [] - for line in element.text.split("\n"): - if line.startswith("UID:"): - uid = line[len("UID:"):] - assert uid == "event_daily_rrule" - uids.append(uid) - - assert len(uids) == 1 - - req_body_with_expand = \ - """ - - - - - - - - - - - - - - - """ - - _, responses = self.report("/calendar.ics/", req_body_with_expand) - - assert len(responses) == 1 - - response_with_expand = responses['/calendar.ics/event_daily_rrule.ics'] - assert not isinstance(response_with_expand, int) - status, element = response_with_expand["C:calendar-data"] - - assert status == 200 and element.text - assert "RRULE" not in element.text - assert "BEGIN:VTIMEZONE" not in element.text - - uids = [] - recurrence_ids = [] - for line in element.text.split("\n"): - if line.startswith("UID:"): - assert line == "UID:event_daily_rrule" - uids.append(line) - - if line.startswith("RECURRENCE-ID:"): - assert line in ["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"] - recurrence_ids.append(line) - - if line.startswith("DTSTART:"): - assert line in ["DTSTART:20060103T170000Z", "DTSTART:20060104T170000Z"] - - assert len(uids) == 2 - assert len(set(recurrence_ids)) == 2 - - def test_report_with_expand_property_all_day_event(self) -> None: - """Test report with expand property""" - self.put("/calendar.ics/", get_file_content("event_full_day_rrule.ics")) - req_body_without_expand = \ - """ - - - - - - - - - - - - - - """ - _, responses = self.report("/calendar.ics/", req_body_without_expand) - assert len(responses) == 1 - - response_without_expand = responses['/calendar.ics/event_full_day_rrule.ics'] - assert not isinstance(response_without_expand, int) - status, element = response_without_expand["C:calendar-data"] - - assert status == 200 and element.text - - assert "RRULE" in element.text - assert "RECURRENCE-ID" not in element.text - - uids: List[str] = [] - for line in element.text.split("\n"): - if line.startswith("UID:"): - uid = line[len("UID:"):] - assert uid == "event_full_day_rrule" - uids.append(uid) - - assert len(uids) == 1 - - req_body_with_expand = \ - """ - - - - - - - - - - - - - - - """ - - _, responses = self.report("/calendar.ics/", req_body_with_expand) - - assert len(responses) == 1 - - response_with_expand = responses['/calendar.ics/event_full_day_rrule.ics'] - assert not isinstance(response_with_expand, int) - status, element = response_with_expand["C:calendar-data"] - - assert status == 200 and element.text - assert "RRULE" not in element.text - assert "BEGIN:VTIMEZONE" not in element.text - - uids = [] - recurrence_ids = [] - for line in element.text.split("\n"): - if line.startswith("UID:"): - assert line == "UID:event_full_day_rrule" - uids.append(line) - - if line.startswith("RECURRENCE-ID:"): - assert line in ["RECURRENCE-ID:20060103", "RECURRENCE-ID:20060104", "RECURRENCE-ID:20060105"] - recurrence_ids.append(line) - - if line.startswith("DTSTART:"): - assert line in ["DTSTART:20060103", "DTSTART:20060104", "DTSTART:20060105"] - - if line.startswith("DTEND:"): - assert line in ["DTEND:20060104", "DTEND:20060105", "DTEND:20060106"] - - assert len(uids) == 3 - assert len(set(recurrence_ids)) == 3 - def test_propfind_sync_token(self) -> None: """Retrieve the sync-token with a propfind request""" calendar_path = "/calendar.ics/" diff --git a/radicale/tests/test_expand.py b/radicale/tests/test_expand.py new file mode 100644 index 00000000..708c99fd --- /dev/null +++ b/radicale/tests/test_expand.py @@ -0,0 +1,200 @@ +# This file is part of Radicale - CalDAV and CardDAV server +# Copyright © 2012-2017 Guillaume Ayoub +# Copyright © 2017-2019 Unrud +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Radicale. If not, see . + +""" +Radicale tests with expand requests. + +""" + +import os +from typing import ClassVar, List + +from radicale.tests import BaseTest +from radicale.tests.helpers import get_file_content + +ONLY_DATES = True +CONTAINS_TIMES = False + + +class TestExpandRequests(BaseTest): + """Tests with expand requests.""" + + # Allow skipping sync-token tests, when not fully supported by the backend + full_sync_token_support: ClassVar[bool] = True + + def setup_method(self) -> None: + BaseTest.setup_method(self) + rights_file_path = os.path.join(self.colpath, "rights") + with open(rights_file_path, "w") as f: + f.write("""\ +[permit delete collection] +user: .* +collection: test-permit-delete +permissions: RrWwD + +[forbid delete collection] +user: .* +collection: test-forbid-delete +permissions: RrWwd + +[permit overwrite collection] +user: .* +collection: test-permit-overwrite +permissions: RrWwO + +[forbid overwrite collection] +user: .* +collection: test-forbid-overwrite +permissions: RrWwo + +[allow all] +user: .* +collection: .* +permissions: RrWw""") + self.configure({"rights": {"file": rights_file_path, + "type": "from_file"}}) + + def _test_expand(self, + expected_uid: str, + expected_recurrence_ids: List[str], + expected_start_times: List[str], + expected_end_times: List[str], + only_dates: bool, + nr_uids: int) -> None: + self.put("/calendar.ics/", get_file_content(f"{expected_uid}.ics")) + req_body_without_expand = \ + """ + + + + + + + + + + + + + + """ + _, responses = self.report("/calendar.ics/", req_body_without_expand) + assert len(responses) == 1 + + response_without_expand = responses[f'/calendar.ics/{expected_uid}.ics'] + assert not isinstance(response_without_expand, int) + status, element = response_without_expand["C:calendar-data"] + + assert status == 200 and element.text + + assert "RRULE" in element.text + if not only_dates: + assert "BEGIN:VTIMEZONE" in element.text + if nr_uids == 1: + assert "RECURRENCE-ID" not in element.text + + uids: List[str] = [] + for line in element.text.split("\n"): + if line.startswith("UID:"): + uid = line[len("UID:"):] + assert uid == expected_uid + uids.append(uid) + + assert len(uids) == nr_uids + + req_body_with_expand = \ + """ + + + + + + + + + + + + + + + """ + + _, responses = self.report("/calendar.ics/", req_body_with_expand) + + assert len(responses) == 1 + + response_with_expand = responses[f'/calendar.ics/{expected_uid}.ics'] + assert not isinstance(response_with_expand, int) + status, element = response_with_expand["C:calendar-data"] + + assert status == 200 and element.text + assert "RRULE" not in element.text + assert "BEGIN:VTIMEZONE" not in element.text + + uids = [] + recurrence_ids = [] + for line in element.text.split("\n"): + if line.startswith("UID:"): + assert line == f"UID:{expected_uid}" + uids.append(line) + + if line.startswith("RECURRENCE-ID:"): + assert line in expected_recurrence_ids + recurrence_ids.append(line) + + if line.startswith("DTSTART:"): + assert line in expected_start_times + + if line.startswith("DTEND:"): + assert line in expected_end_times + + assert len(uids) == len(expected_recurrence_ids) + assert len(set(recurrence_ids)) == len(expected_recurrence_ids) + + def test_report_with_expand_property(self) -> None: + """Test report with expand property""" + self._test_expand( + "event_daily_rrule", + ["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"], + ["DTSTART:20060103T170000Z", "DTSTART:20060104T170000Z"], + [], + CONTAINS_TIMES, + 1 + ) + + def test_report_with_expand_property_all_day_event(self) -> None: + """Test report with expand property for all day events""" + self._test_expand( + "event_full_day_rrule", + ["RECURRENCE-ID:20060103", "RECURRENCE-ID:20060104", "RECURRENCE-ID:20060105"], + ["DTSTART:20060103", "DTSTART:20060104", "DTSTART:20060105"], + ["DTEND:20060104", "DTEND:20060105", "DTEND:20060106"], + ONLY_DATES, + 1 + ) + + def test_report_with_expand_property_overridden(self) -> None: + """Test report with expand property with overridden events""" + self._test_expand( + "event_daily_rrule_overridden", + ["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"], + ["DTSTART:20060103T170000Z", "DTSTART:20060104T190000Z"], + [], + CONTAINS_TIMES, + 2 + )