From 2d5dc5186befb6fc1612adffb635d89142ceb9ff Mon Sep 17 00:00:00 2001 From: Pieter Hijma Date: Tue, 5 Nov 2024 11:33:27 +0100 Subject: [PATCH] Expand overridden recurring events --- radicale/app/report.py | 148 ++++++++++++++---- .../static/event_daily_rrule_overridden.ics | 35 +++++ radicale/tests/test_base.py | 87 ++++++++++ 3 files changed, 238 insertions(+), 32 deletions(-) create mode 100644 radicale/tests/static/event_daily_rrule_overridden.ics diff --git a/radicale/app/report.py b/radicale/app/report.py index 11c5bea4..43d89916 100644 --- a/radicale/app/report.py +++ b/radicale/app/report.py @@ -313,25 +313,30 @@ def _expand( recurrences = rruleset.between(start, end, inc=True) expanded: vobject.base.Component = copy.copy(expanded_item.vobject_item) + vevent_recurrence, vevents_overridden = _split_overridden_vevents(expanded, dt_format) + is_expanded_filled: bool = False + i_overridden = 0 for recurrence_dt in recurrences: recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc) + i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format) - vevent = copy.deepcopy(expanded.vevent) - vevent.recurrence_id = ContentLine( - name='RECURRENCE-ID', - value=recurrence_utc.strftime(dt_format), params={} - ) - vevent.dtstart = ContentLine( - name='DTSTART', - value=recurrence_utc.strftime(dt_format), params={} - ) - if duration: - vevent.dtend = ContentLine( - name='DTEND', - value=(recurrence_utc + duration).strftime(dt_format), params={} + if not vevent: + vevent = copy.deepcopy(vevent_recurrence) + vevent.recurrence_id = ContentLine( + name='RECURRENCE-ID', + value=recurrence_utc.strftime(dt_format), params={} ) + vevent.dtstart = ContentLine( + name='DTSTART', + value=recurrence_utc.strftime(dt_format), params={} + ) + if duration: + vevent.dtend = ContentLine( + name='DTEND', + value=(recurrence_utc + duration).strftime(dt_format), params={} + ) if is_expanded_filled is False: expanded.vevent = vevent @@ -346,6 +351,29 @@ def _expand( return element +def _convert_timezone(vevent: vobject.icalendar.RecurringComponent, + name_prop: str, + name_content_line: str): + prop = getattr(vevent, name_prop, None) + if prop: + if type(prop.value) is datetime.date: + date_time = datetime.datetime.fromordinal( + prop.value.toordinal() + ).replace(tzinfo=datetime.timezone.utc) + else: + date_time = prop.value.astimezone(datetime.timezone.utc) + + setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[])) + + +def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent, + name_prop: str, + dt_format: str): + prop = getattr(vevent, name_prop, None) + if prop: + setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[])) + + def _make_vobject_expanded_item( item: radicale_item.Item, dt_format: str, @@ -381,33 +409,89 @@ def _make_vobject_expanded_item( vevent.dtend = ContentLine(name='DTEND', value=end_utc, params={}) rruleset = None - if hasattr(item.vobject_item.vevent, 'rrule'): - rruleset = vevent.getrruleset() + for i, vevent in enumerate(item.vobject_item.vevent_list): + _convert_timezone(vevent, 'dtstart', 'DTSTART') + _convert_timezone(vevent, 'dtend', 'DTEND') + _convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID') - # There is something strange behaviour during serialization native datetime, so converting manually - vevent.dtstart.value = vevent.dtstart.value.strftime(dt_format) - if dt_end is not None: - vevent.dtend.value = vevent.dtend.value.strftime(dt_format) + if hasattr(vevent, 'rrule'): + rruleset = vevent.getrruleset() - timezones_to_remove = [] - for component in item.vobject_item.components(): - if component.name == 'VTIMEZONE': - timezones_to_remove.append(component) + # There is something strange behaviour during serialization native datetime, so converting manually + _convert_to_utc(vevent, 'dtstart', dt_format) + _convert_to_utc(vevent, 'dtend', dt_format) + _convert_to_utc(vevent, 'recurrence_id', dt_format) - for timezone in timezones_to_remove: - item.vobject_item.remove(timezone) + timezones_to_remove = [] + for component in item.vobject_item.components(): + if component.name == 'VTIMEZONE': + timezones_to_remove.append(component) - try: - delattr(item.vobject_item.vevent, 'rrule') - delattr(item.vobject_item.vevent, 'exdate') - delattr(item.vobject_item.vevent, 'exrule') - delattr(item.vobject_item.vevent, 'rdate') - except AttributeError: - pass + for timezone in timezones_to_remove: + item.vobject_item.remove(timezone) + + try: + delattr(item.vobject_item.vevent_list[i], 'rrule') + delattr(item.vobject_item.vevent_list[i], 'exdate') + delattr(item.vobject_item.vevent_list[i], 'exrule') + delattr(item.vobject_item.vevent_list[i], 'rdate') + except AttributeError: + pass return item, rruleset +def _split_overridden_vevents( + component: vobject.base.Component, + dt_format: str +) -> Tuple[ + vobject.icalendar.RecurringComponent, + List[vobject.icalendar.RecurringComponent] +]: + vevent_recurrence = None + vevents_overridden = [] + + for vevent in component.vevent_list: + if hasattr(vevent, 'recurrence_id'): + vevents_overridden += [vevent] + elif vevent_recurrence: + raise ValueError( + f"component with UID {vevent.uid} " + f"has more than one vevent without a recurrence_id" + ) + else: + vevent_recurrence = vevent + + if vevent_recurrence: + return ( + vevent_recurrence, sorted( + vevents_overridden, + key=lambda vevent: datetime.datetime.strptime(vevent.recurrence_id.value, dt_format) + ) + ) + else: + raise ValueError( + f"component with UID {vevent.uid} " + f"does not have a vevent without a recurrence_id" + ) + + +def _find_overridden( + start: int, + vevents: List[vobject.icalendar.RecurringComponent], + dt: datetime.datetime, + dt_format: str +) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]: + for i in range(start, len(vevents)): + dt_event = datetime.datetime.strptime( + vevents[i].recurrence_id.value, + dt_format + ).replace(tzinfo=datetime.timezone.utc) + if dt_event == dt: + return (i + 1, vevents[i]) + return (start, None) + + def xml_item_response(base_prefix: str, href: str, found_props: Sequence[ET.Element] = (), not_found_props: Sequence[ET.Element] = (), diff --git a/radicale/tests/static/event_daily_rrule_overridden.ics b/radicale/tests/static/event_daily_rrule_overridden.ics new file mode 100644 index 00000000..077d6cd4 --- /dev/null +++ b/radicale/tests/static/event_daily_rrule_overridden.ics @@ -0,0 +1,35 @@ +BEGIN:VCALENDAR +VERSION:2.0 +BEGIN:VTIMEZONE +LAST-MODIFIED:20040110T032845Z +TZID:US/Eastern +BEGIN:DAYLIGHT +DTSTART:20000404T020000 +RRULE:FREQ=YEARLY;BYDAY=1SU;BYMONTH=4 +TZNAME:EDT +TZOFFSETFROM:-0500 +TZOFFSETTO:-0400 +END:DAYLIGHT +BEGIN:STANDARD +DTSTART:20001026T020000 +RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=10 +TZNAME:EST +TZOFFSETFROM:-0400 +TZOFFSETTO:-0500 +END:STANDARD +END:VTIMEZONE +BEGIN:VEVENT +DTSTART;TZID=US/Eastern:20060102T120000 +DURATION:PT1H +RRULE:FREQ=DAILY;COUNT=5 +SUMMARY:Event #2 +UID:event_daily_rrule_overridden +END:VEVENT +BEGIN:VEVENT +DTSTART;TZID=US/Eastern:20060104T140000 +DURATION:PT1H +RECURRENCE-ID;TZID=US/Eastern:20060104T120000 +SUMMARY:Event #2 bis +UID:event_daily_rrule_overridden +END:VEVENT +END:VCALENDAR diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py index fd6e8c30..d8d97f98 100644 --- a/radicale/tests/test_base.py +++ b/radicale/tests/test_base.py @@ -1810,6 +1810,93 @@ permissions: RrWw""") assert len(uids) == 3 assert len(set(recurrence_ids)) == 3 + def test_report_with_expand_property_overridden(self) -> None: + """Test report with expand property""" + self.put("/calendar.ics/", get_file_content("event_daily_rrule_overridden.ics")) + req_body_without_expand = \ + """ + + + + + + + + + + + + + + """ + _, responses = self.report("/calendar.ics/", req_body_without_expand) + assert len(responses) == 1 + + response_without_expand = responses['/calendar.ics/event_daily_rrule_overridden.ics'] + assert not isinstance(response_without_expand, int) + status, element = response_without_expand["C:calendar-data"] + + assert status == 200 and element.text + + assert "RRULE" in element.text + assert "BEGIN:VTIMEZONE" in element.text + + uids: List[str] = [] + for line in element.text.split("\n"): + if line.startswith("UID:"): + uid = line[len("UID:"):] + assert uid == "event_daily_rrule_overridden" + uids.append(uid) + + assert len(uids) == 2 + + req_body_with_expand = \ + """ + + + + + + + + + + + + + + + """ + + _, responses = self.report("/calendar.ics/", req_body_with_expand) + + assert len(responses) == 1 + + response_with_expand = responses['/calendar.ics/event_daily_rrule_overridden.ics'] + assert not isinstance(response_with_expand, int) + status, element = response_with_expand["C:calendar-data"] + + assert status == 200 and element.text + assert "RRULE" not in element.text + assert "BEGIN:VTIMEZONE" not in element.text + + uids = [] + recurrence_ids = [] + for line in element.text.split("\n"): + if line.startswith("UID:"): + assert line == "UID:event_daily_rrule_overridden" + uids.append(line) + + if line.startswith("RECURRENCE-ID:"): + assert line in ["RECURRENCE-ID:20060103T170000Z", "RECURRENCE-ID:20060104T170000Z"] + recurrence_ids.append(line) + + if line.startswith("DTSTART:"): + assert line in ["DTSTART:20060103T170000Z", "DTSTART:20060104T190000Z"] + + assert len(uids) == 2 + assert len(set(recurrence_ids)) == 2 + def test_propfind_sync_token(self) -> None: """Retrieve the sync-token with a propfind request""" calendar_path = "/calendar.ics/"