diff --git a/CHANGELOG.md b/CHANGELOG.md
index ed827be8..1d01312e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,7 @@
# Changelog
## 3.5.4.dev
-* Improve: item filter enhanced for 3rd level supporting VALARM and VFREEBUSY (only component existence so far)
+* Improve: item filter enhanced for 3rd level supporting VALARM and honoring TRIGGER (offset or absolute)
## 3.5.3
* Add: [auth] htpasswd: support for Argon2 hashes
diff --git a/radicale/app/report.py b/radicale/app/report.py
index deb5a4ca..c6966777 100644
--- a/radicale/app/report.py
+++ b/radicale/app/report.py
@@ -177,7 +177,7 @@ def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
props: Union[ET.Element, List]
if root.find(xmlutils.make_clark("D:prop")) is not None:
- props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
+ props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
else:
props = []
diff --git a/radicale/auth/htpasswd.py b/radicale/auth/htpasswd.py
index 07f0b0fe..dd66dfbd 100644
--- a/radicale/auth/htpasswd.py
+++ b/radicale/auth/htpasswd.py
@@ -96,7 +96,7 @@ class Auth(auth.BaseAuth):
self._has_bcrypt = False
self._has_argon2 = False
self._htpasswd_ok = False
- self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
+ self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
(self._htpasswd_ok, self._htpasswd_bcrypt_use, self._htpasswd_argon2_use, self._htpasswd, self._htpasswd_size, self._htpasswd_mtime_ns) = self._read_htpasswd(True, False)
self._lock = threading.Lock()
diff --git a/radicale/auth/imap.py b/radicale/auth/imap.py
index 18af91d1..18ec527b 100644
--- a/radicale/auth/imap.py
+++ b/radicale/auth/imap.py
@@ -17,6 +17,8 @@
import imaplib
import ssl
+import sys
+from typing import Union
from radicale import auth
from radicale.log import logger
@@ -49,7 +51,10 @@ class Auth(auth.BaseAuth):
def _login(self, login, password) -> str:
try:
- connection: imaplib.IMAP4 | imaplib.IMAP4_SSL
+ if sys.version_info < (3, 10):
+ connection: Union[imaplib.IMAP4, imaplib.IMAP4_SSL]
+ else:
+ connection: imaplib.IMAP4 | imaplib.IMAP4_SSL
if self._security == "tls":
connection = imaplib.IMAP4_SSL(
host=self._host, port=self._port,
diff --git a/radicale/item/filter.py b/radicale/item/filter.py
index 5a430c22..0e62be8f 100644
--- a/radicale/item/filter.py
+++ b/radicale/item/filter.py
@@ -21,11 +21,12 @@
import math
+import sys
import xml.etree.ElementTree as ET
from datetime import date, datetime, timedelta, timezone
from itertools import chain
from typing import (Callable, Iterable, Iterator, List, Optional, Sequence,
- Tuple)
+ Tuple, Union)
import vobject
@@ -39,6 +40,11 @@ DATETIME_MAX: datetime = datetime.max.replace(tzinfo=timezone.utc)
TIMESTAMP_MIN: int = math.floor(DATETIME_MIN.timestamp())
TIMESTAMP_MAX: int = math.ceil(DATETIME_MAX.timestamp())
+if sys.version_info < (3, 10):
+ TRIGGER = Union[datetime, None]
+else:
+ TRIGGER = datetime | None
+
def date_to_datetime(d: date) -> datetime:
"""Transform any date to a UTC datetime.
@@ -88,8 +94,7 @@ def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
"""
- # TODO: Improve filtering for VALARM and VFREEBUSY
- # so far only filtering based on existence of such component is implemented
+ # TODO: Filtering VFREEBUSY is not implemented
# HACK: the filters are tested separately against all components
name = filter_.get("name", "").upper()
@@ -117,10 +122,11 @@ def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
return False
if ((level == 0 and name != "VCALENDAR") or
(level == 1 and name not in ("VTODO", "VEVENT", "VJOURNAL")) or
- (level == 2 and name not in ("VALARM", "VFREEBUSY"))):
+ (level == 2 and name not in ("VALARM"))):
logger.warning("Filtering %s is not supported", name)
return True
# Point #3 and #4 of rfc4791-9.7.1
+ trigger = None
if level == 0:
components = [item.vobject_item]
elif level == 1:
@@ -128,15 +134,19 @@ def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
elif level == 2:
components = list(getattr(item.vobject_item, "%s_list" % tag.lower()))
for comp in components:
- if not hasattr(comp, name.lower()):
+ subcomp = getattr(comp, name.lower(), None)
+ if not subcomp:
return False
+ if hasattr(subcomp, "trigger"):
+ # rfc4791-7.8.5:
+ trigger = subcomp.trigger.value
for child in filter_:
if child.tag == xmlutils.make_clark("C:prop-filter"):
if not any(prop_match(comp, child, "C")
for comp in components):
return False
elif child.tag == xmlutils.make_clark("C:time-range"):
- if not time_range_match(item.vobject_item, filter_[0], tag):
+ if not time_range_match(item.vobject_item, filter_[0], tag, trigger):
return False
elif child.tag == xmlutils.make_clark("C:comp-filter"):
if not comp_match(item, child, level=level + 1):
@@ -166,7 +176,7 @@ def prop_match(vobject_item: vobject.base.Component,
# Point #3 and #4 of rfc4791-9.7.2
for child in filter_:
if ns == "C" and child.tag == xmlutils.make_clark("C:time-range"):
- if not time_range_match(vobject_item, child, name):
+ if not time_range_match(vobject_item, child, name, None):
return False
elif child.tag == xmlutils.make_clark("%s:text-match" % ns):
if not text_match(vobject_item, child, name, ns):
@@ -180,9 +190,10 @@ def prop_match(vobject_item: vobject.base.Component,
def time_range_match(vobject_item: vobject.base.Component,
- filter_: ET.Element, child_name: str) -> bool:
+ filter_: ET.Element, child_name: str, trigger: TRIGGER) -> bool:
"""Check whether the component/property ``child_name`` of
``vobject_item`` matches the time-range ``filter_``."""
+ # supporting since 3.5.4 now optional trigger (either absolute or relative offset)
if not filter_.get("start") and not filter_.get("end"):
return False
@@ -193,6 +204,25 @@ def time_range_match(vobject_item: vobject.base.Component,
def range_fn(range_start: datetime, range_end: datetime,
is_recurrence: bool) -> bool:
nonlocal matched
+ if trigger:
+ # if trigger is given, only check range_start
+ if isinstance(trigger, timedelta):
+ # trigger is a offset, apply to range_start
+ if start < range_start + trigger and range_start + trigger < end:
+ matched = True
+ return True
+ else:
+ return False
+ elif isinstance(trigger, datetime):
+ # trigger is absolute, use instead of range_start
+ if start < trigger and trigger < end:
+ matched = True
+ return True
+ else:
+ return False
+ else:
+ logger.warning("item/filter/time_range_match/range_fn: unsupported data format of provided trigger=%r", trigger)
+ return True
if start < range_end and range_start < end:
matched = True
return True
diff --git a/radicale/tests/static/valarm1.ics b/radicale/tests/static/valarm1.ics
new file mode 100644
index 00000000..4f3b88f5
--- /dev/null
+++ b/radicale/tests/static/valarm1.ics
@@ -0,0 +1,15 @@
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//python-caldav//caldav//en_DK
+BEGIN:VEVENT
+SUMMARY:This is a test event
+DTSTART:20151010T060000Z
+DTEND:20161010T070000Z
+DTSTAMP:20250515T073149Z
+UID:a9cef952-315e-11f0-a30a-1c1bb5134174
+BEGIN:VALARM
+ACTION:AUDIO
+TRIGGER:-PT15M
+END:VALARM
+END:VEVENT
+END:VCALENDAR
diff --git a/radicale/tests/static/valarm2.ics b/radicale/tests/static/valarm2.ics
new file mode 100644
index 00000000..4e6c2554
--- /dev/null
+++ b/radicale/tests/static/valarm2.ics
@@ -0,0 +1,15 @@
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//python-caldav//caldav//en_DK
+BEGIN:VEVENT
+SUMMARY:This is a test event
+DTSTART:20151010T060000Z
+DTEND:20161010T070000Z
+DTSTAMP:20250515T073149Z
+UID:a9cef952-315e-11f0-a30a-1c1bb5134175
+BEGIN:VALARM
+ACTION:AUDIO
+TRIGGER;VALUE=DATE-TIME:20151010T033000Z
+END:VALARM
+END:VEVENT
+END:VCALENDAR
diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py
index 3639ad2c..d2b26949 100644
--- a/radicale/tests/test_base.py
+++ b/radicale/tests/test_base.py
@@ -21,6 +21,7 @@ Radicale tests with simple requests.
"""
+import logging
import os
import posixpath
from typing import Any, Callable, ClassVar, Iterable, List, Optional, Tuple
@@ -745,7 +746,7 @@ permissions: RrWw""")
) -> List[str]:
filter_template = "%s"
create_collection_fn: Callable[[str], Any]
- if kind in ("event", "journal", "todo"):
+ if kind in ("event", "journal", "todo", "valarm"):
create_collection_fn = self.mkcalendar
path = "/calendar.ics/"
filename_template = "%s%d.ics"
@@ -764,10 +765,13 @@ permissions: RrWw""")
status, _, = self.delete(path, check=None)
assert status in (200, 404)
create_collection_fn(path)
+ logging.warning("Upload items %r", items)
for i in items:
+ logging.warning("Upload %d", i)
filename = filename_template % (kind, i)
event = get_file_content(filename)
self.put(posixpath.join(path, filename), event)
+ logging.warning("Upload items finished")
filters_text = "".join(filter_template % f for f in filters)
_, responses = self.report(path, """\
@@ -1304,6 +1308,49 @@ permissions: RrWw""")
"""], "todo", items=range(1, 9))
assert "/calendar.ics/todo7.ics" in answer
+ def test_time_range_filter_events_valarm(self) -> None:
+ """Report request with time-range filter on events having absolute VALARM."""
+ answer = self._test_filter(["""\
+
+
+
+
+
+
+"""], "valarm", items=[1, 2])
+ assert "/calendar.ics/valarm1.ics" not in answer
+ assert "/calendar.ics/valarm2.ics" in answer # absolute date
+ answer = self._test_filter(["""\
+
+
+
+
+
+
+"""], "valarm", items=[1, 2])
+ assert "/calendar.ics/valarm1.ics" not in answer
+ assert "/calendar.ics/valarm2.ics" not in answer
+ answer = self._test_filter(["""\
+
+
+
+
+
+
+"""], "valarm", items=[1, 2])
+ assert "/calendar.ics/valarm1.ics" not in answer
+ assert "/calendar.ics/valarm2.ics" not in answer
+ answer = self._test_filter(["""\
+
+
+
+
+
+
+"""], "valarm", items=[1, 2])
+ assert "/calendar.ics/valarm1.ics" in answer # -15 min offset
+ assert "/calendar.ics/valarm2.ics" not in answer
+
def test_time_range_filter_todos_completed(self) -> None:
answer = self._test_filter(["""\
diff --git a/radicale/utils.py b/radicale/utils.py
index e0ae92a0..4f759f58 100644
--- a/radicale/utils.py
+++ b/radicale/utils.py
@@ -102,7 +102,7 @@ def ssl_context_options_by_protocol(protocol: str, ssl_context_options):
ssl_context_options |= ssl.OP_NO_TLSv1_3
logger.debug("SSL cleared SSL context options: '0x%x'", ssl_context_options)
for entry in protocol.split():
- entry = entry.strip('+') # remove trailing '+'
+ entry = entry.strip('+') # remove trailing '+'
if entry == "ALL":
logger.debug("SSL context options, enable ALL (some maybe not supported by underlying OpenSSL, SSLv2 not enabled at all)")
ssl_context_options &= ~ssl.OP_NO_SSLv3
@@ -162,7 +162,7 @@ def ssl_context_options_by_protocol(protocol: str, ssl_context_options):
def ssl_context_minimum_version_by_options(ssl_context_options):
logger.debug("SSL calculate minimum version by context options: '0x%x'", ssl_context_options)
- ssl_context_minimum_version = ssl.TLSVersion.SSLv3 # default
+ ssl_context_minimum_version = ssl.TLSVersion.SSLv3 # default
if ((ssl_context_options & ssl.OP_NO_SSLv3) and (ssl_context_minimum_version == ssl.TLSVersion.SSLv3)):
ssl_context_minimum_version = ssl.TLSVersion.TLSv1
if ((ssl_context_options & ssl.OP_NO_TLSv1) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1)):
@@ -172,7 +172,7 @@ def ssl_context_minimum_version_by_options(ssl_context_options):
if ((ssl_context_options & ssl.OP_NO_TLSv1_2) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_2)):
ssl_context_minimum_version = ssl.TLSVersion.TLSv1_3
if ((ssl_context_options & ssl.OP_NO_TLSv1_3) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_3)):
- ssl_context_minimum_version = 0 # all disabled
+ ssl_context_minimum_version = 0 # all disabled
logger.debug("SSL context options: '0x%x' results in minimum version: %s", ssl_context_options, ssl_context_minimum_version)
return ssl_context_minimum_version
@@ -180,7 +180,7 @@ def ssl_context_minimum_version_by_options(ssl_context_options):
def ssl_context_maximum_version_by_options(ssl_context_options):
logger.debug("SSL calculate maximum version by context options: '0x%x'", ssl_context_options)
- ssl_context_maximum_version = ssl.TLSVersion.TLSv1_3 # default
+ ssl_context_maximum_version = ssl.TLSVersion.TLSv1_3 # default
if ((ssl_context_options & ssl.OP_NO_TLSv1_3) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_3)):
ssl_context_maximum_version = ssl.TLSVersion.TLSv1_2
if ((ssl_context_options & ssl.OP_NO_TLSv1_2) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_2)):