1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-04 18:22:26 +00:00
This commit is contained in:
Nate Harris 2025-07-21 19:52:52 +02:00 committed by GitHub
commit b0f9cca4d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 388 additions and 124 deletions

View file

@ -1618,11 +1618,11 @@ When enabled, send one email to all attendee email addresses. When disabled, sen
Default: `False`
##### added_template
##### new_or_added_to_event_template
_(>= 3.5.5)_
Template to use for added/updated event email body.
Template to use for added/updated event email body (sent to an attendee when the event is created or they are added to a pre-existing event).
The following placeholders will be replaced:
- `$organizer_name`: Name of the organizer, or "Unknown Organizer" if not set in event
@ -1648,11 +1648,11 @@ You have been added as an attendee to the following calendar event.
This is an automated message. Please do not reply.
```
##### removed_template
##### deleted_or_removed_from_event_template
_(>= 3.5.5)_
Template to use for deleted event email body.
Template to use for deleted/removed event email body (sent to an attendee when the event is deleted or they are removed from the event).
The following placeholders will be replaced:
- `$organizer_name`: Name of the organizer, or "Unknown Organizer" if not set in event
@ -1669,7 +1669,7 @@ Default:
```
Hello $attendee_name,
You have been removed as an attendee from the following calendar event.
The following event has been deleted.
$event_title
$event_start_time - $event_end_time
@ -1678,6 +1678,38 @@ You have been removed as an attendee from the following calendar event.
This is an automated message. Please do not reply.
```
#### updated_event_template
_(>= 3.5.5)_
Template to use for updated event email body (sent to an attendee when non-attendee-related details of the event are updated).
Existing attendees will NOT be notified of a modified event if the only changes are adding/removing other attendees.
The following placeholders will be replaced:
- `$organizer_name`: Name of the organizer, or "Unknown Organizer" if not set in event
- `$from_email`: Email address the email is sent from
- `$attendee_name`: Name of the attendee (email recipient), or "everyone" if mass email enabled.
- `$event_name`: Name/summary of the event, or "No Title" if not set in event
- `$event_start_time`: Start time of the event in ISO 8601 format
- `$event_end_time`: End time of the event in ISO 8601 format, or "No End Time" if the event has no end time
- `$event_location`: Location of the event, or "No Location Specified" if not set in event
Providing any words prefixed with $ not included in the list above will result in an error.
Default:
```
Hello $attendee_name,
The following event has been updated.
$event_title
$event_start_time - $event_end_time
$event_location
This is an automated message. Please do not reply.
```
#### reporting
##### max_freebusy_occurrence

3
config
View file

@ -324,6 +324,9 @@
#smtp_password =
#from_email =
#mass_email = False
#new_or_added_to_event_template =
#deleted_or_removed_from_event_template =
#updated_event_template =
[reporting]

View file

@ -323,7 +323,7 @@ class Application(ApplicationPartDelete, ApplicationPartHead,
if "W" in self._rights.authorization(user, principal_path):
with self._storage.acquire_lock("w", user):
try:
new_coll = self._storage.create_collection(principal_path)
new_coll, _, _ = self._storage.create_collection(principal_path)
if new_coll:
jsn_coll = self.configuration.get("storage", "predefined_collections")
for (name_coll, props) in jsn_coll.items():

View file

@ -24,7 +24,7 @@ from typing import Optional
from radicale import httputils, storage, types, xmlutils
from radicale.app.base import Access, ApplicationBase
from radicale.hook import DeleteHookNotificationItem
from radicale.hook import HookNotificationItem, HookNotificationItemTypes
from radicale.log import logger
@ -82,10 +82,13 @@ class ApplicationPartDelete(ApplicationBase):
return httputils.NOT_ALLOWED
for i in item.get_all():
hook_notification_item_list.append(
DeleteHookNotificationItem(
access.path,
i.uid,
old_content=item.serialize() # type: ignore
HookNotificationItem(
notification_item_type=HookNotificationItemTypes.DELETE,
path=access.path,
content=i.uid,
uid=i.uid,
old_content=item.serialize(), # type: ignore
new_content=None
)
)
xml_answer = xml_delete(base_prefix, path, item)
@ -93,10 +96,13 @@ class ApplicationPartDelete(ApplicationBase):
assert item.collection is not None
assert item.href is not None
hook_notification_item_list.append(
DeleteHookNotificationItem(
access.path,
item.uid,
old_content=item.serialize() # type: ignore
HookNotificationItem(
notification_item_type=HookNotificationItemTypes.DELETE,
path=access.path,
content=item.uid,
uid=item.uid,
old_content=item.serialize(), # type: ignore
new_content=None,
)
)
xml_answer = xml_delete(

View file

@ -101,13 +101,17 @@ class ApplicationPartProppatch(ApplicationBase):
xml_answer = xml_proppatch(base_prefix, path, xml_content,
item)
if xml_content is not None:
content = DefusedET.tostring(
xml_content,
encoding=self._encoding
).decode(encoding=self._encoding)
hook_notification_item = HookNotificationItem(
HookNotificationItemTypes.CPATCH,
access.path,
DefusedET.tostring(
xml_content,
encoding=self._encoding
).decode(encoding=self._encoding)
notification_item_type=HookNotificationItemTypes.CPATCH,
path=access.path,
content=content,
uid=None,
old_content=None,
new_content=content
)
self._hook.notify(hook_notification_item)
except ValueError as e:

View file

@ -243,14 +243,31 @@ class ApplicationPartPut(ApplicationBase):
if write_whole_collection:
try:
etag = self._storage.create_collection(
path, prepared_items, props).etag
col, replaced_items, new_item_hrefs = self._storage.create_collection(
href=path,
items=prepared_items,
props=props)
for item in prepared_items:
hook_notification_item = HookNotificationItem(
HookNotificationItemTypes.UPSERT,
access.path,
item.serialize()
)
# Try to grab the previously-existing item by href
existing_item = replaced_items.get(item.href, None) # type: ignore
if existing_item:
hook_notification_item = HookNotificationItem(
notification_item_type=HookNotificationItemTypes.UPSERT,
path=access.path,
content=existing_item.serialize(),
uid=None,
old_content=existing_item.serialize(),
new_content=item.serialize()
)
else: # We assume the item is new because it was not in the replaced_items
hook_notification_item = HookNotificationItem(
notification_item_type=HookNotificationItemTypes.UPSERT,
path=access.path,
content=item.serialize(),
uid=None,
old_content=None,
new_content=item.serialize()
)
self._hook.notify(hook_notification_item)
except ValueError as e:
logger.warning(
@ -267,11 +284,15 @@ class ApplicationPartPut(ApplicationBase):
href = posixpath.basename(pathutils.strip_path(path))
try:
etag = parent_item.upload(href, prepared_item).etag
uploaded_item, replaced_item = parent_item.upload(href, prepared_item)
etag = uploaded_item.etag
hook_notification_item = HookNotificationItem(
HookNotificationItemTypes.UPSERT,
access.path,
prepared_item.serialize()
notification_item_type=HookNotificationItemTypes.UPSERT,
path=access.path,
content=prepared_item.serialize(),
uid=None,
old_content=replaced_item.serialize() if replaced_item else None,
new_content=prepared_item.serialize()
)
self._hook.notify(hook_notification_item)
except ValueError as e:

View file

@ -473,7 +473,7 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
"value": "False",
"help": "Send one email to all attendees, versus one email per attendee",
"type": bool}),
("added_template", {
("new_or_added_to_event_template", {
"value": """Hello $attendee_name,
You have been added as an attendee to the following calendar event.
@ -483,20 +483,31 @@ You have been added as an attendee to the following calendar event.
$event_location
This is an automated message. Please do not reply.""",
"help": "Template for the email sent when an event is added or updated. Select placeholder words prefixed with $ will be replaced",
"help": "Template for the email sent when an event is created or attendee is added. Select placeholder words prefixed with $ will be replaced",
"type": str}),
("removed_template", {
("deleted_or_removed_from_event_template", {
"value": """Hello $attendee_name,
You have been removed as an attendee from the following calendar event.
The following event has been deleted.
$event_title
$event_start_time - $event_end_time
$event_location
This is an automated message. Please do not reply.""",
"help": "Template for the email sent when an event is deleted. Select placeholder words prefixed with $ will be replaced",
"help": "Template for the email sent when an event is deleted or attendee is removed. Select placeholder words prefixed with $ will be replaced",
"type": str}),
("updated_event_template", {
"value": """Hello $attendee_name,
The following event has been updated.
$event_title
$event_start_time - $event_end_time
$event_location
This is an automated message. Please do not reply.""",
"help": "Template for the email sent when an event is updated. Select placeholder words prefixed with $ will be replaced",
"type": str
})
])),
("web", OrderedDict([
("type", {

View file

@ -55,21 +55,26 @@ def _cleanup(path):
class HookNotificationItem:
def __init__(self, notification_item_type, path, content):
def __init__(self, notification_item_type, path, content=None, uid=None, new_content=None, old_content=None):
self.type = notification_item_type.value
self.point = _cleanup(path)
self.content = content
self._content_legacy = content
self.uid = uid
self.new_content = new_content
self.old_content = old_content
@property
def content(self): # For backward compatibility
return self._content_legacy or self.uid or self.new_content or self.old_content
@property
def replaces_existing_item(self) -> bool:
"""Check if this notification item replaces/deletes an existing item."""
return self.old_content is not None
def to_json(self):
return json.dumps(
self,
default=lambda o: o.__dict__,
{**self.__dict__, "content": self.content},
sort_keys=True,
indent=4
)
class DeleteHookNotificationItem(HookNotificationItem):
def __init__(self, path, uid, old_content=None):
super().__init__(notification_item_type=HookNotificationItemTypes.DELETE, path=path, content=uid)
self.old_content = old_content

View file

@ -2,6 +2,8 @@
# for email notifications
# Copyright © 2025-2025 Nate Harris
import enum
import hashlib
import json
import re
import smtplib
import ssl
@ -15,8 +17,8 @@ from typing import Any, Dict, List, Optional, Sequence, Tuple
import vobject
from radicale.hook import (BaseHook, DeleteHookNotificationItem,
HookNotificationItem, HookNotificationItemTypes)
from radicale.hook import (BaseHook, HookNotificationItem,
HookNotificationItemTypes)
from radicale.log import logger
PLUGIN_CONFIG_SCHEMA = {
@ -49,7 +51,7 @@ PLUGIN_CONFIG_SCHEMA = {
"value": "",
"type": str
},
"added_template": {
"new_or_added_to_event_template": {
"value": """Hello $attendee_name,
You have been added as an attendee to the following calendar event.
@ -61,10 +63,22 @@ You have been added as an attendee to the following calendar event.
This is an automated message. Please do not reply.""",
"type": str
},
"removed_template": {
"deleted_or_removed_from_event_template": {
"value": """Hello $attendee_name,
You have been removed as an attendee from the following calendar event.
The following event has been deleted.
$event_title
$event_start_time - $event_end_time
$event_location
This is an automated message. Please do not reply.""",
"type": str
},
"updated_event_template": {
"value": """Hello $attendee_name,
The following event has been updated.
$event_title
$event_start_time - $event_end_time
@ -129,14 +143,22 @@ SMTP_SSL_VERIFY_MODES: Sequence[str] = (SMTP_SSL_VERIFY_MODE_ENUM.NONE.value,
SMTP_SSL_VERIFY_MODE_ENUM.REQUIRED.value)
def ics_contents_contains_invited_event(contents: str):
def read_ics_event(contents: str) -> Optional['Event']:
"""
Check if the ICS contents contain an event (versus a VTODO or VJOURNAL).
Read the vobject item from the provided string and create an Event.
"""
v_cal: vobject.base.Component = vobject.readOne(contents)
cal: Calendar = Calendar(vobject_item=v_cal)
return cal.event if cal.event else None
def ics_contents_contains_event(contents: str):
"""
Check if the ICS contents contain an event (versus a VADDRESSBOOK, VTODO or VJOURNAL).
:param contents: The contents of the ICS file.
:return: True if the ICS file contains an event, False otherwise.
"""
cal = vobject.readOne(contents)
return cal.vevent is not None
return read_ics_event(contents) is not None
def extract_email(value: str) -> Optional[str]:
@ -151,6 +173,63 @@ def extract_email(value: str) -> Optional[str]:
return value if "@" in value else None
def determine_added_removed_and_unaltered_attendees(original_event: 'Event',
new_event: 'Event') -> (
Tuple)[List['Attendee'], List['Attendee'], List['Attendee']]:
"""
Determine the added, removed and unaltered attendees between two events.
"""
original_event_attendees = {attendee.email: attendee for attendee in original_event.attendees}
new_event_attendees = {attendee.email: attendee for attendee in new_event.attendees}
# Added attendees are those who are in the new event but not in the original event
added_attendees = [new_event_attendees[email] for email in new_event_attendees if
email not in original_event_attendees]
# Removed attendees are those who are in the original event but not in the new event
removed_attendees = [original_event_attendees[email] for email in original_event_attendees if
email not in new_event_attendees]
# Unaltered attendees are those who are in both events
unaltered_attendees = [original_event_attendees[email] for email in original_event_attendees if
email in new_event_attendees]
return added_attendees, removed_attendees, unaltered_attendees
def event_details_other_than_attendees_changed(original_event: 'Event',
new_event: 'Event') -> bool:
"""
Check if any details other than attendees and IDs have changed between two events.
"""
def hash_dict(d: Dict[str, Any]) -> str:
"""
Create a hash of the dictionary to compare contents.
This will ignore None values and empty strings.
"""
return hashlib.sha1(json.dumps(d).encode("utf8")).hexdigest()
original_event_details = {
"summary": original_event.summary,
"description": original_event.description,
"location": original_event.location,
"datetime_start": original_event.datetime_start.time_string() if original_event.datetime_start else None,
"datetime_end": original_event.datetime_end.time_string() if original_event.datetime_end else None,
"duration": original_event.duration,
"status": original_event.status,
"organizer": original_event.organizer
}
new_event_details = {
"summary": new_event.summary,
"description": new_event.description,
"location": new_event.location,
"datetime_start": new_event.datetime_start.time_string() if new_event.datetime_start else None,
"datetime_end": new_event.datetime_end.time_string() if new_event.datetime_end else None,
"duration": new_event.duration,
"status": new_event.status,
"organizer": new_event.organizer
}
return hash_dict(original_event_details) != hash_dict(new_event_details)
class ContentLine:
_key: str
value: Any
@ -401,6 +480,11 @@ class Event(VComponent):
"""Return the summary of the event."""
return self._get_content_lines("SUMMARY")[0].value
@property
def description(self) -> Optional[str]:
"""Return the description of the event."""
return self._get_content_lines("DESCRIPTION")[0].value
@property
def location(self) -> Optional[str]:
"""Return the location of the event."""
@ -596,8 +680,9 @@ class EmailConfig:
password: str,
from_email: str,
send_mass_emails: bool,
added_template: MessageTemplate,
removed_template: MessageTemplate):
new_or_added_to_event_template: MessageTemplate,
deleted_or_removed_from_event_template: MessageTemplate,
updated_event_template: MessageTemplate):
self.host = host
self.port = port
self.security = SMTP_SECURITY_TYPE_ENUM.from_string(value=security)
@ -606,10 +691,9 @@ class EmailConfig:
self.password = password
self.from_email = from_email
self.send_mass_emails = send_mass_emails
self.added_template = added_template
self.removed_template = removed_template
self.updated_template = added_template # Reuse added template for updated events
self.deleted_template = removed_template # Reuse removed template for deleted events
self.new_or_added_to_event_template = new_or_added_to_event_template
self.deleted_or_removed_from_event_template = deleted_or_removed_from_event_template
self.updated_event_template = updated_event_template
def __str__(self) -> str:
"""
@ -623,26 +707,17 @@ class EmailConfig:
def send_added_email(self, attendees: List[Attendee], event: EmailEvent) -> bool:
"""
Send a notification for added attendees.
Send a notification for created events (and/or adding attendees).
:param attendees: The attendees to inform.
:param event: The event the attendee is being added to.
:param event: The event being created (or the event the attendee is being added to).
:return: True if the email was sent successfully, False otherwise.
"""
ics_attachment = ICSEmailAttachment(file_content=event.ics_content, file_name=f"{event.file_name}")
return self._prepare_and_send_email(template=self.added_template, attendees=attendees, event=event,
return self._prepare_and_send_email(template=self.new_or_added_to_event_template, attendees=attendees,
event=event,
ics_attachment=ics_attachment)
def send_removed_email(self, attendees: List[Attendee], event: EmailEvent) -> bool:
"""
Send a notification for removed attendees.
:param attendees: The attendees to inform.
:param event: The event the attendee is being removed from.
:return: True if the email was sent successfully, False otherwise.
"""
return self._prepare_and_send_email(template=self.removed_template, attendees=attendees, event=event,
ics_attachment=None)
def send_updated_email(self, attendees: List[Attendee], event: EmailEvent) -> bool:
"""
Send a notification for updated events.
@ -652,17 +727,18 @@ class EmailConfig:
"""
ics_attachment = ICSEmailAttachment(file_content=event.ics_content, file_name=f"{event.file_name}")
return self._prepare_and_send_email(template=self.updated_template, attendees=attendees, event=event,
return self._prepare_and_send_email(template=self.updated_event_template, attendees=attendees, event=event,
ics_attachment=ics_attachment)
def send_deleted_email(self, attendees: List[Attendee], event: EmailEvent) -> bool:
"""
Send a notification for deleted events.
Send a notification for deleted events (and/or removing attendees).
:param attendees: The attendees to inform.
:param event: The event being deleted.
:param event: The event being deleted (or the event the attendee is being removed from).
:return: True if the email was sent successfully, False otherwise.
"""
return self._prepare_and_send_email(template=self.deleted_template, attendees=attendees, event=event,
return self._prepare_and_send_email(template=self.deleted_or_removed_from_event_template, attendees=attendees,
event=event,
ics_attachment=None)
def _prepare_and_send_email(self, template: MessageTemplate, attendees: List[Attendee],
@ -814,14 +890,18 @@ class Hook(BaseHook):
password=self.configuration.get("hook", "smtp_password"),
from_email=self.configuration.get("hook", "from_email"),
send_mass_emails=self.configuration.get("hook", "mass_email"),
added_template=MessageTemplate(
new_or_added_to_event_template=MessageTemplate(
subject="You have been added to an event",
body=self.configuration.get("hook", "added_template")
body=self.configuration.get("hook", "new_or_added_to_event_template")
),
removed_template=MessageTemplate(
subject="You have been removed from an event",
body=self.configuration.get("hook", "removed_template")
deleted_or_removed_from_event_template=MessageTemplate(
subject="An event you were invited to has been deleted",
body=self.configuration.get("hook", "deleted_or_removed_from_event_template")
),
updated_event_template=MessageTemplate(
subject="An event you are invited to has been updated",
body=self.configuration.get("hook", "updated_event_template")
)
)
logger.info(
"Email hook initialized with configuration: %s",
@ -845,6 +925,7 @@ class Hook(BaseHook):
:return: None
"""
logger.debug("Received notification item: %s", notification_item)
try:
notification_type = HookNotificationItemTypes(value=notification_item.type)
except ValueError:
@ -856,50 +937,105 @@ class Hook(BaseHook):
return
elif notification_type == HookNotificationItemTypes.UPSERT:
# Handle upsert notifications (POST request for new item and PUT for updating existing item)
# Handle upsert notifications
# We don't have access to the original content for a PUT request, just the incoming data
new_item_str: str = notification_item.new_content # type: ignore # A serialized vobject.base.Component
previous_item_str: Optional[str] = notification_item.old_content
item_str: str = notification_item.content # type: ignore # A serialized vobject.base.Component
if not ics_contents_contains_invited_event(contents=item_str):
# If the ICS file does not contain an event, we do not send any notifications.
if not ics_contents_contains_event(contents=new_item_str):
# If ICS file does not contain an event, do not send any notifications (regardless of previous content).
logger.debug("No event found in the ICS file, skipping notification.")
return
email_event: EmailEvent = _read_event(vobject_data=item_str) # type: ignore
email_event: EmailEvent = _read_event(vobject_data=new_item_str) # type: ignore
email_success: bool = self.email_config.send_updated_email( # type: ignore
attendees=email_event.event.attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all email notifications for event: %s", email_event.event.uid)
if not previous_item_str:
# Dealing with a completely new event, no previous content to compare against.
# Email every attendee about the new event.
logger.debug("New event detected, sending notifications to all attendees.")
email_success: bool = self.email_config.send_added_email( # type: ignore
attendees=email_event.event.attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all added email notifications for event: %s",
email_event.event.uid)
return
# Dealing with an update to an existing event, compare new and previous content.
new_event: Event = read_ics_event(contents=new_item_str) # type: ignore
previous_event: Optional[Event] = read_ics_event(contents=previous_item_str)
if not previous_event:
# If we cannot parse the previous event for some reason, simply treat it as a new event.
logger.warning("Previous event content could not be parsed, treating as a new event.")
email_success: bool = self.email_config.send_added_email( # type: ignore
attendees=email_event.event.attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all added email notifications for event: %s",
email_event.event.uid)
return
# Determine added, removed, and unaltered attendees
added_attendees, removed_attendees, unaltered_attendees = determine_added_removed_and_unaltered_attendees(
original_event=previous_event, new_event=new_event)
# Notify added attendees as "event created"
if added_attendees:
email_success: bool = self.email_config.send_added_email( # type: ignore
attendees=added_attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all added email notifications for event: %s",
email_event.event.uid)
# Notify removed attendees as "event deleted"
if removed_attendees:
email_success: bool = self.email_config.send_deleted_email( # type: ignore
attendees=removed_attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all removed email notifications for event: %s",
email_event.event.uid)
# Notify unaltered attendees as "event updated" if details other than attendees have changed
if unaltered_attendees and event_details_other_than_attendees_changed(original_event=previous_event,
new_event=new_event):
email_success: bool = self.email_config.send_updated_email( # type: ignore
attendees=unaltered_attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all updated email notifications for event: %s",
email_event.event.uid)
# Skip sending notifications to existing attendees if the only changes made to the event
# were the addition/removal of other attendees.
return
elif notification_type == HookNotificationItemTypes.DELETE:
# Handle delete notifications (DELETE requests)
# Handle delete notifications
# Ensure it's a delete notification, as we need the old content
if not isinstance(notification_item, DeleteHookNotificationItem):
return
deleted_item_str: str = notification_item.old_content # type: ignore # A serialized vobject.base.Component
item_str: str = notification_item.old_content # type: ignore # A serialized vobject.base.Component
if not ics_contents_contains_invited_event(contents=item_str):
if not ics_contents_contains_event(contents=deleted_item_str):
# If the ICS file does not contain an event, we do not send any notifications.
logger.debug("No event found in the ICS file, skipping notification.")
return
email_event: EmailEvent = _read_event(vobject_data=item_str) # type: ignore
email_event: EmailEvent = _read_event(vobject_data=deleted_item_str) # type: ignore
email_success: bool = self.email_config.send_deleted_email( # type: ignore
attendees=email_event.event.attendees,
event=email_event
)
if not email_success:
logger.error("Failed to send some or all email notifications for event: %s", email_event.event.uid)
logger.error("Failed to send some or all deleted email notifications for event: %s",
email_event.event.uid)
return

View file

@ -27,8 +27,8 @@ Take a look at the class ``BaseCollection`` if you want to implement your own.
import json
import xml.etree.ElementTree as ET
from hashlib import sha256
from typing import (Callable, ContextManager, Iterable, Iterator, Mapping,
Optional, Sequence, Set, Tuple, Union, overload)
from typing import (Callable, ContextManager, Dict, Iterable, Iterator, List,
Mapping, Optional, Sequence, Set, Tuple, Union, overload)
import vobject
@ -44,7 +44,8 @@ INTERNAL_TYPES: Sequence[str] = ("multifilesystem", "multifilesystem_nolock",)
# NOTE: change only if cache structure is modified to avoid cache invalidation on update
CACHE_VERSION_RADICALE = "3.3.1"
CACHE_VERSION: bytes = ("%s=%s;%s=%s;" % ("radicale", CACHE_VERSION_RADICALE, "vobject", utils.package_version("vobject"))).encode()
CACHE_VERSION: bytes = (
"%s=%s;%s=%s;" % ("radicale", CACHE_VERSION_RADICALE, "vobject", utils.package_version("vobject"))).encode()
def load(configuration: "config.Configuration") -> "BaseStorage":
@ -112,17 +113,18 @@ class BaseCollection:
invalid.
"""
def hrefs_iter() -> Iterator[str]:
for item in self.get_all():
assert item.href
yield item.href
token = "http://radicale.org/ns/sync/%s" % self.etag.strip("\"")
if old_token:
raise ValueError("Sync token are not supported")
return token, hrefs_iter()
def get_multi(self, hrefs: Iterable[str]
) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]:
def get_multi(self, hrefs: Iterable[str]) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]:
"""Fetch multiple items.
It's not required to return the requested items in the correct order.
@ -175,8 +177,11 @@ class BaseCollection:
return False
def upload(self, href: str, item: "radicale_item.Item") -> (
"radicale_item.Item"):
"""Upload a new or replace an existing item."""
Tuple)["radicale_item.Item", Optional["radicale_item.Item"]]:
"""Upload a new or replace an existing item.
Return the uploaded item and the old item if it was replaced.
"""
raise NotImplementedError
def delete(self, href: Optional[str] = None) -> None:
@ -188,10 +193,12 @@ class BaseCollection:
raise NotImplementedError
@overload
def get_meta(self, key: None = None) -> Mapping[str, str]: ...
def get_meta(self, key: None = None) -> Mapping[str, str]:
...
@overload
def get_meta(self, key: str) -> Optional[str]: ...
def get_meta(self, key: str) -> Optional[str]:
...
def get_meta(self, key: Optional[str] = None
) -> Union[Mapping[str, str], Optional[str]]:
@ -294,7 +301,7 @@ class BaseStorage:
def discover(
self, path: str, depth: str = "0",
child_context_manager: Optional[
Callable[[str, Optional[str]], ContextManager[None]]] = None,
Callable[[str, Optional[str]], ContextManager[None]]] = None,
user_groups: Set[str] = set([])) -> Iterable["types.CollectionOrItem"]:
"""Discover a list of collections under the given ``path``.
@ -328,7 +335,8 @@ class BaseStorage:
def create_collection(
self, href: str,
items: Optional[Iterable["radicale_item.Item"]] = None,
props: Optional[Mapping[str, str]] = None) -> BaseCollection:
props: Optional[Mapping[str, str]] = None) -> (
Tuple)[BaseCollection, Dict[str, "radicale_item.Item"], List[str]]:
"""Create a collection.
``href`` is the sanitized path.

View file

@ -19,7 +19,7 @@
import os
from tempfile import TemporaryDirectory
from typing import Iterable, Optional, cast
from typing import Dict, Iterable, List, Optional, Tuple, cast
import radicale.item as radicale_item
from radicale import pathutils
@ -30,9 +30,37 @@ from radicale.storage.multifilesystem.base import StorageBase
class StoragePartCreateCollection(StorageBase):
def _discover_existing_items_pre_overwrite(self,
tmp_collection: "multifilesystem.Collection",
dst_path: str) -> Tuple[Dict[str, radicale_item.Item], List[str]]:
"""Discover existing items in the collection before overwriting them."""
existing_items = {}
new_item_hrefs = []
existing_collection = self._collection_class(
cast(multifilesystem.Storage, self),
pathutils.unstrip_path(dst_path, True))
existing_item_hrefs = set(existing_collection._list())
tmp_collection_hrefs = set(tmp_collection._list())
for item_href in tmp_collection_hrefs:
if item_href not in existing_item_hrefs:
# Item in temporary collection does not exist in the existing collection (is new)
new_item_hrefs.append(item_href)
continue
# Item exists in both collections, grab the existing item for reference
try:
item = existing_collection._get(item_href, verify_href=False)
if item is not None:
existing_items[item_href] = item
except Exception:
# TODO: Log exception?
continue
return existing_items, new_item_hrefs
def create_collection(self, href: str,
items: Optional[Iterable[radicale_item.Item]] = None,
props=None) -> "multifilesystem.Collection":
props=None) -> Tuple["multifilesystem.Collection", Dict[str, radicale_item.Item], List[str]]:
folder = self._get_collection_root_folder()
# Path should already be sanitized
@ -44,11 +72,14 @@ class StoragePartCreateCollection(StorageBase):
self._makedirs_synced(filesystem_path)
return self._collection_class(
cast(multifilesystem.Storage, self),
pathutils.unstrip_path(sane_path, True))
pathutils.unstrip_path(sane_path, True)), {}, []
parent_dir = os.path.dirname(filesystem_path)
self._makedirs_synced(parent_dir)
replaced_items: Dict[str, radicale_item.Item] = {}
new_item_hrefs: List[str] = []
# Create a temporary directory with an unsafe name
try:
with TemporaryDirectory(prefix=".Radicale.tmp-", dir=parent_dir
@ -68,14 +99,20 @@ class StoragePartCreateCollection(StorageBase):
col._upload_all_nonatomic(items, suffix=".vcf")
if os.path.lexists(filesystem_path):
replaced_items, new_item_hrefs = self._discover_existing_items_pre_overwrite(
tmp_collection=col,
dst_path=sane_path)
pathutils.rename_exchange(tmp_filesystem_path, filesystem_path)
else:
# If the destination path does not exist, obviously all items are new
new_item_hrefs = list(col._list())
os.rename(tmp_filesystem_path, filesystem_path)
self._sync_directory(parent_dir)
except Exception as e:
raise ValueError("Failed to create collection %r as %r %s" %
(href, filesystem_path, e)) from e
# TODO: Return new-old pairs and just-new items (new vs updated)
return self._collection_class(
cast(multifilesystem.Storage, self),
pathutils.unstrip_path(sane_path, True))
pathutils.unstrip_path(sane_path, True)), replaced_items, new_item_hrefs

View file

@ -21,7 +21,7 @@ import errno
import os
import pickle
import sys
from typing import Iterable, Iterator, TextIO, cast
from typing import Iterable, Iterator, Optional, TextIO, Tuple, cast
import radicale.item as radicale_item
from radicale import pathutils
@ -36,10 +36,11 @@ class CollectionPartUpload(CollectionPartGet, CollectionPartCache,
CollectionPartHistory, CollectionBase):
def upload(self, href: str, item: radicale_item.Item
) -> radicale_item.Item:
) -> Tuple[radicale_item.Item, Optional[radicale_item.Item]]:
if not pathutils.is_safe_filesystem_path_component(href):
raise pathutils.UnsafePathError(href)
path = pathutils.path_to_filesystem(self._filesystem_path, href)
old_item = self._get(href, verify_href=False)
try:
with self._atomic_write(path, newline="") as fo: # type: ignore
f = cast(TextIO, fo)
@ -67,7 +68,7 @@ class CollectionPartUpload(CollectionPartGet, CollectionPartCache,
uploaded_item = self._get(href, verify_href=False)
if uploaded_item is None:
raise RuntimeError("Storage modified externally")
return uploaded_item
return uploaded_item, old_item
def _upload_all_nonatomic(self, items: Iterable[radicale_item.Item],
suffix: str = "") -> None: