1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-01 18:18:31 +00:00

Expand overridden recurring events

This commit is contained in:
Pieter Hijma 2024-11-05 11:33:27 +01:00
parent 36ef753b0e
commit 2d5dc5186b
3 changed files with 238 additions and 32 deletions

View file

@ -313,25 +313,30 @@ def _expand(
recurrences = rruleset.between(start, end, inc=True)
expanded: vobject.base.Component = copy.copy(expanded_item.vobject_item)
vevent_recurrence, vevents_overridden = _split_overridden_vevents(expanded, dt_format)
is_expanded_filled: bool = False
i_overridden = 0
for recurrence_dt in recurrences:
recurrence_utc = recurrence_dt.astimezone(datetime.timezone.utc)
i_overridden, vevent = _find_overridden(i_overridden, vevents_overridden, recurrence_utc, dt_format)
vevent = copy.deepcopy(expanded.vevent)
vevent.recurrence_id = ContentLine(
name='RECURRENCE-ID',
value=recurrence_utc.strftime(dt_format), params={}
)
vevent.dtstart = ContentLine(
name='DTSTART',
value=recurrence_utc.strftime(dt_format), params={}
)
if duration:
vevent.dtend = ContentLine(
name='DTEND',
value=(recurrence_utc + duration).strftime(dt_format), params={}
if not vevent:
vevent = copy.deepcopy(vevent_recurrence)
vevent.recurrence_id = ContentLine(
name='RECURRENCE-ID',
value=recurrence_utc.strftime(dt_format), params={}
)
vevent.dtstart = ContentLine(
name='DTSTART',
value=recurrence_utc.strftime(dt_format), params={}
)
if duration:
vevent.dtend = ContentLine(
name='DTEND',
value=(recurrence_utc + duration).strftime(dt_format), params={}
)
if is_expanded_filled is False:
expanded.vevent = vevent
@ -346,6 +351,29 @@ def _expand(
return element
def _convert_timezone(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
name_content_line: str):
prop = getattr(vevent, name_prop, None)
if prop:
if type(prop.value) is datetime.date:
date_time = datetime.datetime.fromordinal(
prop.value.toordinal()
).replace(tzinfo=datetime.timezone.utc)
else:
date_time = prop.value.astimezone(datetime.timezone.utc)
setattr(vevent, name_prop, ContentLine(name=name_content_line, value=date_time, params=[]))
def _convert_to_utc(vevent: vobject.icalendar.RecurringComponent,
name_prop: str,
dt_format: str):
prop = getattr(vevent, name_prop, None)
if prop:
setattr(vevent, name_prop, ContentLine(name=prop.name, value=prop.value.strftime(dt_format), params=[]))
def _make_vobject_expanded_item(
item: radicale_item.Item,
dt_format: str,
@ -381,33 +409,89 @@ def _make_vobject_expanded_item(
vevent.dtend = ContentLine(name='DTEND', value=end_utc, params={})
rruleset = None
if hasattr(item.vobject_item.vevent, 'rrule'):
rruleset = vevent.getrruleset()
for i, vevent in enumerate(item.vobject_item.vevent_list):
_convert_timezone(vevent, 'dtstart', 'DTSTART')
_convert_timezone(vevent, 'dtend', 'DTEND')
_convert_timezone(vevent, 'recurrence_id', 'RECURRENCE-ID')
# There is something strange behaviour during serialization native datetime, so converting manually
vevent.dtstart.value = vevent.dtstart.value.strftime(dt_format)
if dt_end is not None:
vevent.dtend.value = vevent.dtend.value.strftime(dt_format)
if hasattr(vevent, 'rrule'):
rruleset = vevent.getrruleset()
timezones_to_remove = []
for component in item.vobject_item.components():
if component.name == 'VTIMEZONE':
timezones_to_remove.append(component)
# There is something strange behaviour during serialization native datetime, so converting manually
_convert_to_utc(vevent, 'dtstart', dt_format)
_convert_to_utc(vevent, 'dtend', dt_format)
_convert_to_utc(vevent, 'recurrence_id', dt_format)
for timezone in timezones_to_remove:
item.vobject_item.remove(timezone)
timezones_to_remove = []
for component in item.vobject_item.components():
if component.name == 'VTIMEZONE':
timezones_to_remove.append(component)
try:
delattr(item.vobject_item.vevent, 'rrule')
delattr(item.vobject_item.vevent, 'exdate')
delattr(item.vobject_item.vevent, 'exrule')
delattr(item.vobject_item.vevent, 'rdate')
except AttributeError:
pass
for timezone in timezones_to_remove:
item.vobject_item.remove(timezone)
try:
delattr(item.vobject_item.vevent_list[i], 'rrule')
delattr(item.vobject_item.vevent_list[i], 'exdate')
delattr(item.vobject_item.vevent_list[i], 'exrule')
delattr(item.vobject_item.vevent_list[i], 'rdate')
except AttributeError:
pass
return item, rruleset
def _split_overridden_vevents(
component: vobject.base.Component,
dt_format: str
) -> Tuple[
vobject.icalendar.RecurringComponent,
List[vobject.icalendar.RecurringComponent]
]:
vevent_recurrence = None
vevents_overridden = []
for vevent in component.vevent_list:
if hasattr(vevent, 'recurrence_id'):
vevents_overridden += [vevent]
elif vevent_recurrence:
raise ValueError(
f"component with UID {vevent.uid} "
f"has more than one vevent without a recurrence_id"
)
else:
vevent_recurrence = vevent
if vevent_recurrence:
return (
vevent_recurrence, sorted(
vevents_overridden,
key=lambda vevent: datetime.datetime.strptime(vevent.recurrence_id.value, dt_format)
)
)
else:
raise ValueError(
f"component with UID {vevent.uid} "
f"does not have a vevent without a recurrence_id"
)
def _find_overridden(
start: int,
vevents: List[vobject.icalendar.RecurringComponent],
dt: datetime.datetime,
dt_format: str
) -> Tuple[int, Optional[vobject.icalendar.RecurringComponent]]:
for i in range(start, len(vevents)):
dt_event = datetime.datetime.strptime(
vevents[i].recurrence_id.value,
dt_format
).replace(tzinfo=datetime.timezone.utc)
if dt_event == dt:
return (i + 1, vevents[i])
return (start, None)
def xml_item_response(base_prefix: str, href: str,
found_props: Sequence[ET.Element] = (),
not_found_props: Sequence[ET.Element] = (),